summaryrefslogtreecommitdiffstats
path: root/include/framework/structure
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-12-13 12:39:54 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2023-12-13 12:39:54 -0500
commit3c127eda69295cb306739bdd3c5ddccff6026a8d (patch)
tree43632849c7684cab68c43a8eb2c0aeac7adffad7 /include/framework/structure
parentd1f3535404ec2c200dcf2628b8c5c1f92b39e797 (diff)
downloaddynamic-extension-3c127eda69295cb306739bdd3c5ddccff6026a8d.tar.gz
Refactoring: corrected a number of names and added more comments
Diffstat (limited to 'include/framework/structure')
-rw-r--r--include/framework/structure/ExtensionStructure.h142
-rw-r--r--include/framework/structure/InternalLevel.h58
-rw-r--r--include/framework/structure/MutableBuffer.h18
3 files changed, 120 insertions, 98 deletions
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index a174805..3cd55ac 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -45,8 +45,8 @@ public:
/*
* Create a shallow copy of this extension structure. The copy will share references to the
* same levels/shards as the original, but will have its own lists. As all of the shards are
- * immutable (with the exception of deletes), the copy can be restructured with merges, etc.,
- * without affecting the original. The copied structure will be returned with a reference
+ * immutable (with the exception of deletes), the copy can be restructured with reconstructions
+ * and flushes without affecting the original. The copied structure will be returned with a reference
* count of 0; generally you will want to immediately call take_reference() on it.
*
* NOTE: When using tagged deletes, a delete of a record in the original structure will affect
@@ -55,7 +55,7 @@ public:
* need to be forwarded to the appropriate structures manually.
*/
ExtensionStructure<R, S, Q, L> *copy() {
- auto new_struct = new ExtensionStructure<R, S, Q, L>(m_buffer_size, m_scale_factor, m_max_delete_prop);
+ auto new_struct = new ExtensionStructure<R, S, Q, L>(m_buffer_size, m_scale_factor, m_max_delete_prop);
for (size_t i=0; i<m_levels.size(); i++) {
new_struct->m_levels.push_back(m_levels[i]->clone());
}
@@ -90,17 +90,20 @@ public:
}
/*
- * Merge the memory table down into the tree, completing any required other
- * merges to make room for it.
+ * Flush a buffer into the extension structure, performing any necessary
+ * reconstructions to free up room in L0.
+ *
+ * FIXME: arguably, this should be a method attached to the buffer that
+ * takes a structure as input.
*/
- inline bool merge_buffer(Buffer *buffer) {
- assert(can_merge_with(0, buffer->get_record_count()));
+ inline bool flush_buffer(Buffer *buffer) {
+ assert(can_reconstruct_with(0, buffer->get_record_count()));
// FIXME: this step makes an extra copy of the buffer,
// which could be avoided by adjusting the shard
// reconstruction process a bit, possibly.
- buffer->start_merge();
- merge_buffer_into_l0(buffer);
+ buffer->start_flush();
+ flush_buffer_into_l0(buffer);
return true;
}
@@ -123,7 +126,7 @@ public:
* Return the total number of tombstones contained within all of the
* levels of the structure.
*/
- size_t get_tombstone_cnt() {
+ size_t get_tombstone_count() {
size_t cnt = 0;
for (size_t i=0; i<m_levels.size(); i++) {
@@ -172,7 +175,7 @@ public:
/*
* Validate that no level in the structure exceeds its maximum tombstone capacity. This is
- * used to trigger preemptive compactions at the end of the merge process.
+ * used to trigger preemptive compactions at the end of the reconstruction process.
*/
bool validate_tombstone_proportion() {
long double ts_prop;
@@ -201,8 +204,8 @@ public:
return m_levels;
}
- std::vector<MergeTask> get_compaction_tasks() {
- std::vector<MergeTask> tasks;
+ std::vector<ReconstructionTask> get_compaction_tasks() {
+ std::vector<ReconstructionTask> tasks;
/* if the tombstone/delete invariant is satisfied, no need for compactions */
if (validate_tombstone_proportion()) {
@@ -220,16 +223,16 @@ public:
assert(violation_idx != -1);
- level_index merge_base_level = find_mergable_level(violation_idx);
- if (merge_base_level == -1) {
- merge_base_level = grow();
+ level_index base_level = find_reconstruction_target(violation_idx);
+ if (base_level == -1) {
+ base_level = grow();
}
- for (level_index i=merge_base_level; i>0; i--) {
- MergeTask task = {i-1, i};
+ for (level_index i=base_level; i>0; i--) {
+ ReconstructionTask task = {i-1, i};
/*
- * The amount of storage required for the merge accounts
+ * The amount of storage required for the reconstruction accounts
* for the cost of storing the new records, along with the
* cost of retaining the old records during the process
* (hence the 2x multiplier).
@@ -240,7 +243,7 @@ public:
*/
size_t reccnt = m_levels[i-1]->get_record_count();
if constexpr (L == LayoutPolicy::LEVELING) {
- if (can_merge_with(i, reccnt)) {
+ if (can_reconstruct_with(i, reccnt)) {
reccnt += m_levels[i]->get_record_count();
}
}
@@ -255,28 +258,27 @@ public:
/*
*
*/
- std::vector<MergeTask> get_merge_tasks(size_t buffer_reccnt) {
- std::vector<MergeTask> merges;
+ std::vector<ReconstructionTask> get_reconstruction_tasks(size_t buffer_reccnt) {
+ std::vector<ReconstructionTask> reconstructions;
/*
- * The buffer -> L0 merge task is not included so if that
- * can be done without any other change, just return an
- * empty list.
+ * The buffer flush is not included so if that can be done without any
+ * other change, just return an empty list.
*/
- if (can_merge_with(0, buffer_reccnt)) {
- return std::move(merges);
+ if (can_reconstruct_with(0, buffer_reccnt)) {
+ return std::move(reconstructions);
}
- level_index merge_base_level = find_mergable_level(0);
- if (merge_base_level == -1) {
- merge_base_level = grow();
+ level_index base_level = find_reconstruction_target(0);
+ if (base_level == -1) {
+ base_level = grow();
}
- for (level_index i=merge_base_level; i>0; i--) {
- MergeTask task = {i-1, i};
+ for (level_index i=base_level; i>0; i--) {
+ ReconstructionTask task = {i-1, i};
/*
- * The amount of storage required for the merge accounts
+ * The amount of storage required for the reconstruction accounts
* for the cost of storing the new records, along with the
* cost of retaining the old records during the process
* (hence the 2x multiplier).
@@ -287,34 +289,34 @@ public:
*/
size_t reccnt = m_levels[i-1]->get_record_count();
if constexpr (L == LayoutPolicy::LEVELING) {
- if (can_merge_with(i, reccnt)) {
+ if (can_reconstruct_with(i, reccnt)) {
reccnt += m_levels[i]->get_record_count();
}
}
//task.m_size = 2* reccnt * sizeof(R);
- merges.push_back(task);
+ reconstructions.push_back(task);
}
- return std::move(merges);
+ return std::move(reconstructions);
}
/*
*
*/
- std::vector<MergeTask> get_merge_tasks_from_level(level_index source_level) {
- std::vector<MergeTask> merges;
+ std::vector<ReconstructionTask> get_reconstruction_tasks_from_level(level_index source_level) {
+ std::vector<ReconstructionTask> reconstructions;
- level_index merge_base_level = find_mergable_level(source_level);
- if (merge_base_level == -1) {
- merge_base_level = grow();
+ level_index base_level = find_reconstruction_target(source_level);
+ if (base_level == -1) {
+ base_level = grow();
}
- for (level_index i=merge_base_level; i>source_level; i--) {
- MergeTask task = {i - 1, i};
+ for (level_index i=base_level; i>source_level; i--) {
+ ReconstructionTask task = {i - 1, i};
/*
- * The amount of storage required for the merge accounts
+ * The amount of storage required for the reconstruction accounts
* for the cost of storing the new records, along with the
* cost of retaining the old records during the process
* (hence the 2x multiplier).
@@ -325,31 +327,30 @@ public:
*/
size_t reccnt = m_levels[i-1]->get_record_count();
if constexpr (L == LayoutPolicy::LEVELING) {
- if (can_merge_with(i, reccnt)) {
+ if (can_reconstruct_with(i, reccnt)) {
reccnt += m_levels[i]->get_record_count();
}
}
// task.m_size = 2* reccnt * sizeof(R);
- merges.push_back(task);
+ reconstructions.push_back(task);
}
- return merges;
+ return reconstructions;
}
/*
- * Merge the level specified by incoming level into the level specified
- * by base level. The two levels should be sequential--i.e. no levels
- * are skipped in the merge process--otherwise the tombstone ordering
- * invariant may be violated by the merge operation.
+ * Combine incoming_level with base_level and reconstruct the shard,
+ * placing it in base_level. The two levels should be sequential--i.e. no
+ * levels are skipped in the reconstruction process--otherwise the
+ * tombstone ordering invariant may be violated.
*/
- inline void merge_levels(level_index base_level, level_index incoming_level) {
- // merging two memory levels
+ inline void reconstruction(level_index base_level, level_index incoming_level) {
if constexpr (L == LayoutPolicy::LEVELING) {
auto tmp = m_levels[base_level];
- m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level].get(), m_levels[incoming_level].get());
+ m_levels[base_level] = InternalLevel<R, Shard, Q>::reconstruction(m_levels[base_level].get(), m_levels[incoming_level].get());
} else {
- m_levels[base_level]->append_merged_shards(m_levels[incoming_level].get());
+ m_levels[base_level]->append_level(m_levels[incoming_level].get());
m_levels[base_level]->finalize();
}
@@ -391,9 +392,7 @@ private:
std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> m_levels;
/*
- * Add a new level to the LSM Tree and return that level's index. Will
- * automatically determine whether the level should be on memory or on disk,
- * and act appropriately.
+ * Add a new level to the structure and return its index.
*/
inline level_index grow() {
level_index new_idx = m_levels.size();
@@ -405,22 +404,18 @@ private:
/*
* Find the first level below the level indicated by idx that
- * is capable of sustaining a merge operation and return its
+ * is capable of sustaining a reconstruction and return its
* level index. If no such level exists, returns -1. Also
- * returns -1 if idx==0, and no such level exists, to skimplify
- * the logic of the first merge.
+ * returns -1 if idx==0, and no such level exists, to simplify
+ * the logic of the first buffer flush.
*/
- inline level_index find_mergable_level(level_index idx, Buffer *buffer=nullptr) {
+ inline level_index find_reconstruction_target(level_index idx, Buffer *buffer=nullptr) {
if (idx == 0 && m_levels.size() == 0) return -1;
- bool level_found = false;
- bool disk_level;
- level_index merge_level_idx;
-
size_t incoming_rec_cnt = get_level_record_count(idx, buffer);
for (level_index i=idx+1; i<m_levels.size(); i++) {
- if (can_merge_with(i, incoming_rec_cnt)) {
+ if (can_reconstruct_with(i, incoming_rec_cnt)) {
return i;
}
@@ -430,14 +425,14 @@ private:
return -1;
}
- inline void merge_buffer_into_l0(Buffer *buffer) {
+ inline void flush_buffer_into_l0(Buffer *buffer) {
assert(m_levels[0]);
if constexpr (L == LayoutPolicy::LEVELING) {
// FIXME: Kludgey implementation due to interface constraints.
auto old_level = m_levels[0].get();
auto temp_level = new InternalLevel<R, Shard, Q>(0, 1);
temp_level->append_buffer(buffer);
- auto new_level = InternalLevel<R, Shard, Q>::merge_levels(old_level, temp_level);
+ auto new_level = InternalLevel<R, Shard, Q>::reconstruction(old_level, temp_level);
m_levels[0] = new_level;
delete temp_level;
@@ -479,13 +474,10 @@ private:
}
/*
- * Determines if the specific level can merge with another record containing
- * incoming_rec_cnt number of records. The provided level index should be
- * non-negative (i.e., not refer to the buffer) and will be automatically
- * translated into the appropriate index into either the disk or memory level
- * vector.
+ * Determines if a level can sustain a reconstruction with incoming_rec_cnt
+ * additional records without exceeding its capacity.
*/
- inline bool can_merge_with(level_index idx, size_t incoming_rec_cnt) {
+ inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt) {
if (idx >= m_levels.size() || !m_levels[idx]) {
return false;
}
diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h
index d146b73..e70ed76 100644
--- a/include/framework/structure/InternalLevel.h
+++ b/include/framework/structure/InternalLevel.h
@@ -40,9 +40,14 @@ public:
delete m_pending_shard;
}
- // WARNING: for leveling only.
- // assuming the base level is the level new level is merging into. (base_level is larger.)
- static std::shared_ptr<InternalLevel> merge_levels(InternalLevel* base_level, InternalLevel* new_level) {
+ /*
+ * Create a new shard combining the records from base_level and new_level,
+ * and return a shared_ptr to a new level containing this shard. This is used
+ * for reconstructions under the leveling layout policy.
+ *
+ * No changes are made to the levels provided as arguments.
+ */
+ static std::shared_ptr<InternalLevel> reconstruction(InternalLevel* base_level, InternalLevel* new_level) {
assert(base_level->m_level_no > new_level->m_level_no || (base_level->m_level_no == 0 && new_level->m_level_no == 0));
auto res = new InternalLevel(base_level->m_level_no, 1);
res->m_shard_cnt = 1;
@@ -54,18 +59,15 @@ public:
return std::shared_ptr<InternalLevel>(res);
}
- void append_buffer(Buffer* buffer) {
- if (m_shard_cnt == m_shards.size()) {
- assert(m_pending_shard == nullptr);
- m_pending_shard = new S(buffer);
- return;
- }
-
- m_shards[m_shard_cnt] = std::make_shared<S>(buffer);
- ++m_shard_cnt;
- }
-
- void append_merged_shards(InternalLevel* level) {
+ /*
+ * Create a new shard combining the records from all of
+ * the shards in level, and append this new shard into
+ * this level. This is used for reconstructions under
+ * the tiering layout policy.
+ *
+ * No changes are made to the level provided as an argument.
+ */
+ void append_level(InternalLevel* level) {
Shard *shards[level->m_shard_cnt];
for (size_t i=0; i<level->m_shard_cnt; i++) {
shards[i] = level->m_shards[i].get();
@@ -82,6 +84,22 @@ public:
++m_shard_cnt;
}
+ /*
+ * Create a new shard using the records in the
+ * provided buffer, and append this new shard
+ * into this level. This is used for buffer
+ * flushes under the tiering layout policy.
+ */
+ void append_buffer(Buffer* buffer) {
+ if (m_shard_cnt == m_shards.size()) {
+ assert(m_pending_shard == nullptr);
+ m_pending_shard = new S(buffer);
+ return;
+ }
+
+ m_shards[m_shard_cnt] = std::make_shared<S>(buffer);
+ ++m_shard_cnt;
+ }
void finalize() {
if (m_pending_shard) {
@@ -95,7 +113,13 @@ public:
}
}
- Shard *get_merged_shard() {
+ /*
+ * Create a new shard containing the combined records
+ * from all shards on this level and return it.
+ *
+ * No changes are made to this level.
+ */
+ Shard *get_combined_shard() {
if (m_shard_cnt == 0) {
return nullptr;
}
@@ -109,7 +133,7 @@ public:
return new S(shards, m_shard_cnt);
}
- // Append the sample range in-order.....
+ /* Append the sample range in-order */
void get_query_states(std::vector<std::pair<ShardID, Shard *>> &shards, std::vector<void*>& shard_states, void *query_parms) {
for (size_t i=0; i<m_shard_cnt; i++) {
if (m_shards[i]) {
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index 8b17091..58b5fb4 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -1,11 +1,17 @@
/*
* include/framework/structure/MutableBuffer.h
*
- * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
* Dong Xie <dongx@psu.edu>
*
* Distributed under the Modified BSD License.
*
+ * FIXME: currently, the buffer itself is responsible for managing a
+ * secondary buffer for storing sorted records used during buffer flushes. It
+ * probably makes more sense to make the shard being flushed into responsible
+ * for this instead. This would also facilitate simultaneous flushes of multiple
+ * buffers more easily.
+ *
*/
#pragma once
@@ -35,7 +41,7 @@ public:
: m_cap(capacity), m_tombstone_cap(capacity), m_reccnt(0)
, m_tombstonecnt(0), m_weight(0), m_max_weight(0), m_tail(0) {
m_data = (Wrapped<R>*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped<R>));
- m_merge_data = (Wrapped<R>*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped<R>));
+ m_sorted_data = (Wrapped<R>*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped<R>));
m_tombstone_filter = nullptr;
if (max_tombstone_cap > 0) {
m_tombstone_filter = new psudb::BloomFilter<R>(BF_FPR, max_tombstone_cap, BF_HASH_FUNCS);
@@ -49,7 +55,7 @@ public:
if (m_data) free(m_data);
if (m_tombstone_filter) delete m_tombstone_filter;
- if (m_merge_data) free(m_merge_data);
+ if (m_sorted_data) free(m_sorted_data);
}
template <typename R_ = R>
@@ -171,8 +177,8 @@ public:
* to be adjusted). Other threads having read access is perfectly
* acceptable, however.
*/
- bool start_merge() {
- memcpy(m_merge_data, m_data, sizeof(Wrapped<R>) * m_reccnt.load());
+ bool start_flush() {
+ memcpy(m_sorted_data, m_data, sizeof(Wrapped<R>) * m_reccnt.load());
return true;
}
@@ -210,7 +216,7 @@ private:
size_t m_tombstone_cap;
Wrapped<R>* m_data;
- Wrapped<R>* m_merge_data;
+ Wrapped<R>* m_sorted_data;
psudb::BloomFilter<R>* m_tombstone_filter;