diff options
Diffstat (limited to 'include/framework/structure')
| -rw-r--r-- | include/framework/structure/BufferView.h | 11 | ||||
| -rw-r--r-- | include/framework/structure/ExtensionStructure.h | 139 | ||||
| -rw-r--r-- | include/framework/structure/InternalLevel.h | 26 | ||||
| -rw-r--r-- | include/framework/structure/MutableBuffer.h | 31 |
4 files changed, 108 insertions, 99 deletions
diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h index a9fb12d..afe5dbb 100644 --- a/include/framework/structure/BufferView.h +++ b/include/framework/structure/BufferView.h @@ -46,10 +46,9 @@ public: BufferView(Wrapped<R> *buffer, size_t cap, size_t head, size_t tail, size_t tombstone_cnt, psudb::BloomFilter<R> *filter) - : m_data(buffer), m_head(head), m_tail(tail), - m_start(m_head % cap), m_stop(m_tail % cap), m_cap(cap), - m_approx_ts_cnt(tombstone_cnt), m_tombstone_filter(filter), - m_active(true) {} + : m_data(buffer), m_head(head), m_tail(tail), m_start(m_head % cap), + m_stop(m_tail % cap), m_cap(cap), m_approx_ts_cnt(tombstone_cnt), + m_tombstone_filter(filter), m_active(true) {} ~BufferView() = default; @@ -104,9 +103,7 @@ public: */ size_t get_tombstone_count() { return m_approx_ts_cnt; } - Wrapped<R> *get(size_t i) { - return m_data + to_idx(i); - } + Wrapped<R> *get(size_t i) { return m_data + to_idx(i); } void copy_to_buffer(psudb::byte *buffer) { /* check if the region to be copied circles back to start. If so, do it in diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index bb8a480..e03c7ad 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -27,12 +27,14 @@ class ExtensionStructure { typedef BufferView<RecordType> BuffView; typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> LevelVector; + public: - ExtensionStructure(bool default_level=true) { + ExtensionStructure(bool default_level = true) { if (default_level) - m_levels.emplace_back(std::make_shared<InternalLevel<ShardType, QueryType>>(0)); + m_levels.emplace_back( + std::make_shared<InternalLevel<ShardType, QueryType>>(0)); } - + ~ExtensionStructure() = default; /* @@ -162,34 +164,37 @@ public: return cnt; } - /* * Perform the reconstruction described by task. If the resulting * reconstruction grows the structure (i.e., adds a level), returns * true. Otherwise, returns false. */ - inline reconstruction_results<ShardType> perform_reconstruction(ReconstructionTask task) const { + inline reconstruction_results<ShardType> + perform_reconstruction(ReconstructionTask task) const { reconstruction_results<ShardType> result; result.target_level = task.target; /* if there is only one source, then we don't need to actually rebuild */ if (task.sources.size() == 1) { auto shid = task.sources[0]; - if (shid.shard_idx == all_shards_idx && m_levels[shid.level_idx]->get_shard_count() > 1) { + if (shid.shard_idx == all_shards_idx && + m_levels[shid.level_idx]->get_shard_count() > 1) { /* there's more than one shard, so we need to do the reconstruction */ } else { - auto raw_shard_ptr = m_levels[shid.level_idx]->get_shard(shid.shard_idx); + auto raw_shard_ptr = + m_levels[shid.level_idx]->get_shard(shid.shard_idx); assert(raw_shard_ptr); result.source_shards.emplace_back(shid.level_idx, raw_shard_ptr); - result.new_shard = m_levels[shid.level_idx]->get_shard_ptr(shid.shard_idx).first; + result.new_shard = + m_levels[shid.level_idx]->get_shard_ptr(shid.shard_idx).first; return result; } } - - std::vector<const ShardType*> shards; + + std::vector<const ShardType *> shards; for (ShardID shid : task.sources) { - assert(shid.level_idx < (level_index) m_levels.size()); + assert(shid.level_idx < (level_index)m_levels.size()); assert(shid.shard_idx >= -1); auto raw_shard_ptr = m_levels[shid.level_idx]->get_shard(shid.shard_idx); @@ -197,12 +202,13 @@ public: result.source_shards.emplace_back(shid.level_idx, raw_shard_ptr); } - auto start = std::chrono::high_resolution_clock::now(); result.new_shard = std::make_shared<ShardType>(shards); auto stop = std::chrono::high_resolution_clock::now(); - result.runtime = std::chrono::duration_cast<std::chrono::nanoseconds>(stop- start).count(); + result.runtime = + std::chrono::duration_cast<std::chrono::nanoseconds>(stop - start) + .count(); result.reccnt = result.new_shard->get_record_count(); return result; @@ -221,11 +227,10 @@ public: return queries; } - size_t l0_size() const { - return m_levels[0]->get_shard_count(); - } + size_t l0_size() const { return m_levels[0]->get_shard_count(); } - bool apply_reconstruction(reconstruction_results<ShardType> &recon, size_t version) { + bool apply_reconstruction(reconstruction_results<ShardType> &recon, + size_t version) { bool res = append_shard(recon.new_shard, version, recon.target_level); m_levels[recon.target_level]->update_reconstruction_model(recon); delete_shards(recon.source_shards); @@ -233,13 +238,15 @@ public: return res; } - bool append_shard(std::shared_ptr<ShardType> shard, size_t version, size_t level) { + bool append_shard(std::shared_ptr<ShardType> shard, size_t version, + size_t level) { assert(level <= m_levels.size()); auto rc = false; if (level == m_levels.size()) { /* grow the structure */ - m_levels.push_back(std::make_shared<InternalLevel<ShardType, QueryType>>(level)); + m_levels.push_back( + std::make_shared<InternalLevel<ShardType, QueryType>>(level)); rc = true; } @@ -248,12 +255,15 @@ public: return rc; } - void delete_shards(std::vector<std::pair<level_index, const ShardType*>> shards) { - for (size_t i=0; i<shards.size(); i++) { - assert(shards[i].first < (level_index) m_levels.size()); + void + delete_shards(std::vector<std::pair<level_index, const ShardType *>> shards) { + for (size_t i = 0; i < shards.size(); i++) { + assert(shards[i].first < (level_index)m_levels.size()); ssize_t shard_idx = -1; - for (size_t j=0; j<m_levels[shards[i].first]->get_shard_count(); j++) { - if (m_levels[shards[i].first]->get_shard_ptr(j).first.get() == shards[i].second) { + for (size_t j = 0; j < m_levels[shards[i].first]->get_shard_count(); + j++) { + if (m_levels[shards[i].first]->get_shard_ptr(j).first.get() == + shards[i].second) { shard_idx = j; break; } @@ -262,7 +272,8 @@ public: if (shard_idx != -1) { m_levels[shards[i].first]->delete_shard(shard_idx); } else { - fprintf(stderr, "ERROR: failed to delete shard %ld\t%p\n", shards[i].first, shards[i].second); + fprintf(stderr, "ERROR: failed to delete shard %ld\t%p\n", + shards[i].first, shards[i].second); exit(EXIT_FAILURE); } } @@ -270,51 +281,55 @@ public: LevelVector const &get_level_vector() const { return m_levels; } - - /* - * Validate that no level in the structure exceeds its maximum tombstone - * capacity. This is used to trigger preemptive compactions at the end of - * the reconstruction process. - */ - bool validate_tombstone_proportion(double max_delete_prop) const { - long double ts_prop; - for (size_t i = 0; i < m_levels.size(); i++) { - if (m_levels[i]) { - ts_prop = (long double)m_levels[i]->get_tombstone_count() / - (long double)m_levels[i]->get_record_count(); - if (ts_prop > (long double)max_delete_prop) { - return false; - } + /* + * Validate that no level in the structure exceeds its maximum tombstone + * capacity. This is used to trigger preemptive compactions at the end of + * the reconstruction process. + */ + bool validate_tombstone_proportion(double max_delete_prop) const { + long double ts_prop; + for (size_t i = 0; i < m_levels.size(); i++) { + if (m_levels[i]) { + ts_prop = (long double)m_levels[i]->get_tombstone_count() / + (long double)m_levels[i]->get_record_count(); + if (ts_prop > (long double)max_delete_prop) { + return false; } } - - return true; } - bool validate_tombstone_proportion(level_index level, double max_delete_prop) const { - long double ts_prop = (long double) m_levels[level]->get_tombstone_count() / (long double) m_levels[level]->get_record_count(); - return ts_prop <= (long double) max_delete_prop; - } + return true; + } - void print_structure(bool debug=false) const { - for (size_t i=0; i<m_levels.size(); i++) { - if (debug) { - fprintf(stdout, "[D] [%ld]:\t", i); - } else { - fprintf(stdout, "[%ld]:\t", i); - } + bool validate_tombstone_proportion(level_index level, + double max_delete_prop) const { + long double ts_prop = (long double)m_levels[level]->get_tombstone_count() / + (long double)m_levels[level]->get_record_count(); + return ts_prop <= (long double)max_delete_prop; + } - if (m_levels[i]) { - for (size_t j=0; j<m_levels[i]->get_shard_count(); j++) { - fprintf(stdout, "(%ld, %ld, %p: %ld) ", j, m_levels[i]->get_shard_ptr(j).second, m_levels[i]->get_shard_ptr(j).first.get(), m_levels[i]->get_shard(j)->get_record_count()); - } - } else { - fprintf(stdout, "[Empty]"); - } + void print_structure(bool debug = false) const { + for (size_t i = 0; i < m_levels.size(); i++) { + if (debug) { + fprintf(stdout, "[D] [%ld]:\t", i); + } else { + fprintf(stdout, "[%ld]:\t", i); + } - fprintf(stdout, "\n"); + if (m_levels[i]) { + for (size_t j = 0; j < m_levels[i]->get_shard_count(); j++) { + fprintf(stdout, "(%ld, %ld, %p: %ld) ", j, + m_levels[i]->get_shard_ptr(j).second, + m_levels[i]->get_shard_ptr(j).first.get(), + m_levels[i]->get_shard(j)->get_record_count()); + } + } else { + fprintf(stdout, "[Empty]"); } - } + + fprintf(stdout, "\n"); + } + } private: LevelVector m_levels; diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h index 205dbf9..cf2b16a 100644 --- a/include/framework/structure/InternalLevel.h +++ b/include/framework/structure/InternalLevel.h @@ -15,11 +15,11 @@ */ #pragma once +#include <algorithm> +#include <deque> #include <future> #include <memory> #include <vector> -#include <deque> -#include <algorithm> #include "framework/interface/Query.h" #include "framework/interface/Shard.h" @@ -34,7 +34,7 @@ class InternalLevel { typedef std::pair<std::shared_ptr<ShardType>, size_t> shard_ptr; public: - InternalLevel(ssize_t level_no) : m_level_no(level_no) { } + InternalLevel(ssize_t level_no) : m_level_no(level_no) {} ~InternalLevel() = default; @@ -109,7 +109,7 @@ public: idx = 0; } - if (idx >= (ssize_t) m_shards.size()) { + if (idx >= (ssize_t)m_shards.size()) { return nullptr; } @@ -183,7 +183,7 @@ public: } size_t get_nonempty_shard_count() const { - size_t cnt = 0; + size_t cnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i].first && m_shards[i].first->get_record_count() > 0) { cnt += 1; @@ -199,7 +199,7 @@ public: new_level->append(m_shards[i].first, m_shards[i].second); } - for (auto itr=m_rt_window.begin(); itr < m_rt_window.end(); itr++) { + for (auto itr = m_rt_window.begin(); itr < m_rt_window.end(); itr++) { new_level->m_rt_window.push_front(*itr); } @@ -208,23 +208,21 @@ public: void truncate() { m_shards.erase(m_shards.begin(), m_shards.end()); } - void delete_shard(shard_index shard, bool log_delete=true) { + void delete_shard(shard_index shard, bool log_delete = true) { size_t before = m_shards.size(); m_shards.erase(m_shards.begin() + shard); size_t after = m_shards.size(); - assert( before > after); + assert(before > after); } - void append(std::shared_ptr<ShardType> shard, size_t version=0) { + void append(std::shared_ptr<ShardType> shard, size_t version = 0) { m_shards.push_back({shard, version}); } - void append(shard_ptr shard) { - m_shards.push_back(shard); - } + void append(shard_ptr shard) { m_shards.push_back(shard); } const shard_ptr get_shard_ptr(ssize_t idx) const { - if (idx >= 0 && idx < (ssize_t) m_shards.size()) { + if (idx >= 0 && idx < (ssize_t)m_shards.size()) { return m_shards[idx]; } else if (idx == all_shards_idx && m_shards.size() == 1) { return m_shards[0]; @@ -247,7 +245,7 @@ public: size_t total = 0; for (auto rt : m_rt_window) { total += rt; - } + } return total / m_rt_window.size(); } diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index 7357915..156b718 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -97,7 +97,9 @@ public: return true; } - size_t get_record_count() const { return m_tail.load() - m_head.load().head_idx; } + size_t get_record_count() const { + return m_tail.load() - m_head.load().head_idx; + } size_t get_capacity() const { return m_cap; } @@ -139,14 +141,16 @@ public: m_active_head_advance.store(true); if (m_old_head.load().refcnt > 0) { - //fprintf(stderr, "[W]: Refusing to advance head due to remaining reference counts [2]\n"); + // fprintf(stderr, "[W]: Refusing to advance head due to remaining + // reference counts [2]\n"); m_active_head_advance.store(false); return false; } - // fprintf(stderr, "[I] Advancing head pointer: %ld %ld %ld\n", m_old_head.load().head_idx, m_head.load().head_idx, new_head); - // fprintf(stderr, "[I] Refcnts: %ld %ld\n", m_old_head.load().refcnt, m_head.load().refcnt); - + // fprintf(stderr, "[I] Advancing head pointer: %ld %ld %ld\n", + // m_old_head.load().head_idx, m_head.load().head_idx, new_head); + // fprintf(stderr, "[I] Refcnts: %ld %ld\n", m_old_head.load().refcnt, + // m_head.load().refcnt); buffer_head new_hd = {new_head, 1}; buffer_head cur_hd; @@ -163,7 +167,6 @@ public: return true; } - void set_low_watermark(size_t lwm) { assert(lwm < m_hwm); m_lwm = lwm; @@ -197,13 +200,9 @@ public: return m_cap - (m_tail.load() - m_old_head.load().head_idx); } - size_t debug_get_old_head() const { - return m_old_head.load().head_idx; - } + size_t debug_get_old_head() const { return m_old_head.load().head_idx; } - size_t debug_get_head() const { - return m_head.load().head_idx; - } + size_t debug_get_head() const { return m_head.load().head_idx; } bool take_head_reference(size_t target_head) { buffer_head cur_hd, new_hd; @@ -225,7 +224,7 @@ public: return head_acquired; } - + bool release_head_reference(size_t head) { buffer_head cur_hd, new_hd; bool head_released = false; @@ -261,8 +260,8 @@ private: buffer_head cur_hd, new_hd; bool head_acquired = false; - - //fprintf(stderr, "[I]: getting head %ld %ld %ld\n", target_head, m_old_head.load().head_idx, m_head.load().head_idx); + // fprintf(stderr, "[I]: getting head %ld %ld %ld\n", target_head, + // m_old_head.load().head_idx, m_head.load().head_idx); do { if (m_old_head.load().head_idx == target_head) { cur_hd = m_old_head.load(); @@ -279,7 +278,7 @@ private: return new_hd.head_idx; } - + ssize_t try_advance_tail() { size_t old_value = m_tail.load(); |