summaryrefslogtreecommitdiffstats
path: root/include/framework/structure
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/structure')
-rw-r--r--include/framework/structure/BufferView.h11
-rw-r--r--include/framework/structure/ExtensionStructure.h139
-rw-r--r--include/framework/structure/InternalLevel.h26
-rw-r--r--include/framework/structure/MutableBuffer.h31
4 files changed, 108 insertions, 99 deletions
diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h
index a9fb12d..afe5dbb 100644
--- a/include/framework/structure/BufferView.h
+++ b/include/framework/structure/BufferView.h
@@ -46,10 +46,9 @@ public:
BufferView(Wrapped<R> *buffer, size_t cap, size_t head, size_t tail,
size_t tombstone_cnt, psudb::BloomFilter<R> *filter)
- : m_data(buffer), m_head(head), m_tail(tail),
- m_start(m_head % cap), m_stop(m_tail % cap), m_cap(cap),
- m_approx_ts_cnt(tombstone_cnt), m_tombstone_filter(filter),
- m_active(true) {}
+ : m_data(buffer), m_head(head), m_tail(tail), m_start(m_head % cap),
+ m_stop(m_tail % cap), m_cap(cap), m_approx_ts_cnt(tombstone_cnt),
+ m_tombstone_filter(filter), m_active(true) {}
~BufferView() = default;
@@ -104,9 +103,7 @@ public:
*/
size_t get_tombstone_count() { return m_approx_ts_cnt; }
- Wrapped<R> *get(size_t i) {
- return m_data + to_idx(i);
- }
+ Wrapped<R> *get(size_t i) { return m_data + to_idx(i); }
void copy_to_buffer(psudb::byte *buffer) {
/* check if the region to be copied circles back to start. If so, do it in
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index bb8a480..e03c7ad 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -27,12 +27,14 @@ class ExtensionStructure {
typedef BufferView<RecordType> BuffView;
typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>>
LevelVector;
+
public:
- ExtensionStructure(bool default_level=true) {
+ ExtensionStructure(bool default_level = true) {
if (default_level)
- m_levels.emplace_back(std::make_shared<InternalLevel<ShardType, QueryType>>(0));
+ m_levels.emplace_back(
+ std::make_shared<InternalLevel<ShardType, QueryType>>(0));
}
-
+
~ExtensionStructure() = default;
/*
@@ -162,34 +164,37 @@ public:
return cnt;
}
-
/*
* Perform the reconstruction described by task. If the resulting
* reconstruction grows the structure (i.e., adds a level), returns
* true. Otherwise, returns false.
*/
- inline reconstruction_results<ShardType> perform_reconstruction(ReconstructionTask task) const {
+ inline reconstruction_results<ShardType>
+ perform_reconstruction(ReconstructionTask task) const {
reconstruction_results<ShardType> result;
result.target_level = task.target;
/* if there is only one source, then we don't need to actually rebuild */
if (task.sources.size() == 1) {
auto shid = task.sources[0];
- if (shid.shard_idx == all_shards_idx && m_levels[shid.level_idx]->get_shard_count() > 1) {
+ if (shid.shard_idx == all_shards_idx &&
+ m_levels[shid.level_idx]->get_shard_count() > 1) {
/* there's more than one shard, so we need to do the reconstruction */
} else {
- auto raw_shard_ptr = m_levels[shid.level_idx]->get_shard(shid.shard_idx);
+ auto raw_shard_ptr =
+ m_levels[shid.level_idx]->get_shard(shid.shard_idx);
assert(raw_shard_ptr);
result.source_shards.emplace_back(shid.level_idx, raw_shard_ptr);
- result.new_shard = m_levels[shid.level_idx]->get_shard_ptr(shid.shard_idx).first;
+ result.new_shard =
+ m_levels[shid.level_idx]->get_shard_ptr(shid.shard_idx).first;
return result;
}
}
-
- std::vector<const ShardType*> shards;
+
+ std::vector<const ShardType *> shards;
for (ShardID shid : task.sources) {
- assert(shid.level_idx < (level_index) m_levels.size());
+ assert(shid.level_idx < (level_index)m_levels.size());
assert(shid.shard_idx >= -1);
auto raw_shard_ptr = m_levels[shid.level_idx]->get_shard(shid.shard_idx);
@@ -197,12 +202,13 @@ public:
result.source_shards.emplace_back(shid.level_idx, raw_shard_ptr);
}
-
auto start = std::chrono::high_resolution_clock::now();
result.new_shard = std::make_shared<ShardType>(shards);
auto stop = std::chrono::high_resolution_clock::now();
- result.runtime = std::chrono::duration_cast<std::chrono::nanoseconds>(stop- start).count();
+ result.runtime =
+ std::chrono::duration_cast<std::chrono::nanoseconds>(stop - start)
+ .count();
result.reccnt = result.new_shard->get_record_count();
return result;
@@ -221,11 +227,10 @@ public:
return queries;
}
- size_t l0_size() const {
- return m_levels[0]->get_shard_count();
- }
+ size_t l0_size() const { return m_levels[0]->get_shard_count(); }
- bool apply_reconstruction(reconstruction_results<ShardType> &recon, size_t version) {
+ bool apply_reconstruction(reconstruction_results<ShardType> &recon,
+ size_t version) {
bool res = append_shard(recon.new_shard, version, recon.target_level);
m_levels[recon.target_level]->update_reconstruction_model(recon);
delete_shards(recon.source_shards);
@@ -233,13 +238,15 @@ public:
return res;
}
- bool append_shard(std::shared_ptr<ShardType> shard, size_t version, size_t level) {
+ bool append_shard(std::shared_ptr<ShardType> shard, size_t version,
+ size_t level) {
assert(level <= m_levels.size());
auto rc = false;
if (level == m_levels.size()) {
/* grow the structure */
- m_levels.push_back(std::make_shared<InternalLevel<ShardType, QueryType>>(level));
+ m_levels.push_back(
+ std::make_shared<InternalLevel<ShardType, QueryType>>(level));
rc = true;
}
@@ -248,12 +255,15 @@ public:
return rc;
}
- void delete_shards(std::vector<std::pair<level_index, const ShardType*>> shards) {
- for (size_t i=0; i<shards.size(); i++) {
- assert(shards[i].first < (level_index) m_levels.size());
+ void
+ delete_shards(std::vector<std::pair<level_index, const ShardType *>> shards) {
+ for (size_t i = 0; i < shards.size(); i++) {
+ assert(shards[i].first < (level_index)m_levels.size());
ssize_t shard_idx = -1;
- for (size_t j=0; j<m_levels[shards[i].first]->get_shard_count(); j++) {
- if (m_levels[shards[i].first]->get_shard_ptr(j).first.get() == shards[i].second) {
+ for (size_t j = 0; j < m_levels[shards[i].first]->get_shard_count();
+ j++) {
+ if (m_levels[shards[i].first]->get_shard_ptr(j).first.get() ==
+ shards[i].second) {
shard_idx = j;
break;
}
@@ -262,7 +272,8 @@ public:
if (shard_idx != -1) {
m_levels[shards[i].first]->delete_shard(shard_idx);
} else {
- fprintf(stderr, "ERROR: failed to delete shard %ld\t%p\n", shards[i].first, shards[i].second);
+ fprintf(stderr, "ERROR: failed to delete shard %ld\t%p\n",
+ shards[i].first, shards[i].second);
exit(EXIT_FAILURE);
}
}
@@ -270,51 +281,55 @@ public:
LevelVector const &get_level_vector() const { return m_levels; }
-
- /*
- * Validate that no level in the structure exceeds its maximum tombstone
- * capacity. This is used to trigger preemptive compactions at the end of
- * the reconstruction process.
- */
- bool validate_tombstone_proportion(double max_delete_prop) const {
- long double ts_prop;
- for (size_t i = 0; i < m_levels.size(); i++) {
- if (m_levels[i]) {
- ts_prop = (long double)m_levels[i]->get_tombstone_count() /
- (long double)m_levels[i]->get_record_count();
- if (ts_prop > (long double)max_delete_prop) {
- return false;
- }
+ /*
+ * Validate that no level in the structure exceeds its maximum tombstone
+ * capacity. This is used to trigger preemptive compactions at the end of
+ * the reconstruction process.
+ */
+ bool validate_tombstone_proportion(double max_delete_prop) const {
+ long double ts_prop;
+ for (size_t i = 0; i < m_levels.size(); i++) {
+ if (m_levels[i]) {
+ ts_prop = (long double)m_levels[i]->get_tombstone_count() /
+ (long double)m_levels[i]->get_record_count();
+ if (ts_prop > (long double)max_delete_prop) {
+ return false;
}
}
-
- return true;
}
- bool validate_tombstone_proportion(level_index level, double max_delete_prop) const {
- long double ts_prop = (long double) m_levels[level]->get_tombstone_count() / (long double) m_levels[level]->get_record_count();
- return ts_prop <= (long double) max_delete_prop;
- }
+ return true;
+ }
- void print_structure(bool debug=false) const {
- for (size_t i=0; i<m_levels.size(); i++) {
- if (debug) {
- fprintf(stdout, "[D] [%ld]:\t", i);
- } else {
- fprintf(stdout, "[%ld]:\t", i);
- }
+ bool validate_tombstone_proportion(level_index level,
+ double max_delete_prop) const {
+ long double ts_prop = (long double)m_levels[level]->get_tombstone_count() /
+ (long double)m_levels[level]->get_record_count();
+ return ts_prop <= (long double)max_delete_prop;
+ }
- if (m_levels[i]) {
- for (size_t j=0; j<m_levels[i]->get_shard_count(); j++) {
- fprintf(stdout, "(%ld, %ld, %p: %ld) ", j, m_levels[i]->get_shard_ptr(j).second, m_levels[i]->get_shard_ptr(j).first.get(), m_levels[i]->get_shard(j)->get_record_count());
- }
- } else {
- fprintf(stdout, "[Empty]");
- }
+ void print_structure(bool debug = false) const {
+ for (size_t i = 0; i < m_levels.size(); i++) {
+ if (debug) {
+ fprintf(stdout, "[D] [%ld]:\t", i);
+ } else {
+ fprintf(stdout, "[%ld]:\t", i);
+ }
- fprintf(stdout, "\n");
+ if (m_levels[i]) {
+ for (size_t j = 0; j < m_levels[i]->get_shard_count(); j++) {
+ fprintf(stdout, "(%ld, %ld, %p: %ld) ", j,
+ m_levels[i]->get_shard_ptr(j).second,
+ m_levels[i]->get_shard_ptr(j).first.get(),
+ m_levels[i]->get_shard(j)->get_record_count());
+ }
+ } else {
+ fprintf(stdout, "[Empty]");
}
- }
+
+ fprintf(stdout, "\n");
+ }
+ }
private:
LevelVector m_levels;
diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h
index 205dbf9..cf2b16a 100644
--- a/include/framework/structure/InternalLevel.h
+++ b/include/framework/structure/InternalLevel.h
@@ -15,11 +15,11 @@
*/
#pragma once
+#include <algorithm>
+#include <deque>
#include <future>
#include <memory>
#include <vector>
-#include <deque>
-#include <algorithm>
#include "framework/interface/Query.h"
#include "framework/interface/Shard.h"
@@ -34,7 +34,7 @@ class InternalLevel {
typedef std::pair<std::shared_ptr<ShardType>, size_t> shard_ptr;
public:
- InternalLevel(ssize_t level_no) : m_level_no(level_no) { }
+ InternalLevel(ssize_t level_no) : m_level_no(level_no) {}
~InternalLevel() = default;
@@ -109,7 +109,7 @@ public:
idx = 0;
}
- if (idx >= (ssize_t) m_shards.size()) {
+ if (idx >= (ssize_t)m_shards.size()) {
return nullptr;
}
@@ -183,7 +183,7 @@ public:
}
size_t get_nonempty_shard_count() const {
- size_t cnt = 0;
+ size_t cnt = 0;
for (size_t i = 0; i < m_shards.size(); i++) {
if (m_shards[i].first && m_shards[i].first->get_record_count() > 0) {
cnt += 1;
@@ -199,7 +199,7 @@ public:
new_level->append(m_shards[i].first, m_shards[i].second);
}
- for (auto itr=m_rt_window.begin(); itr < m_rt_window.end(); itr++) {
+ for (auto itr = m_rt_window.begin(); itr < m_rt_window.end(); itr++) {
new_level->m_rt_window.push_front(*itr);
}
@@ -208,23 +208,21 @@ public:
void truncate() { m_shards.erase(m_shards.begin(), m_shards.end()); }
- void delete_shard(shard_index shard, bool log_delete=true) {
+ void delete_shard(shard_index shard, bool log_delete = true) {
size_t before = m_shards.size();
m_shards.erase(m_shards.begin() + shard);
size_t after = m_shards.size();
- assert( before > after);
+ assert(before > after);
}
- void append(std::shared_ptr<ShardType> shard, size_t version=0) {
+ void append(std::shared_ptr<ShardType> shard, size_t version = 0) {
m_shards.push_back({shard, version});
}
- void append(shard_ptr shard) {
- m_shards.push_back(shard);
- }
+ void append(shard_ptr shard) { m_shards.push_back(shard); }
const shard_ptr get_shard_ptr(ssize_t idx) const {
- if (idx >= 0 && idx < (ssize_t) m_shards.size()) {
+ if (idx >= 0 && idx < (ssize_t)m_shards.size()) {
return m_shards[idx];
} else if (idx == all_shards_idx && m_shards.size() == 1) {
return m_shards[0];
@@ -247,7 +245,7 @@ public:
size_t total = 0;
for (auto rt : m_rt_window) {
total += rt;
- }
+ }
return total / m_rt_window.size();
}
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index 7357915..156b718 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -97,7 +97,9 @@ public:
return true;
}
- size_t get_record_count() const { return m_tail.load() - m_head.load().head_idx; }
+ size_t get_record_count() const {
+ return m_tail.load() - m_head.load().head_idx;
+ }
size_t get_capacity() const { return m_cap; }
@@ -139,14 +141,16 @@ public:
m_active_head_advance.store(true);
if (m_old_head.load().refcnt > 0) {
- //fprintf(stderr, "[W]: Refusing to advance head due to remaining reference counts [2]\n");
+ // fprintf(stderr, "[W]: Refusing to advance head due to remaining
+ // reference counts [2]\n");
m_active_head_advance.store(false);
return false;
}
- // fprintf(stderr, "[I] Advancing head pointer: %ld %ld %ld\n", m_old_head.load().head_idx, m_head.load().head_idx, new_head);
- // fprintf(stderr, "[I] Refcnts: %ld %ld\n", m_old_head.load().refcnt, m_head.load().refcnt);
-
+ // fprintf(stderr, "[I] Advancing head pointer: %ld %ld %ld\n",
+ // m_old_head.load().head_idx, m_head.load().head_idx, new_head);
+ // fprintf(stderr, "[I] Refcnts: %ld %ld\n", m_old_head.load().refcnt,
+ // m_head.load().refcnt);
buffer_head new_hd = {new_head, 1};
buffer_head cur_hd;
@@ -163,7 +167,6 @@ public:
return true;
}
-
void set_low_watermark(size_t lwm) {
assert(lwm < m_hwm);
m_lwm = lwm;
@@ -197,13 +200,9 @@ public:
return m_cap - (m_tail.load() - m_old_head.load().head_idx);
}
- size_t debug_get_old_head() const {
- return m_old_head.load().head_idx;
- }
+ size_t debug_get_old_head() const { return m_old_head.load().head_idx; }
- size_t debug_get_head() const {
- return m_head.load().head_idx;
- }
+ size_t debug_get_head() const { return m_head.load().head_idx; }
bool take_head_reference(size_t target_head) {
buffer_head cur_hd, new_hd;
@@ -225,7 +224,7 @@ public:
return head_acquired;
}
-
+
bool release_head_reference(size_t head) {
buffer_head cur_hd, new_hd;
bool head_released = false;
@@ -261,8 +260,8 @@ private:
buffer_head cur_hd, new_hd;
bool head_acquired = false;
-
- //fprintf(stderr, "[I]: getting head %ld %ld %ld\n", target_head, m_old_head.load().head_idx, m_head.load().head_idx);
+ // fprintf(stderr, "[I]: getting head %ld %ld %ld\n", target_head,
+ // m_old_head.load().head_idx, m_head.load().head_idx);
do {
if (m_old_head.load().head_idx == target_head) {
cur_hd = m_old_head.load();
@@ -279,7 +278,7 @@ private:
return new_hd.head_idx;
}
-
+
ssize_t try_advance_tail() {
size_t old_value = m_tail.load();