From 08d6c84b9d69b500c964a8ff66e726e1f01f2095 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 22 May 2023 13:25:20 -0400 Subject: Progress towards generalization of shard interface --- include/framework/DynamicExtension.h | 393 ++++++++++++----------------------- 1 file changed, 129 insertions(+), 264 deletions(-) (limited to 'include/framework/DynamicExtension.h') diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 53b55b1..2009344 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -17,6 +17,7 @@ #include "framework/MutableBuffer.h" #include "framework/InternalLevel.h" #include "framework/ShardInterface.h" +#include "framework/QueryInterface.h" #include "shard/WIRS.h" #include "ds/Alias.h" @@ -71,221 +72,88 @@ enum class DeletePolicy { typedef ssize_t level_index; -template +template class DynamicExtension { - typedef decltype(R::key) K; - typedef decltype(R::value) V; - typedef decltype(R::weight) W; public: - DynamicExtension(size_t buffer_cap, size_t buffer_delete_cap, size_t scale_factor, - double max_delete_prop, double max_rejection_prop, gsl_rng *rng) - : m_active_buffer(0), - m_scale_factor(scale_factor), - m_max_delete_prop(max_delete_prop), - m_max_rejection_rate(max_rejection_prop), - m_last_level_idx(-1), - m_buffer_1(new MutableBuffer(buffer_cap, LSM_REJ_SAMPLE, buffer_delete_cap, rng)), - m_buffer_2(new MutableBuffer(buffer_cap, LSM_REJ_SAMPLE, buffer_delete_cap, rng)), - m_buffer_1_merging(false), m_buffer_2_merging(false) {} + DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop) + : m_scale_factor(scale_factor), m_max_delete_prop(max_delete_prop), + m_buffer(new MutableBuffer(buffer_cap, LSM_REJ_SAMPLE, buffer_cap * max_delete_prop)) + {} ~DynamicExtension() { - delete m_buffer_1; - delete m_buffer_2; + delete m_buffer; for (size_t i=0; idelete_record(key, val)) { - return 1; - } - } - - // the buffer will take the longest amount of time, and - // probably has the lowest probability of having the record, - // so we'll check it last. - return buffer->delete_record(key, val); + int insert(const R &rec) { + return internal_append(rec, false); } - int append(R &rec, gsl_rng *rng) { - // NOTE: single-threaded implementation only + int erase(const R &rec) { MutableBuffer *buffer; - while (!(buffer = get_buffer())) - ; - - if (buffer->is_full()) { - merge_buffer(rng); - } - - return buffer->append(rec); - } - - void range_sample(R *sample_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) { - auto buffer = get_buffer(); - Alias *buffer_alias = nullptr; - std::vector buffer_records; - size_t buffer_cutoff = 0; - - W buffer_weight; - if (LSM_REJ_SAMPLE) { - buffer_weight = buffer->get_total_weight(); - buffer_cutoff = buffer->get_record_count() - 1; - } else { - buffer_weight = buffer->get_sample_range(lower_key, upper_key, buffer_records, &buffer_alias, &buffer_cutoff); - } - - // Get the shard weights for each level. Index 0 is the buffer, - // represented by nullptr. - std::vector> shards; - std::vector states; - shards.push_back({{-1, -1}, nullptr}); - states.push_back(nullptr); - - std::vector shard_weights; - shard_weights.push_back((double) buffer_weight); - - WIRS::wirs_query_parms parms = {lower_key, upper_key}; - - for (auto &level : m_levels) { - level->get_query_states(shard_weights, shards, states, &parms); - } - if (shard_weights.size() == 1 && shard_weights[0] == 0) { - if (buffer_alias) delete buffer_alias; - for (auto& x: states) S::delete_query_state(x); - sampling_bailouts++; - return; // no records in the sampling range - } - - double tot_weight = std::accumulate(shard_weights.begin(), shard_weights.end(), 0); - std::vector normalized_weights(shard_weights.size()); - for (size_t i=0; i shard_samples(shard_weights.size(), 0); - - size_t rejections = sample_sz; - size_t sample_idx = 0; - - size_t buffer_rejections = 0; - - do { - for (size_t i=0; i 0) { - const R *rec; - if (LSM_REJ_SAMPLE) { - rec = buffer->get_sample(lower_key, upper_key, rng); - } else { - rec = buffer_records[buffer_alias->get(rng)]; - } + if constexpr (DELETE_TAGGING) { + auto buffer = get_buffer(); - if (DELETE_TAGGING) { - if (rec && !rec->is_deleted()) { - sample_set[sample_idx++] = *rec; - } else { - rejections++; - } - } else { - if (rec && !buffer->check_tombstone(*rec)) { - sample_set[sample_idx++] = *rec; - } else { - rejections++; - } - } - - shard_samples[0]--; - - // Assume nothing in buffer and bail out. - // FIXME: rather than a bailout, we could switch to non-rejection - // sampling, but that would require rebuilding the full alias structure. - // Wouldn't be too hard to do, but for the moment I'll just do this. - if (LSM_REJ_SAMPLE && buffer_rejections >= sample_sz && sample_idx == 0 && shard_weights.size() == 1) { - if (buffer_alias) delete buffer_alias; - //for (auto& x: states) delete x; - sampling_bailouts++; - return; // no records in the sampling range + // Check the levels first. This assumes there aren't + // any undeleted duplicate records. + for (auto level : m_levels) { + if (level && level->delete_record(rec)) { + return 1; } } - std::vector results; - for (size_t i=1; iget_samples(states[i], results, lower_key, upper_key, shard_samples[i], rng); + // the buffer will take the longest amount of time, and + // probably has the lowest probability of having the record, + // so we'll check it last. + return buffer->delete_record(rec); + } - for (size_t j=0; j query(void *parms) { - shard_samples[i] = 0; - results.clear(); - } + // Use the provided top-level query function is one + // is specified. Otherwise, use the default framework + // behavior. + if constexpr (!std::is_same::value) { + return FQ(parms); + } - } while (sample_idx < sample_sz); + auto buffer = get_buffer(); - if (buffer_alias) delete buffer_alias; - for (auto& x: states) S::delete_query_state(x); + // Get the buffer query state + auto buffer_state = Q::get_buffer_query_state(buffer, parms); - enforce_rejection_rate_maximum(rng); - } + // Get the shard query states + std::vector> shards; + std::vector states; - // Checks the tree and buffer for a tombstone corresponding to - // the provided record in any shard *above* the shid, which - // should correspond to the shard containing the record in question - // - // Passing INVALID_SHID indicates that the record exists within the buffer - bool is_deleted(const R &record, const ShardID &shid, MutableBuffer *buffer, size_t buffer_cutoff) { - // If tagging is enabled, we just need to check if the record has the delete tag set - if (DELETE_TAGGING) { - return record.is_deleted(); + for (auto &level : m_levels) { + level->get_query_states(shards, states, parms); } - // Otherwise, we need to look for a tombstone. - - // check for tombstone in the buffer. This will require accounting for the cutoff eventually. - if (buffer->check_tombstone(record)) { - return true; - } + std::vector> query_results(shards.size() + 1); - // if the record is in the buffer, then we're done. - if (shid == INVALID_SHID) { - return false; - } + // Execute the query for the buffer + query_results[0] = Q::buffer_query(buffer, buffer_state, parms); - for (size_t lvl=0; lvl<=shid.level_idx; lvl++) { - if (m_levels[lvl]->check_tombstone(0, record)) { - return true; - } + // Execute the query for each shard + for (size_t i=0; icheck_tombstone(shid.shard_idx + 1, record); + return result; } - size_t get_record_cnt() { size_t cnt = get_buffer()->get_record_count(); @@ -296,7 +164,6 @@ public: return cnt; } - size_t get_tombstone_cnt() { size_t cnt = get_buffer()->get_tombstone_count(); @@ -311,22 +178,22 @@ public: return m_levels.size(); } - size_t get_memory_utilization() { - size_t cnt = m_buffer_1->get_memory_utilization() + m_buffer_2->get_memory_utilization(); + size_t get_memory_usage() { + size_t cnt = m_buffer->get_memory_usage(); for (size_t i=0; iget_memory_utilization(); + if (m_levels[i]) cnt += m_levels[i]->get_memory_usage(); } return cnt; } - size_t get_aux_memory_utilization() { - size_t cnt = m_buffer_1->get_aux_memory_utilization() + m_buffer_2->get_aux_memory_utilization(); + size_t get_aux_memory_usage() { + size_t cnt = m_buffer->get_aux_memory_usage(); for (size_t i=0; iget_aux_memory_utilization(); + cnt += m_levels[i]->get_aux_memory_usage(); } } @@ -348,10 +215,9 @@ public: } size_t get_buffer_capacity() { - return m_buffer_1->get_capacity(); + return m_buffer->get_capacity(); } - S *create_ssi() { std::vector shards; @@ -384,57 +250,72 @@ public: } private: - MutableBuffer *m_buffer_1; - MutableBuffer *m_buffer_2; - std::atomic m_active_buffer; - std::atomic m_buffer_1_merging; - std::atomic m_buffer_2_merging; + MutableBuffer *m_buffer; size_t m_scale_factor; double m_max_delete_prop; - double m_max_rejection_rate; - std::vector *> m_levels; - - level_index m_last_level_idx; + std::vector *> m_levels; MutableBuffer *get_buffer() { - if (m_buffer_1_merging && m_buffer_2_merging) { - return nullptr; - } - - return (m_active_buffer) ? m_buffer_2 : m_buffer_1; + return m_buffer; } - inline bool rejection(const R &record, ShardID shid, const K& lower_bound, const K& upper_bound, MutableBuffer *buffer, size_t buffer_cutoff) { - if (record.is_tombstone()) { - tombstone_rejections++; - return true; - } else if (record.key < lower_bound || record.key > upper_bound) { - bounds_rejections++; - return true; - } else if (is_deleted(record, shid, buffer, buffer_cutoff)) { - deletion_rejections++; - return true; + int internal_append(R &rec, bool ts) { + MutableBuffer *buffer; + while (!(buffer = get_buffer())) + ; + + if (buffer->is_full()) { + merge_buffer(); } - return false; + return buffer->append(rec, ts); } - inline bool add_to_sample(const R &record, ShardID shid, const K& upper_key, const K& lower_key, char *io_buffer, - R *sample_buffer, size_t &sample_idx, MutableBuffer *buffer, size_t buffer_cutoff) { - TIMER_INIT(); - TIMER_START(); - sampling_attempts++; - if (!record || rejection(record, shid, lower_key, upper_key, io_buffer, buffer, buffer_cutoff)) { - sampling_rejections++; - return false; + std::vector post_process(std::vector records, ShardID shid, MutableBuffer *buffer) { + std::vector processed_records; + processed_records.reserve(records.size()); + + // For delete tagging, we just need to check the delete bit on each + // record. + if constexpr (DELETE_TAGGING) { + for (auto &rec : records) { + if (rec.is_deleted()) { + continue; + } + + processed_records.emplace_back(rec.rec); + } + + return processed_records; } - TIMER_STOP(); - rejection_check_time += TIMER_RESULT(); - sample_buffer[sample_idx++] = *record; - return true; + // For tombstone deletes, we need to search for the corresponding + // tombstone for each record. + for (auto &rec : records) { + if (rec.is_tombstone()) { + continue; + } + + if (buffer->check_tombstone(rec.rec)) { + continue; + } + + if (shid != INVALID_SHID) { + for (size_t lvl=0; lvl<=shid.level_idx; lvl++) { + if (m_levels[lvl]->check_tombstone(0, rec.rec)) { + continue; + } + } + + if (m_levels[shid.level_idx]->check_tombstone(shid.shard_idx + 1, rec.rec)) { + continue; + } + } + + processed_records.emplace_back(rec.rec); + } } /* @@ -450,24 +331,23 @@ private: if (new_idx > 0) { assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0); } - m_levels.emplace_back(new InternalLevel(new_idx, new_shard_cnt)); + m_levels.emplace_back(new InternalLevel(new_idx, new_shard_cnt)); - m_last_level_idx++; return new_idx; } // Merge the memory table down into the tree, completing any required other // merges to make room for it. - inline void merge_buffer(gsl_rng *rng) { + inline void merge_buffer() { auto buffer = get_buffer(); if (!can_merge_with(0, buffer->get_record_count())) { - merge_down(0, rng); + merge_down(0); } - merge_buffer_into_l0(buffer, rng); - enforce_delete_maximum(0, rng); + merge_buffer_into_l0(buffer); + enforce_delete_maximum(0); buffer->truncate(); return; @@ -479,15 +359,15 @@ private: * routine will recursively perform any necessary merges to make room for the * specified level. */ - inline void merge_down(level_index idx, gsl_rng *rng) { + inline void merge_down(level_index idx) { level_index merge_base_level = find_mergable_level(idx); if (merge_base_level == -1) { merge_base_level = grow(); } for (level_index i=merge_base_level; i>idx; i--) { - merge_levels(i, i-1, rng); - enforce_delete_maximum(i, rng); + merge_levels(i, i-1); + enforce_delete_maximum(i); } return; @@ -509,7 +389,7 @@ private: level_index merge_level_idx; size_t incoming_rec_cnt = get_level_record_count(idx, buffer); - for (level_index i=idx+1; i<=m_last_level_idx; i++) { + for (level_index i=idx+1; i<=m_levels.size(); i++) { if (can_merge_with(i, incoming_rec_cnt)) { return i; } @@ -526,35 +406,35 @@ private: * are skipped in the merge process--otherwise the tombstone ordering * invariant may be violated by the merge operation. */ - inline void merge_levels(level_index base_level, level_index incoming_level, gsl_rng *rng) { + inline void merge_levels(level_index base_level, level_index incoming_level) { // merging two memory levels if (LSM_LEVELING) { auto tmp = m_levels[base_level]; - m_levels[base_level] = InternalLevel::merge_levels(m_levels[base_level], m_levels[incoming_level], rng); + m_levels[base_level] = InternalLevel::merge_levels(m_levels[base_level], m_levels[incoming_level]); mark_as_unused(tmp); } else { - m_levels[base_level]->append_merged_shards(m_levels[incoming_level], rng); + m_levels[base_level]->append_merged_shards(m_levels[incoming_level]); } mark_as_unused(m_levels[incoming_level]); - m_levels[incoming_level] = new InternalLevel(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor); + m_levels[incoming_level] = new InternalLevel(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor); } - inline void merge_buffer_into_l0(MutableBuffer *buffer, gsl_rng *rng) { + inline void merge_buffer_into_l0(MutableBuffer *buffer) { assert(m_levels[0]); if (LSM_LEVELING) { // FIXME: Kludgey implementation due to interface constraints. auto old_level = m_levels[0]; - auto temp_level = new InternalLevel(0, 1); - temp_level->append_mem_table(buffer, rng); - auto new_level = InternalLevel::merge_levels(old_level, temp_level, rng); + auto temp_level = new InternalLevel(0, 1); + temp_level->append_buffer(buffer); + auto new_level = InternalLevel::merge_levels(old_level, temp_level); m_levels[0] = new_level; delete temp_level; mark_as_unused(old_level); } else { - m_levels[0]->append_mem_table(buffer, rng); + m_levels[0]->append_buffer(buffer); } } @@ -564,7 +444,7 @@ private: * level may not be able to immediately be deleted, depending upon who * else is using it. */ - inline void mark_as_unused(InternalLevel *level) { + inline void mark_as_unused(InternalLevel *level) { delete level; } @@ -573,31 +453,16 @@ private: * if the limit is exceeded, forcibly merge levels until all * levels below idx are below the limit. */ - inline void enforce_delete_maximum(level_index idx, gsl_rng *rng) { + inline void enforce_delete_maximum(level_index idx) { long double ts_prop = (long double) m_levels[idx]->get_tombstone_count() / (long double) calc_level_record_capacity(idx); if (ts_prop > (long double) m_max_delete_prop) { - merge_down(idx, rng); + merge_down(idx); } return; } - inline void enforce_rejection_rate_maximum(gsl_rng *rng) { - if (m_levels.size() == 0) { - return; - } - - for (size_t i=0; iget_rejection_rate(); - if (ratio > m_max_rejection_rate) { - merge_down(i, rng); - } - } - } - } - /* * Assume that level "0" should be larger than the buffer. The buffer * itself is index -1, which should return simply the buffer capacity. -- cgit v1.2.3