summaryrefslogtreecommitdiffstats
path: root/include/framework/DynamicExtension.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/DynamicExtension.h')
-rw-r--r--include/framework/DynamicExtension.h229
1 files changed, 123 insertions, 106 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 0331353..da2945a 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -36,10 +36,10 @@ class DynamicExtension {
private:
/* convenience typedefs for commonly used types within the class */
typedef typename ShardType::RECORD RecordType;
- typedef MutableBuffer<RecordType> Buffer;
- typedef ExtensionStructure<ShardType, QueryType> Structure;
- typedef Version<ShardType, QueryType> Version;
- typedef BufferView<RecordType> BufView;
+ typedef MutableBuffer<RecordType> BufferType;
+ typedef ExtensionStructure<ShardType, QueryType> StructureType;
+ typedef Version<ShardType, QueryType> VersionType;
+ typedef BufferView<RecordType> BufferViewType;
typedef ReconstructionPolicy<ShardType, QueryType> ReconPolicyType;
typedef DEConfiguration<ShardType, QueryType, D, SchedType> ConfType;
@@ -52,6 +52,11 @@ private:
static constexpr size_t QUERY = 1;
static constexpr size_t RECONSTRUCTION = 2;
+ typedef std::shared_ptr<VersionType> version_ptr;
+ typedef size_t version_id;
+ static constexpr size_t INVALID_VERSION = 0;
+ static constexpr size_t INITIAL_VERSION = 1;
+
public:
/**
* Create a new Dynamized version of a data structure, supporting
@@ -74,12 +79,14 @@ public:
* framework's scheduler for use in answering queries and
* performing compactions and flushes, etc.
*/
- DynamicExtension(ConfType &&config) : m_config(std::move(config)) {
- m_buffer = std::make_unique(m_config.buffer_flush_trigger,
- m_config.buffer_size);
-
- m_sched = SchedType(m_config.maximum_memory_usage, m_config.maximum_threads);
- m_active_version.load(std::make_shared(INITIAL_VERSION, new Structure(), m_buffer.get(), 0));
+ DynamicExtension(ConfType &&config) : m_config(std::move(config)) {
+ m_buffer =
+ std::make_unique<BufferType>(m_config.buffer_flush_trigger, m_config.buffer_size);
+
+ m_sched = std::make_unique<SchedType>(m_config.maximum_memory_usage,
+ m_config.maximum_threads);
+ m_active_version.store(
+ std::make_shared<VersionType>(INITIAL_VERSION, std::make_unique<StructureType>(), m_buffer.get(), 0));
}
/**
@@ -90,10 +97,10 @@ public:
*/
~DynamicExtension() {
/* let any in-flight version transitions finish */
- await_newest_version();
+ await_version();
/* shutdown the scheduler */
- m_sched.shutdown();
+ m_sched->shutdown();
}
/**
@@ -274,10 +281,11 @@ public:
* the index. Ownership of this object is transfered to the
* caller.
*/
+ // FIXME: switch this over to std::unique_ptr
ShardType *
create_static_structure(bool await_reconstruction_completion = false) {
if (await_reconstruction_completion) {
- await_newest_version();
+ await_version();
}
auto version = get_active_version();
@@ -317,19 +325,27 @@ public:
}
/*
- * Determines the newest pending version at the time of call, and
- * blocks until that version becomes active.
+ * Blocks until the specified version id becomes active. If no version
+ * id is provided, wait for the newest pending version to be installed.
+ *
+ * NOTE: this method will return once the specified version has been
+ * installed, but does not guarantee that the specified version
+ * is the currently active one when it returns. It is possible
+ * that the active version upon the return of this method is newer
+ * than the one requested.
*/
- void await_newest_version() {
- /*
+ void await_version(version_id vid = INVALID_VERSION) {
+ /*
* versions are assigned by fetch and add on the counter, so the
* newest assigned version number will be one less than the value
* of the counter
*/
- auto newest_pending_version = m_version_counter.load() - 1;
+ if (vid == INVALID_VERSION) {
+ vid = m_version_counter.load() - 1;
+ }
/* versions signal on m_version_advance_cv when they activate */
- while (m_active_version.load() < newest_pending_version) {
+ while (m_active_version.load()->get_id() < vid) {
std::unique_lock lk(m_version_advance_mtx);
m_version_advance_cv.wait(lk);
}
@@ -354,14 +370,13 @@ public:
* Calls SchedType::print_statistics, which should write a report of
* scheduler performance statistics to stdout.
*/
- void print_scheduler_statistics() const { m_sched.print_statistics(); }
+ void print_scheduler_statistics() const { m_sched->print_statistics(); }
private:
ConfType m_config;
-
- SchedType m_sched;
- std::unique_ptr<Buffer> m_buffer;
+ std::unique_ptr<SchedType> m_sched;
+ std::unique_ptr<BufferType> m_buffer;
size_t m_core_cnt;
std::atomic<int> m_next_core;
@@ -370,20 +385,14 @@ private:
/* versioning + concurrency variables */
std::atomic<size_t> m_version_counter;
- typedef std::atomic<std::shared_ptr<Version>> version_ptr;
-
- version_ptr m_active_version;
-
- typedef size_t version_id;
- const size_t INVALID_VERSION = 0;
- const size_t INITIAL_VERSION = 1;
+ std::atomic<std::shared_ptr<VersionType>> m_active_version;
std::condition_variable m_version_advance_cv;
std::mutex m_version_advance_mtx;
alignas(64) std::atomic<bool> m_scheduling_reconstruction;
- void enforce_delete_invariant(Version *version) {
+ void enforce_delete_invariant(VersionType *version) {
auto structure = version->get_structure();
auto compactions = structure->get_compaction_tasks();
@@ -401,7 +410,7 @@ private:
auto wait = args->result.get_future();
- m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION);
+ m_sched->schedule_job(reconstruction, 0, args, RECONSTRUCTION);
/* wait for compaction completion */
wait.get();
@@ -413,60 +422,61 @@ private:
static void reconstruction(void *arguments) {
auto args = (ReconstructionArgs<ShardType, QueryType> *)arguments;
- auto extension = (DynamicExtension *) args->extension;
+ auto extension = (DynamicExtension *)args->extension;
extension->SetThreadAffinity();
+ /*
+ * For "normal" flushes, the task vector should be empty, so this is
+ * all that will happen. Piggybacking internal reconstructions off
+ * the flush WILL bottleneck the system, but is left in place to
+ * allow the traditional policies (LEVELING, TIERING, etc.) to be
+ * emulated within the new system.
+ *
+ * Background reconstructions will not have a priority level of
+ * FLUSH, and will already have a structure present. As a result,
+ * this code will be bypassed in that case.
+ */
if (args->priority == ReconstructionPriority::FLUSH) {
/* we first construct a shard from the buffer */
auto buffview = args->version->get_buffer();
- auto new_head = buffview.get_tail();
- auto new_shard = Shard(std::move(buffview));
+ args->version->set_next_buffer_head(buffview.get_tail());
+ auto new_shard = std::make_shared<ShardType>(std::move(buffview));
- /* copy the currently active version's structure */
- auto structure = extension->get_active_version()->get_structure()->clone();
-
-
- }
+ /*
+ * Flushes already know their version id. To avoid needing to
+ * do any update reconciliation between structures, they wait
+ * until the version directly preceeding them has been installed,
+ * and only then take a copy of the structure.
+ */
+ extension->await_version(args->version->get_id() - 1);
+ StructureType *structure =
+ extension->get_active_version()->get_structure()->copy();
+ /* add the newly created shard to the structure copy */
+ structure->append_l0(new_shard);
- else {
-
+ /* set this version's structure to the newly created one */
+ args->version->set_structure(std::unique_ptr<StructureType>(structure));
}
-
- Structure *vers = args->version->get_mutable_structure();
-
- ReconstructionTask flush_task;
- flush_task.type = ReconstructionType::Invalid;
-
+ /* perform all of the reconstructions */
+ StructureType *structure = args->version->get_mutable_structure();
for (size_t i = 0; i < args->tasks.size(); i++) {
- if (args->tasks[i].sources.size() > 0 &&
- args->tasks[i].sources[0] == buffer_shid) {
- flush_task = args->tasks[i];
- continue;
- }
-
- vers->perform_reconstruction(args->tasks[i]);
+ structure->perform_reconstruction(args->tasks[i]);
}
- if (flush_task.type != ReconstructionType::Invalid) {
- /*
- * we'll grab the buffer AFTER doing the internal reconstruction, so we
- * can flush as many records as possible in one go. The reconstruction
- * was done so as to make room for the full buffer anyway, so there's
- * no real benefit to doing this first.
- */
- auto buffer_view = args->version->get_buffer();
- size_t new_head = buffer_view.get_tail();
-
- vers->perform_flush(flush_task, std::move(buffer_view));
- args->result.set_value(true);
- ((DynamicExtension *)args->extension)->advance_version(new_head);
- } else {
- args->result.set_value(true);
+ /*
+ * if there isn't already a version id on the new version (i.e., the
+ * reconstruction isn't a flush), generate one.
+ */
+ if (args->version->get_id() == INVALID_VERSION) {
+ args->version->set_id(extension->m_version_counter.fetch_add(1));
}
+ /* advance the index to the newly finished version */
+ extension->install_new_version(args->version);
+ /* manually delete the argument object */
delete args;
}
@@ -532,7 +542,7 @@ private:
version_ptr get_active_version() { return m_active_version.load(); }
- /*
+ /*
* Create a new version with an assigned version number, but without
* an assigned copy of the structure. Intended for use in flushing,
* where the structure will be copied from the currently active version
@@ -542,7 +552,8 @@ private:
version_ptr create_version() {
size_t version_id = m_version_counter.fetch_add(1);
auto active_version = get_active_version();
- version_ptr new_version = std::make_shared<Version>(version_id, nullptr, &m_buffer, 0);
+ std::shared_ptr<VersionType> new_version =
+ std::make_shared<VersionType>(version_id, nullptr, m_buffer.get(), active_version->get_buffer().get_head());
return new_version;
}
@@ -551,51 +562,57 @@ private:
* Create a new version without an assigned version number, but with
* a copy of the extension structure. This is for use with background
* reconstructions, where the underlying structure is manipulated, but
- * no version number is claimed until the version is activated, to
+ * no version number is claimed until the version is activated, to
* prevent blocking buffer flushes.
*/
- version_ptr create_version(Structure *structure) {
- version_ptr new_version = std::make_shared<Version>(INVALID_VERSION, structure, &m_buffer, 0);
+ version_ptr create_version(std::unique_ptr<StructureType> structure) {
+ auto active_version = get_active_version();
+ version_ptr new_version =
+ std::make_shared<VersionType>(INVALID_VERSION, std::move(structure), m_buffer.get(), active_version->get_buffer().get_head());
return new_version;
}
void install_new_version(version_ptr new_version) {
assert(new_version->get_structure());
- assert(new_version->get_version_number() != INVALID_VERSION);
-
- auto old = get_active_version();
- assert(new_version->get_version_number() > old->get_version_number());
+ assert(new_version->get_id() != INVALID_VERSION);
/* wait until our turn to install the new version */
- auto lk = std::unique_lock(m_version_advance_mtx);
- while (new_version.load()->get_verison_number() != old.load()->get_version_number() + 1) {
- m_version_advance_cv.wait(lk);
- old = get_active_version();
- }
+ await_version(new_version->get_id() - 1);
+
+ auto old = get_active_version();
// FIXME: implement this interface
// new_version->merge_changes_from(old.load().get());
- /*
+ /*
* Only one version can have a given number, so we are safe to
* directly assign here--nobody else is going to change it out from
- * under us. We're also protected under the mutex.
+ * under us.
+ */
+ m_active_version.store(new_version);
+
+ /*
+ * My understanding is that you don't *really* need this mutex for
+ * safety in modern C++ when sending the signal. But I'll grab it
+ * anyway out of an abundance of caution. I doubt this will be a
+ * major bottleneck.
*/
- m_active_version.store(new_version);
- m_version_advance_cv.notify_all();
+ auto lk = std::unique_lock(m_version_advance_mtx);
+ m_version_advance_cv.notify_all();
}
- Structure *create_scratch_structure() {
+ StructureType *create_scratch_structure() {
return get_active_version()->get_structure()->copy();
}
-
void begin_reconstruction_scheduling() {
bool cur_val;
do {
cur_val = m_scheduling_reconstruction.load();
- } while(cur_val == true && !m_scheduling_reconstruction.compare_exchange_strong(cur_val, true));
+ } while (
+ cur_val == true &&
+ !m_scheduling_reconstruction.compare_exchange_strong(cur_val, true));
}
void end_reconstruction_scheduling() {
@@ -608,16 +625,17 @@ private:
auto new_version = create_version();
auto *args = new ReconstructionArgs<ShardType, QueryType>();
- args->version.load(new_version);
+ args->version = new_version;
args->tasks = m_recon_policy->get_flush_tasks(args->version.get());
args->extension = this;
args->priority = ReconstructionPriority::FLUSH;
- /*
- * NOTE: args is deleted by the reconstruction job, so shouldn't be
- * freed here
+ /*
+ * NOTE: args is deleted by the reconstruction job, so shouldn't be
+ * freed here
*/
- m_sched.schedule_job(reconstruction, m_buffer->get_hwm(), args, RECONSTRUCTION);
+ m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args,
+ RECONSTRUCTION);
if (m_config.recon_enable_maint_on_flush) {
schedule_maint_reconstruction(false);
@@ -626,28 +644,27 @@ private:
end_reconstruction_scheduling();
}
-
- void schedule_maint_reconstruction(bool take_reconstruction_lock=true) {
+ void schedule_maint_reconstruction(bool take_reconstruction_lock = true) {
if (take_reconstruction_lock) {
begin_reconstruction_scheduling();
}
// FIXME: memory management issue here?
- auto new_version = create_version(m_active_version.load()->get_structure());
+ auto new_version = create_version(std::unique_ptr<StructureType>(m_active_version.load()->get_structure()->copy()));
auto *args = new ReconstructionArgs<ShardType, QueryType>();
- args->version.load(new_version);
+ args->version = new_version;
args->tasks = m_recon_policy->get_reconstruction_tasks(args->version.get());
args->extension = this;
args->priority = ReconstructionPriority::MAINT;
- /*
- * NOTE: args is deleted by the reconstruction job, so shouldn't be
- * freed here
+ /*
+ * NOTE: args is deleted by the reconstruction job, so shouldn't be
+ * freed here
*/
- m_sched.schedule_job(reconstruction, m_buffer->get_hwm(), args, RECONSTRUCTION);
-
+ m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args,
+ RECONSTRUCTION);
if (take_reconstruction_lock) {
end_reconstruction_scheduling();
@@ -662,7 +679,7 @@ private:
args->query_parms = std::move(query_parms);
auto result = args->result_set.get_future();
- m_sched.schedule_job(async_query, 0, (void *)args, QUERY);
+ m_sched->schedule_job(async_query, 0, (void *)args, QUERY);
return result;
}