diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2023-05-22 14:58:22 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2023-05-22 14:58:22 -0400 |
| commit | 0cf160ee68d37be93665e665ef22ae6e211a157d (patch) | |
| tree | badaca4c5654e7abbe9291b18b08748aeeadc518 /include/framework/DynamicExtension.h | |
| parent | 08d6c84b9d69b500c964a8ff66e726e1f01f2095 (diff) | |
| download | dynamic-extension-0cf160ee68d37be93665e665ef22ae6e211a157d.tar.gz | |
More updates/restructuring
Diffstat (limited to 'include/framework/DynamicExtension.h')
| -rw-r--r-- | include/framework/DynamicExtension.h | 46 |
1 files changed, 25 insertions, 21 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 2009344..a70dda4 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -18,6 +18,7 @@ #include "framework/InternalLevel.h" #include "framework/ShardInterface.h" #include "framework/QueryInterface.h" +#include "framework/RecordInterface.h" #include "shard/WIRS.h" #include "ds/Alias.h" @@ -74,12 +75,15 @@ typedef ssize_t level_index; template <RecordInterface R, ShardInterface S, QueryInterface Q, typename FQ=void> class DynamicExtension { + //typedef typename S<WrappedRecord<R>> Shard; + typedef S Shard; + typedef MutableBuffer<R> Buffer; public: DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop) : m_scale_factor(scale_factor), m_max_delete_prop(max_delete_prop), - m_buffer(new MutableBuffer<R>(buffer_cap, LSM_REJ_SAMPLE, buffer_cap * max_delete_prop)) - {} + m_buffer(new Buffer(buffer_cap, LSM_REJ_SAMPLE, buffer_cap * max_delete_prop)) + { } ~DynamicExtension() { delete m_buffer; @@ -94,7 +98,7 @@ public: } int erase(const R &rec) { - MutableBuffer<R> *buffer; + Buffer *buffer; if constexpr (DELETE_TAGGING) { auto buffer = get_buffer(); @@ -131,7 +135,7 @@ public: auto buffer_state = Q::get_buffer_query_state(buffer, parms); // Get the shard query states - std::vector<std::pair<ShardID, S*>> shards; + std::vector<std::pair<ShardID, Shard*>> shards; std::vector<void*> states; for (auto &level : m_levels) { @@ -218,8 +222,8 @@ public: return m_buffer->get_capacity(); } - S *create_ssi() { - std::vector<S *> shards; + Shard *create_ssi() { + std::vector<Shard *> shards; if (m_levels.size() > 0) { for (int i=m_levels.size() - 1; i>= 0; i--) { @@ -231,7 +235,7 @@ public: shards.emplace_back(new S(get_buffer(), nullptr)); - S *shards_array[shards.size()]; + Shard *shards_array[shards.size()]; size_t j = 0; for (size_t i=0; i<shards.size(); i++) { @@ -240,7 +244,7 @@ public: } } - S *flattened = new S(shards_array, j, nullptr); + Shard *flattened = new S(shards_array, j, nullptr); for (auto shard : shards) { delete shard; @@ -250,19 +254,19 @@ public: } private: - MutableBuffer<R> *m_buffer; + Buffer *m_buffer; size_t m_scale_factor; double m_max_delete_prop; std::vector<InternalLevel<R, S, Q> *> m_levels; - MutableBuffer<R> *get_buffer() { + Buffer *get_buffer() { return m_buffer; } int internal_append(R &rec, bool ts) { - MutableBuffer<R> *buffer; + Buffer *buffer; while (!(buffer = get_buffer())) ; @@ -273,7 +277,7 @@ private: return buffer->append(rec, ts); } - std::vector<R> post_process(std::vector<R> records, ShardID shid, MutableBuffer<R> *buffer) { + std::vector<R> post_process(std::vector<R> records, ShardID shid, Buffer *buffer) { std::vector<R> processed_records; processed_records.reserve(records.size()); @@ -331,7 +335,7 @@ private: if (new_idx > 0) { assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0); } - m_levels.emplace_back(new InternalLevel<R, S, Q>(new_idx, new_shard_cnt)); + m_levels.emplace_back(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt)); return new_idx; } @@ -380,7 +384,7 @@ private: * returns -1 if idx==0, and no such level exists, to simplify * the logic of the first merge. */ - inline level_index find_mergable_level(level_index idx, MutableBuffer<R> *buffer=nullptr) { + inline level_index find_mergable_level(level_index idx, Buffer *buffer=nullptr) { if (idx == 0 && m_levels.size() == 0) return -1; @@ -410,25 +414,25 @@ private: // merging two memory levels if (LSM_LEVELING) { auto tmp = m_levels[base_level]; - m_levels[base_level] = InternalLevel<R, S, Q>::merge_levels(m_levels[base_level], m_levels[incoming_level]); + m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level], m_levels[incoming_level]); mark_as_unused(tmp); } else { m_levels[base_level]->append_merged_shards(m_levels[incoming_level]); } mark_as_unused(m_levels[incoming_level]); - m_levels[incoming_level] = new InternalLevel<R, S, Q>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor); + m_levels[incoming_level] = new InternalLevel<R, Shard, Q>(incoming_level, (LSM_LEVELING) ? 1 : m_scale_factor); } - inline void merge_buffer_into_l0(MutableBuffer<R> *buffer) { + inline void merge_buffer_into_l0(Buffer *buffer) { assert(m_levels[0]); if (LSM_LEVELING) { // FIXME: Kludgey implementation due to interface constraints. auto old_level = m_levels[0]; - auto temp_level = new InternalLevel<R, S, Q>(0, 1); + auto temp_level = new InternalLevel<R, Shard, Q>(0, 1); temp_level->append_buffer(buffer); - auto new_level = InternalLevel<R, S, Q>::merge_levels(old_level, temp_level); + auto new_level = InternalLevel<R, Shard, Q>::merge_levels(old_level, temp_level); m_levels[0] = new_level; delete temp_level; @@ -444,7 +448,7 @@ private: * level may not be able to immediately be deleted, depending upon who * else is using it. */ - inline void mark_as_unused(InternalLevel<R, S, Q> *level) { + inline void mark_as_unused(InternalLevel<R, Shard, Q> *level) { delete level; } @@ -477,7 +481,7 @@ private: * a pointer to the memory table to use, if desired. Otherwise, there are * no guarantees about which buffer will be accessed if level_index is -1. */ - inline size_t get_level_record_count(level_index idx, MutableBuffer<R> *buffer=nullptr) { + inline size_t get_level_record_count(level_index idx, Buffer *buffer=nullptr) { assert(idx >= -1); if (idx == -1) { |