summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2025-02-11 17:32:10 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2025-02-11 17:32:10 -0500
commit85afe4ef04f327862460570fb0aa4c30afcf7cc7 (patch)
treeaba55db313b752df4ac073db117900e0128c22fb
parentc04efb2640421be7a24f851c08e290c89b7b46f2 (diff)
downloaddynamic-extension-85afe4ef04f327862460570fb0aa4c30afcf7cc7.tar.gz
Progress: began adding parallel merging and locking of levels
-rw-r--r--CMakeLists.txt2
-rw-r--r--benchmarks/tail-latency/standard_latency_dist.cpp1
-rw-r--r--include/framework/DynamicExtension.h87
-rw-r--r--include/framework/reconstruction/BSMPolicy.h7
-rw-r--r--include/framework/reconstruction/BackgroundTieringPolicy.h33
-rw-r--r--include/framework/reconstruction/FixedShardCountPolicy.h8
-rw-r--r--include/framework/reconstruction/FloodL0Policy.h7
-rw-r--r--include/framework/reconstruction/LevelingPolicy.h7
-rw-r--r--include/framework/reconstruction/ReconstructionPolicy.h3
-rw-r--r--include/framework/reconstruction/TieringPolicy.h7
-rw-r--r--include/framework/scheduling/FIFOScheduler.h20
-rw-r--r--include/framework/scheduling/LockManager.h52
-rw-r--r--include/framework/scheduling/Task.h9
-rw-r--r--include/framework/scheduling/statistics.h4
-rw-r--r--include/framework/structure/ExtensionStructure.h19
-rw-r--r--include/framework/structure/MutableBuffer.h1
16 files changed, 180 insertions, 87 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index f0d8a86..106b91a 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -9,7 +9,7 @@ set(CMAKE_CXX_STANDARD_REQUIRED True)
set(namespace "de")
project("Practical Dynamic Extension" VERSION 0.1.0)
-set(debug false)
+set(debug true)
set(tests True)
set(bench false)
set(vldb_bench false)
diff --git a/benchmarks/tail-latency/standard_latency_dist.cpp b/benchmarks/tail-latency/standard_latency_dist.cpp
index 2e800fc..7cce48c 100644
--- a/benchmarks/tail-latency/standard_latency_dist.cpp
+++ b/benchmarks/tail-latency/standard_latency_dist.cpp
@@ -80,7 +80,6 @@ int main(int argc, char **argv) {
for (size_t j=warmup; j<data.size(); j++) {
while (!extension->insert(data[j])) {
usleep(1);
- fprintf(stderr, "%ld\n", j);
}
}
TIMER_STOP();
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index a48f390..aa07659 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -17,6 +17,7 @@
#include "framework/interface/Scheduler.h"
#include "framework/reconstruction/ReconstructionPolicy.h"
#include "framework/scheduling/SerialScheduler.h"
+#include "framework/scheduling/LockManager.h"
#include "framework/scheduling/Task.h"
#include "framework/structure/ExtensionStructure.h"
@@ -51,6 +52,7 @@ private:
static constexpr size_t QUERY = 1;
static constexpr size_t RECONSTRUCTION = 2;
+ static constexpr size_t FLUSH = 3;
typedef std::shared_ptr<VersionType> version_ptr;
typedef size_t version_id;
@@ -90,7 +92,6 @@ public:
m_version_counter = INITIAL_VERSION;
assert(m_config.recon_policy);
- m_reconstruction_scheduled.store(false);
}
/**
@@ -400,10 +401,10 @@ private:
std::condition_variable m_version_advance_cv;
std::mutex m_version_advance_mtx;
- std::atomic<bool> m_reconstruction_scheduled;
-
std::atomic<bool> m_flush_in_progress = false;
+ LockManager m_lock_mngr;
+
alignas(64) std::atomic<bool> m_scheduling_reconstruction;
void enforce_delete_invariant(VersionType *version) {
@@ -493,7 +494,9 @@ private:
/* perform all of the reconstructions */
StructureType *structure = args->version->get_mutable_structure();
for (size_t i = 0; i < args->tasks.size(); i++) {
- structure->perform_reconstruction(args->tasks[i]);
+ if (structure->perform_reconstruction(args->tasks[i])) {
+ extension->m_lock_mngr.add_lock();
+ }
}
/*
@@ -508,11 +511,19 @@ private:
/* advance the index to the newly finished version */
extension->install_new_version(args->version, args->initial_version);
+ /* maint reconstructions can now safely release their locks */
+ if (args->priority == ReconstructionPriority::MAINT) {
+ for (size_t i=0; i<args->tasks.size(); i++) {
+ for (auto source : args->tasks[i].sources) {
+ extension->m_lock_mngr.release_lock(source.level_idx);
+ }
+ }
+ }
+
if (args->priority == ReconstructionPriority::FLUSH) {
extension->m_flush_in_progress.store(false);
// fprintf(stderr, "[I] Completed flush (%ld)\n", args->version->get_id());
} else {
- extension->m_reconstruction_scheduled.store(false);
// fprintf(stderr, "[I] Completed background reconstruction (%ld)\n", args->version->get_id());
}
@@ -591,6 +602,7 @@ private:
*/
version_ptr create_version_flush(std::unique_ptr<StructureType> structure) {
size_t version_id = m_version_counter.fetch_add(1);
+ // fprintf(stderr, "[I] Flush version assigned (%ld)\n", version_id);
auto active_version = get_active_version();
std::shared_ptr<VersionType> new_version =
std::make_shared<VersionType>(version_id, std::move(structure), m_buffer.get(), active_version->get_buffer().get_head());
@@ -617,7 +629,7 @@ private:
assert(new_version->get_structure());
assert(new_version->get_id() != INVALID_VERSION);
- // fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id());
+ // fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id());
/* wait until our turn to install the new version */
await_version(new_version->get_id() - 1);
@@ -669,10 +681,10 @@ private:
bool old = m_flush_in_progress.load();
if (old || !m_flush_in_progress.compare_exchange_strong(old, true)) {
end_reconstruction_scheduling();
+ m_version_advance_cv.notify_all();
return;
}
- // fprintf(stderr, "[I] Scheduling flush\n");
/*
* for "legacy" policies, without background reconstruction, we need
@@ -692,6 +704,7 @@ private:
auto new_version = create_version_flush(std::move(structure));
+
auto *args = new ReconstructionArgs<ShardType, QueryType>();
args->version = new_version;
args->tasks = m_config.recon_policy->get_flush_tasks(args->version.get());
@@ -699,12 +712,14 @@ private:
args->priority = ReconstructionPriority::FLUSH;
args->initial_version = INVALID_VERSION;
+ // fprintf(stderr, "[I] Scheduling flush (%ld)\n", new_version->get_id());
/*
* NOTE: args is deleted by the reconstruction job, so shouldn't be
* freed here
*/
m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args,
- RECONSTRUCTION);
+ FLUSH);
+ // fprintf(stderr, "[I] Finished scheduling flush (%ld)\n", new_version->get_id());
if (m_config.recon_enable_maint_on_flush) {
schedule_maint_reconstruction(false);
@@ -715,7 +730,7 @@ private:
void schedule_maint_reconstruction(bool take_reconstruction_lock = true) {
- if (m_config.recon_maint_disabled || m_reconstruction_scheduled.load()) {
+ if (m_config.recon_maint_disabled) {
return;
}
@@ -723,37 +738,28 @@ private:
begin_reconstruction_scheduling();
}
- if (m_reconstruction_scheduled.load()) {
- end_reconstruction_scheduling();
- return;
- }
-
// fprintf(stderr, "[I] Scheduling maintenance\n");
- m_reconstruction_scheduled.store(true);
-
- // FIXME: memory management issue here?
auto active_version = m_active_version.load();
- auto new_version = create_version_maint(std::unique_ptr<StructureType>(active_version->get_structure()->copy()));
+ auto reconstructions = m_config.recon_policy->get_reconstruction_tasks(active_version.get(), m_lock_mngr);
- auto *args = new ReconstructionArgs<ShardType, QueryType>();
- args->version = new_version;
- args->tasks = m_config.recon_policy->get_reconstruction_tasks(args->version.get());
- args->extension = this;
- args->priority = ReconstructionPriority::MAINT;
- args->initial_version = active_version->get_id();
+ // if (reconstructions.size() == 0) {
+ // fprintf(stderr, "[I] Maintenance contained no task--not scheduled\n");
+ // }
- /*
- * NOTE: args is deleted by the reconstruction job, so shouldn't be
- * freed here
- */
- if (args->tasks.size() > 0) {
+ for (auto &recon : reconstructions) {
+ /*
+ * NOTE: args is deleted by the reconstruction job, so shouldn't be
+ * freed here
+ */
+ auto *args = new ReconstructionArgs<ShardType, QueryType>();
+ args->version = create_version_maint(std::unique_ptr<StructureType>(active_version->get_structure()->copy()));
+ args->tasks = std::move(recon);
+ args->extension = this;
+ args->priority = ReconstructionPriority::MAINT;
+ args->initial_version = active_version->get_id();
m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args,
- RECONSTRUCTION);
- } else {
- delete args;
- m_reconstruction_scheduled.store(false);
- // fprintf(stderr, "[I] Maintenance contained no task--not scheduled\n");
+ RECONSTRUCTION);
}
if (take_reconstruction_lock) {
@@ -775,17 +781,20 @@ private:
}
int internal_append(const RecordType &rec, bool ts) {
- if (m_buffer->is_at_low_watermark()) {
+ size_t max_l0 = (log(get_record_count()) / log(8)) + 1;
+ size_t current_l0 = get_active_version()->get_structure()->get_level_vector()[0]->get_shard_count();
+
+ if (m_buffer->is_at_low_watermark() && current_l0 <= max_l0) {
auto old = false;
if (m_scheduling_reconstruction.compare_exchange_strong(old, true)) {
schedule_flush();
}
}
-
- if (rand() % 1000 < 5) {
- size_t l0_cnt = get_active_version()->get_structure()->get_level_vector()[0]->get_shard_count();
- usleep(l0_cnt);
+ if (m_buffer->is_at_high_watermark() && current_l0 > max_l0) {
+ schedule_maint_reconstruction(true);
+ // fprintf(stderr, "[I] Current L0: %ld\tMax L0:%ld\n", current_l0, max_l0);
+ return 0;
}
/* this will fail if the HWM is reached and return 0 */
diff --git a/include/framework/reconstruction/BSMPolicy.h b/include/framework/reconstruction/BSMPolicy.h
index 9ddd150..6d55a12 100644
--- a/include/framework/reconstruction/BSMPolicy.h
+++ b/include/framework/reconstruction/BSMPolicy.h
@@ -24,10 +24,9 @@ public:
BSMPolicy(size_t buffer_size)
: m_scale_factor(2), m_buffer_size(buffer_size) {}
- ReconstructionVector
- get_reconstruction_tasks(const Version<ShardType, QueryType> *version) const override {
- ReconstructionVector reconstructions;
- return reconstructions;
+ std::vector<ReconstructionVector>
+ get_reconstruction_tasks(const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override {
+ return {};
}
ReconstructionVector
diff --git a/include/framework/reconstruction/BackgroundTieringPolicy.h b/include/framework/reconstruction/BackgroundTieringPolicy.h
index 4c266fd..5a82695 100644
--- a/include/framework/reconstruction/BackgroundTieringPolicy.h
+++ b/include/framework/reconstruction/BackgroundTieringPolicy.h
@@ -24,14 +24,14 @@ public:
BackgroundTieringPolicy(size_t scale_factor, size_t buffer_size)
: m_scale_factor(scale_factor), m_buffer_size(buffer_size) {}
- ReconstructionVector get_reconstruction_tasks(
- const Version<ShardType, QueryType> *version) const override {
- ReconstructionVector reconstructions;
+ std::vector<ReconstructionVector> get_reconstruction_tasks(
+ const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override {
+ std::vector<ReconstructionVector> reconstructions;
auto levels = version->get_structure()->get_level_vector();
- if (levels[0]->get_shard_count() < m_scale_factor) {
- return reconstructions;
+ if (levels[0]->get_shard_count() == 0) {
+ return {};
}
level_index target_level = find_reconstruction_target(levels);
@@ -44,16 +44,19 @@ public:
}
for (level_index i = target_level; i > source_level; i--) {
- size_t target_reccnt =
- (i < (level_index)levels.size()) ? levels[i]->get_record_count() : 0;
- size_t total_reccnt = levels[i - 1]->get_record_count() + target_reccnt;
-
- std::vector<ShardID> shards;
- for (ssize_t j=0; j<(ssize_t)levels[i-1]->get_shard_count(); j++) {
- shards.push_back({i-1, j});
+ if (lock_mngr.take_lock(i-1)) {
+ ReconstructionVector recon;
+ size_t target_reccnt =
+ (i < (level_index)levels.size()) ? levels[i]->get_record_count() : 0;
+ size_t total_reccnt = levels[i - 1]->get_record_count() + target_reccnt;
+ std::vector<ShardID> shards;
+ for (ssize_t j=0; j<(ssize_t)levels[i-1]->get_shard_count(); j++) {
+ shards.push_back({i-1, j});
+ }
+
+ recon.add_reconstruction(shards, i, total_reccnt, ReconstructionType::Compact);
+ reconstructions.push_back(recon);
}
-
- reconstructions.add_reconstruction(shards, i, total_reccnt, ReconstructionType::Compact);
}
return reconstructions;
@@ -70,7 +73,7 @@ private:
level_index find_reconstruction_target(LevelVector &levels) const {
level_index target_level = invalid_level_idx;
- for (level_index i = 0; i < (level_index)levels.size(); i++) {
+ for (level_index i = 1; i < (level_index)levels.size(); i++) {
if (levels[i]->get_shard_count() + 1 <= capacity()) {
target_level = i;
break;
diff --git a/include/framework/reconstruction/FixedShardCountPolicy.h b/include/framework/reconstruction/FixedShardCountPolicy.h
index a181052..cc8dce4 100644
--- a/include/framework/reconstruction/FixedShardCountPolicy.h
+++ b/include/framework/reconstruction/FixedShardCountPolicy.h
@@ -24,11 +24,9 @@ public:
FixedShardCountPolicy(size_t buffer_size, size_t shard_count, size_t max_record_count)
: m_buffer_size(buffer_size), m_shard_count(shard_count), m_max_reccnt(max_record_count) {}
- ReconstructionVector
- get_reconstruction_tasks(const Version<ShardType, QueryType> *version) const override {
- ReconstructionVector reconstructions;
- return reconstructions;
-
+ std::vector<ReconstructionVector>
+ get_reconstruction_tasks(const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override {
+ return {};
}
ReconstructionVector
diff --git a/include/framework/reconstruction/FloodL0Policy.h b/include/framework/reconstruction/FloodL0Policy.h
index b4f453b..c0d29fe 100644
--- a/include/framework/reconstruction/FloodL0Policy.h
+++ b/include/framework/reconstruction/FloodL0Policy.h
@@ -23,10 +23,9 @@ class FloodL0Policy : public ReconstructionPolicy<ShardType, QueryType> {
public:
FloodL0Policy(size_t buffer_size) : m_buffer_size(buffer_size) {}
- ReconstructionVector
- get_reconstruction_tasks(const Version<ShardType, QueryType> *version) const override {
- ReconstructionVector reconstructions;
- return reconstructions;
+ std::vector<ReconstructionVector>
+ get_reconstruction_tasks(const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override {
+ return {};
}
ReconstructionVector
diff --git a/include/framework/reconstruction/LevelingPolicy.h b/include/framework/reconstruction/LevelingPolicy.h
index 3a0b73e..f0feb53 100644
--- a/include/framework/reconstruction/LevelingPolicy.h
+++ b/include/framework/reconstruction/LevelingPolicy.h
@@ -24,10 +24,9 @@ public:
LevelingPolicy(size_t scale_factor, size_t buffer_size)
: m_scale_factor(scale_factor), m_buffer_size(buffer_size) {}
- ReconstructionVector
- get_reconstruction_tasks(const Version<ShardType, QueryType> *version) const override {
- ReconstructionVector reconstructions;
- return reconstructions;
+ std::vector<ReconstructionVector>
+ get_reconstruction_tasks(const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override {
+ return {};
}
ReconstructionVector
diff --git a/include/framework/reconstruction/ReconstructionPolicy.h b/include/framework/reconstruction/ReconstructionPolicy.h
index 6f99b32..41a2092 100644
--- a/include/framework/reconstruction/ReconstructionPolicy.h
+++ b/include/framework/reconstruction/ReconstructionPolicy.h
@@ -15,6 +15,7 @@
#include "util/types.h"
#include "framework/structure/ExtensionStructure.h"
#include "framework/scheduling/Version.h"
+#include "framework/scheduling/LockManager.h"
namespace de {
template<ShardInterface ShardType, QueryInterface<ShardType> QueryType>
@@ -23,7 +24,7 @@ class ReconstructionPolicy {
public:
ReconstructionPolicy() {}
- virtual ReconstructionVector get_reconstruction_tasks(const Version<ShardType, QueryType> *version) const = 0;
+ virtual std::vector<ReconstructionVector> get_reconstruction_tasks(const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const = 0;
virtual ReconstructionVector get_flush_tasks(const Version<ShardType, QueryType> *version) const = 0;
};
}
diff --git a/include/framework/reconstruction/TieringPolicy.h b/include/framework/reconstruction/TieringPolicy.h
index aa94f7a..d8769f7 100644
--- a/include/framework/reconstruction/TieringPolicy.h
+++ b/include/framework/reconstruction/TieringPolicy.h
@@ -24,10 +24,9 @@ public:
TieringPolicy(size_t scale_factor, size_t buffer_size)
: m_scale_factor(scale_factor), m_buffer_size(buffer_size) {}
- ReconstructionVector get_reconstruction_tasks(
- const Version<ShardType, QueryType> *version) const override {
- ReconstructionVector reconstructions;
- return reconstructions;
+ std::vector<ReconstructionVector> get_reconstruction_tasks(
+ const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override {
+ return {};
}
ReconstructionVector
diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h
index 4c1db8d..8a4cd8d 100644
--- a/include/framework/scheduling/FIFOScheduler.h
+++ b/include/framework/scheduling/FIFOScheduler.h
@@ -51,13 +51,20 @@ public:
m_sched_thrd.join();
m_sched_wakeup_thrd.join();
+ m_flush_thread.join();
}
void schedule_job(std::function<void(void *)> job, size_t size, void *args,
size_t type = 0) {
- std::unique_lock<std::mutex> lk(m_cv_lock);
+
size_t ts = m_counter.fetch_add(1);
+ if (type == 3) {
+ do_flush(Task(size, ts, job, args, type, &m_stats, &m_flush_lock));
+ return;
+ }
+
+ std::unique_lock<std::mutex> lk(m_cv_lock);
m_stats.job_queued(ts, type, size);
m_task_queue.push(Task(size, ts, job, args, type, &m_stats));
@@ -84,6 +91,9 @@ private:
std::condition_variable m_cv;
std::mutex m_queue_lock;
+ std::mutex m_flush_lock;
+ std::thread m_flush_thread;
+
std::thread m_sched_thrd;
std::thread m_sched_wakeup_thrd;
ctpl::thread_pool m_thrd_pool;
@@ -121,6 +131,14 @@ private:
}
} while (!m_shutdown.load());
}
+
+ void do_flush(Task task) {
+ m_flush_lock.lock();
+ if (m_flush_thread.joinable()) {
+ m_flush_thread.join();
+ }
+ m_flush_thread = std::thread(task, 0);
+ }
};
} // namespace de
diff --git a/include/framework/scheduling/LockManager.h b/include/framework/scheduling/LockManager.h
new file mode 100644
index 0000000..91ed778
--- /dev/null
+++ b/include/framework/scheduling/LockManager.h
@@ -0,0 +1,52 @@
+/*
+ *
+ */
+
+#pragma once
+#include <deque>
+#include <atomic>
+
+namespace de {
+class LockManager {
+public:
+ LockManager(size_t levels=1) {
+ for (size_t i=0; i < levels; i++) {
+ m_lks.emplace_back(false);
+ }
+ }
+
+ ~LockManager() = default;
+
+ void add_lock() {
+ m_lks.emplace_back(false);
+ }
+
+ void release_lock(size_t idx) {
+ if (idx < m_lks.size()) {
+ m_lks[idx].store(false);
+ }
+ }
+
+ bool is_locked(size_t idx) {
+ if (idx < m_lks.size()) {
+ return m_lks[idx].load();
+ }
+
+ return false;
+ }
+
+ bool take_lock(size_t idx) {
+ if (idx < m_lks.size()) {
+ bool old = m_lks[idx].load();
+ if (!old) {
+ return m_lks[idx].compare_exchange_strong(old, true);
+ }
+ }
+
+ return false;
+ }
+
+private:
+ std::deque<std::atomic<bool>> m_lks;
+};
+}
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
index ed40d3d..2d68f56 100644
--- a/include/framework/scheduling/Task.h
+++ b/include/framework/scheduling/Task.h
@@ -49,9 +49,9 @@ typedef std::function<void(void *)> Job;
struct Task {
Task(size_t size, size_t ts, Job job, void *args, size_t type = 0,
- SchedulerStatistics *stats = nullptr)
+ SchedulerStatistics *stats = nullptr, std::mutex *lk = nullptr)
: m_job(job), m_size(size), m_timestamp(ts), m_args(args), m_type(type),
- m_stats(stats) {}
+ m_stats(stats), m_lk(lk) {}
Job m_job;
size_t m_size;
@@ -59,6 +59,7 @@ struct Task {
void *m_args;
size_t m_type;
SchedulerStatistics *m_stats;
+ std::mutex *m_lk;
friend bool operator<(const Task &self, const Task &other) {
return self.m_timestamp < other.m_timestamp;
@@ -87,6 +88,10 @@ struct Task {
.count();
m_stats->log_time_data(time, m_type);
}
+
+ if (m_lk) {
+ m_lk->unlock();
+ }
}
};
diff --git a/include/framework/scheduling/statistics.h b/include/framework/scheduling/statistics.h
index 48c186f..a6d66ab 100644
--- a/include/framework/scheduling/statistics.h
+++ b/include/framework/scheduling/statistics.h
@@ -52,7 +52,7 @@ public:
/* FIXME: This is just a temporary approach */
void log_time_data(size_t length, size_t type) {
- assert(type == 1 || type == 2);
+ assert(type == 1 || type == 2 || type == 3);
if (type == 1) {
m_type_1_cnt.fetch_add(1);
@@ -61,7 +61,7 @@ public:
if (length > m_type_1_largest_time) {
m_type_1_largest_time.store(length);
}
- } else {
+ } else if (type == 2) {
m_type_2_cnt.fetch_add(1);
m_type_2_total_time.fetch_add(length);
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index 2bf7086..fa713af 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -159,7 +159,12 @@ public:
return cnt;
}
- inline void perform_reconstruction(ReconstructionTask task, size_t version=0) {
+ /*
+ * Perform the reconstruction described by task. If the resulting
+ * reconstruction grows the structure (i.e., adds a level), returns
+ * true. Otherwise, returns false.
+ */
+ inline bool perform_reconstruction(ReconstructionTask task, size_t version=0) {
/* perform the reconstruction itself */
std::vector<const ShardType *> shards;
for (ShardID shid : task.sources) {
@@ -205,10 +210,12 @@ public:
*/
if (task.target < (level_index)m_levels.size()) {
m_levels[task.target]->append(std::shared_ptr<ShardType>(new_shard), version);
+ return false;
// fprintf(stderr, "append (no growth)\n");
} else { /* grow the structure if needed */
m_levels.push_back(std::make_shared<InternalLevel<ShardType, QueryType>>(task.target));
m_levels[task.target]->append(std::shared_ptr<ShardType>(new_shard), version);
+ return true;
// fprintf(stderr, "grow and append\n");
}
}
@@ -283,9 +290,13 @@ public:
assert(version_id > 0);
for (size_t i=0; i<old_structure->m_levels.size(); i++) {
- for (size_t j=0; j<old_structure->m_levels[i]->get_shard_count(); j++) {
- if (old_structure->m_levels[i]->get_shard_version(j) > version_id) {
- m_levels[i]->append(old_structure->m_levels[i]->get_shard_ptr(j));
+ if (m_levels.size() <= i) {
+ m_levels.push_back(old_structure->m_levels[i]);
+ } else {
+ for (size_t j=0; j<old_structure->m_levels[i]->get_shard_count(); j++) {
+ if (old_structure->m_levels[i]->get_shard_version(j) > version_id) {
+ m_levels[i]->append(old_structure->m_levels[i]->get_shard_ptr(j));
+ }
}
}
}
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index 0eae73d..0197ecd 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -104,6 +104,7 @@ public:
bool is_full() const { return get_record_count() >= m_hwm; }
bool is_at_low_watermark() const { return get_record_count() >= m_lwm; }
+ bool is_at_high_watermark() const { return get_record_count() >= m_hwm; }
size_t get_tombstone_count() const { return m_tscnt.load(); }