diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2023-05-09 12:40:26 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2023-05-09 12:40:26 -0400 |
| commit | 963df5b0dcccc686dff9eadee0de4c5d95db84db (patch) | |
| tree | 6b17a76d22b547411483d913a0e4fdb7f0e853a8 /include | |
| parent | 3e5df1ab4f581c795b7d3e1f1ba385efe47d6f15 (diff) | |
| download | dynamic-extension-963df5b0dcccc686dff9eadee0de4c5d95db84db.tar.gz | |
Initial port of DynamicExtension framework
Diffstat (limited to 'include')
| -rw-r--r-- | include/framework/DynamicExtension.h | 633 | ||||
| -rw-r--r-- | include/shard/WIRS.h | 26 |
2 files changed, 643 insertions, 16 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h new file mode 100644 index 0000000..93de24f --- /dev/null +++ b/include/framework/DynamicExtension.h @@ -0,0 +1,633 @@ +#pragma once + +#include <atomic> +#include <numeric> +#include <cstdio> +#include<vector> + +#include "framework/MutableBuffer.h" +#include "framework/InternalLevel.h" +#include "ds/Alias.h" +#include "util/timer.h" + +namespace de { + +thread_local size_t sampling_attempts = 0; +thread_local size_t sampling_rejections = 0; +thread_local size_t deletion_rejections = 0; +thread_local size_t bounds_rejections = 0; +thread_local size_t tombstone_rejections = 0; +thread_local size_t buffer_rejections = 0; + +/* + * thread_local size_t various_sampling_times go here. + */ +thread_local size_t sample_range_time = 0; +thread_local size_t alias_time = 0; +thread_local size_t alias_query_time = 0; +thread_local size_t rejection_check_time = 0; +thread_local size_t buffer_sample_time = 0; +thread_local size_t memlevel_sample_time = 0; +thread_local size_t disklevel_sample_time = 0; +thread_local size_t sampling_bailouts = 0; + + +/* + * LSM Tree configuration global variables + */ + +// True for buffer rejection sampling +static constexpr bool LSM_REJ_SAMPLE = false; + +// True for leveling, false for tiering +static constexpr bool LSM_LEVELING = false; + +static constexpr bool DELETE_TAGGING = true; + +// TODO: Replace the constexpr bools above +// with template parameters based on these +// enums. +enum class LayoutPolicy { + LEVELING, + TEIRING +}; + +enum class DeletePolicy { + TOMBSTONE, + TAGGING +}; + +typedef ssize_t level_index; + +template <typename K, typename V, typename W=void> +class DynamicExtension { + typedef WIRS<K,V,W> Shard; + typedef MutableBuffer<K,V,W> MBuffer; + typedef Record<K, V, W> Rec; + +public: + DynamicExtension(size_t buffer_cap, size_t buffer_delete_cap, size_t scale_factor, + double max_delete_prop, double max_rejection_prop, gsl_rng *rng) + : m_active_buffer(0), + m_scale_factor(scale_factor), + m_max_delete_prop(max_delete_prop), + m_max_rejection_rate(max_rejection_prop), + m_last_level_idx(-1), + m_buffer_1(new MutableBuffer<K,V,W>(buffer_cap, LSM_REJ_SAMPLE, buffer_delete_cap, rng)), + m_buffer_2(new MutableBuffer<K,V,W>(buffer_cap, LSM_REJ_SAMPLE, buffer_delete_cap, rng)), + m_buffer_1_merging(false), m_buffer_2_merging(false) {} + + ~DynamicExtension() { + delete m_buffer_1; + delete m_buffer_2; + + for (size_t i=0; i<m_levels.size(); i++) { + delete m_levels[i]; + } + } + + int delete_record(const K& key, const V& val, gsl_rng *rng) { + assert(DELETE_TAGGING); + + auto buffer = get_buffer(); + // Check the levels first. This assumes there aren't + // any undeleted duplicate records. + for (auto level : m_levels) { + if (level && level->delete_record(key, val)) { + return 1; + } + } + + // the buffer will take the longest amount of time, and + // probably has the lowest probability of having the record, + // so we'll check it last. + return buffer->delete_record(key, val); + } + + int append(const K& key, const V& val, W weight, bool tombstone, gsl_rng *rng) { + // NOTE: single-threaded implementation only + MutableBuffer<K,V,W> *buffer; + while (!(buffer = buffer())) + ; + + if (buffer->is_full()) { + merge_buffer(rng); + } + + return buffer->append(key, val, weight, tombstone); + } + + void range_sample(Record<K,V,W> *sample_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) { + auto buffer = get_buffer(); + Alias *buffer_alias = nullptr; + std::vector<Record<K,V,W> *> buffer_records; + size_t buffer_cutoff = 0; + + W buffer_weight; + if (LSM_REJ_SAMPLE) { + buffer_weight = buffer->get_total_weight(); + buffer_cutoff = buffer->get_record_count() - 1; + } else { + buffer_weight = buffer->get_sample_range(lower_key, upper_key, buffer_records, &buffer_alias, &buffer_cutoff); + } + + // Get the shard weights for each level. Index 0 is the buffer, + // represented by nullptr. + std::vector<std::pair<ShardID, Shard*>> shards; + std::vector<void*> states; + shards.push_back({{-1, -1}, nullptr}); + states.push_back(nullptr); + + std::vector<double> shard_weights; + shard_weights.push_back(buffer_weight); + + for (auto &level : m_levels) { + level->get_shard_weights(shard_weights, shards, states, lower_key, upper_key); + } + + if (shard_weights.size() == 1 && shard_weights[0] == 0) { + if (buffer_alias) delete buffer_alias; + for (auto& x: states) Shard::delete_state(x); + sampling_bailouts++; + return; // no records in the sampling range + } + + double tot_weight = std::accumulate(shard_weights.begin(), shard_weights.end(), 0); + for (auto& w: shard_weights) w /= tot_weight; + + // Construct alias structure + auto alias = Alias(shard_weights); + + std::vector<size_t> shard_samples(shard_weights.size(), 0); + + size_t rejections = sample_sz; + size_t sample_idx = 0; + + size_t buffer_rejections = 0; + + do { + for (size_t i=0; i<rejections; i++) { + shard_samples[alias.get(rng)] += 1; + } + + rejections = 0; + + while (shard_samples[0] > 0) { + const Record<K,V,W> *rec; + if (LSM_REJ_SAMPLE) { + rec = buffer->get_sample(lower_key, upper_key, rng); + } else { + rec = buffer_records[buffer_alias->get(rng)]; + } + + if (DELETE_TAGGING) { + if (rec && !rec->get_delete_status()) { + sample_set[sample_idx++] = *rec; + } else { + rejections++; + } + } else { + if (rec && !buffer->check_tombstone(rec->key, rec->value)) { + sample_set[sample_idx++] = *rec; + } else { + rejections++; + } + } + + shard_samples[0]--; + + // Assume nothing in buffer and bail out. + // FIXME: rather than a bailout, we could switch to non-rejection + // sampling, but that would require rebuilding the full alias structure. + // Wouldn't be too hard to do, but for the moment I'll just do this. + if (LSM_REJ_SAMPLE && buffer_rejections >= sample_sz && sample_idx == 0 && shard_weights.size() == 1) { + if (buffer_alias) delete buffer_alias; + //for (auto& x: states) delete x; + sampling_bailouts++; + return; // no records in the sampling range + } + } + + std::vector<Rec> results; + for (size_t i=1; i<shard_samples.size(); i++) { + results.reserve(shard_samples[i]); + + shards[i].second->get_samples(states[i], results, lower_key, upper_key, shard_samples[i], rng); + + for (size_t i=0; i<results.size(); i++) { + if (results[i]->is_tombstone() || is_deleted(results[i], shards[i].first, buffer, buffer_cutoff)) { + rejections++; + continue; + } + + sample_set[sample_idx++] = results[i]; + } + + shard_samples[i] = 0; + results.clear(); + } + + } while (sample_idx < sample_sz); + + if (buffer_alias) delete buffer_alias; + for (auto& x: states) Shard::delete_state(x); + + enforce_rejection_rate_maximum(rng); + } + + // Checks the tree and buffer for a tombstone corresponding to + // the provided record in any shard *above* the shid, which + // should correspond to the shard containing the record in question + // + // Passing INVALID_SHID indicates that the record exists within the buffer + bool is_deleted(const Record<K,V,W> *record, const ShardID &shid, MutableBuffer<K,V,W> *buffer, size_t buffer_cutoff) { + // If tagging is enabled, we just need to check if the record has the delete tag set + if (DELETE_TAGGING) { + return record->get_delete_status(); + } + + // Otherwise, we need to look for a tombstone. + + // check for tombstone in the buffer. This will require accounting for the cutoff eventually. + if (buffer->check_tombstone(record->key, record->value)) { + return true; + } + + // if the record is in the buffer, then we're done. + if (shid == INVALID_SHID) { + return false; + } + + for (size_t lvl=0; lvl<=shid.level_idx; lvl++) { + if (m_levels[lvl]->check_tombstone(0, record->key, record->value)) { + return true; + } + } + + // check the level containing the shard + return m_levels[shid.level_idx]->check_tombstone(shid.shard_idx + 1, record->key, record->value); + } + + + size_t get_record_cnt() { + size_t cnt = get_buffer()->get_record_count(); + + for (size_t i=0; i<m_levels.size(); i++) { + if (m_levels[i]) cnt += m_levels[i]->get_record_cnt(); + } + + return cnt; + } + + + size_t get_tombstone_cnt() { + size_t cnt = get_buffer()->get_tombstone_count(); + + for (size_t i=0; i<m_levels.size(); i++) { + if (m_levels[i]) cnt += m_levels[i]->get_tombstone_count(); + } + + return cnt; + } + + size_t get_height() { + return m_levels.size(); + } + + size_t get_memory_utilization() { + size_t cnt = m_buffer_1->get_memory_utilization() + m_buffer_2->get_memory_utilization(); + + for (size_t i=0; i<m_levels.size(); i++) { + if (m_levels[i]) cnt += m_levels[i]->get_memory_utilization(); + } + + return cnt; + } + + size_t get_aux_memory_utilization() { + size_t cnt = m_buffer_1->get_aux_memory_utilization() + m_buffer_2->get_aux_memory_utilization(); + + for (size_t i=0; i<m_levels.size(); i++) { + if (m_levels[i]) { + cnt += m_levels[i]->get_aux_memory_utilization(); + } + } + + return cnt; + } + + bool validate_tombstone_proportion() { + long double ts_prop; + for (size_t i=0; i<m_levels.size(); i++) { + if (m_levels[i]) { + ts_prop = (long double) m_levels[i]->get_tombstone_count() / (long double) calc_level_record_capacity(i); + if (ts_prop > (long double) m_max_delete_prop) { + return false; + } + } + } + + return true; + } + + size_t get_buffer_capacity() { + return m_buffer_1->get_capacity(); + } + + + Shard *create_ssi() { + std::vector<Shard *> shards; + + if (m_levels.size() > 0) { + for (int i=m_levels.size() - 1; i>= 0; i--) { + if (m_levels[i]) { + shards.emplace_back(m_levels[i]->get_merged_shard()); + } + } + } + + shards.emplace_back(new Shard(get_buffer(), nullptr, DELETE_TAGGING)); + + Shard *shards_array[shards.size()]; + + size_t j = 0; + for (size_t i=0; i<shards.size(); i++) { + if (shards[i]) { + shards_array[j++] = shards[i]; + } + } + + Shard *flattened = new Shard(shards_array, j, nullptr, DELETE_TAGGING); + + for (auto shard : shards) { + delete shard; + } + + return flattened; + } + +private: + MutableBuffer<K,V,W> *m_buffer_1; + MutableBuffer<K,V,W> *m_buffer_2; + std::atomic<bool> m_active_buffer; + std::atomic<bool> m_buffer_1_merging; + std::atomic<bool> m_buffer_2_merging; + + size_t m_scale_factor; + double m_max_delete_prop; + double m_max_rejection_rate; + + std::vector<InternalLevel<K,V,W> *> m_levels; + + level_index m_last_level_idx; + + MutableBuffer<K,V,W> *get_buffer() { + if (m_buffer_1_merging && m_buffer_2_merging) { + return nullptr; + } + + return (m_active_buffer) ? m_buffer_2 : m_buffer_1; + } + + inline bool rejection(const Record<K,V,W> *record, ShardID shid, const K& lower_bound, const K& upper_bound, MutableBuffer<K,V,W> *buffer, size_t buffer_cutoff) { + if (record->is_tombstone()) { + tombstone_rejections++; + return true; + } else if (record->key < lower_bound || record->key > upper_bound) { + bounds_rejections++; + return true; + } else if (is_deleted(record, shid, buffer, buffer, buffer_cutoff)) { + deletion_rejections++; + return true; + } + + return false; + } + + inline bool add_to_sample(const Record<K,V,W> *record, ShardID shid, const K& upper_key, const K& lower_key, char *io_buffer, + Record<K,V,W> *sample_buffer, size_t &sample_idx, MutableBuffer<K,V,W> *buffer, size_t buffer_cutoff) { + TIMER_INIT(); + TIMER_START(); + sampling_attempts++; + if (!record || rejection(record, shid, lower_key, upper_key, io_buffer, buffer, buffer_cutoff)) { + sampling_rejections++; + return false; + } + TIMER_STOP(); + rejection_check_time += TIMER_RESULT(); + + sample_buffer[sample_idx++] = *record; + return true; + } + + /* + * Add a new level to the LSM Tree and return that level's index. Will + * automatically determine whether the level should be on memory or on disk, + * and act appropriately. + */ + inline level_index grow() { + level_index new_idx; + + size_t new_shard_cnt = (LSM_LEVELING) ? 1 : m_scale_factor; + new_idx = m_levels.size(); + if (new_idx > 0) { + auto res = m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count(); + assert(res == 0); + } + m_levels.emplace_back(new InternalLevel<K,V,W>(new_idx, new_shard_cnt, DELETE_TAGGING)); + + m_last_level_idx++; + return new_idx; + } + + + // Merge the memory table down into the tree, completing any required other + // merges to make room for it. + inline void merge_buffer(gsl_rng *rng) { + auto buffer = get_buffer(); + + if (!can_merge_with(0, buffer->get_record_count())) { + merge_down(0, rng); + } + + merge_buffer_into_l0(buffer, rng); + enforce_delete_maximum(0, rng); + + buffer->tshardcate(); + return; + } + + /* + * Merge the specified level down into the tree. The level index must be + * non-negative (i.e., this function cannot be used to merge the buffer). This + * routine will recursively perform any necessary merges to make room for the + * specified level. + */ + inline void merge_down(level_index idx, gsl_rng *rng) { + level_index merge_base_level = find_mergable_level(idx); + if (merge_base_level == -1) { + merge_base_level = grow(); + } + + for (level_index i=merge_base_level; i>idx; i--) { + merge_levels(i, i-1, rng); + enforce_delete_maximum(i, rng); + } + + return; + } + + /* + * Find the first level below the level indicated by idx that + * is capable of sustaining a merge operation and return its + * level index. If no such level exists, returns -1. Also + * returns -1 if idx==0, and no such level exists, to simplify + * the logic of the first merge. + */ + inline level_index find_mergable_level(level_index idx, MutableBuffer<K,V,W> *buffer=nullptr) { + + if (idx == 0 && m_levels.size() == 0) return -1; + + bool level_found = false; + bool disk_level; + level_index merge_level_idx; + + size_t incoming_rec_cnt = get_level_record_count(idx, buffer); + for (level_index i=idx+1; i<=m_last_level_idx; i++) { + if (can_merge_with(i, incoming_rec_cnt)) { + return i; + } + + incoming_rec_cnt = get_level_record_count(i); + } + + return -1; + } + + /* + * Merge the level specified by incoming level into the level specified + * by base level. The two levels should be sequential--i.e. no levels + * are skipped in the merge process--otherwise the tombstone ordering + * invariant may be violated by the merge operation. + */ + inline void merge_levels(level_index base_level, level_index incoming_level, gsl_rng *rng) { + // merging two memory levels + if (LSM_LEVELING) { + auto tmp = m_levels[base_level]; + m_levels[base_level] = InternalLevel<K,V,W>::merge_levels(m_levels[base_level], m_levels[incoming_level], + DELETE_TAGGING, rng); + mark_as_unused(tmp); + } else { + m_levels[base_level]->append_merged_shards(m_levels[incoming_level], rng); + } + + mark_as_unused(m_levels[incoming_level]); + m_levels[incoming_level] = new InternalLevel<K,V,W>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor, DELETE_TAGGING); + } + + + inline void merge_buffer_into_l0(MutableBuffer<K,V,W> *buffer, gsl_rng *rng) { + assert(m_levels[0]); + if (LSM_LEVELING) { + // FIXME: Kludgey implementation due to interface constraints. + auto old_level = m_levels[0]; + auto temp_level = new InternalLevel<K,V,W>(0, 1, DELETE_TAGGING); + temp_level->append_mem_table(buffer, rng); + auto new_level = InternalLevel<K,V,W>::merge_levels(old_level, temp_level, DELETE_TAGGING, rng); + + m_levels[0] = new_level; + delete temp_level; + mark_as_unused(old_level); + } else { + m_levels[0]->append_mem_table(buffer, rng); + } + } + + /* + * Mark a given memory level as no-longer in use by the tree. For now this + * will just free the level. In future, this will be more complex as the + * level may not be able to immediately be deleted, depending upon who + * else is using it. + */ + inline void mark_as_unused(InternalLevel<K,V,W> *level) { + delete level; + } + + /* + * Check the tombstone proportion for the specified level and + * if the limit is exceeded, forcibly merge levels until all + * levels below idx are below the limit. + */ + inline void enforce_delete_maximum(level_index idx, gsl_rng *rng) { + long double ts_prop = (long double) m_levels[idx]->get_tombstone_count() / (long double) calc_level_record_capacity(idx); + + if (ts_prop > (long double) m_max_delete_prop) { + merge_down(idx, rng); + } + + return; + } + + inline void enforce_rejection_rate_maximum(gsl_rng *rng) { + if (m_levels.size() == 0) { + return; + } + + for (size_t i=0; i<m_last_level_idx; i++) { + if (m_levels[i]) { + double ratio = m_levels[i]->get_rejection_rate(); + if (ratio > m_max_rejection_rate) { + merge_down(i, rng); + } + } + } + } + + /* + * Assume that level "0" should be larger than the buffer. The buffer + * itself is index -1, which should return simply the buffer capacity. + */ + inline size_t calc_level_record_capacity(level_index idx) { + return get_buffer()->get_capacity() * pow(m_scale_factor, idx+1); + } + + /* + * Returns the actual number of records present on a specified level. An + * index value of -1 indicates the memory table. Can optionally pass in + * a pointer to the memory table to use, if desired. Otherwise, there are + * no guarantees about which buffer will be accessed if level_index is -1. + */ + inline size_t get_level_record_count(level_index idx, MutableBuffer<K,V,W> *buffer=nullptr) { + assert(idx >= -1); + if (idx == -1) { + return (buffer) ? buffer->get_record_count() : get_buffer()->get_record_count(); + } + + return (m_levels[idx]) ? m_levels[idx]->get_record_cnt() : 0; + } + + /* + * Determines if the specific level can merge with another record containing + * incoming_rec_cnt number of records. The provided level index should be + * non-negative (i.e., not refer to the buffer) and will be automatically + * translated into the appropriate index into either the disk or memory level + * vector. + */ + inline bool can_merge_with(level_index idx, size_t incoming_rec_cnt) { + if (idx>= m_levels.size() || !m_levels[idx]) { + return false; + } + + if (LSM_LEVELING) { + return m_levels[idx]->get_record_cnt() + incoming_rec_cnt <= calc_level_record_capacity(idx); + } else { + return m_levels[idx]->get_shard_count() < m_scale_factor; + } + + // unreachable + assert(true); + } +}; + +} + diff --git a/include/shard/WIRS.h b/include/shard/WIRS.h index 5d6baeb..d2f728f 100644 --- a/include/shard/WIRS.h +++ b/include/shard/WIRS.h @@ -22,14 +22,9 @@ namespace de { -struct sample_state; -template <typename K, typename V, typename W> -bool check_deleted(Record<K, V, W>* record, sample_state *state); extern thread_local size_t bounds_rejections; extern thread_local size_t tombstone_rejections; - - thread_local size_t m_wirsrun_cancelations = 0; template <typename K, typename V, typename W> @@ -211,8 +206,6 @@ public: // Build Alias across the decomposed nodes. WIRSState* get_sample_run_state(const K& lower_key, const K& upper_key) { WIRSState* res = new WIRSState(); - //std::vector<struct wirs_node*> nodes; - //double tot_weight = decompose_node(m_root, lower_key, upper_key, res->nodes); // Simulate a stack to unfold recursion. double tot_weight = 0.0; @@ -242,10 +235,15 @@ public: return res; } + static void delete_state(void *state) { + auto s = (WIRSState *) state; + delete s; + } + // returns the number of records sampled // NOTE: This operation returns records strictly between the lower and upper bounds, not // including them. - size_t get_samples(WIRSState* run_state, Record<K, V, W> *sample_set, const K& lower_key, const K& upper_key, size_t sample_sz, sample_state *state, gsl_rng *rng) { + size_t get_samples(WIRSState* run_state, std::vector<Record<K, V, W>> &result_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) { if (sample_sz == 0) { return 0; } @@ -263,18 +261,14 @@ public: size_t rec_offset = fat_point * m_group_size + m_alias[fat_point]->get(rng); auto record = m_data + rec_offset; + // bounds rejection if (lower_key > record->key || upper_key < record->key) { - // bounds rejection bounds_rejections++; continue; - } else if (record->is_tombstone() || (state && check_deleted(record, state))) { - // tombstone/delete rejection - tombstone_rejections++; - continue; - } + } - sample_set[cnt++] = *record; - + result_set.emplace_back(*record); + cnt++; } while (attempts < sample_sz); return cnt; |