summaryrefslogtreecommitdiffstats
path: root/include/framework
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-09-18 13:05:44 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2023-09-18 13:05:44 -0400
commitabc8605a51537fc7b35bb0d9b1da6c724c5c6973 (patch)
treeb24509db6192b8e02e79237be986c5fcbb52667f /include/framework
parent7f56949bc847b56da69c9eb3ebe081d6cf9f61c6 (diff)
downloaddynamic-extension-abc8605a51537fc7b35bb0d9b1da6c724c5c6973.tar.gz
Moved individual merge task execution into the scheduler
This change is made in anticipation of scheduling each task using a specific thread, and required some modification to the interface of ExtensionStructure. Namely, 1. ExtensionStructure now supports a get_merge_tasks() interface, which returns a list of the individual level merges that would need to be performed to complete a buffer flush of specified size. 2. merge_levels and merge_buffer have been promoted to the public interface, to allow their use within the scheduler. 3. merge_buffer has been modified to assume that the structure already can support a direct flush of the buffer into L0, it is now the responsibility of the caller to ensure that the necessary merges have already been completed prior to calling this method. Currently, preemptive tombstone compactions are non-functional, so some unit tests are failing. This will be fixed when the thread scheduling system is set up.
Diffstat (limited to 'include/framework')
-rw-r--r--include/framework/ExtensionStructure.h101
-rw-r--r--include/framework/InternalLevel.h4
-rw-r--r--include/framework/Scheduler.h28
3 files changed, 98 insertions, 35 deletions
diff --git a/include/framework/ExtensionStructure.h b/include/framework/ExtensionStructure.h
index 1e756db..9a5f6b3 100644
--- a/include/framework/ExtensionStructure.h
+++ b/include/framework/ExtensionStructure.h
@@ -27,6 +27,17 @@
namespace de {
+struct MergeTask {
+ level_index m_source_level;
+ level_index m_target_level;
+ size_t m_size;
+ size_t m_timestamp;
+
+ bool operator<(MergeTask &other) {
+ return m_timestamp < other.m_timestamp;
+ }
+};
+
template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING>
class ExtensionStructure {
typedef S Shard;
@@ -90,9 +101,7 @@ public:
* merges to make room for it.
*/
inline bool merge_buffer(Buffer *buffer) {
- if (!can_merge_with(0, buffer->get_record_count())) {
- merge_down(0);
- }
+ assert(can_merge_with(0, buffer->get_record_count()));
merge_buffer_into_l0(buffer);
enforce_delete_maximum(0);
@@ -192,6 +201,74 @@ public:
return m_levels;
}
+ /*
+ *
+ */
+ std::vector<MergeTask> get_merge_tasks(size_t buffer_reccnt) {
+ std::vector<MergeTask> merges;
+
+ /*
+ * The buffer -> L0 merge task is not included so if that
+ * can be done without any other change, just return an
+ * empty list.
+ */
+ if (can_merge_with(0, buffer_reccnt)) {
+ return std::move(merges);
+ }
+
+ level_index merge_base_level = find_mergable_level(0);
+ if (merge_base_level == -1) {
+ merge_base_level = grow();
+ }
+
+ for (level_index i=merge_base_level; i>0; i--) {
+ MergeTask task;
+ task.m_source_level = i - 1;
+ task.m_target_level = i;
+
+ /*
+ * The amount of storage required for the merge accounts
+ * for the cost of storing the new records, along with the
+ * cost of retaining the old records during the process
+ * (hence the 2x multiplier).
+ *
+ * FIXME: currently does not account for the *actual* size
+ * of the shards, only the storage for the records
+ * themselves.
+ */
+ size_t reccnt = m_levels[i-1]->get_record_count();
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ if (can_merge_with(i, reccnt)) {
+ reccnt += m_levels[i]->get_record_count();
+ }
+ }
+ task.m_size = 2* reccnt * sizeof(R);
+
+ merges.push_back(task);
+ }
+
+ return std::move(merges);
+ }
+
+ /*
+ * Merge the level specified by incoming level into the level specified
+ * by base level. The two levels should be sequential--i.e. no levels
+ * are skipped in the merge process--otherwise the tombstone ordering
+ * invariant may be violated by the merge operation.
+ */
+ inline void merge_levels(level_index base_level, level_index incoming_level) {
+ // merging two memory levels
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ auto tmp = m_levels[base_level];
+ m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level].get(), m_levels[incoming_level].get());
+ } else {
+ m_levels[base_level]->append_merged_shards(m_levels[incoming_level].get());
+ }
+
+ m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor));
+ }
+
+
private:
size_t m_scale_factor;
double m_max_delete_prop;
@@ -265,24 +342,6 @@ private:
return -1;
}
- /*
- * Merge the level specified by incoming level into the level specified
- * by base level. The two levels should be sequential--i.e. no levels
- * are skipped in the merge process--otherwise the tombstone ordering
- * invariant may be violated by the merge operation.
- */
- inline void merge_levels(level_index base_level, level_index incoming_level) {
- // merging two memory levels
- if constexpr (L == LayoutPolicy::LEVELING) {
- auto tmp = m_levels[base_level];
- m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level].get(), m_levels[incoming_level].get());
- } else {
- m_levels[base_level]->append_merged_shards(m_levels[incoming_level].get());
- }
-
- m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor));
- }
-
inline void merge_buffer_into_l0(Buffer *buffer) {
assert(m_levels[0]);
diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h
index 983ec6a..b9866b8 100644
--- a/include/framework/InternalLevel.h
+++ b/include/framework/InternalLevel.h
@@ -87,6 +87,10 @@ public:
}
Shard *get_merged_shard() {
+ if (m_shard_cnt == 0) {
+ return nullptr;
+ }
+
Shard *shards[m_shard_cnt];
for (size_t i=0; i<m_shard_cnt; i++) {
diff --git a/include/framework/Scheduler.h b/include/framework/Scheduler.h
index cd3f430..28ed8a9 100644
--- a/include/framework/Scheduler.h
+++ b/include/framework/Scheduler.h
@@ -23,19 +23,6 @@
namespace de {
-
-struct MergeTask {
- level_index m_source_level;
- level_index m_target_level;
- size_t m_size;
- size_t m_timestamp;
-
- bool operator<(MergeTask &other) {
- return m_timestamp < other.m_timestamp;
- }
-};
-
-
template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L>
class Scheduler {
typedef ExtensionStructure<R, S, Q, L> Structure;
@@ -52,7 +39,20 @@ public:
{}
bool schedule_merge(Structure *version, MutableBuffer<R> *buffer) {
- // FIXME: this is a non-concurrent implementation
+ /*
+ * Get list of individual level reconstructions that are necessary
+ * for completing the overall merge
+ */
+ std::vector<MergeTask> merges = version->get_merge_tasks(buffer->get_record_count());
+
+ /*
+ * Schedule the merge tasks (FIXME: currently this just
+ * executes them sequentially in a blocking fashion)
+ */
+ for (ssize_t i=merges.size()-1; i>=0; i--) {
+ version->merge_levels(merges[i].m_target_level, merges[i].m_source_level);
+ }
+
return version->merge_buffer(buffer);
}