summaryrefslogtreecommitdiffstats
path: root/include
diff options
context:
space:
mode:
Diffstat (limited to 'include')
-rw-r--r--include/framework/DynamicExtension.h69
-rw-r--r--include/framework/reconstruction/BSMPolicy.h19
-rw-r--r--include/framework/reconstruction/BackgroundTieringPolicy.h88
-rw-r--r--include/framework/reconstruction/FixedShardCountPolicy.h34
-rw-r--r--include/framework/reconstruction/FloodL0Policy.h3
-rw-r--r--include/framework/reconstruction/TieringPolicy.h15
-rw-r--r--include/framework/scheduling/Task.h1
-rw-r--r--include/framework/scheduling/Version.h40
-rw-r--r--include/framework/structure/ExtensionStructure.h72
-rw-r--r--include/framework/structure/InternalLevel.h76
-rw-r--r--include/util/types.h8
11 files changed, 340 insertions, 85 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index ef36de3..a48f390 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -89,6 +89,8 @@ public:
std::make_shared<VersionType>(INITIAL_VERSION, std::make_unique<StructureType>(), m_buffer.get(), 0));
m_version_counter = INITIAL_VERSION;
+ assert(m_config.recon_policy);
+ m_reconstruction_scheduled.store(false);
}
/**
@@ -374,6 +376,14 @@ public:
*/
void print_scheduler_statistics() const { m_sched->print_statistics(); }
+ /**
+ * Writes a schematic view of the currently active structure to
+ * stdout. Each level is on its own line, and each shard is represented.
+ */
+ void print_structure() {
+ get_active_version()->get_structure()->print_structure();
+ }
+
private:
ConfType m_config;
@@ -390,6 +400,8 @@ private:
std::condition_variable m_version_advance_cv;
std::mutex m_version_advance_mtx;
+ std::atomic<bool> m_reconstruction_scheduled;
+
std::atomic<bool> m_flush_in_progress = false;
alignas(64) std::atomic<bool> m_scheduling_reconstruction;
@@ -441,6 +453,7 @@ private:
* this code will be bypassed in that case.
*/
if (args->priority == ReconstructionPriority::FLUSH) {
+ // fprintf(stderr, "[I] Running flush (%ld)\n", args->version->get_id());
assert(extension->m_flush_in_progress.load());
/* we first construct a shard from the buffer */
auto buffview = args->version->get_buffer();
@@ -460,19 +473,21 @@ private:
if (extension->m_config.recon_maint_disabled) {
assert(args->version->get_mutable_structure());
- args->version->get_mutable_structure()->append_l0(std::move(new_shard));
+ args->version->get_mutable_structure()->append_l0(std::move(new_shard), args->version->get_id());
} else {
assert(!args->version->get_mutable_structure());
auto structure = std::unique_ptr<StructureType>(extension->get_active_version()->get_structure()->copy());
/* add the newly created shard to the structure copy */
- structure->append_l0(std::move(new_shard));
+ structure->append_l0(std::move(new_shard), args->version->get_id());
/* set this version's structure to the newly created one */
args->version->set_structure(std::move(structure));
}
args->version->advance_buffer_head(new_head);
+ } else {
+ // fprintf(stderr, "[I] Running background reconstruction\n");
}
/* perform all of the reconstructions */
@@ -487,13 +502,18 @@ private:
*/
if (args->version->get_id() == INVALID_VERSION) {
args->version->set_id(extension->m_version_counter.fetch_add(1));
+ // fprintf(stderr, "[I] Reconstruction version assigned (%ld)\n", args->version->get_id());
}
/* advance the index to the newly finished version */
- extension->install_new_version(args->version);
+ extension->install_new_version(args->version, args->initial_version);
if (args->priority == ReconstructionPriority::FLUSH) {
extension->m_flush_in_progress.store(false);
+ // fprintf(stderr, "[I] Completed flush (%ld)\n", args->version->get_id());
+ } else {
+ extension->m_reconstruction_scheduled.store(false);
+ // fprintf(stderr, "[I] Completed background reconstruction (%ld)\n", args->version->get_id());
}
/* manually delete the argument object */
@@ -593,17 +613,18 @@ private:
return new_version;
}
- void install_new_version(version_ptr new_version) {
+ void install_new_version(version_ptr new_version, size_t old_active_version_id) {
assert(new_version->get_structure());
assert(new_version->get_id() != INVALID_VERSION);
+ // fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id());
/* wait until our turn to install the new version */
await_version(new_version->get_id() - 1);
auto old = get_active_version();
- // FIXME: implement this interface
- // new_version->merge_changes_from(old.load().get());
+ new_version->merge_changes_from(old.get(), old_active_version_id);
+ new_version->update_shard_version(new_version->get_id());
/*
* Only one version can have a given number, so we are safe to
@@ -620,6 +641,8 @@ private:
*/
auto lk = std::unique_lock(m_version_advance_mtx);
m_version_advance_cv.notify_all();
+
+ // fprintf(stderr, "[I] Installed version %ld\n", new_version->get_id());
}
StructureType *create_scratch_structure() {
@@ -649,6 +672,8 @@ private:
return;
}
+ // fprintf(stderr, "[I] Scheduling flush\n");
+
/*
* for "legacy" policies, without background reconstruction, we need
* a valid structure object as part of the version prior to determining
@@ -672,6 +697,7 @@ private:
args->tasks = m_config.recon_policy->get_flush_tasks(args->version.get());
args->extension = this;
args->priority = ReconstructionPriority::FLUSH;
+ args->initial_version = INVALID_VERSION;
/*
* NOTE: args is deleted by the reconstruction job, so shouldn't be
@@ -689,7 +715,7 @@ private:
void schedule_maint_reconstruction(bool take_reconstruction_lock = true) {
- if (m_config.recon_maint_disabled) {
+ if (m_config.recon_maint_disabled || m_reconstruction_scheduled.load()) {
return;
}
@@ -697,21 +723,38 @@ private:
begin_reconstruction_scheduling();
}
+ if (m_reconstruction_scheduled.load()) {
+ end_reconstruction_scheduling();
+ return;
+ }
+
+ // fprintf(stderr, "[I] Scheduling maintenance\n");
+
+ m_reconstruction_scheduled.store(true);
+
// FIXME: memory management issue here?
- auto new_version = create_version_maint(std::unique_ptr<StructureType>(m_active_version.load()->get_structure()->copy()));
+ auto active_version = m_active_version.load();
+ auto new_version = create_version_maint(std::unique_ptr<StructureType>(active_version->get_structure()->copy()));
auto *args = new ReconstructionArgs<ShardType, QueryType>();
args->version = new_version;
args->tasks = m_config.recon_policy->get_reconstruction_tasks(args->version.get());
args->extension = this;
args->priority = ReconstructionPriority::MAINT;
+ args->initial_version = active_version->get_id();
/*
* NOTE: args is deleted by the reconstruction job, so shouldn't be
* freed here
*/
- m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args,
- RECONSTRUCTION);
+ if (args->tasks.size() > 0) {
+ m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args,
+ RECONSTRUCTION);
+ } else {
+ delete args;
+ m_reconstruction_scheduled.store(false);
+ // fprintf(stderr, "[I] Maintenance contained no task--not scheduled\n");
+ }
if (take_reconstruction_lock) {
end_reconstruction_scheduling();
@@ -739,6 +782,12 @@ private:
}
}
+
+ if (rand() % 1000 < 5) {
+ size_t l0_cnt = get_active_version()->get_structure()->get_level_vector()[0]->get_shard_count();
+ usleep(l0_cnt);
+ }
+
/* this will fail if the HWM is reached and return 0 */
return m_buffer->append(rec, ts);
}
diff --git a/include/framework/reconstruction/BSMPolicy.h b/include/framework/reconstruction/BSMPolicy.h
index eaa374a..9ddd150 100644
--- a/include/framework/reconstruction/BSMPolicy.h
+++ b/include/framework/reconstruction/BSMPolicy.h
@@ -25,8 +25,7 @@ public:
: m_scale_factor(2), m_buffer_size(buffer_size) {}
ReconstructionVector
- get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
- size_t incoming_reccnt) const override {
+ get_reconstruction_tasks(const Version<ShardType, QueryType> *version) const override {
ReconstructionVector reconstructions;
return reconstructions;
}
@@ -49,13 +48,21 @@ public:
task.target = target_level;
task.type = ReconstructionType::Merge;
+ std::vector<ShardID> source_shards;
+ size_t reccnt = 0;
+
+ source_shards.push_back({0, all_shards_idx});
+
for (level_index i = target_level; i > source_level; i--) {
if (i < (level_index)levels.size()) {
- task.add_shard({i, all_shards_idx}, levels[i]->get_record_count());
+ source_shards.push_back({i-1, all_shards_idx});
+ reccnt += levels[i-1]->get_record_count();
}
}
- reconstructions.add_reconstruction(task);
+ assert(source_shards.size() > 0);
+
+ reconstructions.add_reconstruction(source_shards, target_level, reccnt, ReconstructionType::Merge);
return reconstructions;
}
@@ -63,8 +70,8 @@ private:
level_index find_reconstruction_target(LevelVector &levels) const {
level_index target_level = invalid_level_idx;
- for (level_index i = 0; i < (level_index)levels.size(); i++) {
- if (levels[i]->get_record_count() + m_buffer_size <= capacity(i)) {
+ for (level_index i = 1; i < (level_index)levels.size(); i++) {
+ if (levels[i]->get_record_count() == 0) {
target_level = i;
break;
}
diff --git a/include/framework/reconstruction/BackgroundTieringPolicy.h b/include/framework/reconstruction/BackgroundTieringPolicy.h
new file mode 100644
index 0000000..4c266fd
--- /dev/null
+++ b/include/framework/reconstruction/BackgroundTieringPolicy.h
@@ -0,0 +1,88 @@
+/*
+ * include/framework/reconstruction/LevelingPolicy.h
+ *
+ * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ */
+#pragma once
+#include <cmath>
+
+#include "framework/reconstruction/ReconstructionPolicy.h"
+#include "framework/scheduling/Version.h"
+#include "util/types.h"
+
+namespace de {
+template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
+class BackgroundTieringPolicy : public ReconstructionPolicy<ShardType, QueryType> {
+ typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>>
+ LevelVector;
+
+public:
+ BackgroundTieringPolicy(size_t scale_factor, size_t buffer_size)
+ : m_scale_factor(scale_factor), m_buffer_size(buffer_size) {}
+
+ ReconstructionVector get_reconstruction_tasks(
+ const Version<ShardType, QueryType> *version) const override {
+ ReconstructionVector reconstructions;
+
+ auto levels = version->get_structure()->get_level_vector();
+
+ if (levels[0]->get_shard_count() < m_scale_factor) {
+ return reconstructions;
+ }
+
+ level_index target_level = find_reconstruction_target(levels);
+ assert(target_level != -1);
+ level_index source_level = 0;
+
+ if (target_level == invalid_level_idx) {
+ /* grow */
+ target_level = levels.size();
+ }
+
+ for (level_index i = target_level; i > source_level; i--) {
+ size_t target_reccnt =
+ (i < (level_index)levels.size()) ? levels[i]->get_record_count() : 0;
+ size_t total_reccnt = levels[i - 1]->get_record_count() + target_reccnt;
+
+ std::vector<ShardID> shards;
+ for (ssize_t j=0; j<(ssize_t)levels[i-1]->get_shard_count(); j++) {
+ shards.push_back({i-1, j});
+ }
+
+ reconstructions.add_reconstruction(shards, i, total_reccnt, ReconstructionType::Compact);
+ }
+
+ return reconstructions;
+ }
+
+ ReconstructionVector
+ get_flush_tasks(const Version<ShardType, QueryType> *version) const override {
+ ReconstructionVector reconstructions;
+
+ return reconstructions;
+ }
+
+private:
+ level_index find_reconstruction_target(LevelVector &levels) const {
+ level_index target_level = invalid_level_idx;
+
+ for (level_index i = 0; i < (level_index)levels.size(); i++) {
+ if (levels[i]->get_shard_count() + 1 <= capacity()) {
+ target_level = i;
+ break;
+ }
+ }
+
+ return target_level;
+ }
+
+ inline size_t capacity() const { return m_scale_factor; }
+
+ size_t m_scale_factor;
+ size_t m_buffer_size;
+};
+} // namespace de
diff --git a/include/framework/reconstruction/FixedShardCountPolicy.h b/include/framework/reconstruction/FixedShardCountPolicy.h
index 0768daa..a181052 100644
--- a/include/framework/reconstruction/FixedShardCountPolicy.h
+++ b/include/framework/reconstruction/FixedShardCountPolicy.h
@@ -25,8 +25,7 @@ public:
: m_buffer_size(buffer_size), m_shard_count(shard_count), m_max_reccnt(max_record_count) {}
ReconstructionVector
- get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
- size_t incoming_reccnt) const override {
+ get_reconstruction_tasks(const Version<ShardType, QueryType> *version) const override {
ReconstructionVector reconstructions;
return reconstructions;
@@ -36,26 +35,25 @@ public:
get_flush_tasks(const Version<ShardType, QueryType> *version) const override {
auto levels = version->get_structure()->get_level_vector();
-
ReconstructionVector v;
- if (levels.size() == 0) {
- v.add_reconstruction(ReconstructionTask{
- {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append});
- return v;
- }
-
- ShardID last_shid = {0, (shard_index) (levels[0]->get_shard_count() - 1)};
-
- if (levels[0]->get_shard(last_shid.shard_idx)->get_record_count() + m_buffer_size <= capacity()) {
- v.add_reconstruction(ReconstructionTask{
- {{buffer_shid, last_shid}}, 0, m_buffer_size, ReconstructionType::Merge});
- return v;
- } else {
- v.add_reconstruction(ReconstructionTask{
- {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append});
+ /* if this is the very first flush, there won't be an L1 yet */
+ if (levels.size() > 1 && levels[1]->get_shard_count() > 0) {
+ ShardID last_shid = {1, (shard_index) (levels[1]->get_shard_count() - 1)};
+ if (levels[1]->get_shard(last_shid.shard_idx)->get_record_count() + m_buffer_size <= capacity()) {
+ auto task = ReconstructionTask {
+ {{0, 0}, last_shid}, 1, m_buffer_size,ReconstructionType::Merge
+ };
+ v.add_reconstruction(task);
return v;
+ }
}
+
+ auto task = ReconstructionTask {
+ {{0, 0}}, 1, m_buffer_size, ReconstructionType::Append
+ };
+ v.add_reconstruction(task);
+ return v;
}
private:
diff --git a/include/framework/reconstruction/FloodL0Policy.h b/include/framework/reconstruction/FloodL0Policy.h
index 94bed70..b4f453b 100644
--- a/include/framework/reconstruction/FloodL0Policy.h
+++ b/include/framework/reconstruction/FloodL0Policy.h
@@ -24,8 +24,7 @@ public:
FloodL0Policy(size_t buffer_size) : m_buffer_size(buffer_size) {}
ReconstructionVector
- get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
- size_t incoming_reccnt) const override {
+ get_reconstruction_tasks(const Version<ShardType, QueryType> *version) const override {
ReconstructionVector reconstructions;
return reconstructions;
}
diff --git a/include/framework/reconstruction/TieringPolicy.h b/include/framework/reconstruction/TieringPolicy.h
index dce5c3c..ae215db 100644
--- a/include/framework/reconstruction/TieringPolicy.h
+++ b/include/framework/reconstruction/TieringPolicy.h
@@ -17,14 +17,15 @@
namespace de {
template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
class TieringPolicy : public ReconstructionPolicy<ShardType, QueryType> {
- typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> LevelVector;
+ typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>>
+ LevelVector;
+
public:
TieringPolicy(size_t scale_factor, size_t buffer_size)
: m_scale_factor(scale_factor), m_buffer_size(buffer_size) {}
- ReconstructionVector
- get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
- size_t incoming_reccnt) const override {
+ ReconstructionVector get_reconstruction_tasks(
+ const Version<ShardType, QueryType> *version) const override {
ReconstructionVector reconstructions;
return reconstructions;
}
@@ -59,7 +60,7 @@ private:
level_index find_reconstruction_target(LevelVector &levels) const {
level_index target_level = invalid_level_idx;
- for (level_index i = 0; i < (level_index) levels.size(); i++) {
+ for (level_index i = 0; i < (level_index)levels.size(); i++) {
if (levels[i]->get_shard_count() + 1 <= capacity()) {
target_level = i;
break;
@@ -69,9 +70,7 @@ private:
return target_level;
}
- inline size_t capacity() const {
- return m_scale_factor;
- }
+ inline size_t capacity() const { return m_scale_factor; }
size_t m_scale_factor;
size_t m_buffer_size;
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
index 1591909..ed40d3d 100644
--- a/include/framework/scheduling/Task.h
+++ b/include/framework/scheduling/Task.h
@@ -36,6 +36,7 @@ struct ReconstructionArgs {
ReconstructionVector tasks;
void *extension;
ReconstructionPriority priority;
+ size_t initial_version;
};
template <ShardInterface S, QueryInterface<S> Q, typename DE> struct QueryArgs {
diff --git a/include/framework/scheduling/Version.h b/include/framework/scheduling/Version.h
index 3e93202..fa677f2 100644
--- a/include/framework/scheduling/Version.h
+++ b/include/framework/scheduling/Version.h
@@ -89,16 +89,44 @@ public:
return m_buffer->advance_head(new_head);
}
+ void merge_changes_from(Version *old, size_t version_id) {
+ /*
+ * for a maint reconstruction, the buffer head may have advanced
+ * during the reconstruction; we don't need to adjust the buffer
+ * for maintenance reconstructions, so we can simply "catch" the
+ * internal head index up to the current version.
+ */
+ if (old->m_buffer_head > m_buffer_head) {
+ m_buffer_head = old->m_buffer_head;
+ }
+
+ // FIXME: we should also ensure that we don't clobber anything
+ // in the event that multiple concurrent reconstructions affect
+ // the same levels. As it stands, two reconstructions *could* share
+ // source shards, resulting in some records being lost or duplicated.
+ //
+ // For the moment, I'm being careful to avoid this within the
+ // scheduling policy itself, and only forwarding changes to this
+ // version.
+
+ /* using INVALID_VERSION disables shard reconcilliation */
+ if (version_id == 0) {
+ return;
+ }
+
+ /* add any shards newer than version_id to this version */
+ auto old_structure = old->get_structure();
+ m_structure->merge_structure(old_structure, version_id);
+ }
+
+ void update_shard_version(size_t version) {
+ m_structure->update_shard_version(version);
+ }
+
private:
BufferType *m_buffer;
std::unique_ptr<StructureType> m_structure;
- /*
- * The number of currently active jobs
- * (queries/merges) operating on this
- * epoch. An epoch can only be retired
- * when this number is 0.
- */
size_t m_id;
size_t m_buffer_head;
ssize_t m_pending_buffer_head;
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index 60fb6c7..2bf7086 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -27,8 +27,9 @@ class ExtensionStructure {
typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>>
LevelVector;
public:
- ExtensionStructure() {
- m_levels.emplace_back(std::make_shared<InternalLevel<ShardType, QueryType>>(0));
+ ExtensionStructure(bool default_level=true) {
+ if (default_level)
+ m_levels.emplace_back(std::make_shared<InternalLevel<ShardType, QueryType>>(0));
}
~ExtensionStructure() = default;
@@ -49,7 +50,7 @@ public:
* need to be forwarded to the appropriate structures manually.
*/
ExtensionStructure<ShardType, QueryType> *copy() const {
- auto new_struct = new ExtensionStructure<ShardType, QueryType>();
+ auto new_struct = new ExtensionStructure<ShardType, QueryType>(false);
for (size_t i = 0; i < m_levels.size(); i++) {
new_struct->m_levels.push_back(m_levels[i]->clone());
}
@@ -158,13 +159,17 @@ public:
return cnt;
}
- inline void perform_reconstruction(ReconstructionTask task) {
+ inline void perform_reconstruction(ReconstructionTask task, size_t version=0) {
/* perform the reconstruction itself */
std::vector<const ShardType *> shards;
for (ShardID shid : task.sources) {
- assert(shid.level_idx < (level_index) m_levels.size());
+ assert(shid.level_idx <= (level_index) m_levels.size());
assert(shid.shard_idx >= -1);
+ if (shid.level_idx == (level_index) m_levels.size()) {
+ continue;
+ }
+
/* if unspecified, push all shards into the vector */
if (shid.shard_idx == all_shards_idx) {
for (size_t i = 0; i < m_levels[shid.level_idx]->get_shard_count();
@@ -184,21 +189,27 @@ public:
* Remove all of the shards processed by the operation
*/
for (ShardID shid : task.sources) {
- if (shid.shard_idx == all_shards_idx) {
+ if (shid.level_idx == (level_index) m_levels.size()) {
+ continue;
+ } else if (shid.shard_idx == all_shards_idx) {
m_levels[shid.level_idx]->truncate();
} else if (shid != buffer_shid) {
m_levels[shid.level_idx]->delete_shard(shid.shard_idx);
}
}
+ // fprintf(stderr, "Target: %ld\tLevels:%ld\n", task.target, m_levels.size());
+
/*
* Append the new shard to the target level
*/
if (task.target < (level_index)m_levels.size()) {
- m_levels[task.target]->append(std::shared_ptr<ShardType>(new_shard));
+ m_levels[task.target]->append(std::shared_ptr<ShardType>(new_shard), version);
+ // fprintf(stderr, "append (no growth)\n");
} else { /* grow the structure if needed */
m_levels.push_back(std::make_shared<InternalLevel<ShardType, QueryType>>(task.target));
- m_levels[task.target]->append(std::shared_ptr<ShardType>(new_shard));
+ m_levels[task.target]->append(std::shared_ptr<ShardType>(new_shard), version);
+ // fprintf(stderr, "grow and append\n");
}
}
@@ -219,8 +230,8 @@ public:
return m_levels[0]->get_shard_count();
}
- void append_l0(std::shared_ptr<ShardType> shard) {
- m_levels[0]->append(shard);
+ void append_l0(std::shared_ptr<ShardType> shard, size_t version) {
+ m_levels[0]->append(shard, version);
}
LevelVector const &get_level_vector() const { return m_levels; }
@@ -251,6 +262,47 @@ public:
return ts_prop <= (long double) max_delete_prop;
}
+ void print_structure() const {
+ for (size_t i=0; i<m_levels.size(); i++) {
+ fprintf(stdout, "[%ld]:\t", i);
+
+ if (m_levels[i]) {
+ for (size_t j=0; j<m_levels[i]->get_shard_count(); j++) {
+ fprintf(stdout, "(%ld: %ld) ", j, m_levels[i]->get_shard(j)->get_record_count());
+ }
+ } else {
+ fprintf(stdout, "[Empty]");
+ }
+
+ fprintf(stdout, "\n");
+ }
+ }
+
+
+ void merge_structure(const ExtensionStructure* old_structure, size_t version_id = 0) {
+ assert(version_id > 0);
+
+ for (size_t i=0; i<old_structure->m_levels.size(); i++) {
+ for (size_t j=0; j<old_structure->m_levels[i]->get_shard_count(); j++) {
+ if (old_structure->m_levels[i]->get_shard_version(j) > version_id) {
+ m_levels[i]->append(old_structure->m_levels[i]->get_shard_ptr(j));
+ }
+ }
+ }
+ }
+
+ void update_shard_version(size_t version) {
+ assert(version != 0);
+
+ for (size_t i=0; i<m_levels.size(); i++) {
+ for (size_t j=0; j<m_levels[i]->get_shard_count(); j++) {
+ if (m_levels[i]->get_shard_version(j) == 0) {
+ m_levels[i]->set_shard_version(j, version);
+ }
+ }
+ }
+ }
+
private:
LevelVector m_levels;
};
diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h
index 5bc891b..37b2b40 100644
--- a/include/framework/structure/InternalLevel.h
+++ b/include/framework/structure/InternalLevel.h
@@ -15,6 +15,7 @@
*/
#pragma once
+#include <future>
#include <memory>
#include <vector>
@@ -28,6 +29,7 @@ template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
class InternalLevel {
typedef typename ShardType::RECORD RecordType;
typedef BufferView<RecordType> BuffView;
+ typedef std::pair<std::shared_ptr<ShardType>, size_t> shard_ptr;
public:
InternalLevel(ssize_t level_no) : m_level_no(level_no) {}
@@ -47,8 +49,8 @@ public:
std::vector<const ShardType *> shards;
for (auto shard : m_shards) {
- if (shard)
- shards.emplace_back(shard.get());
+ if (shard.first)
+ shards.emplace_back(shard.first.get());
}
return new ShardType(shards);
@@ -59,10 +61,10 @@ public:
std::vector<typename QueryType::LocalQuery *> &local_queries,
typename QueryType::Parameters *query_parms) const {
for (size_t i = 0; i < m_shards.size(); i++) {
- if (m_shards[i]) {
+ if (m_shards[i].first) {
auto local_query =
- QueryType::local_preproc(m_shards[i].get(), query_parms);
- shards.push_back({{m_level_no, (ssize_t)i}, m_shards[i].get()});
+ QueryType::local_preproc(m_shards[i].first.get(), query_parms);
+ shards.push_back({{m_level_no, (ssize_t)i}, m_shards[i].first.get()});
local_queries.emplace_back(local_query);
}
}
@@ -74,7 +76,7 @@ public:
for (int i = m_shards.size() - 1; i >= (ssize_t)shard_stop; i--) {
if (m_shards[i]) {
- auto res = m_shards[i]->point_lookup(rec, true);
+ auto res = m_shards[i].first->point_lookup(rec, true);
if (res && res->is_tombstone()) {
return true;
}
@@ -88,8 +90,8 @@ public:
return false;
for (size_t i = 0; i < m_shards.size(); ++i) {
- if (m_shards[i]) {
- auto res = m_shards[i]->point_lookup(rec);
+ if (m_shards[i].first) {
+ auto res = m_shards[i].first->point_lookup(rec);
if (res) {
res->set_delete();
return true;
@@ -105,7 +107,15 @@ public:
return nullptr;
}
- return m_shards[idx].get();
+ return m_shards[idx].first.get();
+ }
+
+ const size_t get_shard_version(size_t idx) const {
+ if (idx >= m_shards.size()) {
+ return 0;
+ }
+
+ return m_shards[idx].second;
}
size_t get_shard_count() const { return m_shards.size(); }
@@ -113,8 +123,8 @@ public:
size_t get_record_count() const {
size_t cnt = 0;
for (size_t i = 0; i < m_shards.size(); i++) {
- if (m_shards[i]) {
- cnt += m_shards[i]->get_record_count();
+ if (m_shards[i].first) {
+ cnt += m_shards[i].first->get_record_count();
}
}
@@ -124,8 +134,8 @@ public:
size_t get_tombstone_count() const {
size_t res = 0;
for (size_t i = 0; i < m_shards.size(); ++i) {
- if (m_shards[i]) {
- res += m_shards[i]->get_tombstone_count();
+ if (m_shards[i].first) {
+ res += m_shards[i].first->get_tombstone_count();
}
}
return res;
@@ -134,8 +144,8 @@ public:
size_t get_aux_memory_usage() const {
size_t cnt = 0;
for (size_t i = 0; i < m_shards.size(); i++) {
- if (m_shards[i]) {
- cnt += m_shards[i]->get_aux_memory_usage();
+ if (m_shards[i].first) {
+ cnt += m_shards[i].first->get_aux_memory_usage();
}
}
@@ -146,7 +156,7 @@ public:
size_t cnt = 0;
for (size_t i = 0; i < m_shards.size(); i++) {
if (m_shards[i]) {
- cnt += m_shards[i]->get_memory_usage();
+ cnt += m_shards[i].first->get_memory_usage();
}
}
@@ -158,8 +168,8 @@ public:
size_t reccnt = 0;
for (size_t i = 0; i < m_shards.size(); i++) {
if (m_shards[i]) {
- tscnt += m_shards[i]->get_tombstone_count();
- reccnt += m_shards[i]->get_record_count();
+ tscnt += m_shards[i].first->get_tombstone_count();
+ reccnt += m_shards[i].first->get_record_count();
}
}
@@ -169,7 +179,7 @@ public:
size_t get_nonempty_shard_count() const {
size_t cnt = 0;
for (size_t i = 0; i < m_shards.size(); i++) {
- if (m_shards[i] && m_shards[i]->get_record_count() > 0) {
+ if (m_shards[i].first && m_shards[i].first->get_record_count() > 0) {
cnt += 1;
}
}
@@ -180,7 +190,7 @@ public:
std::shared_ptr<InternalLevel> clone() const {
auto new_level = std::make_shared<InternalLevel>(m_level_no);
for (size_t i = 0; i < m_shards.size(); i++) {
- new_level->append(m_shards[i]);
+ new_level->append(m_shards[i].first, m_shards[i].second);
}
return new_level;
@@ -192,21 +202,39 @@ public:
m_shards.erase(m_shards.begin() + shard);
}
- void append(std::shared_ptr<ShardType> shard) {
- m_shards.emplace_back(shard);
+ void append(std::shared_ptr<ShardType> shard, size_t version=0) {
+ m_shards.push_back({shard, version});
+ }
+
+ void append(shard_ptr shard) {
+ m_shards.push_back(shard);
}
const ShardType *get_shard(ShardID shid) const {
if (shid < m_shards.size()) {
- return m_shards[shid].get();
+ return m_shards[shid].first.get();
}
return nullptr;
}
+ const shard_ptr get_shard_ptr(size_t shid) const {
+ if (shid < m_shards.size()) {
+ return m_shards[shid];
+ }
+
+ return {nullptr, 0};
+ }
+
+ void set_shard_version(size_t idx, size_t version) {
+ if (idx < m_shards.size()) {
+ m_shards[idx].second = version;
+ }
+ }
+
private:
ssize_t m_level_no;
- std::vector<std::shared_ptr<ShardType>> m_shards;
+ std::vector<shard_ptr> m_shards;
};
} // namespace de
diff --git a/include/util/types.h b/include/util/types.h
index e67c486..084bf4b 100644
--- a/include/util/types.h
+++ b/include/util/types.h
@@ -111,7 +111,13 @@ public:
void add_reconstruction(level_index source, level_index target,
size_t reccnt, ReconstructionType type) {
- m_tasks.push_back({{{source, all_shards_idx}}, target, reccnt});
+
+ if (type == ReconstructionType::Merge) {
+ m_tasks.push_back({{{source, all_shards_idx}, {target, all_shards_idx}}, target, reccnt});
+ } else {
+ m_tasks.push_back({{{source, all_shards_idx}}, target, reccnt});
+ }
+
total_reccnt += reccnt;
}