summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2025-01-31 16:16:28 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2025-01-31 16:16:28 -0500
commit3061bfbf1c4b5092fa4234de3105a615fcef18ea (patch)
tree4c41cdf3b8e3af6e38ba1c8f283d2e35086cc2af
parent30da48151f58803968ca3ef5d42e66a9223d80a4 (diff)
downloaddynamic-extension-3061bfbf1c4b5092fa4234de3105a615fcef18ea.tar.gz
More updates
-rw-r--r--CMakeLists.txt8
-rw-r--r--include/framework/DynamicExtension.h80
-rw-r--r--include/framework/reconstruction/LevelingPolicy.h11
-rw-r--r--include/framework/scheduling/FIFOScheduler.h2
-rw-r--r--include/framework/scheduling/Version.h9
-rw-r--r--include/framework/structure/ExtensionStructure.h6
-rw-r--r--include/framework/util/Configuration.h7
-rw-r--r--tests/de_level_concurrent.cpp5
-rw-r--r--tests/de_level_tag.cpp7
-rw-r--r--tests/include/concurrent_extension.h14
-rw-r--r--tests/include/dynamic_extension.h37
11 files changed, 124 insertions, 62 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index d506da2..902889f 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -89,10 +89,10 @@ if (tests)
# target_link_options(de_tier_tomb PUBLIC -mcx16)
# target_include_directories(de_tier_tomb PRIVATE include external/PLEX/include external/psudb-common/cpp/include external)
- # add_executable(de_level_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_tag.cpp)
- # target_link_libraries(de_level_tag PUBLIC gsl check subunit pthread atomic)
- # target_link_options(de_level_tag PUBLIC -mcx16)
- # target_include_directories(de_level_tag PRIVATE include external/psudb-common/cpp/include external)
+ add_executable(de_level_tag ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_tag.cpp)
+ target_link_libraries(de_level_tag PUBLIC gsl check subunit pthread atomic)
+ target_link_options(de_level_tag PUBLIC -mcx16)
+ target_include_directories(de_level_tag PRIVATE include external/psudb-common/cpp/include external)
# add_executable(de_level_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_tomb.cpp)
# target_link_libraries(de_level_tomb PUBLIC gsl check subunit pthread atomic)
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index da2945a..91bed98 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -87,6 +87,8 @@ public:
m_config.maximum_threads);
m_active_version.store(
std::make_shared<VersionType>(INITIAL_VERSION, std::make_unique<StructureType>(), m_buffer.get(), 0));
+
+ m_version_counter = INITIAL_VERSION;
}
/**
@@ -226,7 +228,7 @@ public:
* @return The number of levels within the index
*/
size_t get_height() {
- return get_active_version()->get_structure()->get_height();
+ return get_active_version()->get_structure()->get_height() - 1;
}
/**
@@ -381,8 +383,6 @@ private:
size_t m_core_cnt;
std::atomic<int> m_next_core;
- ReconPolicyType const *m_recon_policy;
-
/* versioning + concurrency variables */
std::atomic<size_t> m_version_counter;
std::atomic<std::shared_ptr<VersionType>> m_active_version;
@@ -390,6 +390,8 @@ private:
std::condition_variable m_version_advance_cv;
std::mutex m_version_advance_mtx;
+ std::atomic<bool> m_flush_in_progress = false;
+
alignas(64) std::atomic<bool> m_scheduling_reconstruction;
void enforce_delete_invariant(VersionType *version) {
@@ -420,6 +422,8 @@ private:
}
}
+ size_t m_flush_cnt = 0;
+
static void reconstruction(void *arguments) {
auto args = (ReconstructionArgs<ShardType, QueryType> *)arguments;
auto extension = (DynamicExtension *)args->extension;
@@ -437,11 +441,14 @@ private:
* this code will be bypassed in that case.
*/
if (args->priority == ReconstructionPriority::FLUSH) {
+ assert(extension->m_flush_in_progress.load());
/* we first construct a shard from the buffer */
auto buffview = args->version->get_buffer();
- args->version->set_next_buffer_head(buffview.get_tail());
+ assert(buffview.get_tail() != buffview.get_head());
+
auto new_shard = std::make_shared<ShardType>(std::move(buffview));
+
/*
* Flushes already know their version id. To avoid needing to
* do any update reconciliation between structures, they wait
@@ -449,14 +456,22 @@ private:
* and only then take a copy of the structure.
*/
extension->await_version(args->version->get_id() - 1);
- StructureType *structure =
- extension->get_active_version()->get_structure()->copy();
- /* add the newly created shard to the structure copy */
- structure->append_l0(new_shard);
+ if (extension->m_config.recon_maint_disabled) {
+ assert(args->version->get_mutable_structure());
+ args->version->get_mutable_structure()->append_l0(std::move(new_shard));
+ } else {
+ assert(!args->version->get_mutable_structure());
+ auto structure = std::unique_ptr<StructureType>(extension->get_active_version()->get_structure()->copy());
- /* set this version's structure to the newly created one */
- args->version->set_structure(std::unique_ptr<StructureType>(structure));
+ /* add the newly created shard to the structure copy */
+ structure->append_l0(std::move(new_shard));
+
+ /* set this version's structure to the newly created one */
+ args->version->set_structure(std::move(structure));
+ }
+
+ args->version->advance_buffer_head();
}
/* perform all of the reconstructions */
@@ -476,6 +491,10 @@ private:
/* advance the index to the newly finished version */
extension->install_new_version(args->version);
+ if (args->priority == ReconstructionPriority::FLUSH) {
+ extension->m_flush_in_progress.store(false);
+ }
+
/* manually delete the argument object */
delete args;
}
@@ -549,11 +568,11 @@ private:
* at the time it is activated, but the version number must be claimed
* early to minimize activation blocking.
*/
- version_ptr create_version() {
+ version_ptr create_version_flush(std::unique_ptr<StructureType> structure) {
size_t version_id = m_version_counter.fetch_add(1);
auto active_version = get_active_version();
std::shared_ptr<VersionType> new_version =
- std::make_shared<VersionType>(version_id, nullptr, m_buffer.get(), active_version->get_buffer().get_head());
+ std::make_shared<VersionType>(version_id, std::move(structure), m_buffer.get(), active_version->get_buffer().get_head());
return new_version;
}
@@ -565,7 +584,7 @@ private:
* no version number is claimed until the version is activated, to
* prevent blocking buffer flushes.
*/
- version_ptr create_version(std::unique_ptr<StructureType> structure) {
+ version_ptr create_version_maint(std::unique_ptr<StructureType> structure) {
auto active_version = get_active_version();
version_ptr new_version =
std::make_shared<VersionType>(INVALID_VERSION, std::move(structure), m_buffer.get(), active_version->get_buffer().get_head());
@@ -622,11 +641,34 @@ private:
void schedule_flush() {
begin_reconstruction_scheduling();
- auto new_version = create_version();
+
+ bool old = m_flush_in_progress.load();
+ if (old || !m_flush_in_progress.compare_exchange_strong(old, true)) {
+ end_reconstruction_scheduling();
+ return;
+ }
+
+ /*
+ * for "legacy" policies, without background reconstruction, we need
+ * a valid structure object as part of the version prior to determining
+ * the flush operations. Otherwise, the flush operation should only ever
+ * do the flush itself, and so no structure is needed at this point
+ *
+ * delaying obtaining the structure until later, when maintenance
+ * reconstructions are enabled, removes the need for flushes to reconcile
+ * differences in structure between when they are scheduled and when they
+ * finish
+ */
+ std::unique_ptr<StructureType> structure = nullptr;
+ if (m_config.recon_maint_disabled) {
+ structure = std::unique_ptr<StructureType>(m_active_version.load()->get_structure()->copy());
+ }
+
+ auto new_version = create_version_flush(std::move(structure));
auto *args = new ReconstructionArgs<ShardType, QueryType>();
args->version = new_version;
- args->tasks = m_recon_policy->get_flush_tasks(args->version.get());
+ args->tasks = m_config.recon_policy->get_flush_tasks(args->version.get());
args->extension = this;
args->priority = ReconstructionPriority::FLUSH;
@@ -646,16 +688,20 @@ private:
void schedule_maint_reconstruction(bool take_reconstruction_lock = true) {
+ if (m_config.recon_maint_disabled) {
+ return;
+ }
+
if (take_reconstruction_lock) {
begin_reconstruction_scheduling();
}
// FIXME: memory management issue here?
- auto new_version = create_version(std::unique_ptr<StructureType>(m_active_version.load()->get_structure()->copy()));
+ auto new_version = create_version_maint(std::unique_ptr<StructureType>(m_active_version.load()->get_structure()->copy()));
auto *args = new ReconstructionArgs<ShardType, QueryType>();
args->version = new_version;
- args->tasks = m_recon_policy->get_reconstruction_tasks(args->version.get());
+ args->tasks = m_config.recon_policy->get_reconstruction_tasks(args->version.get());
args->extension = this;
args->priority = ReconstructionPriority::MAINT;
diff --git a/include/framework/reconstruction/LevelingPolicy.h b/include/framework/reconstruction/LevelingPolicy.h
index 1523e74..3a0b73e 100644
--- a/include/framework/reconstruction/LevelingPolicy.h
+++ b/include/framework/reconstruction/LevelingPolicy.h
@@ -36,8 +36,7 @@ public:
auto levels = version->get_structure()->get_level_vector();
level_index target_level = find_reconstruction_target(levels);
- assert(target_level != -1);
- level_index source_level = 1;
+ level_index source_level = 0;
if (target_level == invalid_level_idx) {
/* grow */
@@ -47,7 +46,9 @@ public:
for (level_index i = target_level; i > source_level; i--) {
size_t target_reccnt =
(i < (level_index)levels.size()) ? levels[i]->get_record_count() : 0;
- size_t total_reccnt = levels[i - 1]->get_record_count() + target_reccnt;
+ size_t total_reccnt =
+ (i == 0) ? m_buffer_size + target_reccnt
+ : levels[i - 1]->get_record_count() + target_reccnt;
reconstructions.add_reconstruction(i - 1, i, total_reccnt,
ReconstructionType::Merge);
@@ -61,7 +62,7 @@ private:
level_index target_level = invalid_level_idx;
size_t incoming_records = m_buffer_size;
- for (level_index i = 0; i < (level_index)levels.size(); i++) {
+ for (level_index i = 1; i < (level_index)levels.size(); i++) {
if (levels[i]->get_record_count() + incoming_records < capacity(i)) {
target_level = i;
break;
@@ -74,7 +75,7 @@ private:
}
inline size_t capacity(level_index level) const {
- return m_buffer_size * pow(m_scale_factor, level + 1);
+ return m_buffer_size * pow(m_scale_factor, level);
}
size_t m_scale_factor;
diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h
index 7cb6d20..4c1db8d 100644
--- a/include/framework/scheduling/FIFOScheduler.h
+++ b/include/framework/scheduling/FIFOScheduler.h
@@ -82,6 +82,7 @@ private:
std::atomic<size_t> m_counter;
std::mutex m_cv_lock;
std::condition_variable m_cv;
+ std::mutex m_queue_lock;
std::thread m_sched_thrd;
std::thread m_sched_wakeup_thrd;
@@ -102,6 +103,7 @@ private:
}
void schedule_next() {
+ auto lk = std::unique_lock<std::mutex>(m_queue_lock);
assert(m_task_queue.size() > 0);
auto t = m_task_queue.pop();
m_stats.job_scheduled(t.m_timestamp);
diff --git a/include/framework/scheduling/Version.h b/include/framework/scheduling/Version.h
index 8d3d038..9c62ea1 100644
--- a/include/framework/scheduling/Version.h
+++ b/include/framework/scheduling/Version.h
@@ -84,13 +84,10 @@ public:
return version;
}
- void set_next_buffer_head(size_t new_head) {
- m_pending_buffer_head = new_head;
- }
-
bool advance_buffer_head() {
- m_buffer_head = m_pending_buffer_head;
- return m_buffer->advance_head(m_buffer_head);
+ auto new_head = m_buffer->get_buffer_view().get_tail();
+ m_buffer_head = new_head;
+ return m_buffer->advance_head(new_head);
}
private:
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index 62c27f5..60fb6c7 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -27,7 +27,10 @@ class ExtensionStructure {
typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>>
LevelVector;
public:
- ExtensionStructure() = default;
+ ExtensionStructure() {
+ m_levels.emplace_back(std::make_shared<InternalLevel<ShardType, QueryType>>(0));
+ }
+
~ExtensionStructure() = default;
/*
@@ -217,7 +220,6 @@ public:
}
void append_l0(std::shared_ptr<ShardType> shard) {
- // FIXME: ensure that there's always a level 0 in the version
m_levels[0]->append(shard);
}
diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h
index a751a29..81698d2 100644
--- a/include/framework/util/Configuration.h
+++ b/include/framework/util/Configuration.h
@@ -20,9 +20,9 @@ DeletePolicy D, SchedulerInterface SchedType>
class DEConfiguration {
public:
DEConfiguration(std::unique_ptr<ReconstructionPolicy<ShardType, QueryType>> recon_policy)
- : m_recon_policy(std::move(recon_policy)) {}
+ : recon_policy(std::move(recon_policy)) {}
- std::unique_ptr<ReconstructionPolicy<ShardType, QueryType>> m_recon_policy;
+ std::unique_ptr<ReconstructionPolicy<ShardType, QueryType>> recon_policy;
/* buffer parameters */
size_t buffer_count = 1;
@@ -31,8 +31,9 @@ class DEConfiguration {
/* reconstruction triggers */
bool recon_enable_seek_trigger = false;
- bool recon_enable_maint_on_flush = true;
+ bool recon_enable_maint_on_flush = false;
bool recon_enable_delete_cmpct = false;
+ bool recon_maint_disabled = true;
size_t recon_l0_capacity = 0; /* 0 for unbounded */
double maximum_delete_proportion = 1;
diff --git a/tests/de_level_concurrent.cpp b/tests/de_level_concurrent.cpp
index a43006e..5415744 100644
--- a/tests/de_level_concurrent.cpp
+++ b/tests/de_level_concurrent.cpp
@@ -16,6 +16,7 @@
#include "framework/scheduling/SerialScheduler.h"
#include "include/testing.h"
#include "framework/DynamicExtension.h"
+#include "framework/scheduling/FIFOScheduler.h"
#include "shard/ISAMTree.h"
#include "query/rangequery.h"
#include "framework/reconstruction/LevelingPolicy.h"
@@ -28,8 +29,8 @@ typedef Rec R;
typedef ISAMTree<R> S;
typedef rq::Query<S> Q;
-typedef DynamicExtension<S, Q, DeletePolicy::TOMBSTONE, SerialScheduler> DE;
-typedef de::DEConfiguration<S, Q, DeletePolicy::TOMBSTONE, SerialScheduler> CONF;
+typedef DynamicExtension<S, Q, DeletePolicy::TOMBSTONE, FIFOScheduler> DE;
+typedef de::DEConfiguration<S, Q, DeletePolicy::TOMBSTONE, FIFOScheduler> CONF;
#include "include/concurrent_extension.h"
diff --git a/tests/de_level_tag.cpp b/tests/de_level_tag.cpp
index 6fead54..da7cf7c 100644
--- a/tests/de_level_tag.cpp
+++ b/tests/de_level_tag.cpp
@@ -20,14 +20,15 @@
#include "framework/reconstruction/LevelingPolicy.h"
#include <check.h>
-using namespace de;
+
+using namespace de;
typedef Rec R;
typedef ISAMTree<R> S;
typedef rq::Query<S> Q;
-
+typedef LevelingPolicy<S, Q> Policy;
typedef DynamicExtension<S, Q, DeletePolicy::TAGGING, SerialScheduler> DE;
-ReconstructionPolicy<S, Q> *recon = new LevelingPolicy<S, Q>(2, 1000);
+typedef de::DEConfiguration<S, Q, DeletePolicy::TAGGING, SerialScheduler> CONF;
#include "include/dynamic_extension.h"
diff --git a/tests/include/concurrent_extension.h b/tests/include/concurrent_extension.h
index fa669d2..f6d01fd 100644
--- a/tests/include/concurrent_extension.h
+++ b/tests/include/concurrent_extension.h
@@ -37,21 +37,19 @@
-using namespace de;
-typedef Rec R;
-typedef ISAMTree<R> S;
-typedef rq::Query<S> Q;
-typedef DynamicExtension<S, Q, DeletePolicy::TOMBSTONE, SerialScheduler> DE;
-typedef de::DEConfiguration<S, Q, DeletePolicy::TOMBSTONE, SerialScheduler> CONF;
+// using namespace de;
+// typedef Rec R;
+// typedef ISAMTree<R> S;
+// typedef rq::Query<S> Q;
+// typedef DynamicExtension<S, Q, DeletePolicy::TOMBSTONE, SerialScheduler> DE;
+// typedef de::DEConfiguration<S, Q, DeletePolicy::TOMBSTONE, SerialScheduler> CONF;
static CONF create_config(size_t type=1) {
if (type == 1) {
auto recon = std::make_unique<LevelingPolicy<S, Q>>(2, 1000);
-
return CONF(std::move(recon));
} else {
auto recon2 = std::make_unique<LevelingPolicy<S, Q>>(4, 10000);
- CONF configuration2 = CONF(std::move(recon2));
return CONF(std::move(recon2));
}
}
diff --git a/tests/include/dynamic_extension.h b/tests/include/dynamic_extension.h
index a1ab20a..c378ea9 100644
--- a/tests/include/dynamic_extension.h
+++ b/tests/include/dynamic_extension.h
@@ -29,6 +29,7 @@
#include "framework/DynamicExtension.h"
#include "framework/scheduling/SerialScheduler.h"
#include "framework/reconstruction/ReconstructionPolicy.h"
+#include "framework/reconstruction/LevelingPolicy.h"
#include "shard/ISAMTree.h"
#include "query/rangequery.h"
#include <check.h>
@@ -39,12 +40,22 @@
// typedef Rec R;
// typedef ISAMTree<R> S;
// typedef rq::Query<S> Q;
-// typedef DynamicExtension<S, Q, DeletePolicy::TAGGING, SerialScheduler> DE;
-// ReconstructionPolicy<S, Q> *recon = new TieringPolicy<S, Q>(1000, 2);
+// typedef DynamicExtension<S, Q, DeletePolicy::TOMBSTONE, SerialScheduler> DE;
+// typedef de::DEConfiguration<S, Q, DeletePolicy::TOMBSTONE, SerialScheduler> CONF;
+
+static CONF create_config(size_t type=1) {
+ if (type == 1) {
+ auto recon = std::make_unique<Policy>(2, 100);
+ return CONF(std::move(recon));
+ } else {
+ auto recon2 = std::make_unique<Policy>(4, 1000);
+ return CONF(std::move(recon2));
+ }
+}
START_TEST(t_create)
{
- auto test_de = new DE(recon, 100);
+ auto test_de = new DE(create_config());
ck_assert_ptr_nonnull(test_de);
ck_assert_int_eq(test_de->get_record_count(), 0);
@@ -57,7 +68,7 @@ END_TEST
START_TEST(t_insert)
{
- auto test_de = new DE(recon, 100);
+ auto test_de = new DE(create_config());
uint64_t key = 0;
uint32_t val = 0;
@@ -78,7 +89,7 @@ END_TEST
START_TEST(t_debug_insert)
{
- auto test_de = new DE(recon, 100);
+ auto test_de = new DE(create_config());
uint64_t key = 0;
uint32_t val = 0;
@@ -97,7 +108,7 @@ END_TEST
START_TEST(t_insert_with_mem_merges)
{
- auto test_de = new DE(recon, 100);
+ auto test_de = new DE(create_config());
uint64_t key = 0;
uint32_t val = 0;
@@ -108,7 +119,7 @@ START_TEST(t_insert_with_mem_merges)
val++;
}
- test_de->await_next_epoch();
+ test_de->await_version();
ck_assert_int_eq(test_de->get_record_count(), 300);
@@ -116,11 +127,13 @@ START_TEST(t_insert_with_mem_merges)
* BSM grows on every flush, so the height will be different than
* normal layout policies
*/
+ /*
if (dynamic_cast<const BSMPolicy<S, Q>*>(recon)) {
ck_assert_int_eq(test_de->get_height(), 2);
} else {
ck_assert_int_eq(test_de->get_height(), 1);
}
+ */
delete test_de;
}
@@ -129,7 +142,7 @@ END_TEST
START_TEST(t_range_query)
{
- auto test_de = new DE(recon, 100);
+ auto test_de = new DE(create_config());
size_t n = 10000;
std::vector<uint64_t> keys;
@@ -146,7 +159,7 @@ START_TEST(t_range_query)
ck_assert_int_eq(test_de->insert(r), 1);
}
- test_de->await_next_epoch();
+ test_de->await_version();
std::sort(keys.begin(), keys.end());
@@ -177,7 +190,7 @@ END_TEST
START_TEST(t_tombstone_merging_01)
{
size_t reccnt = 100000;
- auto test_de = new DE(recon, 100);
+ auto test_de = new DE(create_config());
auto rng = gsl_rng_alloc(gsl_rng_mt19937);
@@ -217,7 +230,7 @@ START_TEST(t_tombstone_merging_01)
}
}
- test_de->await_next_epoch();
+ test_de->await_version();
ck_assert(test_de->validate_tombstone_proportion());
@@ -232,7 +245,7 @@ START_TEST(t_static_structure)
auto rng = gsl_rng_alloc(gsl_rng_mt19937);
size_t reccnt = 100000;
- auto test_de = new DE(recon, 100);
+ auto test_de = new DE(create_config());
std::set<Rec> records;
std::set<Rec> to_delete;