summaryrefslogtreecommitdiffstats
path: root/include/framework/structure
diff options
context:
space:
mode:
authorDouglas B. Rumbaugh <doug@douglasrumbaugh.com>2024-12-22 15:12:13 -0500
committerDouglas B. Rumbaugh <doug@douglasrumbaugh.com>2024-12-22 15:12:13 -0500
commitba65c8976f54d4da2467074235a12f5be0bd5ebc (patch)
tree955d5995f211d8a7a24f7b106912773db5e3a5ba /include/framework/structure
parent5617bed5257506d3dfda8537b16f44b3e40f1b42 (diff)
downloaddynamic-extension-ba65c8976f54d4da2467074235a12f5be0bd5ebc.tar.gz
Continued development
Diffstat (limited to 'include/framework/structure')
-rw-r--r--include/framework/structure/ExtensionStructure.h63
-rw-r--r--include/framework/structure/InternalLevel.h28
-rw-r--r--include/framework/structure/MutableBuffer.h22
3 files changed, 74 insertions, 39 deletions
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index 9b7ae87..3bb8a0b 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -147,14 +147,14 @@ public:
inline void perform_reconstruction(ReconstructionTask task) {
/* perform the reconstruction itself */
- std::vector<ShardType *> shards;
+ std::vector<const ShardType *> shards;
for (ShardID shid : task.sources) {
- assert(shid.level_idx < m_levels.size());
+ assert(shid.level_idx < (level_index) m_levels.size());
assert(shid.shard_idx >= -1);
/* if unspecified, push all shards into the vector */
if (shid.shard_idx == all_shards_idx) {
- for (size_t i = 0; i < m_levels[shid.level_idx].get_shard_count();
+ for (size_t i = 0; i < m_levels[shid.level_idx]->get_shard_count();
i++) {
if (m_levels[shid.level_idx]->get_shard(i)) {
shards.push_back(m_levels[shid.level_idx]->get_shard(i));
@@ -165,7 +165,7 @@ public:
}
}
- auto new_shard = Shard(shards);
+ auto new_shard = new ShardType(shards);
/*
* Remove all of the shards processed by the operation
@@ -181,10 +181,11 @@ public:
/*
* Append the new shard to the target level
*/
- if (task.target < m_levels.size()) {
- m_levels[task.target]->append_shard(new_shard);
- } else {
- m_levels.push_back();
+ if (task.target < (level_index)m_levels.size()) {
+ m_levels[task.target]->append(std::shared_ptr<ShardType>(new_shard));
+ } else { /* grow the structure if needed */
+ m_levels.push_back(std::make_shared<InternalLevel<ShardType, QueryType>>(task.target));
+ m_levels[task.target]->append(std::shared_ptr<ShardType>(new_shard));
}
}
@@ -197,12 +198,20 @@ public:
* the buffer itself. Given that we're unlikely to actually use policies
* like that, we'll leave this as low priority.
*/
- ShardType *buffer_shard = new ShardType(buffer);
- if (task.type == ReconstructionType::Append) {
- m_levels[0]->append(std::shared_ptr(buffer_shard));
+
+ /* insert the first level, if needed */
+ if (m_levels.size() == 0) {
+ m_levels.push_back(
+ std::make_shared<InternalLevel<ShardType, QueryType>>(0));
+ }
+
+ ShardType *buffer_shard = new ShardType(std::move(buffer));
+ if (task.type == ReconstructionType::Append || m_levels[0]->get_shard_count() == 0) {
+ m_levels[0]->append(std::shared_ptr<ShardType>(buffer_shard));
} else {
- std::vector<ShardType *> shards;
- for (size_t i = 0; i < m_levels[0].size(); i++) {
+ std::vector<const ShardType *> shards;
+ for (level_index i = 0; i < (level_index)m_levels[0]->get_shard_count();
+ i++) {
if (m_levels[0]->get_shard(i)) {
shards.push_back(m_levels[0]->get_shard(i));
}
@@ -210,7 +219,7 @@ public:
shards.push_back(buffer_shard);
ShardType *new_shard = new ShardType(shards);
m_levels[0]->truncate();
- m_levels[0]->append(std::shared_ptr(new_shard));
+ m_levels[0]->append(std::shared_ptr<ShardType>(new_shard));
}
}
}
@@ -243,6 +252,32 @@ public:
LevelVector const &get_level_vector() const { return m_levels; }
+
+ /*
+ * Validate that no level in the structure exceeds its maximum tombstone
+ * capacity. This is used to trigger preemptive compactions at the end of
+ * the reconstruction process.
+ */
+ bool validate_tombstone_proportion(double max_delete_prop) const {
+ long double ts_prop;
+ for (size_t i = 0; i < m_levels.size(); i++) {
+ if (m_levels[i]) {
+ ts_prop = (long double)m_levels[i]->get_tombstone_count() /
+ (long double)m_levels[i]->get_record_count();
+ if (ts_prop > (long double)max_delete_prop) {
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+
+ bool validate_tombstone_proportion(level_index level, double max_delete_prop) const {
+ long double ts_prop = (long double) m_levels[level]->get_tombstone_count() / (long double) m_levels[level]->get_record_count();
+ return ts_prop <= (long double) max_delete_prop;
+ }
+
private:
std::atomic<size_t> m_refcnt;
LevelVector m_levels;
diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h
index 8cfcd49..c9d1749 100644
--- a/include/framework/structure/InternalLevel.h
+++ b/include/framework/structure/InternalLevel.h
@@ -40,12 +40,12 @@ public:
*
* No changes are made to this level.
*/
- ShardType *get_combined_shard() {
+ ShardType *get_combined_shard() const {
if (m_shards.size() == 0) {
return nullptr;
}
- std::vector<ShardType *> shards;
+ std::vector<const ShardType *> shards;
for (auto shard : m_shards) {
if (shard)
shards.emplace_back(shard.get());
@@ -57,7 +57,7 @@ public:
void get_local_queries(
std::vector<std::pair<ShardID, ShardType *>> &shards,
std::vector<typename QueryType::LocalQuery *> &local_queries,
- typename QueryType::Parameters *query_parms) {
+ typename QueryType::Parameters *query_parms) const {
for (size_t i = 0; i < m_shards.size(); i++) {
if (m_shards[i]) {
auto local_query =
@@ -68,7 +68,7 @@ public:
}
}
- bool check_tombstone(size_t shard_stop, const RecordType &rec) {
+ bool check_tombstone(size_t shard_stop, const RecordType &rec) const {
if (m_shards.size() == 0)
return false;
@@ -100,7 +100,7 @@ public:
return false;
}
- ShardType *get_shard(size_t idx) {
+ const ShardType *get_shard(size_t idx) const {
if (idx >= m_shards.size()) {
return nullptr;
}
@@ -108,9 +108,9 @@ public:
return m_shards[idx].get();
}
- size_t get_shard_count() { return m_shards.size(); }
+ size_t get_shard_count() const { return m_shards.size(); }
- size_t get_record_count() {
+ size_t get_record_count() const {
size_t cnt = 0;
for (size_t i = 0; i < m_shards.size(); i++) {
if (m_shards[i]) {
@@ -121,7 +121,7 @@ public:
return cnt;
}
- size_t get_tombstone_count() {
+ size_t get_tombstone_count() const {
size_t res = 0;
for (size_t i = 0; i < m_shards.size(); ++i) {
if (m_shards[i]) {
@@ -131,7 +131,7 @@ public:
return res;
}
- size_t get_aux_memory_usage() {
+ size_t get_aux_memory_usage() const {
size_t cnt = 0;
for (size_t i = 0; i < m_shards.size(); i++) {
if (m_shards[i]) {
@@ -142,7 +142,7 @@ public:
return cnt;
}
- size_t get_memory_usage() {
+ size_t get_memory_usage() const {
size_t cnt = 0;
for (size_t i = 0; i < m_shards.size(); i++) {
if (m_shards[i]) {
@@ -153,7 +153,7 @@ public:
return cnt;
}
- double get_tombstone_prop() {
+ double get_tombstone_prop() const {
size_t tscnt = 0;
size_t reccnt = 0;
for (size_t i = 0; i < m_shards.size(); i++) {
@@ -166,10 +166,10 @@ public:
return (double)tscnt / (double)(tscnt + reccnt);
}
- std::shared_ptr<InternalLevel> clone() {
+ std::shared_ptr<InternalLevel> clone() const {
auto new_level = std::make_shared<InternalLevel>(m_level_no);
for (size_t i = 0; i < m_shards.size(); i++) {
- new_level->m_shards[i] = m_shards[i];
+ new_level->append(m_shards[i]);
}
return new_level;
@@ -181,7 +181,7 @@ public:
m_shards.erase(m_shards.begin() + shard);
}
- bool append(std::shared_ptr<ShardType> shard) {
+ void append(std::shared_ptr<ShardType> shard) {
m_shards.emplace_back(shard);
}
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index 625b04b..0eae73d 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -97,15 +97,15 @@ public:
return true;
}
- size_t get_record_count() { return m_tail.load() - m_head.load().head_idx; }
+ size_t get_record_count() const { return m_tail.load() - m_head.load().head_idx; }
- size_t get_capacity() { return m_cap; }
+ size_t get_capacity() const { return m_cap; }
- bool is_full() { return get_record_count() >= m_hwm; }
+ bool is_full() const { return get_record_count() >= m_hwm; }
- bool is_at_low_watermark() { return get_record_count() >= m_lwm; }
+ bool is_at_low_watermark() const { return get_record_count() >= m_lwm; }
- size_t get_tombstone_count() { return m_tscnt.load(); }
+ size_t get_tombstone_count() const { return m_tscnt.load(); }
bool delete_record(const R &rec) {
return get_buffer_view().delete_record(rec);
@@ -115,9 +115,9 @@ public:
return get_buffer_view().check_tombstone(rec);
}
- size_t get_memory_usage() { return m_cap * sizeof(Wrapped<R>); }
+ size_t get_memory_usage() const { return m_cap * sizeof(Wrapped<R>); }
- size_t get_aux_memory_usage() {
+ size_t get_aux_memory_usage() const {
return m_tombstone_filter->get_memory_usage();
}
@@ -200,7 +200,7 @@ public:
m_lwm = lwm;
}
- size_t get_low_watermark() { return m_lwm; }
+ size_t get_low_watermark() const { return m_lwm; }
void set_high_watermark(size_t hwm) {
assert(hwm > m_lwm);
@@ -208,9 +208,9 @@ public:
m_hwm = hwm;
}
- size_t get_high_watermark() { return m_hwm; }
+ size_t get_high_watermark() const { return m_hwm; }
- size_t get_tail() { return m_tail.load(); }
+ size_t get_tail() const { return m_tail.load(); }
/*
* Note: this returns the available physical storage capacity,
@@ -220,7 +220,7 @@ public:
* but a buggy framework implementation may violate the
* assumption.
*/
- size_t get_available_capacity() {
+ size_t get_available_capacity() const {
if (m_old_head.load().refcnt == 0) {
return m_cap - (m_tail.load() - m_head.load().head_idx);
}