summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2024-05-01 18:51:41 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2024-05-01 18:51:41 -0400
commit96faedaeb92776fd9cc2ed8d8b0878ebc9300cbe (patch)
treea681788b4074b97dedd45778aa79749f6d4204be
parentef2ec17c21cb331c37f25501394b009282604fcf (diff)
downloaddynamic-extension-96faedaeb92776fd9cc2ed8d8b0878ebc9300cbe.tar.gz
Added a Bentley-Saxe layout policy
-rw-r--r--CMakeLists.txt5
-rw-r--r--include/framework/DynamicExtension.h21
-rw-r--r--include/framework/structure/ExtensionStructure.h60
-rw-r--r--include/framework/structure/InternalLevel.h19
-rw-r--r--include/framework/util/Configuration.h3
-rw-r--r--include/util/types.h12
-rw-r--r--tests/de_bsm_tomb.cpp58
7 files changed, 167 insertions, 11 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index d88db24..cdc114f 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -96,6 +96,11 @@ if (tests)
target_link_options(de_level_tomb PUBLIC -mcx16)
target_include_directories(de_level_tomb PRIVATE include external/ctpl external/PLEX/include external/psudb-common/cpp/include external)
+ add_executable(de_bsm_tomb ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_bsm_tomb.cpp)
+ target_link_libraries(de_bsm_tomb PUBLIC gsl check subunit pthread atomic)
+ target_link_options(de_bsm_tomb PUBLIC -mcx16)
+ target_include_directories(de_bsm_tomb PRIVATE include external/PLEX/include external/psudb-common/cpp/include external)
+
add_executable(de_level_concurrent ${CMAKE_CURRENT_SOURCE_DIR}/tests/de_level_concurrent.cpp)
target_link_libraries(de_level_concurrent PUBLIC gsl check subunit pthread atomic)
target_link_options(de_level_concurrent PUBLIC -mcx16)
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 6fd95c6..538ff25 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -54,6 +54,10 @@ public:
, m_next_core(0)
, m_epoch_cnt(0)
{
+ if constexpr (L == LayoutPolicy::BSM) {
+ assert(scale_factor == 2);
+ }
+
auto vers = new Structure(buffer_hwm, m_scale_factor, m_max_delete_prop);
m_current_epoch.store({new _Epoch(0, vers, m_buffer, 0), 0});
m_previous_epoch.store({nullptr, 0});
@@ -487,10 +491,17 @@ private:
((DynamicExtension *) args->extension)->SetThreadAffinity();
Structure *vers = args->epoch->get_structure();
- for (ssize_t i=0; i<args->merges.size(); i++) {
- vers->reconstruction(args->merges[i].target, args->merges[i].source);
+ if constexpr (L == LayoutPolicy::BSM) {
+ if (args->merges.size() > 0) {
+ vers->reconstruction(args->merges[0]);
+ }
+ } else {
+ for (ssize_t i=0; i<args->merges.size(); i++) {
+ vers->reconstruction(args->merges[i].target, args->merges[i].sources[0]);
+ }
}
+
/*
* we'll grab the buffer AFTER doing the internal reconstruction, so we
* can flush as many records as possible in one go. The reconstruction
@@ -628,7 +639,7 @@ private:
static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Structure *vers, BufView *bview) {
if constexpr (Q::SKIP_DELETE_FILTER) {
- return records;
+ return std::move(records);
}
std::vector<Wrapped<R>> processed_records;
@@ -691,6 +702,10 @@ private:
#ifdef _GNU_SOURCE
void SetThreadAffinity() {
+ if constexpr (std::same_as<SCHED, SerialScheduler>) {
+ return;
+ }
+
int core = m_next_core.fetch_add(1) % m_core_cnt;
cpu_set_t mask;
CPU_ZERO(&mask);
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index ffba1c1..b83674b 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -343,6 +343,30 @@ public:
base_level = grow(scratch_state);
}
+ if constexpr (L == LayoutPolicy::BSM) {
+ if (base_level == 0) {
+ return std::move(reconstructions);
+ }
+
+ ReconstructionTask task;
+ task.target = base_level;
+
+ size_t base_reccnt = 0;
+ for (level_index i=base_level; i>source_level; i--) {
+ auto recon_reccnt = scratch_state[i-1].reccnt;
+ base_reccnt += recon_reccnt;
+ scratch_state[i-1].reccnt = 0;
+ scratch_state[i-1].shardcnt = 0;
+ task.add_source(i-1, recon_reccnt);
+ }
+
+ reconstructions.add_reconstruction(task);
+ scratch_state[base_level].reccnt = base_reccnt;
+ scratch_state[base_level].shardcnt = 1;
+
+ return std::move(reconstructions);
+ }
+
/*
* Determine the full set of reconstructions necessary to open up
* space in the source level.
@@ -384,6 +408,33 @@ public:
return std::move(reconstructions);
}
+ inline void reconstruction(ReconstructionTask task) {
+ static_assert(L == LayoutPolicy::BSM);
+ std::vector<InternalLevel<R, Shard, Q>*> levels(task.sources.size());
+ for (size_t i=0; i<task.sources.size(); i++) {
+ levels[i] = m_levels[task.sources[i]].get();
+ }
+
+ auto new_level = InternalLevel<R, Shard, Q>::reconstruction(levels, task.target);
+ if (task.target >= m_levels.size()) {
+ m_current_state.push_back({new_level->get_record_count(), calc_level_record_capacity(task.target),
+ 1, 1});
+ m_levels.emplace_back(new_level);
+ } else {
+ m_current_state[task.target] = {new_level->get_record_count(), calc_level_record_capacity(task.target),
+ 1, 1};
+ m_levels[task.target] = new_level;
+ }
+
+ /* remove all of the levels that have been flattened */
+ for (size_t i=0; i<task.sources.size(); i++) {
+ m_levels[task.sources[i]] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(task.sources[i], 1));
+ m_current_state[task.sources[i]] = {0, calc_level_record_capacity(task.target), 0, 1};
+ }
+
+ return;
+ }
+
/*
* Combine incoming_level with base_level and reconstruct the shard,
* placing it in base_level. The two levels should be sequential--i.e. no
@@ -395,7 +446,6 @@ public:
if (base_level >= m_levels.size()) {
m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(base_level, shard_capacity)));
-
m_current_state.push_back({0, calc_level_record_capacity(base_level),
0, shard_capacity});
}
@@ -418,7 +468,7 @@ public:
m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor));
/*
- * Update the state vector to match the *real* stage following
+ * Update the state vector to match the *real* state following
* the reconstruction
*/
m_current_state[base_level] = {m_levels[base_level]->get_record_count(),
@@ -576,9 +626,11 @@ private:
return false;
}
- if (L == LayoutPolicy::LEVELING) {
+ if constexpr (L == LayoutPolicy::LEVELING) {
return state[idx].reccnt + incoming_rec_cnt <= state[idx].reccap;
- } else {
+ } else if constexpr (L == LayoutPolicy::BSM) {
+ return state[idx].reccnt == 0;
+ } else {
return state[idx].shardcnt < state[idx].shardcap;
}
diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h
index db38946..b962dcc 100644
--- a/include/framework/structure/InternalLevel.h
+++ b/include/framework/structure/InternalLevel.h
@@ -64,6 +64,21 @@ public:
return std::shared_ptr<InternalLevel>(res);
}
+ static std::shared_ptr<InternalLevel> reconstruction(std::vector<InternalLevel*> levels, size_t level_idx) {
+ std::vector<Shard *> shards;
+ for (auto level : levels) {
+ for (auto shard : level->m_shards) {
+ if (shard) shards.emplace_back(shard.get());
+ }
+ }
+
+ auto res = new InternalLevel(level_idx, 1);
+ res->m_shard_cnt = 1;
+ res->m_shards[0] = std::make_shared<S>(shards);
+
+ return std::shared_ptr<InternalLevel>(res);
+ }
+
/*
* Create a new shard combining the records from all of
* the shards in level, and append this new shard into
@@ -185,6 +200,10 @@ public:
}
Shard* get_shard(size_t idx) {
+ if (idx >= m_shard_cnt) {
+ return nullptr;
+ }
+
return m_shards[idx].get();
}
diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h
index 55cc682..4a4524a 100644
--- a/include/framework/util/Configuration.h
+++ b/include/framework/util/Configuration.h
@@ -35,7 +35,8 @@ static thread_local size_t sampling_bailouts = 0;
enum class LayoutPolicy {
LEVELING,
- TEIRING
+ TEIRING,
+ BSM
};
enum class DeletePolicy {
diff --git a/include/util/types.h b/include/util/types.h
index bac0246..cf61412 100644
--- a/include/util/types.h
+++ b/include/util/types.h
@@ -73,10 +73,16 @@ const ShardID INVALID_SHID = {-1, -1};
typedef ssize_t level_index;
-typedef struct {
- level_index source;
+typedef struct ReconstructionTask {
+ std::vector<level_index> sources;
level_index target;
size_t reccnt;
+
+ void add_source(level_index source, size_t cnt) {
+ sources.push_back(source);
+ reccnt += cnt;
+ }
+
} ReconstructionTask;
class ReconstructionVector {
@@ -91,7 +97,7 @@ public:
}
void add_reconstruction(level_index source, level_index target, size_t reccnt) {
- m_tasks.push_back({source, target, reccnt});
+ m_tasks.push_back({{source}, target, reccnt});
total_reccnt += reccnt;
}
diff --git a/tests/de_bsm_tomb.cpp b/tests/de_bsm_tomb.cpp
new file mode 100644
index 0000000..493440e
--- /dev/null
+++ b/tests/de_bsm_tomb.cpp
@@ -0,0 +1,58 @@
+/*
+ * tests/de_level_tomb.cpp
+ *
+ * Unit tests for Dynamic Extension Framework
+ *
+ * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ */
+#include <set>
+#include <random>
+#include <algorithm>
+
+#include "include/testing.h"
+#include "framework/DynamicExtension.h"
+#include "shard/ISAMTree.h"
+#include "query/rangequery.h"
+
+#include <check.h>
+using namespace de;
+
+typedef Rec R;
+typedef DynamicExtension<R, ISAMTree<R>, rq::Query<R, ISAMTree<R>>, LayoutPolicy::BSM, DeletePolicy::TOMBSTONE, SerialScheduler> DE;
+
+#include "include/dynamic_extension.h"
+
+
+Suite *unit_testing()
+{
+ Suite *unit = suite_create("DynamicExtension: Tombstone BSM Testing");
+ inject_dynamic_extension_tests(unit);
+
+ return unit;
+}
+
+
+int shard_unit_tests()
+{
+ int failed = 0;
+ Suite *unit = unit_testing();
+ SRunner *unit_shardner = srunner_create(unit);
+
+ srunner_run_all(unit_shardner, CK_NORMAL);
+ failed = srunner_ntests_failed(unit_shardner);
+ srunner_free(unit_shardner);
+
+ return failed;
+}
+
+
+int main()
+{
+ int unit_failed = shard_unit_tests();
+
+ return (unit_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
+}