summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-09-25 10:49:36 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2023-09-25 10:49:36 -0400
commit7c03d771475421c1d5a2bbc135242536af1a371c (patch)
tree94856ac950662c564608ad3cdc5b59bfd08b955c
parent754372aeccb74815cbb16f32ceacb04b4c5aaba9 (diff)
downloaddynamic-extension-7c03d771475421c1d5a2bbc135242536af1a371c.tar.gz
Re-structuring Project + scheduling updates
This is a big one--probably should have split it apart, but I'm feeling lazy this morning. * Organized the mess of header files in include/framework by splitting them out into their own subdirectories, and renaming a few files to remove redundancies introduced by the directory structure. * Introduced a new framework/ShardRequirements.h header file for simpler shard development. This header simply contains the necessary includes from framework/* for creating shard files. This should help to remove structural dependencies from the framework file structure and shards, as well as centralizing the necessary framework files to make shard development easier. * Created a (currently dummy) SchedulerInterface, and make the scheduler implementation a template parameter of the dynamic extension for easier testing of various scheduling policies. There's still more work to be done to fully integrate the scheduler (queries, multiple buffers), but some more of the necessary framework code for this has been added as well. * Adjusted the Task interface setup for the scheduler. The task structures have been removed from ExtensionStructure and placed in their own header file. Additionally, I started experimenting with using std::variant, as opposed to inheritence, to implement subtype polymorphism on the Merge and Query tasks. The scheduler now has a general task queue that contains both, and std::variant, std::visit, and std::get are used to manipulate them without virtual functions. * Removed Alex.h, as it can't build anyway. There's a branch out there containing the Alex implementation stripped of the C++20 stuff. So there's no need to keep it here.
-rw-r--r--include/framework/DynamicExtension.h24
-rw-r--r--include/framework/ShardRequirements.h9
-rw-r--r--include/framework/interface/Query.h (renamed from include/framework/QueryInterface.h)0
-rw-r--r--include/framework/interface/Record.h (renamed from include/framework/RecordInterface.h)0
-rw-r--r--include/framework/interface/Scheduler.h31
-rw-r--r--include/framework/interface/Shard.h (renamed from include/framework/ShardInterface.h)2
-rw-r--r--include/framework/scheduling/Scheduler.h (renamed from include/framework/Scheduler.h)15
-rw-r--r--include/framework/scheduling/SerialScheduler.h227
-rw-r--r--include/framework/scheduling/Task.h63
-rw-r--r--include/framework/structure/ExtensionStructure.h (renamed from include/framework/ExtensionStructure.h)29
-rw-r--r--include/framework/structure/InternalLevel.h (renamed from include/framework/InternalLevel.h)8
-rw-r--r--include/framework/structure/MutableBuffer.h (renamed from include/framework/MutableBuffer.h)2
-rw-r--r--include/framework/util/Configuration.h (renamed from include/framework/Configuration.h)0
-rw-r--r--include/shard/Alex.h360
-rw-r--r--include/shard/MemISAM.h3
-rw-r--r--include/shard/PGM.h6
-rw-r--r--include/shard/TrieSpline.h5
-rw-r--r--include/shard/VPTree.h6
-rw-r--r--include/shard/WIRS.h6
-rw-r--r--include/shard/WSS.h6
-rw-r--r--include/util/Cursor.h2
-rw-r--r--tests/internal_level_tests.cpp8
-rw-r--r--tests/mutable_buffer_tests.cpp2
-rw-r--r--tests/testing.h4
24 files changed, 384 insertions, 434 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 6965965..3a460aa 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -14,22 +14,24 @@
#include <cstdio>
#include <vector>
-#include "framework/MutableBuffer.h"
-#include "framework/InternalLevel.h"
-#include "framework/ShardInterface.h"
-#include "framework/QueryInterface.h"
-#include "framework/RecordInterface.h"
-#include "framework/ExtensionStructure.h"
-
-#include "framework/Configuration.h"
-#include "framework/Scheduler.h"
+#include "framework/structure/MutableBuffer.h"
+#include "framework/structure/InternalLevel.h"
+#include "framework/interface/Shard.h"
+#include "framework/interface/Query.h"
+#include "framework/interface/Record.h"
+#include "framework/interface/Query.h"
+#include "framework/interface/Scheduler.h"
+#include "framework/structure/ExtensionStructure.h"
+
+#include "framework/util/Configuration.h"
+#include "framework/scheduling/SerialScheduler.h"
#include "psu-util/timer.h"
#include "psu-ds/Alias.h"
namespace de {
-template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING, DeletePolicy D=DeletePolicy::TAGGING>
+template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING, DeletePolicy D=DeletePolicy::TAGGING, SchedulerInterface SCHED=SerialScheduler<R, S, Q, L>>
class DynamicExtension {
typedef S Shard;
typedef MutableBuffer<R> Buffer;
@@ -219,7 +221,7 @@ public:
}
private:
- Scheduler<R, S, Q, L> m_sched;
+ SCHED m_sched;
std::vector<Buffer *> m_buffers;
std::vector<Structure *> m_versions;
diff --git a/include/framework/ShardRequirements.h b/include/framework/ShardRequirements.h
new file mode 100644
index 0000000..95f7b67
--- /dev/null
+++ b/include/framework/ShardRequirements.h
@@ -0,0 +1,9 @@
+/*
+ *
+ */
+#pragma once
+
+#include "framework/structure/MutableBuffer.h"
+#include "framework/interface/Record.h"
+#include "framework/interface/Shard.h"
+#include "framework/interface/Query.h"
diff --git a/include/framework/QueryInterface.h b/include/framework/interface/Query.h
index 46a1ce1..46a1ce1 100644
--- a/include/framework/QueryInterface.h
+++ b/include/framework/interface/Query.h
diff --git a/include/framework/RecordInterface.h b/include/framework/interface/Record.h
index 1ef1984..1ef1984 100644
--- a/include/framework/RecordInterface.h
+++ b/include/framework/interface/Record.h
diff --git a/include/framework/interface/Scheduler.h b/include/framework/interface/Scheduler.h
new file mode 100644
index 0000000..1445e90
--- /dev/null
+++ b/include/framework/interface/Scheduler.h
@@ -0,0 +1,31 @@
+/*
+ * include/framework/QueryInterface.h
+ *
+ * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
+ *
+ * All rights reserved. Published under the Modified BSD License.
+ *
+ */
+#pragma once
+
+#include <vector>
+#include <concepts>
+#include "framework/interface/Record.h"
+#include "util/types.h"
+
+template <typename S>
+concept SchedulerInterface = requires(S s, size_t i, void *vp) {
+ {S(i, i)};
+// {s.schedule_merge(vp, vp)};
+
+/*
+ {q.get_query_state(p, p)} -> std::convertible_to<void*>;
+ {q.get_buffer_query_state(p, p)};
+ {q.query(p, p)};
+ {q.buffer_query(p, p)};
+ {q.merge()};
+ {q.delete_query_state(p)};
+*/
+ //{Q::get_query_state(p, p)} -> std::convertible_to<void*>;
+ //{Q::get_buffer_query_state(p, p)} -> std::convertible_to<void*>;
+};
diff --git a/include/framework/ShardInterface.h b/include/framework/interface/Shard.h
index 3aa62df..ea58b2a 100644
--- a/include/framework/ShardInterface.h
+++ b/include/framework/interface/Shard.h
@@ -11,7 +11,7 @@
#include <concepts>
#include "util/types.h"
-#include "framework/RecordInterface.h"
+#include "framework/interface/Record.h"
namespace de {
diff --git a/include/framework/Scheduler.h b/include/framework/scheduling/Scheduler.h
index 6055bef..992cbf9 100644
--- a/include/framework/Scheduler.h
+++ b/include/framework/scheduling/Scheduler.h
@@ -16,12 +16,12 @@
#include <condition_variable>
#include "util/types.h"
-#include "framework/ShardInterface.h"
-#include "framework/QueryInterface.h"
-#include "framework/RecordInterface.h"
-#include "framework/MutableBuffer.h"
-#include "framework/Configuration.h"
-#include "framework/ExtensionStructure.h"
+#include "framework/interface/Shard.h"
+#include "framework/interface/Query.h"
+#include "framework/interface/Record.h"
+#include "framework/structure/MutableBuffer.h"
+#include "framework/util/Configuration.h"
+#include "framework/structure/ExtensionStructure.h"
namespace de {
@@ -118,6 +118,7 @@ private:
}
}
+
void run_merge(MergeTask task, Structure *version) {
version->merge_levels(task.m_target_level, task.m_source_level);
@@ -160,7 +161,7 @@ private:
std::unique_lock<std::mutex> cv_lock(m_cv_lock);
m_cv.wait(cv_lock);
- while (m_merge_queue.size() > 0 && m_used_threads < m_thread_cnt) {
+ while (m_merge_queue.size() > 0 && m_used_threads.load() < m_thread_cnt) {
schedule_next_task();
}
cv_lock.unlock();
diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h
new file mode 100644
index 0000000..5e16bdf
--- /dev/null
+++ b/include/framework/scheduling/SerialScheduler.h
@@ -0,0 +1,227 @@
+/*
+ * include/framework/Scheduler.h
+ *
+ * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * All rights reserved. Published under the Modified BSD License.
+ *
+ */
+#pragma once
+
+#include <vector>
+#include <memory>
+#include <queue>
+#include <thread>
+#include <condition_variable>
+
+#include "util/types.h"
+#include "framework/interface/Shard.h"
+#include "framework/interface/Query.h"
+#include "framework/interface/Record.h"
+#include "framework/structure/MutableBuffer.h"
+#include "framework/util/Configuration.h"
+#include "framework/structure/ExtensionStructure.h"
+#include "framework/scheduling/Task.h"
+
+namespace de {
+
+template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L>
+class SerialScheduler {
+ typedef ExtensionStructure<R, S, Q, L> Structure;
+ typedef MutableBuffer<R> Buffer;
+public:
+ /*
+ * A simple "scheduler" that runs tasks serially, in a FIFO manner. Incoming concurrent
+ * requests will wait for their turn, and only one task will be active in the system at
+ * a time. The scheduler will spin up a second thread for running itself, but all tasks
+ * will be single-threaded.
+ *
+ * Memory budget stated in bytes, with 0 meaning unlimited. Likewise, 0 threads means
+ * unlimited.
+ *
+ * Note that the SerialScheduler object is non-concurrent, and so will ignore the
+ * thread_cnt argument. It will obey the memory_budget, however a failure due to
+ * memory constraints will be irrecoverable, as there is no way to free up memory
+ * or block particular tasks until memory becomes available.
+ */
+ SerialScheduler(size_t memory_budget, size_t thread_cnt)
+ : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX)
+ , m_thread_cnt((thread_cnt) ? thread_cnt : UINT64_MAX)
+ , m_used_memory(0)
+ , m_used_threads(0)
+ , m_shutdown(false)
+ {
+ m_sched_thrd = std::thread(&SerialScheduler::run_scheduler, this);
+ }
+
+ ~SerialScheduler() {
+ m_shutdown = true;
+
+ m_cv.notify_all();
+ m_sched_thrd.join();
+ }
+
+ bool schedule_merge(Structure *version, MutableBuffer<R> *buffer) {
+ pending_version = version;
+ pending_buffer = buffer;
+
+ /*
+ * Get list of individual level reconstructions that are necessary
+ * for completing the overall merge
+ */
+ std::vector<MergeTask> merges = version->get_merge_tasks(buffer->get_record_count());
+
+ /*
+ * Schedule the merge tasks (FIXME: currently this just
+ * executes them sequentially in a blocking fashion)
+ */
+ for (ssize_t i=0; i<merges.size(); i++) {
+ merges[i].m_timestamp = m_timestamp.fetch_add(1);
+ m_merge_queue_lock.lock();
+ m_merge_queue.emplace(merges[i]);
+ m_merge_queue_lock.unlock();
+ }
+
+ MergeTask buffer_merge;
+ buffer_merge.m_source_level = -1;
+ buffer_merge.m_target_level = 0;
+ buffer_merge.m_size = buffer->get_record_count() * sizeof(R) * 2;
+ buffer_merge.m_timestamp = m_timestamp.fetch_add(1);
+ buffer_merge.m_type = TaskType::MERGE;
+ m_merge_queue_lock.lock();
+ m_merge_queue.emplace(buffer_merge);
+ m_merge_queue_lock.unlock();
+
+ m_cv.notify_all();
+ do {
+ std::unique_lock<std::mutex> merge_cv_lock(m_merge_cv_lock);
+ m_merge_cv.wait(merge_cv_lock);
+ } while (m_merge_queue.size() > 0);
+
+ assert(version->get_levels()[version->get_levels().size() - 1]->get_shard(0)->get_tombstone_count() == 0);
+
+ return true;
+ }
+
+ bool schedule_query() {
+ return true;
+ }
+
+private:
+ size_t get_timestamp() {
+ auto ts = m_timestamp.fetch_add(1);
+ return ts;
+ }
+
+ void schedule_merge(MergeTask task) {
+ if (task.m_source_level == -1 && task.m_target_level == 0) {
+ run_buffer_merge(pending_buffer, pending_version);
+ } else {
+ run_merge(task, pending_version);
+ }
+ }
+
+
+ void schedule_query(QueryTask task) {
+
+ }
+
+ void schedule_next_task() {
+ m_merge_queue_lock.lock();
+ auto task = m_merge_queue.top();
+ m_merge_queue.pop();
+ m_merge_queue_lock.unlock();
+
+ auto type = std::visit(GetTaskType{}, task);
+
+ switch (type) {
+ case TaskType::MERGE:
+ schedule_merge(std::get<MergeTask>(task));
+ break;
+ case TaskType::QUERY:
+ schedule_query(std::get<QueryTask>(task));
+ break;
+ default: assert(false);
+ }
+
+ if (m_merge_queue.size() == 0) {
+ m_merge_cv.notify_all();
+ }
+ }
+
+
+ void run_merge(MergeTask task, Structure *version) {
+ version->merge_levels(task.m_target_level, task.m_source_level);
+
+ if (!version->validate_tombstone_proportion(task.m_target_level)) {
+ auto tasks = version->get_merge_tasks(task.m_target_level);
+ /*
+ * Schedule the merge tasks (FIXME: currently this just
+ * executes them sequentially in a blocking fashion)
+ */
+ for (ssize_t i=tasks.size()-1; i>=0; i--) {
+ tasks[i].m_timestamp = m_timestamp.fetch_add(1);
+ m_merge_queue_lock.lock();
+ m_merge_queue.push(tasks[i]);
+ m_merge_queue_lock.unlock();
+ }
+ }
+ }
+
+
+ void run_buffer_merge(Buffer *buffer, Structure *version) {
+ version->merge_buffer(buffer);
+ if (!version->validate_tombstone_proportion(0)) {
+ auto tasks = version->get_merge_tasks_from_level(0);
+
+ /*
+ * Schedule the merge tasks (FIXME: currently this just
+ * executes them sequentially in a blocking fashion)
+ */
+ for (ssize_t i=tasks.size()-1; i>=0; i--) {
+ tasks[i].m_timestamp = m_timestamp.fetch_add(1);
+ m_merge_queue_lock.lock();
+ m_merge_queue.push(tasks[i]);
+ m_merge_queue_lock.unlock();
+ }
+ }
+ }
+
+ void run_scheduler() {
+ do {
+ std::unique_lock<std::mutex> cv_lock(m_cv_lock);
+ m_cv.wait(cv_lock);
+
+ while (m_merge_queue.size() > 0 && m_used_threads.load() < m_thread_cnt) {
+ schedule_next_task();
+ }
+ cv_lock.unlock();
+ } while(!m_shutdown);
+ }
+
+ size_t m_memory_budget;
+ size_t m_thread_cnt;
+
+ Buffer *pending_buffer;
+ Structure *pending_version;
+
+ alignas(64) std::atomic<size_t> m_used_memory;
+ alignas(64) std::atomic<size_t> m_used_threads;
+ alignas(64) std::atomic<size_t> m_timestamp;
+
+ std::priority_queue<Task, std::vector<Task>, std::greater<Task>> m_merge_queue;
+ std::mutex m_merge_queue_lock;
+
+ std::mutex m_cv_lock;
+ std::condition_variable m_cv;
+
+ std::mutex m_merge_cv_lock;
+ std::condition_variable m_merge_cv;
+
+ std::thread m_sched_thrd;
+
+ bool m_shutdown;
+};
+
+}
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
new file mode 100644
index 0000000..9e0655a
--- /dev/null
+++ b/include/framework/scheduling/Task.h
@@ -0,0 +1,63 @@
+/*
+ *
+ */
+#pragma once
+
+#include <variant>
+
+#include "framework/util/Configuration.h"
+
+namespace de {
+
+enum class TaskType {
+ MERGE,
+ QUERY
+};
+
+struct MergeTask {
+ level_index m_source_level;
+ level_index m_target_level;
+ size_t m_timestamp;
+ size_t m_size;
+ TaskType m_type;
+
+ TaskType get_type() const {
+ return m_type;
+ }
+
+ friend bool operator<(const MergeTask &self, const MergeTask &other) {
+ return self.m_timestamp < other.m_timestamp;
+ }
+
+ friend bool operator>(const MergeTask &self, const MergeTask &other) {
+ return self.m_timestamp > other.m_timestamp;
+ }
+
+};
+
+struct QueryTask {
+ size_t m_timestamp;
+ size_t m_size;
+ TaskType m_type;
+
+ TaskType get_type() const {
+ return m_type;
+ }
+
+ friend bool operator<(const QueryTask &self, const QueryTask &other) {
+ return self.m_timestamp < other.m_timestamp;
+ }
+
+ friend bool operator>(const QueryTask &self, const QueryTask &other) {
+ return self.m_timestamp > other.m_timestamp;
+ }
+};
+
+struct GetTaskType {
+ TaskType operator()(const MergeTask &t) { return t.get_type(); }
+ TaskType operator()(const QueryTask &t) { return t.get_type(); }
+};
+
+typedef std::variant<MergeTask, QueryTask> Task;
+
+}
diff --git a/include/framework/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index 892e63b..920e1c3 100644
--- a/include/framework/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -14,34 +14,20 @@
#include <cstdio>
#include <vector>
-#include "framework/MutableBuffer.h"
-#include "framework/InternalLevel.h"
-#include "framework/ShardInterface.h"
-#include "framework/QueryInterface.h"
-#include "framework/RecordInterface.h"
+#include "framework/structure/MutableBuffer.h"
+#include "framework/structure/InternalLevel.h"
+#include "framework/interface/Shard.h"
+#include "framework/interface/Query.h"
+#include "framework/interface/Record.h"
-#include "framework/Configuration.h"
+#include "framework/util/Configuration.h"
+#include "framework/scheduling/Task.h"
#include "psu-util/timer.h"
#include "psu-ds/Alias.h"
namespace de {
-struct MergeTask {
- level_index m_source_level;
- level_index m_target_level;
- size_t m_size;
- size_t m_timestamp;
-
- friend bool operator<(const MergeTask &self, const MergeTask &other) {
- return self.m_timestamp < other.m_timestamp;
- }
-
- friend bool operator>(const MergeTask &self, const MergeTask &other) {
- return self.m_timestamp > other.m_timestamp;
- }
-};
-
template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING>
class ExtensionStructure {
typedef S Shard;
@@ -233,6 +219,7 @@ public:
MergeTask task;
task.m_source_level = i - 1;
task.m_target_level = i;
+ task.m_type = TaskType::MERGE;
/*
* The amount of storage required for the merge accounts
diff --git a/include/framework/InternalLevel.h b/include/framework/structure/InternalLevel.h
index 6cdac4e..b9230f4 100644
--- a/include/framework/InternalLevel.h
+++ b/include/framework/structure/InternalLevel.h
@@ -13,10 +13,10 @@
#include <memory>
#include "util/types.h"
-#include "framework/ShardInterface.h"
-#include "framework/QueryInterface.h"
-#include "framework/RecordInterface.h"
-#include "framework/MutableBuffer.h"
+#include "framework/interface/Shard.h"
+#include "framework/interface/Query.h"
+#include "framework/interface/Record.h"
+#include "framework/structure/MutableBuffer.h"
namespace de {
template <RecordInterface R, ShardInterface S, QueryInterface Q>
diff --git a/include/framework/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index 572b656..9f12175 100644
--- a/include/framework/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -22,7 +22,7 @@
#include "psu-ds/BloomFilter.h"
#include "psu-ds/Alias.h"
#include "psu-util/timer.h"
-#include "framework/RecordInterface.h"
+#include "framework/interface/Record.h"
using psudb::CACHELINE_SIZE;
diff --git a/include/framework/Configuration.h b/include/framework/util/Configuration.h
index eb9b93f..eb9b93f 100644
--- a/include/framework/Configuration.h
+++ b/include/framework/util/Configuration.h
diff --git a/include/shard/Alex.h b/include/shard/Alex.h
deleted file mode 100644
index 9f794dc..0000000
--- a/include/shard/Alex.h
+++ /dev/null
@@ -1,360 +0,0 @@
-/*
- * include/shard/Alex.h
- *
- * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
- *
- * All rights reserved. Published under the Modified BSD License.
- *
- */
-#pragma once
-
-
-#include <vector>
-#include <cassert>
-#include <queue>
-#include <memory>
-#include <concepts>
-
-#include "alex.h"
-#include "psu-ds/PriorityQueue.h"
-#include "util/Cursor.h"
-#include "psu-ds/BloomFilter.h"
-#include "util/bf_config.h"
-#include "framework/MutableBuffer.h"
-#include "framework/RecordInterface.h"
-#include "framework/ShardInterface.h"
-#include "framework/QueryInterface.h"
-
-using psudb::CACHELINE_SIZE;
-using psudb::BloomFilter;
-using psudb::PriorityQueue;
-using psudb::queue_record;
-using psudb::Alias;
-
-namespace de {
-
-template <RecordInterface R>
-struct alex_range_query_parms {
- decltype(R::key) lower_bound;
- decltype(R::key) upper_bound;
-};
-
-template <RecordInterface R>
-class AlexRangeQuery;
-
-template <RecordInterface R>
-struct AlexState {
- size_t start_idx;
- size_t stop_idx;
-};
-
-template <RecordInterface R>
-struct AlexBufferState {
- size_t cutoff;
- Alias* alias;
-
- ~AlexBufferState() {
- delete alias;
- }
-};
-
-
-template <RecordInterface R, size_t epsilon=128>
-class Alex {
-private:
- typedef decltype(R::key) K;
- typedef decltype(R::value) V;
-
-public:
-
- // FIXME: there has to be a better way to do this
- friend class AlexRangeQuery<R>;
-
- Alex(MutableBuffer<R>* buffer)
- : m_reccnt(0), m_tombstone_cnt(0) {
-
- m_alloc_size = (buffer->get_record_count() * sizeof(Wrapped<R>)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(Wrapped<R>)) % CACHELINE_SIZE);
- assert(m_alloc_size % CACHELINE_SIZE == 0);
- m_data = (Wrapped<R>*)std::aligned_alloc(CACHELINE_SIZE, m_alloc_size);
- std::vector<std::pair<K, V>> temp_records;
-
- m_bf = new BloomFilter<R>(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS);
-
- size_t offset = 0;
- m_reccnt = 0;
- auto base = buffer->get_data();
- auto stop = base + buffer->get_record_count();
-
- std::sort(base, stop, std::less<Wrapped<R>>());
-
- K min_key = base->rec.key;
- K max_key = (stop - 1)->rec.key;
-
- while (base < stop) {
- if (!(base->is_tombstone()) && (base + 1) < stop) {
- if (base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) {
- base += 2;
- continue;
- }
- } else if (base->is_deleted()) {
- base += 1;
- continue;
- }
-
- // FIXME: this shouldn't be necessary, but the tagged record
- // bypass doesn't seem to be working on this code-path, so this
- // ensures that tagged records from the buffer are able to be
- // dropped, eventually. It should only need to be &= 1
- base->header &= 3;
- m_data[m_reccnt++] = *base;
- temp_records.push_back({base->rec.key, base->rec.value});
-
- if (m_bf && base->is_tombstone()) {
- m_tombstone_cnt++;
- m_bf->insert(base->rec);
- }
-
- base++;
- }
-
- if (m_reccnt > 0) {
- m_alex = alex::Alex<K, V>();
- m_alex.set_expected_insert_frac(0);
- m_alex.bulkload(temp_records.data(), temp_records.size());
- }
- }
-
- Alex(Alex** shards, size_t len)
- : m_reccnt(0), m_tombstone_cnt(0) {
- std::vector<Cursor<Wrapped<R>>> cursors;
- cursors.reserve(len);
-
- PriorityQueue<Wrapped<R>> pq(len);
-
- size_t attemp_reccnt = 0;
- size_t tombstone_count = 0;
-
- for (size_t i = 0; i < len; ++i) {
- if (shards[i]) {
- auto base = shards[i]->get_data();
- cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()});
- attemp_reccnt += shards[i]->get_record_count();
- tombstone_count += shards[i]->get_tombstone_count();
- pq.push(cursors[i].ptr, i);
-
- } else {
- cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0});
- }
- }
-
- m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS);
-
- m_alloc_size = (attemp_reccnt * sizeof(Wrapped<R>)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(Wrapped<R>)) % CACHELINE_SIZE);
- assert(m_alloc_size % CACHELINE_SIZE == 0);
- m_data = (Wrapped<R>*)std::aligned_alloc(CACHELINE_SIZE, m_alloc_size);
-
- std::vector<std::pair<K, V>> temp_records;
-
- while (pq.size()) {
- auto now = pq.peek();
- auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
- if (!now.data->is_tombstone() && next.data != nullptr &&
- now.data->rec == next.data->rec && next.data->is_tombstone()) {
-
- pq.pop(); pq.pop();
- auto& cursor1 = cursors[now.version];
- auto& cursor2 = cursors[next.version];
- if (advance_cur5sor<Wrapped<R>>(cursor1)) pq.push(cursor1.ptr, now.version);
- if (advance_cursor<Wrapped<R>>(cursor2)) pq.push(cursor2.ptr, next.version);
- } else {
- auto& cursor = cursors[now.version];
- if (!cursor.ptr->is_deleted()) {
- m_data[m_reccnt++] = *cursor.ptr;
- temp_records.pushback({cursor.ptr->rec.key, cursor.ptr->rec.value});
- if (m_bf && cursor.ptr->is_tombstone()) {
- ++m_tombstone_cnt;
- if (m_bf) m_bf->insert(cursor.ptr->rec);
- }
- }
- pq.pop();
-
- if (advance_cursor<Wrapped<R>>(cursor)) pq.push(cursor.ptr, now.version);
- }
- }
-
- if (m_reccnt > 0) {
- m_alex = alex::Alex<K, V>();
- m_alex.set_expected_insert_frac(0);
- m_alex.bulkload(temp_records.data(), temp_records.size());
- }
- }
-
- ~Alex() {
- if (m_data) free(m_data);
- if (m_bf) delete m_bf;
-
- }
-
- Wrapped<R> *point_lookup(const R &rec, bool filter=false) {
- if (filter && !m_bf->lookup(rec)) {
- return nullptr;
- }
-
- size_t idx = get_lower_bound(rec.key);
- if (idx >= m_reccnt) {
- return nullptr;
- }
-
- while (idx < m_reccnt && m_data[idx].rec < rec) ++idx;
-
- if (m_data[idx].rec == rec) {
- return m_data + idx;
- }
-
- return nullptr;
- }
-
- Wrapped<R>* get_data() const {
- return m_data;
- }
-
- size_t get_record_count() const {
- return m_reccnt;
- }
-
- size_t get_tombstone_count() const {
- return m_tombstone_cnt;
- }
-
- const Wrapped<R>* get_record_at(size_t idx) const {
- if (idx >= m_reccnt) return nullptr;
- return m_data + idx;
- }
-
-
- size_t get_memory_usage() {
- return m_alex.size_in_bytes() + m_alloc_size;
- }
-
- alex::Alex<K, V>::Iterator get_lower_bound(const K& key) const {
- auto bound = m_alex.find(key);
- while (bound != m_alex.end() && bound.key() < key) {
- bound++;
- }
-
- return bound;
- }
-
-private:
- Wrapped<R>* m_data;
- size_t m_reccnt;
- size_t m_tombstone_cnt;
- size_t m_alloc_size;
- K m_max_key;
- K m_min_key;
- alex::Alex<K, V> m_alex;
- BloomFilter<R> *m_bf;
-};
-
-
-template <RecordInterface R>
-class AlexRangeQuery {
-public:
- static void *get_query_state(Alex<R> *ts, void *parms) {
- auto res = new AlexState<R>();
- auto p = (alex_range_query_parms<R> *) parms;
-
- res->start_idx = ts->get_lower_bound(p->lower_bound);
- res->stop_idx = ts->get_record_count();
-
- return res;
- }
-
- static void* get_buffer_query_state(MutableBuffer<R> *buffer, void *parms) {
- auto res = new AlexBufferState<R>();
- res->cutoff = buffer->get_record_count();
-
- return res;
- }
-
- static void process_query_states(void *query_parms, std::vector<void*> &shard_states, void *buff_state) {
- return;
- }
-
- static std::vector<Wrapped<R>> query(Alex<R> *ts, void *q_state, void *parms) {
- std::vector<Wrapped<R>> records;
- auto p = (alex_range_query_parms<R> *) parms;
- auto s = (AlexState<R> *) q_state;
-
- // if the returned index is one past the end of the
- // records for the Alex, then there are not records
- // in the index falling into the specified range.
- if (s->start_idx == ts->get_record_count()) {
- return records;
- }
-
- auto ptr = ts->get_record_at(s->start_idx);
-
- // roll the pointer forward to the first record that is
- // greater than or equal to the lower bound.
- while(ptr->rec.key < p->lower_bound) {
- ptr++;
- }
-
- while (ptr->rec.key <= p->upper_bound && ptr < ts->m_data + s->stop_idx) {
- records.emplace_back(*ptr);
- ptr++;
- }
-
- return records;
- }
-
- static std::vector<Wrapped<R>> buffer_query(MutableBuffer<R> *buffer, void *state, void *parms) {
- auto p = (alex_range_query_parms<R> *) parms;
- auto s = (AlexBufferState<R> *) state;
-
- std::vector<Wrapped<R>> records;
- for (size_t i=0; i<s->cutoff; i++) {
- auto rec = buffer->get_data() + i;
- if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound) {
- records.emplace_back(*rec);
- }
- }
-
- return records;
- }
-
- static std::vector<R> merge(std::vector<std::vector<R>> &results, void *parms) {
- size_t total = 0;
- for (size_t i=0; i<results.size(); i++) {
- total += results[i].size();
- }
-
- if (total == 0) {
- return std::vector<R>();
- }
-
- std::vector<R> output;
- output.reserve(total);
-
- for (size_t i=0; i<results.size(); i++) {
- std::move(results[i].begin(), results[i].end(), std::back_inserter(output));
- }
-
- return output;
- }
-
- static void delete_query_state(void *state) {
- auto s = (AlexState<R> *) state;
- delete s;
- }
-
- static void delete_buffer_query_state(void *state) {
- auto s = (AlexBufferState<R> *) state;
- delete s;
- }
-};
-
-;
-
-}
diff --git a/include/shard/MemISAM.h b/include/shard/MemISAM.h
index a220792..f9c621e 100644
--- a/include/shard/MemISAM.h
+++ b/include/shard/MemISAM.h
@@ -14,7 +14,8 @@
#include <queue>
#include <memory>
-#include "framework/MutableBuffer.h"
+#include "framework/ShardRequirements.h"
+
#include "util/bf_config.h"
#include "psu-ds/PriorityQueue.h"
#include "util/Cursor.h"
diff --git a/include/shard/PGM.h b/include/shard/PGM.h
index 2cd153e..d960e70 100644
--- a/include/shard/PGM.h
+++ b/include/shard/PGM.h
@@ -15,15 +15,13 @@
#include <memory>
#include <concepts>
+#include "framework/ShardRequirements.h"
+
#include "pgm/pgm_index.hpp"
#include "psu-ds/PriorityQueue.h"
#include "util/Cursor.h"
#include "psu-ds/BloomFilter.h"
#include "util/bf_config.h"
-#include "framework/MutableBuffer.h"
-#include "framework/RecordInterface.h"
-#include "framework/ShardInterface.h"
-#include "framework/QueryInterface.h"
using psudb::CACHELINE_SIZE;
using psudb::BloomFilter;
diff --git a/include/shard/TrieSpline.h b/include/shard/TrieSpline.h
index 69fcfbc..98153c0 100644
--- a/include/shard/TrieSpline.h
+++ b/include/shard/TrieSpline.h
@@ -15,15 +15,12 @@
#include <memory>
#include <concepts>
+#include "framework/ShardRequirements.h"
#include "ts/builder.h"
#include "psu-ds/PriorityQueue.h"
#include "util/Cursor.h"
#include "psu-ds/BloomFilter.h"
#include "util/bf_config.h"
-#include "framework/MutableBuffer.h"
-#include "framework/RecordInterface.h"
-#include "framework/ShardInterface.h"
-#include "framework/QueryInterface.h"
using psudb::CACHELINE_SIZE;
using psudb::BloomFilter;
diff --git a/include/shard/VPTree.h b/include/shard/VPTree.h
index 8feec84..0e998d9 100644
--- a/include/shard/VPTree.h
+++ b/include/shard/VPTree.h
@@ -15,14 +15,12 @@
#include <concepts>
#include <map>
+#include "framework/ShardRequirements.h"
+
#include "psu-ds/PriorityQueue.h"
#include "util/Cursor.h"
#include "psu-ds/BloomFilter.h"
#include "util/bf_config.h"
-#include "framework/MutableBuffer.h"
-#include "framework/RecordInterface.h"
-#include "framework/ShardInterface.h"
-#include "framework/QueryInterface.h"
using psudb::CACHELINE_SIZE;
using psudb::BloomFilter;
diff --git a/include/shard/WIRS.h b/include/shard/WIRS.h
index 19d3eea..8583cb0 100644
--- a/include/shard/WIRS.h
+++ b/include/shard/WIRS.h
@@ -16,15 +16,13 @@
#include <memory>
#include <concepts>
+#include "framework/ShardRequirements.h"
+
#include "psu-ds/PriorityQueue.h"
#include "util/Cursor.h"
#include "psu-ds/Alias.h"
#include "psu-ds/BloomFilter.h"
#include "util/bf_config.h"
-#include "framework/MutableBuffer.h"
-#include "framework/RecordInterface.h"
-#include "framework/ShardInterface.h"
-#include "framework/QueryInterface.h"
using psudb::CACHELINE_SIZE;
using psudb::BloomFilter;
diff --git a/include/shard/WSS.h b/include/shard/WSS.h
index c0af573..87b016c 100644
--- a/include/shard/WSS.h
+++ b/include/shard/WSS.h
@@ -16,15 +16,13 @@
#include <memory>
#include <concepts>
+#include "framework/ShardRequirements.h"
+
#include "psu-ds/PriorityQueue.h"
#include "util/Cursor.h"
#include "psu-ds/Alias.h"
#include "psu-ds/BloomFilter.h"
#include "util/bf_config.h"
-#include "framework/MutableBuffer.h"
-#include "framework/RecordInterface.h"
-#include "framework/ShardInterface.h"
-#include "framework/QueryInterface.h"
using psudb::CACHELINE_SIZE;
using psudb::BloomFilter;
diff --git a/include/util/Cursor.h b/include/util/Cursor.h
index 1b0b8ed..1cf20e1 100644
--- a/include/util/Cursor.h
+++ b/include/util/Cursor.h
@@ -9,7 +9,7 @@
*/
#pragma once
-#include "framework/RecordInterface.h"
+#include "framework/ShardRequirements.h"
#include "psu-ds/BloomFilter.h"
#include "psu-ds/PriorityQueue.h"
diff --git a/tests/internal_level_tests.cpp b/tests/internal_level_tests.cpp
index 58369ff..056e458 100644
--- a/tests/internal_level_tests.cpp
+++ b/tests/internal_level_tests.cpp
@@ -10,10 +10,10 @@
*
*/
#include "shard/WIRS.h"
-#include "framework/InternalLevel.h"
-#include "framework/RecordInterface.h"
-#include "framework/QueryInterface.h"
-#include "framework/ShardInterface.h"
+#include "framework/structure/InternalLevel.h"
+#include "framework/interface/Record.h"
+#include "framework/interface/Query.h"
+#include "framework/interface/Shard.h"
#include "testing.h"
diff --git a/tests/mutable_buffer_tests.cpp b/tests/mutable_buffer_tests.cpp
index 201fddb..a2561c8 100644
--- a/tests/mutable_buffer_tests.cpp
+++ b/tests/mutable_buffer_tests.cpp
@@ -15,7 +15,7 @@
#include <algorithm>
#include "testing.h"
-#include "framework/MutableBuffer.h"
+#include "framework/structure/MutableBuffer.h"
#include <check.h>
diff --git a/tests/testing.h b/tests/testing.h
index bdf4869..023be7f 100644
--- a/tests/testing.h
+++ b/tests/testing.h
@@ -18,8 +18,8 @@
#include "util/types.h"
#include "psu-util/alignment.h"
-#include "framework/MutableBuffer.h"
-#include "framework/RecordInterface.h"
+#include "framework/structure/MutableBuffer.h"
+#include "framework/interface/Record.h"
typedef de::WeightedRecord<uint64_t, uint32_t, uint64_t> WRec;
typedef de::Record<uint64_t, uint32_t> Rec;