summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2025-01-27 18:17:21 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2025-01-27 18:17:21 -0500
commit30da48151f58803968ca3ef5d42e66a9223d80a4 (patch)
tree23cad3718bca116016caf5aba375a3eb3a490328
parentf149a2459cfc2007f755d792b3c4e567d30c132f (diff)
downloaddynamic-extension-30da48151f58803968ca3ef5d42e66a9223d80a4.tar.gz
progress
-rw-r--r--CMakeLists.txt76
-rw-r--r--include/framework/DynamicExtension.h229
-rw-r--r--include/framework/reconstruction/BSMPolicy.h14
-rw-r--r--include/framework/reconstruction/FixedShardCountPolicy.h2
-rw-r--r--include/framework/reconstruction/FloodL0Policy.h9
-rw-r--r--include/framework/reconstruction/LevelingPolicy.h21
-rw-r--r--include/framework/reconstruction/ReconstructionPolicy.h3
-rw-r--r--include/framework/reconstruction/TieringPolicy.h17
-rw-r--r--include/framework/scheduling/Task.h2
-rw-r--r--include/framework/scheduling/Version.h102
-rw-r--r--include/framework/structure/ExtensionStructure.h8
-rw-r--r--include/framework/util/Configuration.h4
-rw-r--r--tests/de_level_concurrent.cpp6
-rw-r--r--tests/include/concurrent_extension.h55
14 files changed, 273 insertions, 275 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 5e62641..d506da2 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1,7 +1,7 @@
cmake_minimum_required(VERSION 3.22)
-set(CMAKE_C_COMPILER clang)
-set(CMAKE_CXX_COMPILER clang++)
+set(CMAKE_C_COMPILER gcc-12)
+set(CMAKE_CXX_COMPILER g++-12)
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED True)
@@ -9,11 +9,11 @@ set(CMAKE_CXX_STANDARD_REQUIRED True)
set(namespace "de")
project("Practical Dynamic Extension" VERSION 0.1.0)
-set(debug false)
+set(debug true)
set(tests True)
set(bench false)
-set(vldb_bench true)
-set(tail_bench true)
+set(vldb_bench false)
+set(tail_bench false)
# ALEX doesn't build under C++20
set(build_alex false)
@@ -79,45 +79,45 @@ if (tests)
target_link_options(vptree_tests PUBLIC -mcx16)
target_include_directories(vptree_tests PRIVATE include external/vptree external/psudb-common/cpp/include)
- add_executable(de_tier_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_tier_tag.cpp)
- target_link_libraries(de_tier_tag PUBLIC gsl check subunit pthread atomic)
- target_link_options(de_tier_tag PUBLIC -mcx16)
- target_include_directories(de_tier_tag PRIVATE include external/psudb-common/cpp/include external)
-
- add_executable(de_tier_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_tier_tomb.cpp)
- target_link_libraries(de_tier_tomb PUBLIC gsl check subunit pthread atomic)
- target_link_options(de_tier_tomb PUBLIC -mcx16)
- target_include_directories(de_tier_tomb PRIVATE include external/PLEX/include external/psudb-common/cpp/include external)
-
- add_executable(de_level_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_tag.cpp)
- target_link_libraries(de_level_tag PUBLIC gsl check subunit pthread atomic)
- target_link_options(de_level_tag PUBLIC -mcx16)
- target_include_directories(de_level_tag PRIVATE include external/psudb-common/cpp/include external)
-
- add_executable(de_level_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_tomb.cpp)
- target_link_libraries(de_level_tomb PUBLIC gsl check subunit pthread atomic)
- target_link_options(de_level_tomb PUBLIC -mcx16)
- target_include_directories(de_level_tomb PRIVATE include external/ctpl external/PLEX/include external/psudb-common/cpp/include external)
-
- add_executable(de_bsm_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_bsm_tomb.cpp)
- target_link_libraries(de_bsm_tomb PUBLIC gsl check subunit pthread atomic)
- target_link_options(de_bsm_tomb PUBLIC -mcx16)
- target_include_directories(de_bsm_tomb PRIVATE include external/PLEX/include external/psudb-common/cpp/include external)
-
- add_executable(de_bsm_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_bsm_tag.cpp)
- target_link_libraries(de_bsm_tag PUBLIC gsl check subunit pthread atomic)
- target_link_options(de_bsm_tag PUBLIC -mcx16)
- target_include_directories(de_bsm_tag PRIVATE include external/PLEX/include external/psudb-common/cpp/include external)
+ # add_executable(de_tier_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_tier_tag.cpp)
+ # target_link_libraries(de_tier_tag PUBLIC gsl check subunit pthread atomic)
+ # target_link_options(de_tier_tag PUBLIC -mcx16)
+ # target_include_directories(de_tier_tag PRIVATE include external/psudb-common/cpp/include external)
+
+ # add_executable(de_tier_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_tier_tomb.cpp)
+ # target_link_libraries(de_tier_tomb PUBLIC gsl check subunit pthread atomic)
+ # target_link_options(de_tier_tomb PUBLIC -mcx16)
+ # target_include_directories(de_tier_tomb PRIVATE include external/PLEX/include external/psudb-common/cpp/include external)
+
+ # add_executable(de_level_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_tag.cpp)
+ # target_link_libraries(de_level_tag PUBLIC gsl check subunit pthread atomic)
+ # target_link_options(de_level_tag PUBLIC -mcx16)
+ # target_include_directories(de_level_tag PRIVATE include external/psudb-common/cpp/include external)
+
+ # add_executable(de_level_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_tomb.cpp)
+ # target_link_libraries(de_level_tomb PUBLIC gsl check subunit pthread atomic)
+ # target_link_options(de_level_tomb PUBLIC -mcx16)
+ # target_include_directories(de_level_tomb PRIVATE include external/ctpl external/PLEX/include external/psudb-common/cpp/include external)
+
+ # add_executable(de_bsm_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_bsm_tomb.cpp)
+ # target_link_libraries(de_bsm_tomb PUBLIC gsl check subunit pthread atomic)
+ # target_link_options(de_bsm_tomb PUBLIC -mcx16)
+ # target_include_directories(de_bsm_tomb PRIVATE include external/PLEX/include external/psudb-common/cpp/include external)
+
+ # add_executable(de_bsm_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_bsm_tag.cpp)
+ # target_link_libraries(de_bsm_tag PUBLIC gsl check subunit pthread atomic)
+ # target_link_options(de_bsm_tag PUBLIC -mcx16)
+ # target_include_directories(de_bsm_tag PRIVATE include external/PLEX/include external/psudb-common/cpp/include external)
add_executable(de_level_concurrent ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_concurrent.cpp)
target_link_libraries(de_level_concurrent PUBLIC gsl check subunit pthread atomic)
target_link_options(de_level_concurrent PUBLIC -mcx16)
target_include_directories(de_level_concurrent PRIVATE include external/ctpl external/PLEX/include external/psudb-common/cpp/include external)
- add_executable(de_tier_concurrent ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_tier_concurrent.cpp)
- target_link_libraries(de_tier_concurrent PUBLIC gsl check subunit pthread atomic)
- target_link_options(de_tier_concurrent PUBLIC -mcx16)
- target_include_directories(de_tier_concurrent PRIVATE include external/ctpl external/PLEX/include external/psudb-common/cpp/include external)
+ # add_executable(de_tier_concurrent ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_tier_concurrent.cpp)
+ # target_link_libraries(de_tier_concurrent PUBLIC gsl check subunit pthread atomic)
+ # target_link_options(de_tier_concurrent PUBLIC -mcx16)
+ # target_include_directories(de_tier_concurrent PRIVATE include external/ctpl external/PLEX/include external/psudb-common/cpp/include external)
add_executable(memisam_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/memisam_tests.cpp)
target_link_libraries(memisam_tests PUBLIC gsl check subunit pthread atomic)
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 0331353..da2945a 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -36,10 +36,10 @@ class DynamicExtension {
private:
/* convenience typedefs for commonly used types within the class */
typedef typename ShardType::RECORD RecordType;
- typedef MutableBuffer<RecordType> Buffer;
- typedef ExtensionStructure<ShardType, QueryType> Structure;
- typedef Version<ShardType, QueryType> Version;
- typedef BufferView<RecordType> BufView;
+ typedef MutableBuffer<RecordType> BufferType;
+ typedef ExtensionStructure<ShardType, QueryType> StructureType;
+ typedef Version<ShardType, QueryType> VersionType;
+ typedef BufferView<RecordType> BufferViewType;
typedef ReconstructionPolicy<ShardType, QueryType> ReconPolicyType;
typedef DEConfiguration<ShardType, QueryType, D, SchedType> ConfType;
@@ -52,6 +52,11 @@ private:
static constexpr size_t QUERY = 1;
static constexpr size_t RECONSTRUCTION = 2;
+ typedef std::shared_ptr<VersionType> version_ptr;
+ typedef size_t version_id;
+ static constexpr size_t INVALID_VERSION = 0;
+ static constexpr size_t INITIAL_VERSION = 1;
+
public:
/**
* Create a new Dynamized version of a data structure, supporting
@@ -74,12 +79,14 @@ public:
* framework's scheduler for use in answering queries and
* performing compactions and flushes, etc.
*/
- DynamicExtension(ConfType &&config) : m_config(std::move(config)) {
- m_buffer = std::make_unique(m_config.buffer_flush_trigger,
- m_config.buffer_size);
-
- m_sched = SchedType(m_config.maximum_memory_usage, m_config.maximum_threads);
- m_active_version.load(std::make_shared(INITIAL_VERSION, new Structure(), m_buffer.get(), 0));
+ DynamicExtension(ConfType &&config) : m_config(std::move(config)) {
+ m_buffer =
+ std::make_unique<BufferType>(m_config.buffer_flush_trigger, m_config.buffer_size);
+
+ m_sched = std::make_unique<SchedType>(m_config.maximum_memory_usage,
+ m_config.maximum_threads);
+ m_active_version.store(
+ std::make_shared<VersionType>(INITIAL_VERSION, std::make_unique<StructureType>(), m_buffer.get(), 0));
}
/**
@@ -90,10 +97,10 @@ public:
*/
~DynamicExtension() {
/* let any in-flight version transitions finish */
- await_newest_version();
+ await_version();
/* shutdown the scheduler */
- m_sched.shutdown();
+ m_sched->shutdown();
}
/**
@@ -274,10 +281,11 @@ public:
* the index. Ownership of this object is transfered to the
* caller.
*/
+ // FIXME: switch this over to std::unique_ptr
ShardType *
create_static_structure(bool await_reconstruction_completion = false) {
if (await_reconstruction_completion) {
- await_newest_version();
+ await_version();
}
auto version = get_active_version();
@@ -317,19 +325,27 @@ public:
}
/*
- * Determines the newest pending version at the time of call, and
- * blocks until that version becomes active.
+ * Blocks until the specified version id becomes active. If no version
+ * id is provided, wait for the newest pending version to be installed.
+ *
+ * NOTE: this method will return once the specified version has been
+ * installed, but does not guarantee that the specified version
+ * is the currently active one when it returns. It is possible
+ * that the active version upon the return of this method is newer
+ * than the one requested.
*/
- void await_newest_version() {
- /*
+ void await_version(version_id vid = INVALID_VERSION) {
+ /*
* versions are assigned by fetch and add on the counter, so the
* newest assigned version number will be one less than the value
* of the counter
*/
- auto newest_pending_version = m_version_counter.load() - 1;
+ if (vid == INVALID_VERSION) {
+ vid = m_version_counter.load() - 1;
+ }
/* versions signal on m_version_advance_cv when they activate */
- while (m_active_version.load() < newest_pending_version) {
+ while (m_active_version.load()->get_id() < vid) {
std::unique_lock lk(m_version_advance_mtx);
m_version_advance_cv.wait(lk);
}
@@ -354,14 +370,13 @@ public:
* Calls SchedType::print_statistics, which should write a report of
* scheduler performance statistics to stdout.
*/
- void print_scheduler_statistics() const { m_sched.print_statistics(); }
+ void print_scheduler_statistics() const { m_sched->print_statistics(); }
private:
ConfType m_config;
-
- SchedType m_sched;
- std::unique_ptr<Buffer> m_buffer;
+ std::unique_ptr<SchedType> m_sched;
+ std::unique_ptr<BufferType> m_buffer;
size_t m_core_cnt;
std::atomic<int> m_next_core;
@@ -370,20 +385,14 @@ private:
/* versioning + concurrency variables */
std::atomic<size_t> m_version_counter;
- typedef std::atomic<std::shared_ptr<Version>> version_ptr;
-
- version_ptr m_active_version;
-
- typedef size_t version_id;
- const size_t INVALID_VERSION = 0;
- const size_t INITIAL_VERSION = 1;
+ std::atomic<std::shared_ptr<VersionType>> m_active_version;
std::condition_variable m_version_advance_cv;
std::mutex m_version_advance_mtx;
alignas(64) std::atomic<bool> m_scheduling_reconstruction;
- void enforce_delete_invariant(Version *version) {
+ void enforce_delete_invariant(VersionType *version) {
auto structure = version->get_structure();
auto compactions = structure->get_compaction_tasks();
@@ -401,7 +410,7 @@ private:
auto wait = args->result.get_future();
- m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION);
+ m_sched->schedule_job(reconstruction, 0, args, RECONSTRUCTION);
/* wait for compaction completion */
wait.get();
@@ -413,60 +422,61 @@ private:
static void reconstruction(void *arguments) {
auto args = (ReconstructionArgs<ShardType, QueryType> *)arguments;
- auto extension = (DynamicExtension *) args->extension;
+ auto extension = (DynamicExtension *)args->extension;
extension->SetThreadAffinity();
+ /*
+ * For "normal" flushes, the task vector should be empty, so this is
+ * all that will happen. Piggybacking internal reconstructions off
+ * the flush WILL bottleneck the system, but is left in place to
+ * allow the traditional policies (LEVELING, TIERING, etc.) to be
+ * emulated within the new system.
+ *
+ * Background reconstructions will not have a priority level of
+ * FLUSH, and will already have a structure present. As a result,
+ * this code will be bypassed in that case.
+ */
if (args->priority == ReconstructionPriority::FLUSH) {
/* we first construct a shard from the buffer */
auto buffview = args->version->get_buffer();
- auto new_head = buffview.get_tail();
- auto new_shard = Shard(std::move(buffview));
+ args->version->set_next_buffer_head(buffview.get_tail());
+ auto new_shard = std::make_shared<ShardType>(std::move(buffview));
- /* copy the currently active version's structure */
- auto structure = extension->get_active_version()->get_structure()->clone();
-
-
- }
+ /*
+ * Flushes already know their version id. To avoid needing to
+ * do any update reconciliation between structures, they wait
+ * until the version directly preceeding them has been installed,
+ * and only then take a copy of the structure.
+ */
+ extension->await_version(args->version->get_id() - 1);
+ StructureType *structure =
+ extension->get_active_version()->get_structure()->copy();
+ /* add the newly created shard to the structure copy */
+ structure->append_l0(new_shard);
- else {
-
+ /* set this version's structure to the newly created one */
+ args->version->set_structure(std::unique_ptr<StructureType>(structure));
}
-
- Structure *vers = args->version->get_mutable_structure();
-
- ReconstructionTask flush_task;
- flush_task.type = ReconstructionType::Invalid;
-
+ /* perform all of the reconstructions */
+ StructureType *structure = args->version->get_mutable_structure();
for (size_t i = 0; i < args->tasks.size(); i++) {
- if (args->tasks[i].sources.size() > 0 &&
- args->tasks[i].sources[0] == buffer_shid) {
- flush_task = args->tasks[i];
- continue;
- }
-
- vers->perform_reconstruction(args->tasks[i]);
+ structure->perform_reconstruction(args->tasks[i]);
}
- if (flush_task.type != ReconstructionType::Invalid) {
- /*
- * we'll grab the buffer AFTER doing the internal reconstruction, so we
- * can flush as many records as possible in one go. The reconstruction
- * was done so as to make room for the full buffer anyway, so there's
- * no real benefit to doing this first.
- */
- auto buffer_view = args->version->get_buffer();
- size_t new_head = buffer_view.get_tail();
-
- vers->perform_flush(flush_task, std::move(buffer_view));
- args->result.set_value(true);
- ((DynamicExtension *)args->extension)->advance_version(new_head);
- } else {
- args->result.set_value(true);
+ /*
+ * if there isn't already a version id on the new version (i.e., the
+ * reconstruction isn't a flush), generate one.
+ */
+ if (args->version->get_id() == INVALID_VERSION) {
+ args->version->set_id(extension->m_version_counter.fetch_add(1));
}
+ /* advance the index to the newly finished version */
+ extension->install_new_version(args->version);
+ /* manually delete the argument object */
delete args;
}
@@ -532,7 +542,7 @@ private:
version_ptr get_active_version() { return m_active_version.load(); }
- /*
+ /*
* Create a new version with an assigned version number, but without
* an assigned copy of the structure. Intended for use in flushing,
* where the structure will be copied from the currently active version
@@ -542,7 +552,8 @@ private:
version_ptr create_version() {
size_t version_id = m_version_counter.fetch_add(1);
auto active_version = get_active_version();
- version_ptr new_version = std::make_shared<Version>(version_id, nullptr, &m_buffer, 0);
+ std::shared_ptr<VersionType> new_version =
+ std::make_shared<VersionType>(version_id, nullptr, m_buffer.get(), active_version->get_buffer().get_head());
return new_version;
}
@@ -551,51 +562,57 @@ private:
* Create a new version without an assigned version number, but with
* a copy of the extension structure. This is for use with background
* reconstructions, where the underlying structure is manipulated, but
- * no version number is claimed until the version is activated, to
+ * no version number is claimed until the version is activated, to
* prevent blocking buffer flushes.
*/
- version_ptr create_version(Structure *structure) {
- version_ptr new_version = std::make_shared<Version>(INVALID_VERSION, structure, &m_buffer, 0);
+ version_ptr create_version(std::unique_ptr<StructureType> structure) {
+ auto active_version = get_active_version();
+ version_ptr new_version =
+ std::make_shared<VersionType>(INVALID_VERSION, std::move(structure), m_buffer.get(), active_version->get_buffer().get_head());
return new_version;
}
void install_new_version(version_ptr new_version) {
assert(new_version->get_structure());
- assert(new_version->get_version_number() != INVALID_VERSION);
-
- auto old = get_active_version();
- assert(new_version->get_version_number() > old->get_version_number());
+ assert(new_version->get_id() != INVALID_VERSION);
/* wait until our turn to install the new version */
- auto lk = std::unique_lock(m_version_advance_mtx);
- while (new_version.load()->get_verison_number() != old.load()->get_version_number() + 1) {
- m_version_advance_cv.wait(lk);
- old = get_active_version();
- }
+ await_version(new_version->get_id() - 1);
+
+ auto old = get_active_version();
// FIXME: implement this interface
// new_version->merge_changes_from(old.load().get());
- /*
+ /*
* Only one version can have a given number, so we are safe to
* directly assign here--nobody else is going to change it out from
- * under us. We're also protected under the mutex.
+ * under us.
+ */
+ m_active_version.store(new_version);
+
+ /*
+ * My understanding is that you don't *really* need this mutex for
+ * safety in modern C++ when sending the signal. But I'll grab it
+ * anyway out of an abundance of caution. I doubt this will be a
+ * major bottleneck.
*/
- m_active_version.store(new_version);
- m_version_advance_cv.notify_all();
+ auto lk = std::unique_lock(m_version_advance_mtx);
+ m_version_advance_cv.notify_all();
}
- Structure *create_scratch_structure() {
+ StructureType *create_scratch_structure() {
return get_active_version()->get_structure()->copy();
}
-
void begin_reconstruction_scheduling() {
bool cur_val;
do {
cur_val = m_scheduling_reconstruction.load();
- } while(cur_val == true && !m_scheduling_reconstruction.compare_exchange_strong(cur_val, true));
+ } while (
+ cur_val == true &&
+ !m_scheduling_reconstruction.compare_exchange_strong(cur_val, true));
}
void end_reconstruction_scheduling() {
@@ -608,16 +625,17 @@ private:
auto new_version = create_version();
auto *args = new ReconstructionArgs<ShardType, QueryType>();
- args->version.load(new_version);
+ args->version = new_version;
args->tasks = m_recon_policy->get_flush_tasks(args->version.get());
args->extension = this;
args->priority = ReconstructionPriority::FLUSH;
- /*
- * NOTE: args is deleted by the reconstruction job, so shouldn't be
- * freed here
+ /*
+ * NOTE: args is deleted by the reconstruction job, so shouldn't be
+ * freed here
*/
- m_sched.schedule_job(reconstruction, m_buffer->get_hwm(), args, RECONSTRUCTION);
+ m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args,
+ RECONSTRUCTION);
if (m_config.recon_enable_maint_on_flush) {
schedule_maint_reconstruction(false);
@@ -626,28 +644,27 @@ private:
end_reconstruction_scheduling();
}
-
- void schedule_maint_reconstruction(bool take_reconstruction_lock=true) {
+ void schedule_maint_reconstruction(bool take_reconstruction_lock = true) {
if (take_reconstruction_lock) {
begin_reconstruction_scheduling();
}
// FIXME: memory management issue here?
- auto new_version = create_version(m_active_version.load()->get_structure());
+ auto new_version = create_version(std::unique_ptr<StructureType>(m_active_version.load()->get_structure()->copy()));
auto *args = new ReconstructionArgs<ShardType, QueryType>();
- args->version.load(new_version);
+ args->version = new_version;
args->tasks = m_recon_policy->get_reconstruction_tasks(args->version.get());
args->extension = this;
args->priority = ReconstructionPriority::MAINT;
- /*
- * NOTE: args is deleted by the reconstruction job, so shouldn't be
- * freed here
+ /*
+ * NOTE: args is deleted by the reconstruction job, so shouldn't be
+ * freed here
*/
- m_sched.schedule_job(reconstruction, m_buffer->get_hwm(), args, RECONSTRUCTION);
-
+ m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args,
+ RECONSTRUCTION);
if (take_reconstruction_lock) {
end_reconstruction_scheduling();
@@ -662,7 +679,7 @@ private:
args->query_parms = std::move(query_parms);
auto result = args->result_set.get_future();
- m_sched.schedule_job(async_query, 0, (void *)args, QUERY);
+ m_sched->schedule_job(async_query, 0, (void *)args, QUERY);
return result;
}
diff --git a/include/framework/reconstruction/BSMPolicy.h b/include/framework/reconstruction/BSMPolicy.h
index c42b928..eaa374a 100644
--- a/include/framework/reconstruction/BSMPolicy.h
+++ b/include/framework/reconstruction/BSMPolicy.h
@@ -11,7 +11,7 @@
#include <cmath>
#include "framework/reconstruction/ReconstructionPolicy.h"
-#include "framework/scheduling/Epoch.h"
+#include "framework/scheduling/Version.h"
#include "util/types.h"
namespace de {
@@ -28,6 +28,12 @@ public:
get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
size_t incoming_reccnt) const override {
ReconstructionVector reconstructions;
+ return reconstructions;
+ }
+
+ ReconstructionVector
+ get_flush_tasks(const Version<ShardType, QueryType> *version) const override {
+ ReconstructionVector reconstructions;
auto levels = version->get_structure()->get_level_vector();
level_index target_level = find_reconstruction_target(levels);
@@ -53,12 +59,6 @@ public:
return reconstructions;
}
- ReconstructionVector
- get_flush_tasks(const Version<ShardType, QueryType> *version) const override {
- ReconstructionVector v;
- v.add_reconstruction(ReconstructionTask {{buffer_shid}, 0, m_buffer_size, ReconstructionType::Merge});
- }
-
private:
level_index find_reconstruction_target(LevelVector &levels) const {
level_index target_level = invalid_level_idx;
diff --git a/include/framework/reconstruction/FixedShardCountPolicy.h b/include/framework/reconstruction/FixedShardCountPolicy.h
index 2a3c977..0768daa 100644
--- a/include/framework/reconstruction/FixedShardCountPolicy.h
+++ b/include/framework/reconstruction/FixedShardCountPolicy.h
@@ -11,7 +11,7 @@
#include <cmath>
#include "framework/reconstruction/ReconstructionPolicy.h"
-#include "framework/scheduling/Epoch.h"
+#include "framework/scheduling/Version.h"
#include "util/types.h"
namespace de {
diff --git a/include/framework/reconstruction/FloodL0Policy.h b/include/framework/reconstruction/FloodL0Policy.h
index 8304d8a..94bed70 100644
--- a/include/framework/reconstruction/FloodL0Policy.h
+++ b/include/framework/reconstruction/FloodL0Policy.h
@@ -11,7 +11,7 @@
#include <cmath>
#include "framework/reconstruction/ReconstructionPolicy.h"
-#include "framework/scheduling/Epoch.h"
+#include "framework/scheduling/Version.h"
#include "util/types.h"
namespace de {
@@ -26,17 +26,14 @@ public:
ReconstructionVector
get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
size_t incoming_reccnt) const override {
-
ReconstructionVector reconstructions;
return reconstructions;
-
}
ReconstructionVector
get_flush_tasks(const Version<ShardType, QueryType> *version) const override {
- ReconstructionVector v;
- v.add_reconstruction(ReconstructionTask{
- {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append});
+ ReconstructionVector reconstructions;
+ return reconstructions;
}
private:
diff --git a/include/framework/reconstruction/LevelingPolicy.h b/include/framework/reconstruction/LevelingPolicy.h
index 176492e..1523e74 100644
--- a/include/framework/reconstruction/LevelingPolicy.h
+++ b/include/framework/reconstruction/LevelingPolicy.h
@@ -11,7 +11,7 @@
#include <cmath>
#include "framework/reconstruction/ReconstructionPolicy.h"
-#include "framework/scheduling/Epoch.h"
+#include "framework/scheduling/Version.h"
#include "util/types.h"
namespace de {
@@ -25,14 +25,19 @@ public:
: m_scale_factor(scale_factor), m_buffer_size(buffer_size) {}
ReconstructionVector
- get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
- size_t incoming_reccnt) const override {
+ get_reconstruction_tasks(const Version<ShardType, QueryType> *version) const override {
+ ReconstructionVector reconstructions;
+ return reconstructions;
+ }
+
+ ReconstructionVector
+ get_flush_tasks(const Version<ShardType, QueryType> *version) const override {
ReconstructionVector reconstructions;
auto levels = version->get_structure()->get_level_vector();
level_index target_level = find_reconstruction_target(levels);
assert(target_level != -1);
- level_index source_level = 0;
+ level_index source_level = 1;
if (target_level == invalid_level_idx) {
/* grow */
@@ -51,14 +56,6 @@ public:
return reconstructions;
}
- ReconstructionVector
- get_flush_tasks(const Version<ShardType, QueryType> *version) const override {
- ReconstructionVector v;
- v.add_reconstruction(ReconstructionTask{
- {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Merge});
- return v;
- }
-
private:
level_index find_reconstruction_target(LevelVector &levels) const {
level_index target_level = invalid_level_idx;
diff --git a/include/framework/reconstruction/ReconstructionPolicy.h b/include/framework/reconstruction/ReconstructionPolicy.h
index 48bddcf..6f99b32 100644
--- a/include/framework/reconstruction/ReconstructionPolicy.h
+++ b/include/framework/reconstruction/ReconstructionPolicy.h
@@ -23,8 +23,7 @@ class ReconstructionPolicy {
public:
ReconstructionPolicy() {}
- virtual ReconstructionVector get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
- size_t incoming_reccnt) const = 0;
+ virtual ReconstructionVector get_reconstruction_tasks(const Version<ShardType, QueryType> *version) const = 0;
virtual ReconstructionVector get_flush_tasks(const Version<ShardType, QueryType> *version) const = 0;
};
}
diff --git a/include/framework/reconstruction/TieringPolicy.h b/include/framework/reconstruction/TieringPolicy.h
index 63be5fe..dce5c3c 100644
--- a/include/framework/reconstruction/TieringPolicy.h
+++ b/include/framework/reconstruction/TieringPolicy.h
@@ -11,7 +11,7 @@
#include <cmath>
#include "framework/reconstruction/ReconstructionPolicy.h"
-#include "framework/scheduling/Epoch.h"
+#include "framework/scheduling/Version.h"
#include "util/types.h"
namespace de {
@@ -26,6 +26,12 @@ public:
get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
size_t incoming_reccnt) const override {
ReconstructionVector reconstructions;
+ return reconstructions;
+ }
+
+ ReconstructionVector
+ get_flush_tasks(const Version<ShardType, QueryType> *version) const override {
+ ReconstructionVector reconstructions;
auto levels = version->get_structure()->get_level_vector();
level_index target_level = find_reconstruction_target(levels);
@@ -49,15 +55,6 @@ public:
return reconstructions;
}
- ReconstructionVector
- get_flush_tasks(const Version<ShardType, QueryType> *version) const override {
- ReconstructionVector v;
-
- v.add_reconstruction(ReconstructionTask{
- {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append});
- return v;
- }
-
private:
level_index find_reconstruction_target(LevelVector &levels) const {
level_index target_level = invalid_level_idx;
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
index 197e8bf..1591909 100644
--- a/include/framework/scheduling/Task.h
+++ b/include/framework/scheduling/Task.h
@@ -32,7 +32,7 @@ enum class ReconstructionPriority {
template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
struct ReconstructionArgs {
typedef typename ShardType::RECORD RecordType;
- std::atomic<std::shared_ptr<Version<ShardType, QueryType>>> version;
+ std::shared_ptr<Version<ShardType, QueryType>> version;
ReconstructionVector tasks;
void *extension;
ReconstructionPriority priority;
diff --git a/include/framework/scheduling/Version.h b/include/framework/scheduling/Version.h
index 2b2b5ba..8d3d038 100644
--- a/include/framework/scheduling/Version.h
+++ b/include/framework/scheduling/Version.h
@@ -20,30 +20,21 @@ template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
class Version {
private:
typedef typename ShardType::RECORD RecordType;
- typedef MutableBuffer<RecordType> Buffer;
- typedef ExtensionStructure<ShardType, QueryType> Structure;
- typedef BufferView<RecordType> BufView;
+ typedef MutableBuffer<RecordType> BufferType;
+ typedef ExtensionStructure<ShardType, QueryType> StructureType;
+ typedef BufferView<RecordType> BufferViewType;
public:
- Version(size_t number = 0)
- : m_buffer(nullptr), m_structure(nullptr), m_active_merge(false),
- m_epoch_number(number), m_buffer_head(0) {}
-
- Version(size_t number, Structure *structure, Buffer *buff, size_t head)
- : m_buffer(buff), m_structure(structure), m_active_merge(false),
- m_epoch_number(number), m_buffer_head(head) {
- structure->take_reference();
- }
+ Version(size_t vid = 0)
+ : m_buffer(nullptr), m_structure(nullptr), m_id(vid), m_buffer_head(0),
+ m_pending_buffer_head(-1) {}
- ~Version() {
- if (m_structure) {
- m_structure->release_reference();
- }
+ Version(size_t number, std::unique_ptr<StructureType> structure, BufferType *buff,
+ size_t head)
+ : m_buffer(buff), m_structure(std::move(structure)), m_id(number),
+ m_buffer_head(head), m_pending_buffer_head(-1) {}
- if (m_structure->get_reference_count() == 0) {
- delete m_structure;
- }
- }
+ ~Version() = default;
/*
* Versions are *not* copyable or movable. Only one can exist, and all users
@@ -54,13 +45,26 @@ public:
Version &operator=(const Version &) = delete;
Version &operator=(Version &&) = delete;
- size_t get_epoch_number() const { return m_epoch_number; }
+ size_t get_id() const { return m_id; }
+
+ void set_id(size_t id) { m_id = id;}
- const Structure *get_structure() const { return m_structure.get(); }
+ const StructureType *get_structure() const { return m_structure.get(); }
- Structure *get_mutable_structure() { return m_structure.get(); }
+ StructureType *get_mutable_structure() { return m_structure.get(); }
- BufView get_buffer() const { return m_buffer->get_buffer_view(m_buffer_head); }
+ bool set_structure(std::unique_ptr<StructureType> new_struct) {
+ if (m_structure) {
+ return false;
+ }
+
+ m_structure = std::move(new_struct);
+ return true;
+ }
+
+ BufferViewType get_buffer() const {
+ return m_buffer->get_buffer_view(m_buffer_head);
+ }
/*
* Returns a new Version object that is a copy of this one. The new object
@@ -69,54 +73,29 @@ public:
* provided argument.
*/
Version *clone(size_t number) {
- std::unique_lock<std::mutex> m_buffer_lock;
- auto epoch = new Version(number);
- epoch->m_buffer = m_buffer;
- epoch->m_buffer_head = m_buffer_head;
+ auto version = new Version(number);
+ version->m_buffer = m_buffer;
+ version->m_buffer_head = m_buffer_head;
if (m_structure) {
- epoch->m_structure = m_structure->copy();
- /* the copy routine returns a structure with 0 references */
- epoch->m_structure->take_reference();
+ version->m_structure = std::unique_ptr(m_structure->copy());
}
- return epoch;
+ return version;
}
- /*
- * Check if a merge can be started from this Version. At present, without
- * concurrent merging, this simply checks if there is currently a scheduled
- * merge based on this Version. If there is, returns false. If there isn't,
- * return true and set a flag indicating that there is an active merge.
- */
- bool prepare_reconstruction() {
- auto old = m_active_merge.load();
- if (old) {
- return false;
- }
-
- // FIXME: this needs cleaned up
- while (!m_active_merge.compare_exchange_strong(old, true)) {
- old = m_active_merge.load();
- if (old) {
- return false;
- }
- }
-
- return true;
+ void set_next_buffer_head(size_t new_head) {
+ m_pending_buffer_head = new_head;
}
- bool advance_buffer_head(size_t head) {
- m_buffer_head = head;
+ bool advance_buffer_head() {
+ m_buffer_head = m_pending_buffer_head;
return m_buffer->advance_head(m_buffer_head);
}
private:
- Buffer *m_buffer;
- std::unique_ptr<Structure> m_structure;
-
- std::mutex m_buffer_lock;
- std::atomic<bool> m_active_merge;
+ BufferType *m_buffer;
+ std::unique_ptr<StructureType> m_structure;
/*
* The number of currently active jobs
@@ -124,7 +103,8 @@ private:
* epoch. An epoch can only be retired
* when this number is 0.
*/
- size_t m_epoch_number;
+ size_t m_id;
size_t m_buffer_head;
+ ssize_t m_pending_buffer_head;
};
} // namespace de
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index c304f1c..62c27f5 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -51,7 +51,6 @@ public:
new_struct->m_levels.push_back(m_levels[i]->clone());
}
- new_struct->m_refcnt = 0;
return new_struct;
}
@@ -156,8 +155,7 @@ public:
return cnt;
}
- inline void perform_reconstruction(ReconstructionTask task,
- BuffView *bv=nullptr) {
+ inline void perform_reconstruction(ReconstructionTask task) {
/* perform the reconstruction itself */
std::vector<const ShardType *> shards;
for (ShardID shid : task.sources) {
@@ -172,10 +170,6 @@ public:
shards.push_back(m_levels[shid.level_idx]->get_shard(i));
}
}
- } else if (shid == buffer_shid) {
- assert(bv);
- ShardType *buffer_shard = new ShardType(std::move(bv));
- shards.push_back(buffer_shard);
} else {
shards.push_back(m_levels[shid.level_idx]->get_shard(shid.shard_idx));
}
diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h
index 9c1e624..a751a29 100644
--- a/include/framework/util/Configuration.h
+++ b/include/framework/util/Configuration.h
@@ -18,10 +18,10 @@ namespace de {
template <ShardInterface ShardType, QueryInterface<ShardType> QueryType,
DeletePolicy D, SchedulerInterface SchedType>
class DEConfiguration {
+ public:
DEConfiguration(std::unique_ptr<ReconstructionPolicy<ShardType, QueryType>> recon_policy)
- : m_recon_policy(recon_policy) {}
+ : m_recon_policy(std::move(recon_policy)) {}
- public:
std::unique_ptr<ReconstructionPolicy<ShardType, QueryType>> m_recon_policy;
/* buffer parameters */
diff --git a/tests/de_level_concurrent.cpp b/tests/de_level_concurrent.cpp
index a76285b..a43006e 100644
--- a/tests/de_level_concurrent.cpp
+++ b/tests/de_level_concurrent.cpp
@@ -13,11 +13,13 @@
#include <random>
#include <algorithm>
+#include "framework/scheduling/SerialScheduler.h"
#include "include/testing.h"
#include "framework/DynamicExtension.h"
#include "shard/ISAMTree.h"
#include "query/rangequery.h"
#include "framework/reconstruction/LevelingPolicy.h"
+#include "framework/util/Configuration.h"
#include <check.h>
using namespace de;
@@ -27,12 +29,10 @@ typedef ISAMTree<R> S;
typedef rq::Query<S> Q;
typedef DynamicExtension<S, Q, DeletePolicy::TOMBSTONE, SerialScheduler> DE;
-ReconstructionPolicy<S, Q> *recon = new LevelingPolicy<S, Q>(2, 1000);
-ReconstructionPolicy<S, Q> *recon2 = new LevelingPolicy<S, Q>(4, 10000);
+typedef de::DEConfiguration<S, Q, DeletePolicy::TOMBSTONE, SerialScheduler> CONF;
#include "include/concurrent_extension.h"
-
Suite *unit_testing()
{
Suite *unit = suite_create("DynamicExtension: Concurrent Leveling Testing");
diff --git a/tests/include/concurrent_extension.h b/tests/include/concurrent_extension.h
index 84f816d..fa669d2 100644
--- a/tests/include/concurrent_extension.h
+++ b/tests/include/concurrent_extension.h
@@ -26,24 +26,41 @@
#include "framework/reconstruction/TieringPolicy.h"
#include "testing.h"
#include "framework/DynamicExtension.h"
-#include "framework/scheduling/FIFOScheduler.h"
+#include "framework/scheduling/SerialScheduler.h"
+#include "framework/reconstruction/LevelingPolicy.h"
#include "shard/ISAMTree.h"
#include "query/rangequery.h"
#include <check.h>
+#include <memory>
#include <set>
#include <random>
-// using namespace de;
-// typedef Rec R;
-// typedef ISAMTree<R> S;
-// typedef rq::Query<S> Q;
-// typedef DynamicExtension<S, Q, DeletePolicy::TOMBSTONE, FIFOScheduler> DE;
-// ReconstructionPolicy<S,Q> *recon = new TieringPolicy<S, Q>(2, 1000);
-// ReconstructionPolicy<S,Q> *recon2 = new TieringPolicy<S, Q>(4, 10000);
+
+
+using namespace de;
+typedef Rec R;
+typedef ISAMTree<R> S;
+typedef rq::Query<S> Q;
+typedef DynamicExtension<S, Q, DeletePolicy::TOMBSTONE, SerialScheduler> DE;
+typedef de::DEConfiguration<S, Q, DeletePolicy::TOMBSTONE, SerialScheduler> CONF;
+
+static CONF create_config(size_t type=1) {
+ if (type == 1) {
+ auto recon = std::make_unique<LevelingPolicy<S, Q>>(2, 1000);
+
+ return CONF(std::move(recon));
+ } else {
+ auto recon2 = std::make_unique<LevelingPolicy<S, Q>>(4, 10000);
+ CONF configuration2 = CONF(std::move(recon2));
+ return CONF(std::move(recon2));
+ }
+}
+
+
START_TEST(t_create)
{
- auto test_de = new DE(recon, 100, 1000);
+ auto test_de = new DE(create_config());
ck_assert_ptr_nonnull(test_de);
ck_assert_int_eq(test_de->get_record_count(), 0);
@@ -56,7 +73,7 @@ END_TEST
START_TEST(t_insert)
{
- auto test_de = new DE(recon, 100, 1000);
+ auto test_de = new DE(create_config());
uint64_t key = 0;
uint32_t val = 0;
@@ -77,7 +94,7 @@ END_TEST
START_TEST(t_debug_insert)
{
- auto test_de = new DE(recon, 100, 1000);
+ auto test_de = new DE(create_config());
uint64_t key = 0;
uint32_t val = 0;
@@ -96,7 +113,7 @@ END_TEST
START_TEST(t_insert_with_mem_merges)
{
- auto test_de = new DE(recon, 100, 1000);
+ auto test_de = new DE(create_config());
uint64_t key = 0;
uint32_t val = 0;
@@ -109,7 +126,7 @@ START_TEST(t_insert_with_mem_merges)
ck_assert_int_eq(test_de->get_record_count(), 1000);
- test_de->await_next_epoch();
+ test_de->await_version();
ck_assert_int_eq(test_de->get_record_count(), 1000);
@@ -128,7 +145,7 @@ START_TEST(t_insert_with_mem_merges)
}
} while (cnt < 100000);
- test_de->await_next_epoch();
+ test_de->await_version();
ck_assert_int_eq(test_de->get_record_count(), 101000);
@@ -139,7 +156,7 @@ END_TEST
START_TEST(t_range_query)
{
- auto test_de = new DE(recon2, 1000, 10000);
+ auto test_de = new DE(create_config(2));
size_t n = 10000000;
std::vector<uint64_t> keys;
@@ -162,7 +179,7 @@ START_TEST(t_range_query)
}
- test_de->await_next_epoch();
+ test_de->await_version();
std::sort(keys.begin(), keys.end());
@@ -193,7 +210,7 @@ END_TEST
START_TEST(t_tombstone_merging_01)
{
size_t reccnt = 100000;
- auto test_de = new DE(recon, 100, 1000);
+ auto test_de = new DE(create_config());
auto rng = gsl_rng_alloc(gsl_rng_mt19937);
@@ -237,7 +254,7 @@ START_TEST(t_tombstone_merging_01)
}
}
- test_de->await_next_epoch();
+ test_de->await_version();
ck_assert(test_de->validate_tombstone_proportion());
@@ -251,7 +268,7 @@ START_TEST(t_static_structure)
auto rng = gsl_rng_alloc(gsl_rng_mt19937);
size_t reccnt = 100000;
- auto test_de = new DE(recon, 100, 1000);
+ auto test_de = new DE(create_config());
std::set<R> records;
std::set<R> to_delete;