diff options
Diffstat (limited to 'include/framework/DynamicExtension.h')
| -rw-r--r-- | include/framework/DynamicExtension.h | 79 |
1 files changed, 40 insertions, 39 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index c65290a..1d9ee76 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -68,11 +68,12 @@ enum class DeletePolicy { typedef ssize_t level_index; -template <typename K, typename V, typename W=void> +template <RecordInterface R> class DynamicExtension { - typedef WIRS<K,V,W> Shard; - typedef MutableBuffer<K,V,W> MBuffer; - typedef Record<K, V, W> Rec; + typedef WIRS<R> Shard; + typedef decltype(R::key) K; + typedef decltype(R::value) V; + typedef decltype(R::weight) W; public: DynamicExtension(size_t buffer_cap, size_t buffer_delete_cap, size_t scale_factor, @@ -82,8 +83,8 @@ public: m_max_delete_prop(max_delete_prop), m_max_rejection_rate(max_rejection_prop), m_last_level_idx(-1), - m_buffer_1(new MutableBuffer<K,V,W>(buffer_cap, LSM_REJ_SAMPLE, buffer_delete_cap, rng)), - m_buffer_2(new MutableBuffer<K,V,W>(buffer_cap, LSM_REJ_SAMPLE, buffer_delete_cap, rng)), + m_buffer_1(new MutableBuffer<R>(buffer_cap, LSM_REJ_SAMPLE, buffer_delete_cap, rng)), + m_buffer_2(new MutableBuffer<R>(buffer_cap, LSM_REJ_SAMPLE, buffer_delete_cap, rng)), m_buffer_1_merging(false), m_buffer_2_merging(false) {} ~DynamicExtension() { @@ -113,9 +114,9 @@ public: return buffer->delete_record(key, val); } - int append(const K& key, const V& val, W weight, bool tombstone, gsl_rng *rng) { + int append(R &rec, gsl_rng *rng) { // NOTE: single-threaded implementation only - MutableBuffer<K,V,W> *buffer; + MutableBuffer<R> *buffer; while (!(buffer = get_buffer())) ; @@ -123,13 +124,13 @@ public: merge_buffer(rng); } - return buffer->append(key, val, weight, tombstone); + return buffer->append(rec); } - void range_sample(Record<K,V,W> *sample_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) { + void range_sample(R *sample_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) { auto buffer = get_buffer(); Alias *buffer_alias = nullptr; - std::vector<Record<K,V,W> *> buffer_records; + std::vector<R *> buffer_records; size_t buffer_cutoff = 0; W buffer_weight; @@ -185,7 +186,7 @@ public: rejections = 0; while (shard_samples[0] > 0) { - const Record<K,V,W> *rec; + const R *rec; if (LSM_REJ_SAMPLE) { rec = buffer->get_sample(lower_key, upper_key, rng); } else { @@ -193,13 +194,13 @@ public: } if (DELETE_TAGGING) { - if (rec && !rec->get_delete_status()) { + if (rec && !rec->is_deleted()) { sample_set[sample_idx++] = *rec; } else { rejections++; } } else { - if (rec && !buffer->check_tombstone(rec->key, rec->value)) { + if (rec && !buffer->check_tombstone(*rec)) { sample_set[sample_idx++] = *rec; } else { rejections++; @@ -220,14 +221,14 @@ public: } } - std::vector<Rec> results; + std::vector<R> results; for (size_t i=1; i<shard_samples.size(); i++) { results.reserve(shard_samples[i]); shards[i].second->get_samples(states[i], results, lower_key, upper_key, shard_samples[i], rng); for (size_t j=0; j<results.size(); j++) { - if (rejection(&results[j], shards[i].first, lower_key, upper_key, buffer, buffer_cutoff)) { + if (rejection(results[j], shards[i].first, lower_key, upper_key, buffer, buffer_cutoff)) { rejections++; continue; } @@ -252,16 +253,16 @@ public: // should correspond to the shard containing the record in question // // Passing INVALID_SHID indicates that the record exists within the buffer - bool is_deleted(const Record<K,V,W> *record, const ShardID &shid, MutableBuffer<K,V,W> *buffer, size_t buffer_cutoff) { + bool is_deleted(const R &record, const ShardID &shid, MutableBuffer<R> *buffer, size_t buffer_cutoff) { // If tagging is enabled, we just need to check if the record has the delete tag set if (DELETE_TAGGING) { - return record->get_delete_status(); + return record.is_deleted(); } // Otherwise, we need to look for a tombstone. // check for tombstone in the buffer. This will require accounting for the cutoff eventually. - if (buffer->check_tombstone(record->key, record->value)) { + if (buffer->check_tombstone(record)) { return true; } @@ -271,13 +272,13 @@ public: } for (size_t lvl=0; lvl<=shid.level_idx; lvl++) { - if (m_levels[lvl]->check_tombstone(0, record->key, record->value)) { + if (m_levels[lvl]->check_tombstone(0, record)) { return true; } } // check the level containing the shard - return m_levels[shid.level_idx]->check_tombstone(shid.shard_idx + 1, record->key, record->value); + return m_levels[shid.level_idx]->check_tombstone(shid.shard_idx + 1, record); } @@ -379,8 +380,8 @@ public: } private: - MutableBuffer<K,V,W> *m_buffer_1; - MutableBuffer<K,V,W> *m_buffer_2; + MutableBuffer<R> *m_buffer_1; + MutableBuffer<R> *m_buffer_2; std::atomic<bool> m_active_buffer; std::atomic<bool> m_buffer_1_merging; std::atomic<bool> m_buffer_2_merging; @@ -389,11 +390,11 @@ private: double m_max_delete_prop; double m_max_rejection_rate; - std::vector<InternalLevel<K,V,W> *> m_levels; + std::vector<InternalLevel<R> *> m_levels; level_index m_last_level_idx; - MutableBuffer<K,V,W> *get_buffer() { + MutableBuffer<R> *get_buffer() { if (m_buffer_1_merging && m_buffer_2_merging) { return nullptr; } @@ -401,11 +402,11 @@ private: return (m_active_buffer) ? m_buffer_2 : m_buffer_1; } - inline bool rejection(const Record<K,V,W> *record, ShardID shid, const K& lower_bound, const K& upper_bound, MutableBuffer<K,V,W> *buffer, size_t buffer_cutoff) { - if (record->is_tombstone()) { + inline bool rejection(const R &record, ShardID shid, const K& lower_bound, const K& upper_bound, MutableBuffer<R> *buffer, size_t buffer_cutoff) { + if (record.is_tombstone()) { tombstone_rejections++; return true; - } else if (record->key < lower_bound || record->key > upper_bound) { + } else if (record.key < lower_bound || record.key > upper_bound) { bounds_rejections++; return true; } else if (is_deleted(record, shid, buffer, buffer_cutoff)) { @@ -416,8 +417,8 @@ private: return false; } - inline bool add_to_sample(const Record<K,V,W> *record, ShardID shid, const K& upper_key, const K& lower_key, char *io_buffer, - Record<K,V,W> *sample_buffer, size_t &sample_idx, MutableBuffer<K,V,W> *buffer, size_t buffer_cutoff) { + inline bool add_to_sample(const R &record, ShardID shid, const K& upper_key, const K& lower_key, char *io_buffer, + R *sample_buffer, size_t &sample_idx, MutableBuffer<R> *buffer, size_t buffer_cutoff) { TIMER_INIT(); TIMER_START(); sampling_attempts++; @@ -445,7 +446,7 @@ private: if (new_idx > 0) { assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0); } - m_levels.emplace_back(new InternalLevel<K,V,W>(new_idx, new_shard_cnt, DELETE_TAGGING)); + m_levels.emplace_back(new InternalLevel<R>(new_idx, new_shard_cnt, DELETE_TAGGING)); m_last_level_idx++; return new_idx; @@ -495,7 +496,7 @@ private: * returns -1 if idx==0, and no such level exists, to simplify * the logic of the first merge. */ - inline level_index find_mergable_level(level_index idx, MutableBuffer<K,V,W> *buffer=nullptr) { + inline level_index find_mergable_level(level_index idx, MutableBuffer<R> *buffer=nullptr) { if (idx == 0 && m_levels.size() == 0) return -1; @@ -525,7 +526,7 @@ private: // merging two memory levels if (LSM_LEVELING) { auto tmp = m_levels[base_level]; - m_levels[base_level] = InternalLevel<K,V,W>::merge_levels(m_levels[base_level], m_levels[incoming_level], + m_levels[base_level] = InternalLevel<R>::merge_levels(m_levels[base_level], m_levels[incoming_level], DELETE_TAGGING, rng); mark_as_unused(tmp); } else { @@ -533,18 +534,18 @@ private: } mark_as_unused(m_levels[incoming_level]); - m_levels[incoming_level] = new InternalLevel<K,V,W>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor, DELETE_TAGGING); + m_levels[incoming_level] = new InternalLevel<R>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor, DELETE_TAGGING); } - inline void merge_buffer_into_l0(MutableBuffer<K,V,W> *buffer, gsl_rng *rng) { + inline void merge_buffer_into_l0(MutableBuffer<R> *buffer, gsl_rng *rng) { assert(m_levels[0]); if (LSM_LEVELING) { // FIXME: Kludgey implementation due to interface constraints. auto old_level = m_levels[0]; - auto temp_level = new InternalLevel<K,V,W>(0, 1, DELETE_TAGGING); + auto temp_level = new InternalLevel<R>(0, 1, DELETE_TAGGING); temp_level->append_mem_table(buffer, rng); - auto new_level = InternalLevel<K,V,W>::merge_levels(old_level, temp_level, DELETE_TAGGING, rng); + auto new_level = InternalLevel<R>::merge_levels(old_level, temp_level, DELETE_TAGGING, rng); m_levels[0] = new_level; delete temp_level; @@ -560,7 +561,7 @@ private: * level may not be able to immediately be deleted, depending upon who * else is using it. */ - inline void mark_as_unused(InternalLevel<K,V,W> *level) { + inline void mark_as_unused(InternalLevel<R> *level) { delete level; } @@ -608,7 +609,7 @@ private: * a pointer to the memory table to use, if desired. Otherwise, there are * no guarantees about which buffer will be accessed if level_index is -1. */ - inline size_t get_level_record_count(level_index idx, MutableBuffer<K,V,W> *buffer=nullptr) { + inline size_t get_level_record_count(level_index idx, MutableBuffer<R> *buffer=nullptr) { assert(idx >= -1); if (idx == -1) { return (buffer) ? buffer->get_record_count() : get_buffer()->get_record_count(); |