summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas B. Rumbaugh <doug@douglasrumbaugh.com>2024-12-22 15:12:13 -0500
committerDouglas B. Rumbaugh <doug@douglasrumbaugh.com>2024-12-22 15:12:13 -0500
commitba65c8976f54d4da2467074235a12f5be0bd5ebc (patch)
tree955d5995f211d8a7a24f7b106912773db5e3a5ba
parent5617bed5257506d3dfda8537b16f44b3e40f1b42 (diff)
downloaddynamic-extension-ba65c8976f54d4da2467074235a12f5be0bd5ebc.tar.gz
Continued development
-rw-r--r--CMakeLists.txt9
-rw-r--r--include/framework/DynamicExtension.h92
-rw-r--r--include/framework/interface/Shard.h2
-rw-r--r--include/framework/reconstruction/BSMPolicy.h22
-rw-r--r--include/framework/reconstruction/LevelingPolicy.h22
-rw-r--r--include/framework/reconstruction/ReconstructionPolicy.h6
-rw-r--r--include/framework/reconstruction/TieringPolicy.h18
-rw-r--r--include/framework/scheduling/Epoch.h8
-rw-r--r--include/framework/structure/ExtensionStructure.h63
-rw-r--r--include/framework/structure/InternalLevel.h28
-rw-r--r--include/framework/structure/MutableBuffer.h22
-rw-r--r--include/shard/Alias.h8
-rw-r--r--include/shard/FSTrie.h6
-rw-r--r--include/shard/ISAMTree.h4
-rw-r--r--include/shard/LoudsPatricia.h6
-rw-r--r--include/shard/PGM.h8
-rw-r--r--include/shard/TrieSpline.h8
-rw-r--r--include/shard/VPTree.h6
-rw-r--r--include/util/SortedMerge.h2
-rw-r--r--tests/de_bsm_tag.cpp5
-rw-r--r--tests/de_bsm_tomb.cpp5
-rw-r--r--tests/de_level_concurrent.cpp5
-rw-r--r--tests/de_level_tag.cpp4
-rw-r--r--tests/de_level_tomb.cpp4
-rw-r--r--tests/de_tier_concurrent.cpp5
-rw-r--r--tests/de_tier_tag.cpp4
-rw-r--r--tests/de_tier_tomb.cpp4
-rw-r--r--tests/include/concurrent_extension.h78
-rw-r--r--tests/include/dynamic_extension.h81
-rw-r--r--tests/include/shard_standard.h4
-rw-r--r--tests/include/shard_string.h2
-rw-r--r--tests/internal_level_tests.cpp95
-rw-r--r--tests/vptree_tests.cpp2
33 files changed, 261 insertions, 377 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 0c1aaa1..e35ae19 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -9,10 +9,10 @@ set(CMAKE_CXX_STANDARD_REQUIRED True)
set(namespace "de")
project("Practical Dynamic Extension" VERSION 0.1.0)
-set(debug false)
+set(debug true)
set(tests True)
set(bench false)
-set(vldb_bench true)
+set(vldb_bench false)
# ALEX doesn't build under C++20
set(build_alex false)
@@ -51,11 +51,6 @@ if (tests)
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/bin/tests")
file(MAKE_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/tests/data")
- add_executable(internal_level_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/internal_level_tests.cpp)
- target_link_libraries(internal_level_tests PUBLIC gsl check subunit pthread atomic)
- target_link_options(internal_level_tests PUBLIC -mcx16)
- target_include_directories(internal_level_tests PRIVATE include external/psudb-common/cpp/include)
-
add_executable(mutable_buffer_tests ${CMAKE_CURRENT_SOURCE_DIR}/tests/mutable_buffer_tests.cpp)
target_link_libraries(mutable_buffer_tests PUBLIC gsl check subunit pthread atomic)
target_link_options(mutable_buffer_tests PUBLIC -mcx16)
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 1886234..c35bb93 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -15,7 +15,6 @@
#include "framework/interface/Scheduler.h"
#include "framework/reconstruction/ReconstructionPolicy.h"
-#include "framework/scheduling/FIFOScheduler.h"
#include "framework/scheduling/SerialScheduler.h"
#include "framework/structure/ExtensionStructure.h"
@@ -28,7 +27,7 @@ namespace de {
template <ShardInterface ShardType, QueryInterface<ShardType> QueryType,
DeletePolicy D = DeletePolicy::TAGGING,
- SchedulerInterface SchedType = de::FIFOScheduler>
+ SchedulerInterface SchedType = de::SerialScheduler>
class DynamicExtension {
private:
/* convenience typedefs for commonly used types within the class */
@@ -76,10 +75,12 @@ public:
* performing compactions and flushes, etc.
*/
DynamicExtension(ReconPolicyType *recon_policy, size_t buffer_low_watermark,
- size_t buffer_high_watermark, size_t memory_budget = 0,
+ size_t buffer_high_watermark = 0, size_t memory_budget = 0,
size_t thread_cnt = 16)
: m_max_delete_prop(1), m_sched(memory_budget, thread_cnt),
- m_buffer(new Buffer(buffer_low_watermark, buffer_high_watermark)),
+ m_buffer(new Buffer(buffer_low_watermark, (buffer_high_watermark == 0)
+ ? buffer_low_watermark
+ : buffer_high_watermark)),
m_core_cnt(thread_cnt), m_next_core(0), m_epoch_cnt(0),
m_recon_policy(recon_policy) {
@@ -165,7 +166,7 @@ public:
auto view = m_buffer->get_buffer_view();
auto epoch = get_active_epoch();
- if (epoch->get_structure()->tagged_delete(rec)) {
+ if (epoch->get_mutable_structure()->tagged_delete(rec)) {
end_job(epoch);
return 1;
}
@@ -303,13 +304,14 @@ public:
auto epoch = get_active_epoch();
auto vers = epoch->get_structure();
- std::vector<ShardType *> shards;
-
- if (vers->get_levels().size() > 0) {
- for (int i = vers->get_levels().size() - 1; i >= 0; i--) {
- if (vers->get_levels()[i] &&
- vers->get_levels()[i]->get_record_count() > 0) {
- shards.emplace_back(vers->get_levels()[i]->get_combined_shard());
+ std::vector<const ShardType *> shards;
+
+ if (vers->get_level_vector().size() > 0) {
+ for (int i = vers->get_level_vector().size() - 1; i >= 0; i--) {
+ if (vers->get_level_vector()[i] &&
+ vers->get_level_vector()[i]->get_record_count() > 0) {
+ shards.emplace_back(
+ vers->get_level_vector()[i]->get_combined_shard());
}
}
}
@@ -358,7 +360,8 @@ public:
*/
bool validate_tombstone_proportion() {
auto epoch = get_active_epoch();
- auto t = epoch->get_structure()->validate_tombstone_proportion();
+ auto t = epoch->get_structure()->validate_tombstone_proportion(
+ m_max_delete_prop);
end_job(epoch);
return t;
}
@@ -370,7 +373,6 @@ public:
void print_scheduler_statistics() const { m_sched.print_statistics(); }
private:
- ReconPolicyType const *m_recon_policy;
double m_max_delete_prop;
SchedType m_sched;
@@ -380,6 +382,8 @@ private:
std::atomic<int> m_next_core;
std::atomic<size_t> m_epoch_cnt;
+ ReconPolicyType const *m_recon_policy;
+
alignas(64) std::atomic<bool> m_reconstruction_scheduled;
std::atomic<epoch_ptr> m_next_epoch;
@@ -547,39 +551,36 @@ private:
auto args = (ReconstructionArgs<ShardType, QueryType> *)arguments;
((DynamicExtension *)args->extension)->SetThreadAffinity();
- Structure *vers = args->epoch->get_structure();
+ Structure *vers = args->epoch->get_mutable_structure();
- for (size_t i=0; i<args->tasks.size(); i++) {
- vers->perform_reconstruction(args->tasks[i]);
- }
+ ReconstructionTask flush_task;
+ flush_task.type = ReconstructionType::Invalid;
- /*
- * we'll grab the buffer AFTER doing the internal reconstruction, so we
- * can flush as many records as possible in one go. The reconstruction
- * was done so as to make room for the full buffer anyway, so there's
- * no real benefit to doing this first.
- */
- auto buffer_view = args->epoch->get_buffer();
- size_t new_head = buffer_view.get_tail();
+ for (size_t i = 0; i < args->tasks.size(); i++) {
+ if (args->tasks[i].sources.size() > 0 &&
+ args->tasks[i].sources[0] == buffer_shid) {
+ flush_task = args->tasks[i];
+ continue;
+ }
- /*
- * if performing a compaction, don't flush the buffer, as
- * there is no guarantee that any necessary reconstructions
- * will free sufficient space in L0 to support a flush
- */
- if (!args->compaction) {
- vers->flush_buffer(std::move(buffer_view));
+ vers->perform_reconstruction(args->tasks[i]);
}
- args->result.set_value(true);
+ if (flush_task.type != ReconstructionType::Invalid) {
+ /*
+ * we'll grab the buffer AFTER doing the internal reconstruction, so we
+ * can flush as many records as possible in one go. The reconstruction
+ * was done so as to make room for the full buffer anyway, so there's
+ * no real benefit to doing this first.
+ */
+ auto buffer_view = args->epoch->get_buffer();
+ size_t new_head = buffer_view.get_tail();
- /*
- * Compactions occur on an epoch _before_ it becomes active,
- * and as a result the active epoch should _not_ be advanced as
- * part of a compaction
- */
- if (!args->compaction) {
+ vers->perform_flush(flush_task, std::move(buffer_view));
+ args->result.set_value(true);
((DynamicExtension *)args->extension)->advance_epoch(new_head);
+ } else {
+ args->result.set_value(true);
}
((DynamicExtension *)args->extension)
@@ -660,10 +661,12 @@ private:
args->tasks = m_recon_policy->get_reconstruction_tasks(
epoch, m_buffer->get_high_watermark());
args->extension = this;
- args->compaction = false;
- /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed
- * here */
+ args->tasks.add_reconstruction(m_recon_policy->get_flush_task(epoch));
+ /*
+ * NOTE: args is deleted by the reconstruction job, so shouldn't be freed
+ * here
+ */
m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION);
}
@@ -691,7 +694,8 @@ private:
return m_buffer->append(rec, ts);
}
-#ifdef _GNU_SOURCE
+//#ifdef _GNU_SOURCE
+#if 0
void SetThreadAffinity() {
if constexpr (std::same_as<SchedType, SerialScheduler>) {
return;
diff --git a/include/framework/interface/Shard.h b/include/framework/interface/Shard.h
index bd980c0..fb5ce1a 100644
--- a/include/framework/interface/Shard.h
+++ b/include/framework/interface/Shard.h
@@ -14,7 +14,7 @@ namespace de {
template <typename SHARD>
concept ShardInterface = RecordInterface<typename SHARD::RECORD> &&
- requires(SHARD shard, const std::vector<SHARD *> &shard_vector, bool b,
+ requires(SHARD shard, const std::vector<const SHARD *> &shard_vector, bool b,
BufferView<typename SHARD::RECORD> bv,
typename SHARD::RECORD rec) {
/* construct a shard from a vector of shards of the same type */
diff --git a/include/framework/reconstruction/BSMPolicy.h b/include/framework/reconstruction/BSMPolicy.h
index 9138bd1..4ceca9a 100644
--- a/include/framework/reconstruction/BSMPolicy.h
+++ b/include/framework/reconstruction/BSMPolicy.h
@@ -16,17 +16,17 @@
namespace de {
template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
-class BSMPolicy : ReconstructionPolicy<ShardType, QueryType> {
+class BSMPolicy : public ReconstructionPolicy<ShardType, QueryType> {
typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>>
LevelVector;
public:
- BSMPolicy(size_t scale_factor, size_t buffer_size)
- : m_scale_factor(scale_factor), m_buffer_size(buffer_size) {}
+ BSMPolicy(size_t buffer_size)
+ : m_scale_factor(2), m_buffer_size(buffer_size) {}
ReconstructionVector
- get_reconstruction_tasks(Epoch<ShardType, QueryType> *epoch,
- size_t incoming_reccnt) override {
+ get_reconstruction_tasks(const Epoch<ShardType, QueryType> *epoch,
+ size_t incoming_reccnt) const override {
ReconstructionVector reconstructions;
auto levels = epoch->get_structure()->get_level_vector();
@@ -44,7 +44,7 @@ public:
task.type = ReconstructionType::Merge;
for (level_index i = target_level; i > source_level; i--) {
- if (i < levels.size()) {
+ if (i < (level_index)levels.size()) {
task.add_shard({i, all_shards_idx}, levels[i]->get_record_count());
}
}
@@ -54,17 +54,17 @@ public:
}
ReconstructionTask
- get_flush_task(Epoch<ShardType, QueryType> *epoch) override {
+ get_flush_task(const Epoch<ShardType, QueryType> *epoch) const override {
return ReconstructionTask{
{{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Flush};
}
private:
- level_index find_reconstruction_target(LevelVector &levels) {
+ level_index find_reconstruction_target(LevelVector &levels) const {
level_index target_level = 0;
- for (size_t i = 0; i < (level_index)levels.size(); i++) {
- if (levels[i].get_record_count() + 1 <= capacity(i)) {
+ for (level_index i = 0; i < (level_index)levels.size(); i++) {
+ if (levels[i]->get_record_count() + 1 <= capacity(i)) {
target_level = i;
break;
}
@@ -73,7 +73,7 @@ private:
return target_level;
}
- inline size_t capacity(level_index level) {
+ inline size_t capacity(level_index level) const {
return m_buffer_size * pow(m_scale_factor, level + 1);
}
diff --git a/include/framework/reconstruction/LevelingPolicy.h b/include/framework/reconstruction/LevelingPolicy.h
index 00f2cff..8bf5645 100644
--- a/include/framework/reconstruction/LevelingPolicy.h
+++ b/include/framework/reconstruction/LevelingPolicy.h
@@ -16,7 +16,7 @@
namespace de {
template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
-class LevelingPolicy : ReconstructionPolicy<ShardType, QueryType> {
+class LevelingPolicy : public ReconstructionPolicy<ShardType, QueryType> {
typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>>
LevelVector;
@@ -25,8 +25,8 @@ public:
: m_scale_factor(scale_factor), m_buffer_size(buffer_size) {}
ReconstructionVector
- get_reconstruction_tasks(Epoch<ShardType, QueryType> *epoch,
- size_t incoming_reccnt) override {
+ get_reconstruction_tasks(const Epoch<ShardType, QueryType> *epoch,
+ size_t incoming_reccnt) const override {
ReconstructionVector reconstructions;
auto levels = epoch->get_structure()->get_level_vector();
@@ -41,7 +41,7 @@ public:
for (level_index i = target_level; i > source_level; i--) {
size_t target_reccnt =
- (i < levels.size()) ? levels[i]->get_record_count() : 0;
+ (i < (level_index)levels.size()) ? levels[i]->get_record_count() : 0;
size_t total_reccnt = levels[i - 1]->get_record_count() + target_reccnt;
reconstructions.add_reconstruction(i - 1, i, total_reccnt,
@@ -52,29 +52,29 @@ public:
}
ReconstructionTask
- get_flush_task(Epoch<ShardType, QueryType> *epoch) override {
+ get_flush_task(const Epoch<ShardType, QueryType> *epoch) const override {
return ReconstructionTask{
- {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Flush};
+ {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Merge};
}
private:
- level_index find_reconstruction_target(LevelVector &levels) {
+ level_index find_reconstruction_target(LevelVector &levels) const {
level_index target_level = 0;
size_t incoming_records = m_buffer_size;
- for (size_t i = 0; i < (level_index)levels.size(); i++) {
- if (levels[i].get_record_count() + incoming_records < capacity(i)) {
+ for (level_index i = 0; i < (level_index)levels.size(); i++) {
+ if (levels[i]->get_record_count() + incoming_records < capacity(i)) {
target_level = i;
break;
}
- incoming_records = levels[i].get_record_count();
+ incoming_records = levels[i]->get_record_count();
}
return target_level;
}
- inline size_t capacity(level_index level) {
+ inline size_t capacity(level_index level) const {
return m_buffer_size * pow(m_scale_factor, level + 1);
}
diff --git a/include/framework/reconstruction/ReconstructionPolicy.h b/include/framework/reconstruction/ReconstructionPolicy.h
index 976091e..aa213df 100644
--- a/include/framework/reconstruction/ReconstructionPolicy.h
+++ b/include/framework/reconstruction/ReconstructionPolicy.h
@@ -23,8 +23,8 @@ class ReconstructionPolicy {
public:
ReconstructionPolicy() {}
- virtual ReconstructionVector get_reconstruction_tasks(Epoch<ShardType, QueryType> *epoch,
- size_t incoming_reccnt) = 0;
- virtual ReconstructionTask get_flush_task(Epoch<ShardType, QueryType> *epoch) = 0;
+ virtual ReconstructionVector get_reconstruction_tasks(const Epoch<ShardType, QueryType> *epoch,
+ size_t incoming_reccnt) const = 0;
+ virtual ReconstructionTask get_flush_task(const Epoch<ShardType, QueryType> *epoch) const = 0;
};
}
diff --git a/include/framework/reconstruction/TieringPolicy.h b/include/framework/reconstruction/TieringPolicy.h
index 120bcb5..71fe9ec 100644
--- a/include/framework/reconstruction/TieringPolicy.h
+++ b/include/framework/reconstruction/TieringPolicy.h
@@ -16,15 +16,15 @@
namespace de {
template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
-class TieringPolicy : ReconstructionPolicy<ShardType, QueryType> {
+class TieringPolicy : public ReconstructionPolicy<ShardType, QueryType> {
typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> LevelVector;
public:
TieringPolicy(size_t scale_factor, size_t buffer_size)
: m_scale_factor(scale_factor), m_buffer_size(buffer_size) {}
ReconstructionVector
- get_reconstruction_tasks(Epoch<ShardType, QueryType> *epoch,
- size_t incoming_reccnt) override {
+ get_reconstruction_tasks(const Epoch<ShardType, QueryType> *epoch,
+ size_t incoming_reccnt) const override {
ReconstructionVector reconstructions;
auto levels = epoch->get_structure()->get_level_vector();
@@ -39,7 +39,7 @@ public:
for (level_index i = target_level; i > source_level; i--) {
size_t target_reccnt =
- (i < levels.size()) ? levels[i]->get_record_count() : 0;
+ (i < (level_index)levels.size()) ? levels[i]->get_record_count() : 0;
size_t total_reccnt = levels[i - 1]->get_record_count() + target_reccnt;
reconstructions.add_reconstruction(i - 1, i, total_reccnt,
@@ -50,17 +50,17 @@ public:
}
ReconstructionTask
- get_flush_task(Epoch<ShardType, QueryType> *epoch) override {
+ get_flush_task(const Epoch<ShardType, QueryType> *epoch) const override {
return ReconstructionTask{
{{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Flush};
}
private:
- level_index find_reconstruction_target(LevelVector &levels) {
+ level_index find_reconstruction_target(LevelVector &levels) const {
level_index target_level = 0;
- for (size_t i = 0; i < (level_index) levels.size(); i++) {
- if (levels[i].get_shard_count() + 1 <= capacity()) {
+ for (level_index i = 0; i < (level_index) levels.size(); i++) {
+ if (levels[i]->get_shard_count() + 1 <= capacity()) {
target_level = i;
break;
}
@@ -69,7 +69,7 @@ private:
return target_level;
}
- inline size_t capacity() {
+ inline size_t capacity() const {
return m_scale_factor;
}
diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h
index 303ab2f..95c64ea 100644
--- a/include/framework/scheduling/Epoch.h
+++ b/include/framework/scheduling/Epoch.h
@@ -54,11 +54,13 @@ public:
Epoch &operator=(const Epoch &) = delete;
Epoch &operator=(Epoch &&) = delete;
- size_t get_epoch_number() { return m_epoch_number; }
+ size_t get_epoch_number() const { return m_epoch_number; }
- Structure *get_structure() { return m_structure; }
+ const Structure *get_structure() const { return m_structure; }
- BufView get_buffer() { return m_buffer->get_buffer_view(m_buffer_head); }
+ Structure *get_mutable_structure() { return m_structure; }
+
+ BufView get_buffer() const { return m_buffer->get_buffer_view(m_buffer_head); }
/*
* Returns a new Epoch object that is a copy of this one. The new object
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index 9b7ae87..3bb8a0b 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -147,14 +147,14 @@ public:
inline void perform_reconstruction(ReconstructionTask task) {
/* perform the reconstruction itself */
- std::vector<ShardType *> shards;
+ std::vector<const ShardType *> shards;
for (ShardID shid : task.sources) {
- assert(shid.level_idx < m_levels.size());
+ assert(shid.level_idx < (level_index) m_levels.size());
assert(shid.shard_idx >= -1);
/* if unspecified, push all shards into the vector */
if (shid.shard_idx == all_shards_idx) {
- for (size_t i = 0; i < m_levels[shid.level_idx].get_shard_count();
+ for (size_t i = 0; i < m_levels[shid.level_idx]->get_shard_count();
i++) {
if (m_levels[shid.level_idx]->get_shard(i)) {
shards.push_back(m_levels[shid.level_idx]->get_shard(i));
@@ -165,7 +165,7 @@ public:
}
}
- auto new_shard = Shard(shards);
+ auto new_shard = new ShardType(shards);
/*
* Remove all of the shards processed by the operation
@@ -181,10 +181,11 @@ public:
/*
* Append the new shard to the target level
*/
- if (task.target < m_levels.size()) {
- m_levels[task.target]->append_shard(new_shard);
- } else {
- m_levels.push_back();
+ if (task.target < (level_index)m_levels.size()) {
+ m_levels[task.target]->append(std::shared_ptr<ShardType>(new_shard));
+ } else { /* grow the structure if needed */
+ m_levels.push_back(std::make_shared<InternalLevel<ShardType, QueryType>>(task.target));
+ m_levels[task.target]->append(std::shared_ptr<ShardType>(new_shard));
}
}
@@ -197,12 +198,20 @@ public:
* the buffer itself. Given that we're unlikely to actually use policies
* like that, we'll leave this as low priority.
*/
- ShardType *buffer_shard = new ShardType(buffer);
- if (task.type == ReconstructionType::Append) {
- m_levels[0]->append(std::shared_ptr(buffer_shard));
+
+ /* insert the first level, if needed */
+ if (m_levels.size() == 0) {
+ m_levels.push_back(
+ std::make_shared<InternalLevel<ShardType, QueryType>>(0));
+ }
+
+ ShardType *buffer_shard = new ShardType(std::move(buffer));
+ if (task.type == ReconstructionType::Append || m_levels[0]->get_shard_count() == 0) {
+ m_levels[0]->append(std::shared_ptr<ShardType>(buffer_shard));
} else {
- std::vector<ShardType *> shards;
- for (size_t i = 0; i < m_levels[0].size(); i++) {
+ std::vector<const ShardType *> shards;
+ for (level_index i = 0; i < (level_index)m_levels[0]->get_shard_count();
+ i++) {
if (m_levels[0]->get_shard(i)) {
shards.push_back(m_levels[0]->get_shard(i));
}
@@ -210,7 +219,7 @@ public:
shards.push_back(buffer_shard);
ShardType *new_shard = new ShardType(shards);
m_levels[0]->truncate();
- m_levels[0]->append(std::shared_ptr(new_shard));
+ m_levels[0]->append(std::shared_ptr<ShardType>(new_shard));
}
}
}
@@ -243,6 +252,32 @@ public:
LevelVector const &get_level_vector() const { return m_levels; }
+
+ /*
+ * Validate that no level in the structure exceeds its maximum tombstone
+ * capacity. This is used to trigger preemptive compactions at the end of
+ * the reconstruction process.
+ */
+ bool validate_tombstone_proportion(double max_delete_prop) const {
+ long double ts_prop;
+ for (size_t i = 0; i < m_levels.size(); i++) {
+ if (m_levels[i]) {
+ ts_prop = (long double)m_levels[i]->get_tombstone_count() /
+ (long double)m_levels[i]->get_record_count();
+ if (ts_prop > (long double)max_delete_prop) {
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+
+ bool validate_tombstone_proportion(level_index level, double max_delete_prop) const {
+ long double ts_prop = (long double) m_levels[level]->get_tombstone_count() / (long double) m_levels[level]->get_record_count();
+ return ts_prop <= (long double) max_delete_prop;
+ }
+
private:
std::atomic<size_t> m_refcnt;
LevelVector m_levels;
diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h
index 8cfcd49..c9d1749 100644
--- a/include/framework/structure/InternalLevel.h
+++ b/include/framework/structure/InternalLevel.h
@@ -40,12 +40,12 @@ public:
*
* No changes are made to this level.
*/
- ShardType *get_combined_shard() {
+ ShardType *get_combined_shard() const {
if (m_shards.size() == 0) {
return nullptr;
}
- std::vector<ShardType *> shards;
+ std::vector<const ShardType *> shards;
for (auto shard : m_shards) {
if (shard)
shards.emplace_back(shard.get());
@@ -57,7 +57,7 @@ public:
void get_local_queries(
std::vector<std::pair<ShardID, ShardType *>> &shards,
std::vector<typename QueryType::LocalQuery *> &local_queries,
- typename QueryType::Parameters *query_parms) {
+ typename QueryType::Parameters *query_parms) const {
for (size_t i = 0; i < m_shards.size(); i++) {
if (m_shards[i]) {
auto local_query =
@@ -68,7 +68,7 @@ public:
}
}
- bool check_tombstone(size_t shard_stop, const RecordType &rec) {
+ bool check_tombstone(size_t shard_stop, const RecordType &rec) const {
if (m_shards.size() == 0)
return false;
@@ -100,7 +100,7 @@ public:
return false;
}
- ShardType *get_shard(size_t idx) {
+ const ShardType *get_shard(size_t idx) const {
if (idx >= m_shards.size()) {
return nullptr;
}
@@ -108,9 +108,9 @@ public:
return m_shards[idx].get();
}
- size_t get_shard_count() { return m_shards.size(); }
+ size_t get_shard_count() const { return m_shards.size(); }
- size_t get_record_count() {
+ size_t get_record_count() const {
size_t cnt = 0;
for (size_t i = 0; i < m_shards.size(); i++) {
if (m_shards[i]) {
@@ -121,7 +121,7 @@ public:
return cnt;
}
- size_t get_tombstone_count() {
+ size_t get_tombstone_count() const {
size_t res = 0;
for (size_t i = 0; i < m_shards.size(); ++i) {
if (m_shards[i]) {
@@ -131,7 +131,7 @@ public:
return res;
}
- size_t get_aux_memory_usage() {
+ size_t get_aux_memory_usage() const {
size_t cnt = 0;
for (size_t i = 0; i < m_shards.size(); i++) {
if (m_shards[i]) {
@@ -142,7 +142,7 @@ public:
return cnt;
}
- size_t get_memory_usage() {
+ size_t get_memory_usage() const {
size_t cnt = 0;
for (size_t i = 0; i < m_shards.size(); i++) {
if (m_shards[i]) {
@@ -153,7 +153,7 @@ public:
return cnt;
}
- double get_tombstone_prop() {
+ double get_tombstone_prop() const {
size_t tscnt = 0;
size_t reccnt = 0;
for (size_t i = 0; i < m_shards.size(); i++) {
@@ -166,10 +166,10 @@ public:
return (double)tscnt / (double)(tscnt + reccnt);
}
- std::shared_ptr<InternalLevel> clone() {
+ std::shared_ptr<InternalLevel> clone() const {
auto new_level = std::make_shared<InternalLevel>(m_level_no);
for (size_t i = 0; i < m_shards.size(); i++) {
- new_level->m_shards[i] = m_shards[i];
+ new_level->append(m_shards[i]);
}
return new_level;
@@ -181,7 +181,7 @@ public:
m_shards.erase(m_shards.begin() + shard);
}
- bool append(std::shared_ptr<ShardType> shard) {
+ void append(std::shared_ptr<ShardType> shard) {
m_shards.emplace_back(shard);
}
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index 625b04b..0eae73d 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -97,15 +97,15 @@ public:
return true;
}
- size_t get_record_count() { return m_tail.load() - m_head.load().head_idx; }
+ size_t get_record_count() const { return m_tail.load() - m_head.load().head_idx; }
- size_t get_capacity() { return m_cap; }
+ size_t get_capacity() const { return m_cap; }
- bool is_full() { return get_record_count() >= m_hwm; }
+ bool is_full() const { return get_record_count() >= m_hwm; }
- bool is_at_low_watermark() { return get_record_count() >= m_lwm; }
+ bool is_at_low_watermark() const { return get_record_count() >= m_lwm; }
- size_t get_tombstone_count() { return m_tscnt.load(); }
+ size_t get_tombstone_count() const { return m_tscnt.load(); }
bool delete_record(const R &rec) {
return get_buffer_view().delete_record(rec);
@@ -115,9 +115,9 @@ public:
return get_buffer_view().check_tombstone(rec);
}
- size_t get_memory_usage() { return m_cap * sizeof(Wrapped<R>); }
+ size_t get_memory_usage() const { return m_cap * sizeof(Wrapped<R>); }
- size_t get_aux_memory_usage() {
+ size_t get_aux_memory_usage() const {
return m_tombstone_filter->get_memory_usage();
}
@@ -200,7 +200,7 @@ public:
m_lwm = lwm;
}
- size_t get_low_watermark() { return m_lwm; }
+ size_t get_low_watermark() const { return m_lwm; }
void set_high_watermark(size_t hwm) {
assert(hwm > m_lwm);
@@ -208,9 +208,9 @@ public:
m_hwm = hwm;
}
- size_t get_high_watermark() { return m_hwm; }
+ size_t get_high_watermark() const { return m_hwm; }
- size_t get_tail() { return m_tail.load(); }
+ size_t get_tail() const { return m_tail.load(); }
/*
* Note: this returns the available physical storage capacity,
@@ -220,7 +220,7 @@ public:
* but a buggy framework implementation may violate the
* assumption.
*/
- size_t get_available_capacity() {
+ size_t get_available_capacity() const {
if (m_old_head.load().refcnt == 0) {
return m_cap - (m_tail.load() - m_head.load().head_idx);
}
diff --git a/include/shard/Alias.h b/include/shard/Alias.h
index 8fe70a5..15b0884 100644
--- a/include/shard/Alias.h
+++ b/include/shard/Alias.h
@@ -70,7 +70,7 @@ public:
}
}
- Alias(std::vector<Alias*> const &shards)
+ Alias(std::vector<const Alias*> const &shards)
: m_data(nullptr)
, m_alias(nullptr)
, m_total_weight(0)
@@ -146,15 +146,15 @@ public:
}
- size_t get_memory_usage() {
+ size_t get_memory_usage() const {
return 0;
}
- size_t get_aux_memory_usage() {
+ size_t get_aux_memory_usage() const {
return (m_bf) ? m_bf->memory_usage() : 0;
}
- W get_total_weight() {
+ W get_total_weight() const {
return m_total_weight;
}
diff --git a/include/shard/FSTrie.h b/include/shard/FSTrie.h
index 4e51037..d720aad 100644
--- a/include/shard/FSTrie.h
+++ b/include/shard/FSTrie.h
@@ -82,7 +82,7 @@ public:
delete[] temp_buffer;
}
- FSTrie(std::vector<FSTrie*> const &shards)
+ FSTrie(std::vector<const FSTrie*> const &shards)
: m_data(nullptr)
, m_reccnt(0)
, m_alloc_size(0)
@@ -181,11 +181,11 @@ public:
}
- size_t get_memory_usage() {
+ size_t get_memory_usage() const {
return m_fst->getMemoryUsage();
}
- size_t get_aux_memory_usage() {
+ size_t get_aux_memory_usage() const {
return m_alloc_size;
}
diff --git a/include/shard/ISAMTree.h b/include/shard/ISAMTree.h
index 64c0b2b..f6b525f 100644
--- a/include/shard/ISAMTree.h
+++ b/include/shard/ISAMTree.h
@@ -65,7 +65,7 @@ public:
}
}
- ISAMTree(std::vector<ISAMTree *> const &shards)
+ ISAMTree(std::vector<const ISAMTree *> const &shards)
: m_bf(nullptr), m_isam_nodes(nullptr), m_root(nullptr), m_reccnt(0),
m_tombstone_cnt(0), m_internal_node_cnt(0), m_deleted_cnt(0),
m_alloc_size(0) {
@@ -93,7 +93,7 @@ public:
delete m_bf;
}
- Wrapped<R> *point_lookup(const R &rec, bool filter = false) {
+ Wrapped<R> *point_lookup(const R &rec, bool filter = false) const {
if (filter && !m_bf->lookup(rec)) {
return nullptr;
}
diff --git a/include/shard/LoudsPatricia.h b/include/shard/LoudsPatricia.h
index 3452839..fe0c30e 100644
--- a/include/shard/LoudsPatricia.h
+++ b/include/shard/LoudsPatricia.h
@@ -82,7 +82,7 @@ public:
delete[] temp_buffer;
}
- LoudsPatricia(std::vector<LoudsPatricia*> &shards)
+ LoudsPatricia(std::vector<const LoudsPatricia*> &shards)
: m_data(nullptr)
, m_reccnt(0)
, m_alloc_size(0)
@@ -178,11 +178,11 @@ public:
}
- size_t get_memory_usage() {
+ size_t get_memory_usage() const {
return m_louds->size();
}
- size_t get_aux_memory_usage() {
+ size_t get_aux_memory_usage() const {
return m_alloc_size;
}
diff --git a/include/shard/PGM.h b/include/shard/PGM.h
index 7d1f492..5b39ab4 100644
--- a/include/shard/PGM.h
+++ b/include/shard/PGM.h
@@ -111,7 +111,7 @@ public:
}
}
- PGM(std::vector<PGM*> const &shards)
+ PGM(std::vector<const PGM*> const &shards)
: m_data(nullptr)
, m_bf(nullptr)
, m_reccnt(0)
@@ -190,7 +190,7 @@ public:
delete m_bf;
}
- Wrapped<R> *point_lookup(const R &rec, bool filter=false) {
+ Wrapped<R> *point_lookup(const R &rec, bool filter=false) const {
size_t idx = get_lower_bound(rec.key);
if (idx >= m_reccnt) {
return nullptr;
@@ -223,11 +223,11 @@ public:
}
- size_t get_memory_usage() {
+ size_t get_memory_usage() const {
return m_pgm.size_in_bytes();
}
- size_t get_aux_memory_usage() {
+ size_t get_aux_memory_usage() const {
return (m_bf) ? m_bf->memory_usage() : 0;
}
diff --git a/include/shard/TrieSpline.h b/include/shard/TrieSpline.h
index 9d8c3bb..7f4d4e5 100644
--- a/include/shard/TrieSpline.h
+++ b/include/shard/TrieSpline.h
@@ -124,7 +124,7 @@ public:
}
}
- TrieSpline(std::vector<TrieSpline*> const &shards)
+ TrieSpline(std::vector<const TrieSpline*> const &shards)
: m_reccnt(0)
, m_tombstone_cnt(0)
, m_alloc_size(0)
@@ -229,7 +229,7 @@ public:
delete m_bf;
}
- Wrapped<R> *point_lookup(const R &rec, bool filter=false) {
+ Wrapped<R> *point_lookup(const R &rec, bool filter=false) const {
if (filter && m_bf && !m_bf->lookup(rec)) {
return nullptr;
}
@@ -266,11 +266,11 @@ public:
}
- size_t get_memory_usage() {
+ size_t get_memory_usage() const {
return m_ts.GetSize();
}
- size_t get_aux_memory_usage() {
+ size_t get_aux_memory_usage() const {
return (m_bf) ? m_bf->memory_usage() : 0;
}
diff --git a/include/shard/VPTree.h b/include/shard/VPTree.h
index 477db5c..7130efe 100644
--- a/include/shard/VPTree.h
+++ b/include/shard/VPTree.h
@@ -86,7 +86,7 @@ public:
}
}
- VPTree(std::vector<VPTree*> shards)
+ VPTree(std::vector<const VPTree*> shards)
: m_reccnt(0), m_tombstone_cnt(0), m_node_cnt(0), m_root(nullptr) {
size_t attemp_reccnt = 0;
@@ -174,11 +174,11 @@ public:
return m_data + idx;
}
- size_t get_memory_usage() {
+ size_t get_memory_usage() const {
return m_node_cnt * sizeof(vpnode) + m_reccnt * sizeof(R*);
}
- size_t get_aux_memory_usage() {
+ size_t get_aux_memory_usage() const {
// FIXME: need to return the size of the unordered_map
return 0;
}
diff --git a/include/util/SortedMerge.h b/include/util/SortedMerge.h
index b0a3215..c41a7ae 100644
--- a/include/util/SortedMerge.h
+++ b/include/util/SortedMerge.h
@@ -51,7 +51,7 @@ struct merge_info {
*/
template <RecordInterface R, ShardInterface S>
static std::vector<Cursor<Wrapped<R>>>
-build_cursor_vec(std::vector<S *> const &shards, size_t *reccnt,
+build_cursor_vec(std::vector<const S *> const &shards, size_t *reccnt,
size_t *tscnt) {
std::vector<Cursor<Wrapped<R>>> cursors;
cursors.reserve(shards.size());
diff --git a/tests/de_bsm_tag.cpp b/tests/de_bsm_tag.cpp
index 4063cfe..cc76f05 100644
--- a/tests/de_bsm_tag.cpp
+++ b/tests/de_bsm_tag.cpp
@@ -18,6 +18,8 @@
#include "shard/ISAMTree.h"
#include "query/rangequery.h"
+#include "framework/reconstruction/BSMPolicy.h"
+
#include <check.h>
using namespace de;
@@ -25,7 +27,8 @@ typedef Rec R;
typedef ISAMTree<R> S;
typedef rq::Query<S> Q;
-typedef DynamicExtension<S, Q, LayoutPolicy::BSM, DeletePolicy::TAGGING, SerialScheduler> DE;
+typedef DynamicExtension<S, Q, DeletePolicy::TAGGING, SerialScheduler> DE;
+ReconstructionPolicy<S,Q> *recon = new BSMPolicy<S,Q>(1000);
#include "include/dynamic_extension.h"
diff --git a/tests/de_bsm_tomb.cpp b/tests/de_bsm_tomb.cpp
index 3a24e87..205de4d 100644
--- a/tests/de_bsm_tomb.cpp
+++ b/tests/de_bsm_tomb.cpp
@@ -18,6 +18,8 @@
#include "shard/ISAMTree.h"
#include "query/rangequery.h"
+#include "framework/reconstruction/BSMPolicy.h"
+
#include <check.h>
using namespace de;
@@ -25,7 +27,8 @@ typedef Rec R;
typedef ISAMTree<R> S;
typedef rq::Query<S> Q;
-typedef DynamicExtension<S, Q, LayoutPolicy::BSM, DeletePolicy::TOMBSTONE, SerialScheduler> DE;
+typedef DynamicExtension<S, Q, DeletePolicy::TOMBSTONE, SerialScheduler> DE;
+ReconstructionPolicy<S, Q> *recon = new BSMPolicy<S, Q>(1000);
#include "include/dynamic_extension.h"
diff --git a/tests/de_level_concurrent.cpp b/tests/de_level_concurrent.cpp
index afd1af2..a76285b 100644
--- a/tests/de_level_concurrent.cpp
+++ b/tests/de_level_concurrent.cpp
@@ -17,6 +17,7 @@
#include "framework/DynamicExtension.h"
#include "shard/ISAMTree.h"
#include "query/rangequery.h"
+#include "framework/reconstruction/LevelingPolicy.h"
#include <check.h>
using namespace de;
@@ -25,7 +26,9 @@ typedef Rec R;
typedef ISAMTree<R> S;
typedef rq::Query<S> Q;
-typedef DynamicExtension<S, Q, LayoutPolicy::LEVELING, DeletePolicy::TOMBSTONE, SerialScheduler> DE;
+typedef DynamicExtension<S, Q, DeletePolicy::TOMBSTONE, SerialScheduler> DE;
+ReconstructionPolicy<S, Q> *recon = new LevelingPolicy<S, Q>(2, 1000);
+ReconstructionPolicy<S, Q> *recon2 = new LevelingPolicy<S, Q>(4, 10000);
#include "include/concurrent_extension.h"
diff --git a/tests/de_level_tag.cpp b/tests/de_level_tag.cpp
index c175357..6fead54 100644
--- a/tests/de_level_tag.cpp
+++ b/tests/de_level_tag.cpp
@@ -17,6 +17,7 @@
#include "framework/DynamicExtension.h"
#include "shard/ISAMTree.h"
#include "query/rangequery.h"
+#include "framework/reconstruction/LevelingPolicy.h"
#include <check.h>
using namespace de;
@@ -25,7 +26,8 @@ typedef Rec R;
typedef ISAMTree<R> S;
typedef rq::Query<S> Q;
-typedef DynamicExtension<S, Q, LayoutPolicy::LEVELING, DeletePolicy::TAGGING, SerialScheduler> DE;
+typedef DynamicExtension<S, Q, DeletePolicy::TAGGING, SerialScheduler> DE;
+ReconstructionPolicy<S, Q> *recon = new LevelingPolicy<S, Q>(2, 1000);
#include "include/dynamic_extension.h"
diff --git a/tests/de_level_tomb.cpp b/tests/de_level_tomb.cpp
index e587817..31e87a0 100644
--- a/tests/de_level_tomb.cpp
+++ b/tests/de_level_tomb.cpp
@@ -18,6 +18,7 @@
#include "shard/ISAMTree.h"
#include "query/rangequery.h"
#include "shard/TrieSpline.h"
+#include "framework/reconstruction/LevelingPolicy.h"
#include <check.h>
using namespace de;
@@ -26,7 +27,8 @@ typedef Rec R;
typedef ISAMTree<R> S;
typedef rq::Query<S> Q;
-typedef DynamicExtension<S, Q, LayoutPolicy::LEVELING, DeletePolicy::TOMBSTONE, SerialScheduler> DE;
+typedef DynamicExtension<S, Q, DeletePolicy::TOMBSTONE, SerialScheduler> DE;
+ReconstructionPolicy<S, Q> *recon = new LevelingPolicy<S, Q>(2, 1000);
#include "include/dynamic_extension.h"
diff --git a/tests/de_tier_concurrent.cpp b/tests/de_tier_concurrent.cpp
index ce41dbc..cba84f1 100644
--- a/tests/de_tier_concurrent.cpp
+++ b/tests/de_tier_concurrent.cpp
@@ -18,6 +18,7 @@
#include "shard/ISAMTree.h"
#include "query/rangequery.h"
#include "framework/scheduling//FIFOScheduler.h"
+#include "framework/reconstruction/TieringPolicy.h"
#include <check.h>
using namespace de;
@@ -26,7 +27,9 @@ typedef Rec R;
typedef ISAMTree<R> S;
typedef rq::Query<S> Q;
-typedef DynamicExtension<S, Q, LayoutPolicy::TEIRING, DeletePolicy::TOMBSTONE, FIFOScheduler> DE;
+typedef DynamicExtension<S, Q, DeletePolicy::TOMBSTONE, FIFOScheduler> DE;
+ReconstructionPolicy<S, Q> *recon = new TieringPolicy<S, Q>(2, 1000);
+ReconstructionPolicy<S, Q> *recon2 = new TieringPolicy<S, Q>(4, 10000);
#include "include/concurrent_extension.h"
diff --git a/tests/de_tier_tag.cpp b/tests/de_tier_tag.cpp
index 97a5299..d640d10 100644
--- a/tests/de_tier_tag.cpp
+++ b/tests/de_tier_tag.cpp
@@ -18,6 +18,7 @@
#include "framework/scheduling/SerialScheduler.h"
#include "shard/ISAMTree.h"
#include "query/rangequery.h"
+#include "framework/reconstruction/TieringPolicy.h"
#include <check.h>
using namespace de;
@@ -26,7 +27,8 @@ typedef Rec R;
typedef ISAMTree<R> S;
typedef rq::Query<S> Q;
-typedef DynamicExtension<S, Q, LayoutPolicy::TEIRING, DeletePolicy::TAGGING, SerialScheduler> DE;
+typedef DynamicExtension<S, Q, DeletePolicy::TAGGING, SerialScheduler> DE;
+ReconstructionPolicy<S, Q> *recon = new TieringPolicy<S, Q>(2, 1000);
#include "include/dynamic_extension.h"
diff --git a/tests/de_tier_tomb.cpp b/tests/de_tier_tomb.cpp
index 930d0d5..42b0625 100644
--- a/tests/de_tier_tomb.cpp
+++ b/tests/de_tier_tomb.cpp
@@ -18,6 +18,7 @@
#include "shard/ISAMTree.h"
#include "shard/TrieSpline.h"
#include "query/rangequery.h"
+#include "framework/reconstruction/TieringPolicy.h"
#include <check.h>
using namespace de;
@@ -25,7 +26,8 @@ using namespace de;
typedef Rec R;
typedef ISAMTree<R> S;
typedef rq::Query<S> Q;
-typedef DynamicExtension<S, Q, LayoutPolicy::TEIRING, DeletePolicy::TOMBSTONE, SerialScheduler> DE;
+typedef DynamicExtension<S, Q, DeletePolicy::TOMBSTONE, SerialScheduler> DE;
+ReconstructionPolicy<S, Q> *recon = new TieringPolicy<S, Q>(2, 1000);
#include "include/dynamic_extension.h"
diff --git a/tests/include/concurrent_extension.h b/tests/include/concurrent_extension.h
index d99cd23..84f816d 100644
--- a/tests/include/concurrent_extension.h
+++ b/tests/include/concurrent_extension.h
@@ -22,24 +22,28 @@
* should be included in the source file that includes this one, above the
* include statement.
*/
-// #include "testing.h"
-// #include "framework/DynamicExtension.h"
-// //#include "framework/scheduling/FIFOScheduler.h"
-// #include "shard/ISAMTree.h"
-// #include "query/rangequery.h"
-// #include <check.h>
-// #include <set>
-// #include <random>
+#include "framework/reconstruction/ReconstructionPolicy.h"
+#include "framework/reconstruction/TieringPolicy.h"
+#include "testing.h"
+#include "framework/DynamicExtension.h"
+#include "framework/scheduling/FIFOScheduler.h"
+#include "shard/ISAMTree.h"
+#include "query/rangequery.h"
+#include <check.h>
+#include <set>
+#include <random>
// using namespace de;
// typedef Rec R;
// typedef ISAMTree<R> S;
// typedef rq::Query<S> Q;
-// typedef DynamicExtension<S, Q, LayoutPolicy::LEVELING, DeletePolicy::TOMBSTONE> DE; //, FIFOScheduler> DE;
+// typedef DynamicExtension<S, Q, DeletePolicy::TOMBSTONE, FIFOScheduler> DE;
+// ReconstructionPolicy<S,Q> *recon = new TieringPolicy<S, Q>(2, 1000);
+// ReconstructionPolicy<S,Q> *recon2 = new TieringPolicy<S, Q>(4, 10000);
START_TEST(t_create)
{
- auto test_de = new DE(100, 1000, 2);
+ auto test_de = new DE(recon, 100, 1000);
ck_assert_ptr_nonnull(test_de);
ck_assert_int_eq(test_de->get_record_count(), 0);
@@ -52,7 +56,7 @@ END_TEST
START_TEST(t_insert)
{
- auto test_de = new DE(100, 1000, 2);
+ auto test_de = new DE(recon, 100, 1000);
uint64_t key = 0;
uint32_t val = 0;
@@ -73,7 +77,7 @@ END_TEST
START_TEST(t_debug_insert)
{
- auto test_de = new DE(100, 1000, 2);
+ auto test_de = new DE(recon, 100, 1000);
uint64_t key = 0;
uint32_t val = 0;
@@ -92,7 +96,7 @@ END_TEST
START_TEST(t_insert_with_mem_merges)
{
- auto test_de = new DE(100, 1000, 2);
+ auto test_de = new DE(recon, 100, 1000);
uint64_t key = 0;
uint32_t val = 0;
@@ -135,7 +139,7 @@ END_TEST
START_TEST(t_range_query)
{
- auto test_de = new DE(1000, 10000, 4);
+ auto test_de = new DE(recon2, 1000, 10000);
size_t n = 10000000;
std::vector<uint64_t> keys;
@@ -189,7 +193,7 @@ END_TEST
START_TEST(t_tombstone_merging_01)
{
size_t reccnt = 100000;
- auto test_de = new DE(100, 1000, 2);
+ auto test_de = new DE(recon, 100, 1000);
auto rng = gsl_rng_alloc(gsl_rng_mt19937);
@@ -242,54 +246,12 @@ START_TEST(t_tombstone_merging_01)
}
END_TEST
-DE *create_test_tree(size_t reccnt, size_t memlevel_cnt) {
- auto rng = gsl_rng_alloc(gsl_rng_mt19937);
-
- auto test_de = new DE(1000, 10000, 2);
-
- std::set<R> records;
- std::set<R> to_delete;
- std::set<R> deleted;
-
- while (records.size() < reccnt) {
- uint64_t key = rand();
- uint32_t val = rand();
-
- if (records.find({key, val}) != records.end()) continue;
-
- records.insert({key, val});
- }
-
- for (auto rec : records) {
- ck_assert_int_eq(test_de->insert(rec), 1);
-
- if (gsl_rng_uniform(rng) < 0.05 && !to_delete.empty()) {
- std::vector<R> del_vec;
- std::sample(to_delete.begin(), to_delete.end(), std::back_inserter(del_vec), 3, std::mt19937{std::random_device{}()});
-
- for (size_t i=0; i<del_vec.size(); i++) {
- test_de->erase(del_vec[i]);
- to_delete.erase(del_vec[i]);
- deleted.insert(del_vec[i]);
- }
- }
-
- if (gsl_rng_uniform(rng) < 0.25 && deleted.find(rec) == deleted.end()) {
- to_delete.insert(rec);
- }
- }
-
- gsl_rng_free(rng);
-
- return test_de;
-}
-
START_TEST(t_static_structure)
{
auto rng = gsl_rng_alloc(gsl_rng_mt19937);
size_t reccnt = 100000;
- auto test_de = new DE(100, 1000, 2);
+ auto test_de = new DE(recon, 100, 1000);
std::set<R> records;
std::set<R> to_delete;
diff --git a/tests/include/dynamic_extension.h b/tests/include/dynamic_extension.h
index 326bb72..a1ab20a 100644
--- a/tests/include/dynamic_extension.h
+++ b/tests/include/dynamic_extension.h
@@ -23,26 +23,28 @@
* include statement.
*/
-// #include "testing.h"
-// #include "framework/DynamicExtension.h"
-// #include "framework/scheduling/SerialScheduler.h"
-// #include "shard/ISAMTree.h"
-// #include "query/rangequery.h"
-// #include <check.h>
-// #include <random>
-// #include <set>
+#include "framework/reconstruction/BSMPolicy.h"
+#include "framework/reconstruction/TieringPolicy.h"
+#include "testing.h"
+#include "framework/DynamicExtension.h"
+#include "framework/scheduling/SerialScheduler.h"
+#include "framework/reconstruction/ReconstructionPolicy.h"
+#include "shard/ISAMTree.h"
+#include "query/rangequery.h"
+#include <check.h>
+#include <random>
+#include <set>
// using namespace de;
// typedef Rec R;
// typedef ISAMTree<R> S;
// typedef rq::Query<S> Q;
-// typedef DynamicExtension<S, Q, LayoutPolicy::TEIRING, DeletePolicy::TAGGING, SerialScheduler> DE;
+// typedef DynamicExtension<S, Q, DeletePolicy::TAGGING, SerialScheduler> DE;
+// ReconstructionPolicy<S, Q> *recon = new TieringPolicy<S, Q>(1000, 2);
-
-#include "framework/util/Configuration.h"
START_TEST(t_create)
{
- auto test_de = new DE(100, 1000, 2);
+ auto test_de = new DE(recon, 100);
ck_assert_ptr_nonnull(test_de);
ck_assert_int_eq(test_de->get_record_count(), 0);
@@ -55,7 +57,7 @@ END_TEST
START_TEST(t_insert)
{
- auto test_de = new DE(100, 1000, 2);
+ auto test_de = new DE(recon, 100);
uint64_t key = 0;
uint32_t val = 0;
@@ -76,7 +78,7 @@ END_TEST
START_TEST(t_debug_insert)
{
- auto test_de = new DE(100, 1000, 2);
+ auto test_de = new DE(recon, 100);
uint64_t key = 0;
uint32_t val = 0;
@@ -95,7 +97,7 @@ END_TEST
START_TEST(t_insert_with_mem_merges)
{
- auto test_de = new DE(100, 1000, 2);
+ auto test_de = new DE(recon, 100);
uint64_t key = 0;
uint32_t val = 0;
@@ -114,7 +116,7 @@ START_TEST(t_insert_with_mem_merges)
* BSM grows on every flush, so the height will be different than
* normal layout policies
*/
- if (test_de->Layout == de::LayoutPolicy::BSM) {
+ if (dynamic_cast<const BSMPolicy<S, Q>*>(recon)) {
ck_assert_int_eq(test_de->get_height(), 2);
} else {
ck_assert_int_eq(test_de->get_height(), 1);
@@ -127,7 +129,7 @@ END_TEST
START_TEST(t_range_query)
{
- auto test_de = new DE(100, 1000, 2);
+ auto test_de = new DE(recon, 100);
size_t n = 10000;
std::vector<uint64_t> keys;
@@ -175,7 +177,7 @@ END_TEST
START_TEST(t_tombstone_merging_01)
{
size_t reccnt = 100000;
- auto test_de = new DE(100, 1000, 2);
+ auto test_de = new DE(recon, 100);
auto rng = gsl_rng_alloc(gsl_rng_mt19937);
@@ -224,54 +226,13 @@ START_TEST(t_tombstone_merging_01)
}
END_TEST
-[[maybe_unused]] static DE *create_test_tree(size_t reccnt, size_t memlevel_cnt) {
- auto rng = gsl_rng_alloc(gsl_rng_mt19937);
-
- auto test_de = new DE(1000, 10000, 2);
-
- std::set<Rec> records;
- std::set<Rec> to_delete;
- std::set<Rec> deleted;
-
- while (records.size() < reccnt) {
- uint64_t key = rand();
- uint32_t val = rand();
-
- if (records.find({key, val}) != records.end()) continue;
-
- records.insert({key, val});
- }
-
- for (auto rec : records) {
- ck_assert_int_eq(test_de->insert(rec), 1);
-
- if (gsl_rng_uniform(rng) < 0.05 && !to_delete.empty()) {
- std::vector<Rec> del_vec;
- std::sample(to_delete.begin(), to_delete.end(), std::back_inserter(del_vec), 3, std::mt19937{std::random_device{}()});
-
- for (size_t i=0; i<del_vec.size(); i++) {
- test_de->erase(del_vec[i]);
- to_delete.erase(del_vec[i]);
- deleted.insert(del_vec[i]);
- }
- }
-
- if (gsl_rng_uniform(rng) < 0.25 && deleted.find(rec) == deleted.end()) {
- to_delete.insert(rec);
- }
- }
-
- gsl_rng_free(rng);
-
- return test_de;
-}
START_TEST(t_static_structure)
{
auto rng = gsl_rng_alloc(gsl_rng_mt19937);
size_t reccnt = 100000;
- auto test_de = new DE(100, 1000, 2);
+ auto test_de = new DE(recon, 100);
std::set<Rec> records;
std::set<Rec> to_delete;
diff --git a/tests/include/shard_standard.h b/tests/include/shard_standard.h
index de43edc..0b5ab00 100644
--- a/tests/include/shard_standard.h
+++ b/tests/include/shard_standard.h
@@ -77,7 +77,7 @@ START_TEST(t_shard_init)
auto shard2 = new Shard(mbuffer2->get_buffer_view());
auto shard3 = new Shard(mbuffer3->get_buffer_view());
- std::vector<Shard*> shards = {shard1, shard2, shard3};
+ std::vector<const Shard*> shards = {shard1, shard2, shard3};
auto shard4 = new Shard(shards);
ck_assert_int_eq(shard4->get_record_count(), n * 3);
@@ -130,7 +130,7 @@ START_TEST(t_full_cancelation)
ck_assert_int_eq(shard_ts->get_record_count(), n);
ck_assert_int_eq(shard_ts->get_tombstone_count(), n);
- std::vector<Shard *> shards = {shard, shard_ts};
+ std::vector<const Shard *> shards = {shard, shard_ts};
Shard* merged = new Shard(shards);
diff --git a/tests/include/shard_string.h b/tests/include/shard_string.h
index 2ef4cec..7a3d761 100644
--- a/tests/include/shard_string.h
+++ b/tests/include/shard_string.h
@@ -67,7 +67,7 @@ START_TEST(t_shard_init)
auto shard2 = new Shard(mbuffer2->get_buffer_view());
auto shard3 = new Shard(mbuffer3->get_buffer_view());
- std::vector<Shard*> shards = {shard1, shard2, shard3};
+ std::vector<const Shard*> shards = {shard1, shard2, shard3};
auto shard4 = new Shard(shards);
ck_assert_int_eq(shard4->get_record_count(), n * 3);
diff --git a/tests/internal_level_tests.cpp b/tests/internal_level_tests.cpp
deleted file mode 100644
index e11b7c7..0000000
--- a/tests/internal_level_tests.cpp
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * tests/internal_level_tests.cpp
- *
- * Unit tests for InternalLevel
- *
- * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
- * Dong Xie <dongx@psu.edu>
- *
- * Distributed under the Modified BSD License.
- *
- */
-#include "shard/ISAMTree.h"
-#include "query/rangequery.h"
-#include "framework/structure/InternalLevel.h"
-#include "framework/interface/Record.h"
-#include "framework/interface/Query.h"
-#include "framework/interface/Shard.h"
-
-#include "include/testing.h"
-
-#include <check.h>
-
-using namespace de;
-
-typedef InternalLevel<ISAMTree<Rec>, rq::Query<ISAMTree<Rec>>> ILevel;
-
-START_TEST(t_memlevel_merge)
-{
- auto tbl1 = create_test_mbuffer<Rec>(100);
- auto tbl2 = create_test_mbuffer<Rec>(100);
-
- auto base_level = new ILevel(1, 1);
- base_level->append_buffer(tbl1->get_buffer_view());
- ck_assert_int_eq(base_level->get_record_count(), 100);
-
- auto merging_level = new ILevel(0, 1);
- merging_level->append_buffer(tbl2->get_buffer_view());
- ck_assert_int_eq(merging_level->get_record_count(), 100);
-
- auto new_level = ILevel::reconstruction(base_level, merging_level);
-
- delete merging_level;
- ck_assert_int_eq(new_level->get_record_count(), 200);
-
- delete base_level;
- delete tbl1;
- delete tbl2;
-}
-
-
-ILevel *create_test_memlevel(size_t reccnt) {
- auto tbl1 = create_test_mbuffer<Rec>(reccnt/2);
- auto tbl2 = create_test_mbuffer<Rec>(reccnt/2);
-
- auto base_level = new ILevel(1, 2);
- base_level->append_buffer(tbl1->get_buffer_view());
- base_level->append_buffer(tbl2->get_buffer_view());
-
- delete tbl1;
- delete tbl2;
-
- return base_level;
-}
-
-Suite *unit_testing()
-{
- Suite *unit = suite_create("InternalLevel Unit Testing");
-
- TCase *merge = tcase_create("de::InternalLevel::reconstruction Testing");
- tcase_add_test(merge, t_memlevel_merge);
- suite_add_tcase(unit, merge);
-
- return unit;
-}
-
-int run_unit_tests()
-{
- int failed = 0;
- Suite *unit = unit_testing();
- SRunner *unit_runner = srunner_create(unit);
-
- srunner_run_all(unit_runner, CK_NORMAL);
- failed = srunner_ntests_failed(unit_runner);
- srunner_free(unit_runner);
-
- return failed;
-}
-
-
-int main()
-{
- int unit_failed = run_unit_tests();
-
- return (unit_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
-}
diff --git a/tests/vptree_tests.cpp b/tests/vptree_tests.cpp
index 53bb526..49964e5 100644
--- a/tests/vptree_tests.cpp
+++ b/tests/vptree_tests.cpp
@@ -52,7 +52,7 @@ START_TEST(t_wss_init)
auto shard2 = new Shard(mbuffer2->get_buffer_view());
auto shard3 = new Shard(mbuffer3->get_buffer_view());
- std::vector<Shard *> shards = {shard1, shard2, shard3};
+ std::vector<const Shard *> shards = {shard1, shard2, shard3};
auto shard4 = new Shard(shards);
ck_assert_int_eq(shard4->get_record_count(), n * 3);