summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2025-01-17 17:28:50 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2025-01-17 17:28:50 -0500
commit6c906c94e1eea6d4356b8c99b93da39029e8d95d (patch)
tree8c5cd0d5bc5186cf6f19b29a94d424224a1cde8e
parent77589d4cc82b766d2cf16294fab98a57f6579cb4 (diff)
downloaddynamic-extension-6c906c94e1eea6d4356b8c99b93da39029e8d95d.tar.gz
Progress
-rw-r--r--CMakeLists.txt4
-rw-r--r--include/framework/DynamicExtension.h446
-rw-r--r--include/framework/reconstruction/BSMPolicy.h12
-rw-r--r--include/framework/reconstruction/FixedShardCountPolicy.h25
-rw-r--r--include/framework/reconstruction/FloodL0Policy.h11
-rw-r--r--include/framework/reconstruction/LevelingPolicy.h14
-rw-r--r--include/framework/reconstruction/ReconstructionPolicy.h4
-rw-r--r--include/framework/reconstruction/TieringPolicy.h15
-rw-r--r--include/framework/scheduling/Epoch.h20
-rw-r--r--include/framework/scheduling/Task.h11
-rw-r--r--include/framework/structure/ExtensionStructure.h21
-rw-r--r--include/framework/util/Configuration.h20
12 files changed, 256 insertions, 347 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index c332448..5e62641 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1,7 +1,7 @@
cmake_minimum_required(VERSION 3.22)
-set(CMAKE_C_COMPILER gcc)
-set(CMAKE_CXX_COMPILER g++)
+set(CMAKE_C_COMPILER clang)
+set(CMAKE_CXX_COMPILER clang++)
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED True)
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 5a64243..ea3ef4d 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -11,15 +11,19 @@
#include <atomic>
#include <cstdio>
+#include <mutex>
#include <vector>
#include "framework/interface/Scheduler.h"
#include "framework/reconstruction/ReconstructionPolicy.h"
#include "framework/scheduling/SerialScheduler.h"
+#include "framework/scheduling/Task.h"
#include "framework/structure/ExtensionStructure.h"
#include "framework/structure/MutableBuffer.h"
+#include "framework/util/Configuration.h"
+
#include "framework/scheduling/Epoch.h"
#include "util/types.h"
@@ -34,9 +38,10 @@ private:
typedef typename ShardType::RECORD RecordType;
typedef MutableBuffer<RecordType> Buffer;
typedef ExtensionStructure<ShardType, QueryType> Structure;
- typedef Epoch<ShardType, QueryType> _Epoch;
+ typedef Version<ShardType, QueryType> Version;
typedef BufferView<RecordType> BufView;
typedef ReconstructionPolicy<ShardType, QueryType> ReconPolicyType;
+ typedef DEConfiguration<ShardType, QueryType, D, SchedType> ConfType;
typedef typename QueryType::Parameters Parameters;
typedef typename QueryType::LocalQuery LocalQuery;
@@ -47,11 +52,6 @@ private:
static constexpr size_t QUERY = 1;
static constexpr size_t RECONSTRUCTION = 2;
- struct epoch_ptr {
- _Epoch *epoch;
- size_t refcnt;
- };
-
public:
/**
* Create a new Dynamized version of a data structure, supporting
@@ -74,42 +74,26 @@ public:
* framework's scheduler for use in answering queries and
* performing compactions and flushes, etc.
*/
- DynamicExtension(ReconPolicyType *recon_policy, size_t buffer_low_watermark,
- size_t buffer_high_watermark = 0, size_t memory_budget = 0,
- size_t thread_cnt = 16)
- : m_max_delete_prop(1), m_sched(memory_budget, thread_cnt),
- m_buffer(new Buffer(buffer_low_watermark, (buffer_high_watermark == 0)
- ? buffer_low_watermark
- : buffer_high_watermark)),
- m_core_cnt(thread_cnt), m_next_core(0), m_epoch_cnt(0),
- m_recon_policy(recon_policy) {
-
- auto vers = new Structure();
- m_current_epoch.store({new _Epoch(0, vers, m_buffer, 0), 0});
- m_previous_epoch.store({nullptr, 0});
- m_next_epoch.store({nullptr, 0});
+ DynamicExtension(ConfType &&config) : m_config(std::move(config)) {
+ m_buffer = std::make_unique(m_config.buffer_flush_trigger,
+ m_config.buffer_size);
+
+ m_sched = SchedType(m_config.maximum_memory_usage, m_config.maximum_threads);
+ m_active_version.load(std::make_shared(INITIAL_VERSION, new Structure(), m_buffer.get(), 0));
}
/**
* Destructor for DynamicExtension. Will block until the completion of
- * any outstanding epoch transition, shut down the scheduler, and free
+ * any outstanding version transition, shut down the scheduler, and free
* all currently allocated shards, buffers, etc., by calling their
* destructors.
*/
~DynamicExtension() {
-
- /* let any in-flight epoch transition finish */
- await_next_epoch();
+ /* let any in-flight version transitions finish */
+ await_newest_version();
/* shutdown the scheduler */
m_sched.shutdown();
-
- /* delete all held resources */
- delete m_next_epoch.load().epoch;
- delete m_current_epoch.load().epoch;
- delete m_previous_epoch.load().epoch;
-
- delete m_buffer;
}
/**
@@ -164,15 +148,11 @@ public:
"Tagging is only supported in single-threaded operation");
auto view = m_buffer->get_buffer_view();
-
- auto epoch = get_active_epoch();
- if (epoch->get_mutable_structure()->tagged_delete(rec)) {
- end_job(epoch);
+ auto version = get_active_version();
+ if (version->get_mutable_structure()->tagged_delete(rec)) {
return 1;
}
- end_job(epoch);
-
/*
* the buffer will take the longest amount of time, and
* probably has the lowest probability of having the record,
@@ -208,10 +188,9 @@ public:
* @return The number of records within the index
*/
size_t get_record_count() {
- auto epoch = get_active_epoch();
- auto t = epoch->get_buffer().get_record_count() +
- epoch->get_structure()->get_record_count();
- end_job(epoch);
+ auto version = get_active_version();
+ auto t = version->get_buffer().get_record_count() +
+ version->get_structure()->get_record_count();
return t;
}
@@ -224,10 +203,9 @@ public:
* @return The number of tombstone records within the index
*/
size_t get_tombstone_count() {
- auto epoch = get_active_epoch();
- auto t = epoch->get_buffer().get_tombstone_count() +
- epoch->get_structure()->get_tombstone_count();
- end_job(epoch);
+ auto version = get_active_version();
+ auto t = version->get_buffer().get_tombstone_count() +
+ version->get_structure()->get_tombstone_count();
return t;
}
@@ -241,11 +219,7 @@ public:
* @return The number of levels within the index
*/
size_t get_height() {
- auto epoch = get_active_epoch();
- auto t = epoch->get_structure()->get_height();
- end_job(epoch);
-
- return t;
+ return get_active_version()->get_structure()->get_height();
}
/**
@@ -254,11 +228,7 @@ public:
* @return The number of non-empty shards within the index
*/
size_t get_shard_count() {
- auto epoch = get_active_epoch();
- auto s = epoch->get_structure()->get_shard_count();
- end_job(epoch);
-
- return s;
+ return get_active_version()->get_structure()->get_shard_count();
}
/**
@@ -272,10 +242,9 @@ public:
* ShardType::get_memory_usage) and the buffer by the index.
*/
size_t get_memory_usage() {
- auto epoch = get_active_epoch();
+ auto version = get_active_version();
auto t = m_buffer->get_memory_usage() +
- epoch->get_structure()->get_memory_usage();
- end_job(epoch);
+ version->get_structure()->get_memory_usage();
return t;
}
@@ -289,11 +258,7 @@ public:
* (as reported by ShardType::get_aux_memory_usage) by the index.
*/
size_t get_aux_memory_usage() {
- auto epoch = get_active_epoch();
- auto t = epoch->get_structure()->get_aux_memory_usage();
- end_job(epoch);
-
- return t;
+ return get_active_version()->get_structure()->get_aux_memory_usage();
}
/**
@@ -312,19 +277,19 @@ public:
ShardType *
create_static_structure(bool await_reconstruction_completion = false) {
if (await_reconstruction_completion) {
- await_next_epoch();
+ await_newest_version();
}
- auto epoch = get_active_epoch();
- auto vers = epoch->get_structure();
+ auto version = get_active_version();
+ auto structure = version->get_structure();
std::vector<const ShardType *> shards;
- if (vers->get_level_vector().size() > 0) {
- for (int i = vers->get_level_vector().size() - 1; i >= 0; i--) {
- if (vers->get_level_vector()[i] &&
- vers->get_level_vector()[i]->get_record_count() > 0) {
+ if (structure->get_level_vector().size() > 0) {
+ for (int i = structure->get_level_vector().size() - 1; i >= 0; i--) {
+ if (structure->get_level_vector()[i] &&
+ structure->get_level_vector()[i]->get_record_count() > 0) {
shards.emplace_back(
- vers->get_level_vector()[i]->get_combined_shard());
+ structure->get_level_vector()[i]->get_combined_shard());
}
}
}
@@ -336,7 +301,7 @@ public:
* head pointer any longer
*/
{
- auto bv = epoch->get_buffer();
+ auto bv = version->get_buffer();
if (bv.get_record_count() > 0) {
shards.emplace_back(new ShardType(std::move(bv)));
}
@@ -348,19 +313,28 @@ public:
delete shard;
}
- end_job(epoch);
return flattened;
}
/*
- * If the current epoch is *not* the newest one, then wait for
- * the newest one to become available. Otherwise, returns immediately.
+ * Determines the newest pending version at the time of call, and
+ * blocks until that version becomes active.
*/
- void await_next_epoch() {
- while (m_next_epoch.load().epoch != nullptr) {
- std::unique_lock<std::mutex> lk(m_epoch_cv_lk);
- m_epoch_cv.wait(lk);
+ void await_newest_version() {
+ /*
+ * versions are assigned by fetch and add on the counter, so the
+ * newest assigned version number will be one less than the value
+ * of the counter
+ */
+ auto newest_pending_version = m_version_counter.load() - 1;
+
+ /* versions signal on m_version_advance_cv when they activate */
+ while (m_active_version.load() < newest_pending_version) {
+ std::unique_lock lk(m_version_advance_mtx);
+ m_version_advance_cv.wait(lk);
}
+
+ return;
}
/**
@@ -372,11 +346,8 @@ public:
* satisfied, and false if it is not.
*/
bool validate_tombstone_proportion() {
- auto epoch = get_active_epoch();
- auto t = epoch->get_structure()->validate_tombstone_proportion(
- m_max_delete_prop);
- end_job(epoch);
- return t;
+ return get_active_version()->get_structure()->validate_tombstone_proportion(
+ m_config.maximum_delete_proportion);
}
/**
@@ -386,28 +357,34 @@ public:
void print_scheduler_statistics() const { m_sched.print_statistics(); }
private:
- double m_max_delete_prop;
+ ConfType m_config;
+
SchedType m_sched;
- Buffer *m_buffer;
+ std::unique_ptr<Buffer> m_buffer;
size_t m_core_cnt;
std::atomic<int> m_next_core;
- std::atomic<size_t> m_epoch_cnt;
ReconPolicyType const *m_recon_policy;
- alignas(64) std::atomic<bool> m_reconstruction_scheduled;
+ /* versioning + concurrency variables */
+ std::atomic<size_t> m_version_counter;
+ typedef std::atomic<std::shared_ptr<Version>> version_ptr;
+
+ version_ptr m_active_version;
- std::atomic<epoch_ptr> m_next_epoch;
- std::atomic<epoch_ptr> m_current_epoch;
- std::atomic<epoch_ptr> m_previous_epoch;
+ typedef size_t version_id;
+ const size_t INVALID_VERSION = 0;
+ const size_t INITIAL_VERSION = 1;
- std::condition_variable m_epoch_cv;
- std::mutex m_epoch_cv_lk;
+ std::condition_variable m_version_advance_cv;
+ std::mutex m_version_advance_mtx;
- void enforce_delete_invariant(_Epoch *epoch) {
- auto structure = epoch->get_structure();
+ alignas(64) std::atomic<bool> m_scheduling_reconstruction;
+
+ void enforce_delete_invariant(Version *version) {
+ auto structure = version->get_structure();
auto compactions = structure->get_compaction_tasks();
while (compactions.size() > 0) {
@@ -415,7 +392,7 @@ private:
/* schedule a compaction */
ReconstructionArgs<ShardType, QueryType> *args =
new ReconstructionArgs<ShardType, QueryType>();
- args->epoch = epoch;
+ args->version = version;
args->merges = compactions;
args->extension = this;
args->compaction = true;
@@ -434,137 +411,11 @@ private:
}
}
- _Epoch *get_active_epoch() {
- epoch_ptr old, new_ptr;
-
- do {
- /*
- * during an epoch transition, a nullptr will installed in the
- * current_epoch. At this moment, the "new" current epoch will
- * soon be installed, but the "current" current epoch has been
- * moved back to m_previous_epoch.
- */
- if (m_current_epoch.load().epoch == nullptr) {
- old = m_previous_epoch;
- new_ptr = {old.epoch, old.refcnt + 1};
- if (old.epoch != nullptr &&
- m_previous_epoch.compare_exchange_strong(old, new_ptr)) {
- break;
- }
- } else {
- old = m_current_epoch;
- new_ptr = {old.epoch, old.refcnt + 1};
- if (old.epoch != nullptr &&
- m_current_epoch.compare_exchange_strong(old, new_ptr)) {
- break;
- }
- }
- } while (true);
-
- assert(new_ptr.refcnt > 0);
-
- return new_ptr.epoch;
- }
-
- void advance_epoch(size_t buffer_head) {
-
- retire_epoch(m_previous_epoch.load().epoch);
-
- epoch_ptr tmp = {nullptr, 0};
- epoch_ptr cur;
- do {
- cur = m_current_epoch;
- } while (!m_current_epoch.compare_exchange_strong(cur, tmp));
-
- m_previous_epoch.store(cur);
-
- // FIXME: this may currently block because there isn't any
- // query preemption yet. At this point, we'd need to either
- // 1) wait for all queries on the old_head to finish
- // 2) kill all queries on the old_head
- // 3) somehow migrate all queries on the old_head to the new
- // version
- while (!m_next_epoch.load().epoch->advance_buffer_head(buffer_head)) {
- _mm_pause();
- }
-
- m_current_epoch.store(m_next_epoch);
- m_next_epoch.store({nullptr, 0});
-
- /* notify any blocking threads that the new epoch is available */
- m_epoch_cv_lk.lock();
- m_epoch_cv.notify_all();
- m_epoch_cv_lk.unlock();
- }
-
- /*
- * Creates a new epoch by copying the currently active one. The new epoch's
- * structure will be a shallow copy of the old one's.
- */
- _Epoch *create_new_epoch() {
- /*
- * This epoch access is _not_ protected under the assumption that
- * only one reconstruction will be able to trigger at a time. If that
- * condition is violated, it is possible that this code will clone a retired
- * epoch.
- */
- assert(m_next_epoch.load().epoch == nullptr);
- auto current_epoch = get_active_epoch();
-
- m_epoch_cnt.fetch_add(1);
- m_next_epoch.store({current_epoch->clone(m_epoch_cnt.load()), 0});
-
- end_job(current_epoch);
-
- return m_next_epoch.load().epoch;
- }
-
- void retire_epoch(_Epoch *epoch) {
- /*
- * Epochs with currently active jobs cannot
- * be retired. By the time retire_epoch is called,
- * it is assumed that a new epoch is active, meaning
- * that the epoch to be retired should no longer
- * accumulate new active jobs. Eventually, this
- * number will hit zero and the function will
- * proceed.
- */
-
- if (epoch == nullptr) {
- return;
- }
-
- epoch_ptr old, new_ptr;
- new_ptr = {nullptr, 0};
- do {
- old = m_previous_epoch.load();
-
- /*
- * If running in single threaded mode, the failure to retire
- * an Epoch will result in the thread of execution blocking
- * indefinitely.
- */
- if constexpr (std::same_as<SchedType, SerialScheduler>) {
- if (old.epoch == epoch)
- assert(old.refcnt == 0);
- }
-
- if (old.epoch == epoch && old.refcnt == 0 &&
- m_previous_epoch.compare_exchange_strong(old, new_ptr)) {
- break;
- }
- usleep(1);
-
- } while (true);
-
- delete epoch;
- }
-
static void reconstruction(void *arguments) {
auto args = (ReconstructionArgs<ShardType, QueryType> *)arguments;
((DynamicExtension *)args->extension)->SetThreadAffinity();
- Structure *vers = args->epoch->get_mutable_structure();
+ Structure *vers = args->version->get_mutable_structure();
ReconstructionTask flush_task;
flush_task.type = ReconstructionType::Invalid;
@@ -586,18 +437,18 @@ private:
* was done so as to make room for the full buffer anyway, so there's
* no real benefit to doing this first.
*/
- auto buffer_view = args->epoch->get_buffer();
+ auto buffer_view = args->version->get_buffer();
size_t new_head = buffer_view.get_tail();
vers->perform_flush(flush_task, std::move(buffer_view));
args->result.set_value(true);
- ((DynamicExtension *)args->extension)->advance_epoch(new_head);
+ ((DynamicExtension *)args->extension)->advance_version(new_head);
} else {
args->result.set_value(true);
}
((DynamicExtension *)args->extension)
- ->m_reconstruction_scheduled.store(false);
+ ->m_scheduling_reconstruction.store(false);
delete args;
}
@@ -605,10 +456,10 @@ private:
static void async_query(void *arguments) {
auto *args = (QueryArgs<ShardType, QueryType, DynamicExtension> *)arguments;
- auto epoch = args->extension->get_active_epoch();
+ auto version = args->extension->get_active_version();
- auto buffer = epoch->get_buffer();
- auto vers = epoch->get_structure();
+ auto buffer = version->get_buffer();
+ auto vers = version->get_structure();
auto *parms = &(args->query_parms);
/* create initial buffer query */
@@ -653,9 +504,6 @@ private:
/* return the output vector to caller via the future */
args->result_set.set_value(std::move(output));
- /* officially end the query job, releasing the pin on the epoch */
- args->extension->end_job(epoch);
-
/* clean up memory allocated for temporary query objects */
delete buffer_query;
for (size_t i = 0; i < local_queries.size(); i++) {
@@ -665,22 +513,89 @@ private:
delete args;
}
- void schedule_reconstruction() {
- auto epoch = create_new_epoch();
+ version_ptr get_active_version() { return m_active_version.load(); }
+
+ /*
+ * Create a new version with an assigned version number, but without
+ * an assigned copy of the structure. Intended for use in flushing,
+ * where the structure will be copied from the currently active version
+ * at the time it is activated, but the version number must be claimed
+ * early to minimize activation blocking.
+ */
+ version_ptr create_version() {
+ size_t version_id = m_version_counter.fetch_add(1);
+ auto active_version = get_active_version();
+ version_ptr new_version = std::make_shared<Version>(version_id, nullptr, &m_buffer, 0);
+
+ return new_version;
+ }
+
+ /*
+ * Create a new version without an assigned version number, but with
+ * a copy of the extension structure. This is for use with background
+ * reconstructions, where the underlying structure is manipulated, but
+ * no version number is claimed until the version is activated, to
+ * prevent blocking buffer flushes.
+ */
+ version_ptr create_version(Structure *structure) {
+ version_ptr new_version = std::make_shared<Version>(INVALID_VERSION, structure, &m_buffer, 0);
+
+ return new_version;
+ }
+
+ void install_new_version(version_ptr new_version) {
+ assert(new_version->get_structure());
+ assert(new_version->get_version_number() != INVALID_VERSION);
+
+ auto old = get_active_version();
+ assert(new_version->get_version_number() > old->get_version_number());
+
+ /* wait until our turn to install the new version */
+ auto lk = std::unique_lock(m_version_advance_mtx);
+ while (new_version.load()->get_verison_number() != old.load()->get_version_number() + 1) {
+ m_version_advance_cv.wait(lk);
+ old = get_active_version();
+ }
+
+ // FIXME: implement this interface
+ // new_version->merge_changes_from(old.load().get());
+
+ /*
+ * Only one version can have a given number, so we are safe to
+ * directly assign here--nobody else is going to change it out from
+ * under us. We're also protected under the mutex.
+ */
+ m_active_version.store(new_version);
+ m_version_advance_cv.notify_all();
+ }
+
+ Structure *create_scratch_structure() {
+ return get_active_version()->get_structure()->copy();
+ }
+
+ void schedule_flush() {
+ auto new_version = create_version();
- ReconstructionArgs<ShardType, QueryType> *args =
- new ReconstructionArgs<ShardType, QueryType>();
- args->epoch = epoch;
- args->tasks = m_recon_policy->get_reconstruction_tasks(
- epoch, m_buffer->get_high_watermark());
+ auto *args = new ReconstructionArgs<ShardType, QueryType>();
+ args->version.load(new_version);
+ args->tasks = m_recon_policy->get_flush_tasks(args->version.get());
args->extension = this;
- args->tasks.add_reconstruction(m_recon_policy->get_flush_task(epoch));
+ args->priority = ReconstructionPriority::FLUSH;
/*
- * NOTE: args is deleted by the reconstruction job, so shouldn't be freed
- * here
+ * NOTE: args is deleted by the reconstruction job, so shouldn't be
+ * freed here
*/
- m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION);
+ m_sched.schedule_job(reconstruction, m_buffer->get_hwm(), args, RECONSTRUCTION);
+
+ if (m_config.recon_enable_maint_on_flush) {
+ schedule_maint_reconstruction();
+ }
+ }
+
+
+ void schedule_maint_reconstruction() {
+ return;
}
std::future<QueryResult> schedule_query(Parameters &&query_parms) {
@@ -697,9 +612,8 @@ private:
int internal_append(const RecordType &rec, bool ts) {
if (m_buffer->is_at_low_watermark()) {
auto old = false;
-
- if (m_reconstruction_scheduled.compare_exchange_strong(old, true)) {
- schedule_reconstruction();
+ if (m_scheduling_reconstruction.compare_exchange_strong(old, true)) {
+ schedule_flush();
}
}
@@ -738,45 +652,5 @@ private:
#else
void SetThreadAffinity() {}
#endif
-
- void end_job(_Epoch *epoch) {
- epoch_ptr old, new_ptr;
-
- do {
- if (m_previous_epoch.load().epoch == epoch) {
- old = m_previous_epoch;
- /*
- * This could happen if we get into the system during a
- * transition. In this case, we can just back out and retry
- */
- if (old.epoch == nullptr) {
- continue;
- }
-
- assert(old.refcnt > 0);
-
- new_ptr = {old.epoch, old.refcnt - 1};
- if (m_previous_epoch.compare_exchange_strong(old, new_ptr)) {
- break;
- }
- } else {
- old = m_current_epoch;
- /*
- * This could happen if we get into the system during a
- * transition. In this case, we can just back out and retry
- */
- if (old.epoch == nullptr) {
- continue;
- }
-
- assert(old.refcnt > 0);
-
- new_ptr = {old.epoch, old.refcnt - 1};
- if (m_current_epoch.compare_exchange_strong(old, new_ptr)) {
- break;
- }
- }
- } while (true);
- }
};
} // namespace de
diff --git a/include/framework/reconstruction/BSMPolicy.h b/include/framework/reconstruction/BSMPolicy.h
index ab8c6e4..c42b928 100644
--- a/include/framework/reconstruction/BSMPolicy.h
+++ b/include/framework/reconstruction/BSMPolicy.h
@@ -25,10 +25,10 @@ public:
: m_scale_factor(2), m_buffer_size(buffer_size) {}
ReconstructionVector
- get_reconstruction_tasks(const Epoch<ShardType, QueryType> *epoch,
+ get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
size_t incoming_reccnt) const override {
ReconstructionVector reconstructions;
- auto levels = epoch->get_structure()->get_level_vector();
+ auto levels = version->get_structure()->get_level_vector();
level_index target_level = find_reconstruction_target(levels);
assert(target_level != -1);
@@ -53,10 +53,10 @@ public:
return reconstructions;
}
- ReconstructionTask
- get_flush_task(const Epoch<ShardType, QueryType> *epoch) const override {
- return ReconstructionTask{
- {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Merge};
+ ReconstructionVector
+ get_flush_tasks(const Version<ShardType, QueryType> *version) const override {
+ ReconstructionVector v;
+ v.add_reconstruction(ReconstructionTask {{buffer_shid}, 0, m_buffer_size, ReconstructionType::Merge});
}
private:
diff --git a/include/framework/reconstruction/FixedShardCountPolicy.h b/include/framework/reconstruction/FixedShardCountPolicy.h
index ec8e4e6..2a3c977 100644
--- a/include/framework/reconstruction/FixedShardCountPolicy.h
+++ b/include/framework/reconstruction/FixedShardCountPolicy.h
@@ -25,31 +25,36 @@ public:
: m_buffer_size(buffer_size), m_shard_count(shard_count), m_max_reccnt(max_record_count) {}
ReconstructionVector
- get_reconstruction_tasks(const Epoch<ShardType, QueryType> *epoch,
+ get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
size_t incoming_reccnt) const override {
ReconstructionVector reconstructions;
return reconstructions;
}
- ReconstructionTask
- get_flush_task(const Epoch<ShardType, QueryType> *epoch) const override {
+ ReconstructionVector
+ get_flush_tasks(const Version<ShardType, QueryType> *version) const override {
+
+ auto levels = version->get_structure()->get_level_vector();
- auto levels = epoch->get_structure()->get_level_vector();
+ ReconstructionVector v;
if (levels.size() == 0) {
- return ReconstructionTask{
- {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append};
+ v.add_reconstruction(ReconstructionTask{
+ {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append});
+ return v;
}
ShardID last_shid = {0, (shard_index) (levels[0]->get_shard_count() - 1)};
if (levels[0]->get_shard(last_shid.shard_idx)->get_record_count() + m_buffer_size <= capacity()) {
- return ReconstructionTask{
- {{buffer_shid, last_shid}}, 0, m_buffer_size, ReconstructionType::Merge};
+ v.add_reconstruction(ReconstructionTask{
+ {{buffer_shid, last_shid}}, 0, m_buffer_size, ReconstructionType::Merge});
+ return v;
} else {
- return ReconstructionTask{
- {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append};
+ v.add_reconstruction(ReconstructionTask{
+ {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append});
+ return v;
}
}
diff --git a/include/framework/reconstruction/FloodL0Policy.h b/include/framework/reconstruction/FloodL0Policy.h
index da4c297..8304d8a 100644
--- a/include/framework/reconstruction/FloodL0Policy.h
+++ b/include/framework/reconstruction/FloodL0Policy.h
@@ -24,7 +24,7 @@ public:
FloodL0Policy(size_t buffer_size) : m_buffer_size(buffer_size) {}
ReconstructionVector
- get_reconstruction_tasks(const Epoch<ShardType, QueryType> *epoch,
+ get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
size_t incoming_reccnt) const override {
ReconstructionVector reconstructions;
@@ -32,10 +32,11 @@ public:
}
- ReconstructionTask
- get_flush_task(const Epoch<ShardType, QueryType> *epoch) const override {
- return ReconstructionTask{
- {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append};
+ ReconstructionVector
+ get_flush_tasks(const Version<ShardType, QueryType> *version) const override {
+ ReconstructionVector v;
+ v.add_reconstruction(ReconstructionTask{
+ {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append});
}
private:
diff --git a/include/framework/reconstruction/LevelingPolicy.h b/include/framework/reconstruction/LevelingPolicy.h
index add28ba..176492e 100644
--- a/include/framework/reconstruction/LevelingPolicy.h
+++ b/include/framework/reconstruction/LevelingPolicy.h
@@ -25,10 +25,10 @@ public:
: m_scale_factor(scale_factor), m_buffer_size(buffer_size) {}
ReconstructionVector
- get_reconstruction_tasks(const Epoch<ShardType, QueryType> *epoch,
+ get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
size_t incoming_reccnt) const override {
ReconstructionVector reconstructions;
- auto levels = epoch->get_structure()->get_level_vector();
+ auto levels = version->get_structure()->get_level_vector();
level_index target_level = find_reconstruction_target(levels);
assert(target_level != -1);
@@ -51,10 +51,12 @@ public:
return reconstructions;
}
- ReconstructionTask
- get_flush_task(const Epoch<ShardType, QueryType> *epoch) const override {
- return ReconstructionTask{
- {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Merge};
+ ReconstructionVector
+ get_flush_tasks(const Version<ShardType, QueryType> *version) const override {
+ ReconstructionVector v;
+ v.add_reconstruction(ReconstructionTask{
+ {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Merge});
+ return v;
}
private:
diff --git a/include/framework/reconstruction/ReconstructionPolicy.h b/include/framework/reconstruction/ReconstructionPolicy.h
index aa213df..2c737de 100644
--- a/include/framework/reconstruction/ReconstructionPolicy.h
+++ b/include/framework/reconstruction/ReconstructionPolicy.h
@@ -23,8 +23,8 @@ class ReconstructionPolicy {
public:
ReconstructionPolicy() {}
- virtual ReconstructionVector get_reconstruction_tasks(const Epoch<ShardType, QueryType> *epoch,
+ virtual ReconstructionVector get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
size_t incoming_reccnt) const = 0;
- virtual ReconstructionTask get_flush_task(const Epoch<ShardType, QueryType> *epoch) const = 0;
+ virtual ReconstructionVector get_flush_tasks(const Version<ShardType, QueryType> *version) const = 0;
};
}
diff --git a/include/framework/reconstruction/TieringPolicy.h b/include/framework/reconstruction/TieringPolicy.h
index 1443309..63be5fe 100644
--- a/include/framework/reconstruction/TieringPolicy.h
+++ b/include/framework/reconstruction/TieringPolicy.h
@@ -23,10 +23,10 @@ public:
: m_scale_factor(scale_factor), m_buffer_size(buffer_size) {}
ReconstructionVector
- get_reconstruction_tasks(const Epoch<ShardType, QueryType> *epoch,
+ get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
size_t incoming_reccnt) const override {
ReconstructionVector reconstructions;
- auto levels = epoch->get_structure()->get_level_vector();
+ auto levels = version->get_structure()->get_level_vector();
level_index target_level = find_reconstruction_target(levels);
assert(target_level != -1);
@@ -49,10 +49,13 @@ public:
return reconstructions;
}
- ReconstructionTask
- get_flush_task(const Epoch<ShardType, QueryType> *epoch) const override {
- return ReconstructionTask{
- {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append};
+ ReconstructionVector
+ get_flush_tasks(const Version<ShardType, QueryType> *version) const override {
+ ReconstructionVector v;
+
+ v.add_reconstruction(ReconstructionTask{
+ {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append});
+ return v;
}
private:
diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h
index 95c64ea..7583727 100644
--- a/include/framework/scheduling/Epoch.h
+++ b/include/framework/scheduling/Epoch.h
@@ -17,7 +17,7 @@
namespace de {
template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
-class Epoch {
+class Version {
private:
typedef typename ShardType::RECORD RecordType;
typedef MutableBuffer<RecordType> Buffer;
@@ -25,17 +25,17 @@ private:
typedef BufferView<RecordType> BufView;
public:
- Epoch(size_t number = 0)
+ Version(size_t number = 0)
: m_buffer(nullptr), m_structure(nullptr), m_active_merge(false),
m_epoch_number(number), m_buffer_head(0) {}
- Epoch(size_t number, Structure *structure, Buffer *buff, size_t head)
+ Version(size_t number, Structure *structure, Buffer *buff, size_t head)
: m_buffer(buff), m_structure(structure), m_active_merge(false),
m_epoch_number(number), m_buffer_head(head) {
structure->take_reference();
}
- ~Epoch() {
+ ~Version() {
if (m_structure) {
m_structure->release_reference();
}
@@ -49,10 +49,10 @@ public:
* Epochs are *not* copyable or movable. Only one can exist, and all users
* of it work with pointers
*/
- Epoch(const Epoch &) = delete;
- Epoch(Epoch &&) = delete;
- Epoch &operator=(const Epoch &) = delete;
- Epoch &operator=(Epoch &&) = delete;
+ Version(const Version &) = delete;
+ Version(Version &&) = delete;
+ Version &operator=(const Version &) = delete;
+ Version &operator=(Version &&) = delete;
size_t get_epoch_number() const { return m_epoch_number; }
@@ -68,9 +68,9 @@ public:
* the same one. The epoch number of the new epoch will be set to the
* provided argument.
*/
- Epoch *clone(size_t number) {
+ Version *clone(size_t number) {
std::unique_lock<std::mutex> m_buffer_lock;
- auto epoch = new Epoch(number);
+ auto epoch = new Version(number);
epoch->m_buffer = m_buffer;
epoch->m_buffer_head = m_buffer_head;
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
index 7242bef..1ab35d2 100644
--- a/include/framework/scheduling/Task.h
+++ b/include/framework/scheduling/Task.h
@@ -23,14 +23,19 @@
namespace de {
+enum class ReconstructionPriority {
+ FLUSH = 0,
+ CMPCT = 1,
+ MAINT = 2
+};
+
template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
struct ReconstructionArgs {
typedef typename ShardType::RECORD RecordType;
- Epoch<ShardType, QueryType> *epoch;
+ std::atomic<std::shared_ptr<Version<ShardType, QueryType>>> version;
ReconstructionVector tasks;
- std::promise<bool> result;
- bool compaction;
void *extension;
+ ReconstructionPriority priority;
};
template <ShardInterface S, QueryInterface<S> Q, typename DE> struct QueryArgs {
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index 078c4a9..da91509 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -164,17 +164,18 @@ public:
assert(shid.level_idx < (level_index) m_levels.size());
assert(shid.shard_idx >= -1);
- if (shid == buffer_shid) {
- assert(bv);
- ShardType *buffer_shard = new ShardType(std::move(*bv));
- shards.push_back(buffer_shard);
- } else if (shid.shard_idx == all_shards_idx) {
- /* if unspecified, push all shards into the vector */
- for (size_t i = 0; i < m_levels[shid.level_idx]->get_shard_count(); i++) {
+ /* if unspecified, push all shards into the vector */
+ if (shid.shard_idx == all_shards_idx) {
+ for (size_t i = 0; i < m_levels[shid.level_idx]->get_shard_count();
+ i++) {
if (m_levels[shid.level_idx]->get_shard(i)) {
shards.push_back(m_levels[shid.level_idx]->get_shard(i));
}
}
+ } else if (shid == buffer_shid) {
+ assert(bv);
+ ShardType *buffer_shard = new ShardType(std::move(buffer));
+ shards.push_back(buffer_shard);
} else {
shards.push_back(m_levels[shid.level_idx]->get_shard(shid.shard_idx));
}
@@ -186,11 +187,9 @@ public:
* Remove all of the shards processed by the operation
*/
for (ShardID shid : task.sources) {
- if (shid == buffer_shid) {
- continue;
- } else if (shid.shard_idx == all_shards_idx) {
+ if (shid.shard_idx == all_shards_idx) {
m_levels[shid.level_idx]->truncate();
- } else {
+ } else if (shid != buffer_shid) {
m_levels[shid.level_idx]->delete_shard(shid.shard_idx);
}
}
diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h
index 0477095..9c1e624 100644
--- a/include/framework/util/Configuration.h
+++ b/include/framework/util/Configuration.h
@@ -23,6 +23,26 @@ class DEConfiguration {
public:
std::unique_ptr<ReconstructionPolicy<ShardType, QueryType>> m_recon_policy;
+
+ /* buffer parameters */
+ size_t buffer_count = 1;
+ size_t buffer_size = 8000;
+ size_t buffer_flush_trigger = buffer_size / 4;
+
+ /* reconstruction triggers */
+ bool recon_enable_seek_trigger = false;
+ bool recon_enable_maint_on_flush = true;
+ bool recon_enable_delete_cmpct = false;
+
+ size_t recon_l0_capacity = 0; /* 0 for unbounded */
+ double maximum_delete_proportion = 1;
+
+ /* resource management */
+ size_t maximum_threads = 16;
+ size_t minimum_recon_threads = 1;
+ size_t minimum_query_threads = 4;
+ size_t maximum_memory_usage = 0; /* o for unbounded */
+
};
} // namespace de