summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--include/framework/DynamicExtension.h24
-rw-r--r--include/framework/ShardRequirements.h9
-rw-r--r--include/framework/interface/Query.h (renamed from include/framework/QueryInterface.h)0
-rw-r--r--include/framework/interface/Record.h (renamed from include/framework/RecordInterface.h)0
-rw-r--r--include/framework/interface/Scheduler.h31
-rw-r--r--include/framework/interface/Shard.h (renamed from include/framework/ShardInterface.h)2
-rw-r--r--include/framework/scheduling/Scheduler.h (renamed from include/framework/Scheduler.h)15
-rw-r--r--include/framework/scheduling/SerialScheduler.h227
-rw-r--r--include/framework/scheduling/Task.h63
-rw-r--r--include/framework/structure/ExtensionStructure.h (renamed from include/framework/ExtensionStructure.h)29
-rw-r--r--include/framework/structure/InternalLevel.h (renamed from include/framework/InternalLevel.h)8
-rw-r--r--include/framework/structure/MutableBuffer.h (renamed from include/framework/MutableBuffer.h)2
-rw-r--r--include/framework/util/Configuration.h (renamed from include/framework/Configuration.h)0
-rw-r--r--include/shard/Alex.h360
-rw-r--r--include/shard/MemISAM.h3
-rw-r--r--include/shard/PGM.h6
-rw-r--r--include/shard/TrieSpline.h5
-rw-r--r--include/shard/VPTree.h6
-rw-r--r--include/shard/WIRS.h6
-rw-r--r--include/shard/WSS.h6
-rw-r--r--include/util/Cursor.h2
-rw-r--r--tests/internal_level_tests.cpp8
-rw-r--r--tests/mutable_buffer_tests.cpp2
-rw-r--r--tests/testing.h4
24 files changed, 384 insertions, 434 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 6965965..3a460aa 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -14,22 +14,24 @@
#include <cstdio>
#include <vector>
-#include "framework/MutableBuffer.h"
-#include "framework/InternalLevel.h"
-#include "framework/ShardInterface.h"
-#include "framework/QueryInterface.h"
-#include "framework/RecordInterface.h"
-#include "framework/ExtensionStructure.h"
-
-#include "framework/Configuration.h"
-#include "framework/Scheduler.h"
+#include "framework/structure/MutableBuffer.h"
+#include "framework/structure/InternalLevel.h"
+#include "framework/interface/Shard.h"
+#include "framework/interface/Query.h"
+#include "framework/interface/Record.h"
+#include "framework/interface/Query.h"
+#include "framework/interface/Scheduler.h"
+#include "framework/structure/ExtensionStructure.h"
+
+#include "framework/util/Configuration.h"
+#include "framework/scheduling/SerialScheduler.h"
#include "psu-util/timer.h"
#include "psu-ds/Alias.h"
namespace de {
-template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING, DeletePolicy D=DeletePolicy::TAGGING>
+template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING, DeletePolicy D=DeletePolicy::TAGGING, SchedulerInterface SCHED=SerialScheduler<R, S, Q, L>>
class DynamicExtension {
typedef S Shard;
typedef MutableBuffer<R> Buffer;
@@ -219,7 +221,7 @@ public:
}
private:
- Scheduler<R, S, Q, L> m_sched;
+ SCHED m_sched;
std::vector<Buffer *> m_buffers;
std::vector<Structure *> m_versions;
diff --git a/include/framework/ShardRequirements.h b/include/framework/ShardRequirements.h
new file mode 100644
index 0000000..95f7b67
--- /dev/null
+++ b/include/framework/ShardRequirements.h
@@ -0,0 +1,9 @@
+/*
+ *
+ */
+#pragma once
+
+#include "framework/structure/MutableBuffer.h"
+#include "framework/interface/Record.h"
+#include "framework/interface/Shard.h"
+#include "framework/interface/Query.h"
diff --git a/include/framework/QueryInterface.h b/include/framework/interface/Query.h
index 46a1ce1..46a1ce1 100644
--- a/include/framework/QueryInterface.h
+++ b/include/framework/interface/Query.h
diff --git a/include/framework/RecordInterface.h b/include/framework/interface/Record.h
index 1ef1984..1ef1984 100644
--- a/include/framework/RecordInterface.h
+++ b/include/framework/interface/Record.h
diff --git a/include/framework/interface/Scheduler.h b/include/framework/interface/Scheduler.h
new file mode 100644
index 0000000..1445e90
--- /dev/null
+++ b/include/framework/interface/Scheduler.h
@@ -0,0 +1,31 @@
+/*
+ * include/framework/QueryInterface.h
+ *
+ * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
+ *
+ * All rights reserved. Published under the Modified BSD License.
+ *
+ */
+#pragma once
+
+#include <vector>
+#include <concepts>
+#include "framework/interface/Record.h"
+#include "util/types.h"
+
+template <typename S>
+concept SchedulerInterface = requires(S s, size_t i, void *vp) {
+ {S(i, i)};
+// {s.schedule_merge(vp, vp)};
+
+/*
+ {q.get_query_state(p, p)} -> std::convertible_to<void*>;
+ {q.get_buffer_query_state(p, p)};
+ {q.query(p, p)};
+ {q.buffer_query(p, p)};
+ {q.merge()};
+ {q.delete_query_state(p)};
+*/
+ //{Q::get_query_state(p, p)} -> std::convertible_to<void*>;
+ //{Q::get_buffer_query_state(p, p)} -> std::convertible_to<void*>;
+};
diff --git a/include/framework/ShardInterface.h b/include/framework/interface/Shard.h
index 3aa62df..ea58b2a 100644
--- a/include/framework/ShardInterface.h
+++ b/include/framework/interface/Shard.h
@@ -11,7 +11,7 @@
#include <concepts>
#include "util/types.h"
-#include "framework/RecordInterface.h"
+#include "framework/interface/Record.h"
namespace de {
diff --git a/include/framework/Scheduler.h b/include/framework/scheduling/Scheduler.h
index 6055bef..992cbf9 100644
--- a/include/framework/Scheduler.h
+++ b/include/framework/scheduling/Scheduler.h
@@ -16,12 +16,12 @@
#include <condition_variable>
#include "util/types.h"
-#include "framework/ShardInterface.h"
-#include "framework/QueryInterface.h"
-#include "framework/RecordInterface.h"
-#include "framework/MutableBuffer.h"
-#include "framework/Configuration.h"
-#include "framework/ExtensionStructure.h"
+#include "framework/interface/Shard.h"
+#include "framework/interface/Query.h"
+#include "framework/interface/Record.h"
+#include "framework/structure/MutableBuffer.h"
+#include "framework/util/Configuration.h"
+#include "framework/structure/ExtensionStructure.h"
namespace de {
@@ -118,6 +118,7 @@ private:
}
}
+
void run_merge(MergeTask task, Structure *version) {
version->merge_levels(task.m_target_level, task.m_source_level);
@@ -160,7 +161,7 @@ private:
std::unique_lock<std::mutex> cv_lock(m_cv_lock);
m_cv.wait(cv_lock);
- while (m_merge_queue.size() > 0 && m_used_threads < m_thread_cnt) {
+ while (m_merge_queue.size() > 0 && m_used_threads.load() < m_thread_cnt) {
schedule_next_task();
}
cv_lock.unlock();
diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h
new file mode 100644
index 0000000..5e16bdf
--- /dev/null
+++ b/include/framework/scheduling/SerialScheduler.h
@@ -0,0 +1,227 @@
+/*
+ * include/framework/Scheduler.h
+ *
+ * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * All rights reserved. Published under the Modified BSD License.
+ *
+ */
+#pragma once
+
+#include <vector>
+#include <memory>
+#include <queue>
+#include <thread>
+#include <condition_variable>
+
+#include "util/types.h"
+#include "framework/interface/Shard.h"
+#include "framework/interface/Query.h"
+#include "framework/interface/Record.h"
+#include "framework/structure/MutableBuffer.h"
+#include "framework/util/Configuration.h"
+#include "framework/structure/ExtensionStructure.h"
+#include "framework/scheduling/Task.h"
+
+namespace de {
+
+template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L>
+class SerialScheduler {
+ typedef ExtensionStructure<R, S, Q, L> Structure;
+ typedef MutableBuffer<R> Buffer;
+public:
+ /*
+ * A simple "scheduler" that runs tasks serially, in a FIFO manner. Incoming concurrent
+ * requests will wait for their turn, and only one task will be active in the system at
+ * a time. The scheduler will spin up a second thread for running itself, but all tasks
+ * will be single-threaded.
+ *
+ * Memory budget stated in bytes, with 0 meaning unlimited. Likewise, 0 threads means
+ * unlimited.
+ *
+ * Note that the SerialScheduler object is non-concurrent, and so will ignore the
+ * thread_cnt argument. It will obey the memory_budget, however a failure due to
+ * memory constraints will be irrecoverable, as there is no way to free up memory
+ * or block particular tasks until memory becomes available.
+ */
+ SerialScheduler(size_t memory_budget, size_t thread_cnt)
+ : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX)
+ , m_thread_cnt((thread_cnt) ? thread_cnt : UINT64_MAX)
+ , m_used_memory(0)
+ , m_used_threads(0)
+ , m_shutdown(false)
+ {
+ m_sched_thrd = std::thread(&SerialScheduler::run_scheduler, this);
+ }
+
+ ~SerialScheduler() {
+ m_shutdown = true;
+
+ m_cv.notify_all();
+ m_sched_thrd.join();
+ }
+
+ bool schedule_merge(Structure *version, MutableBuffer<R> *buffer) {
+ pending_version = version;
+ pending_buffer = buffer;
+
+ /*
+ * Get list of individual level reconstructions that are necessary
+ * for completing the overall merge
+ */
+ std::vector<MergeTask> merges = version->get_merge_tasks(buffer->get_record_count());
+
+ /*
+ * Schedule the merge tasks (FIXME: currently this just
+ * executes them sequentially in a blocking fashion)
+ */
+ for (ssize_t i=0; i<merges.size(); i++) {
+ merges[i].m_timestamp = m_timestamp.fetch_add(1);
+ m_merge_queue_lock.lock();
+ m_merge_queue.emplace(merges[i]);
+ m_merge_queue_lock.unlock();
+ }
+
+ MergeTask buffer_merge;
+ buffer_merge.m_source_level = -1;
+ buffer_merge.m_target_level = 0;
+ buffer_merge.m_size = buffer->get_record_count() * sizeof(R) * 2;
+ buffer_merge.m_timestamp = m_timestamp.fetch_add(1);
+ buffer_merge.m_type = TaskType::MERGE;
+ m_merge_queue_lock.lock();
+ m_merge_queue.emplace(buffer_merge);
+ m_merge_queue_lock.unlock();
+
+ m_cv.notify_all();
+ do {
+ std::unique_lock<std::mutex> merge_cv_lock(m_merge_cv_lock);
+ m_merge_cv.wait(merge_cv_lock);
+ } while (m_merge_queue.size() > 0);
+
+ assert(version->get_levels()[version->get_levels().size() - 1]->get_shard(0)->get_tombstone_count() == 0);
+
+ return true;
+ }
+
+ bool schedule_query() {
+ return true;
+ }
+
+private:
+ size_t get_timestamp() {
+ auto ts = m_timestamp.fetch_add(1);
+ return ts;
+ }
+
+ void schedule_merge(MergeTask task) {
+ if (task.m_source_level == -1 && task.m_target_level == 0) {
+ run_buffer_merge(pending_buffer, pending_version);
+ } else {
+ run_merge(task, pending_version);
+ }
+ }
+
+
+ void schedule_query(QueryTask task) {
+
+ }
+
+ void schedule_next_task() {
+ m_merge_queue_lock.lock();
+ auto task = m_merge_queue.top();
+ m_merge_queue.pop();
+ m_merge_queue_lock.unlock();
+
+ auto type = std::visit(GetTaskType{}, task);
+
+ switch (type) {
+ case TaskType::MERGE:
+ schedule_merge(std::get<MergeTask>(task));
+ break;
+ case TaskType::QUERY:
+ schedule_query(std::get<QueryTask>(task));
+ break;
+ default: assert(false);
+ }
+
+ if (m_merge_queue.size() == 0) {
+ m_merge_cv.notify_all();
+ }
+ }
+
+
+ void run_merge(MergeTask task, Structure *version) {
+ version->merge_levels(task.m_target_level, task.m_source_level);
+
+ if (!version->validate_tombstone_proportion(task.m_target_level)) {
+ auto tasks = version->get_merge_tasks(task.m_target_level);
+ /*
+ * Schedule the merge tasks (FIXME: currently this just
+ * executes them sequentially in a blocking fashion)
+ */
+ for (ssize_t i=tasks.size()-1; i>=0; i--) {
+ tasks[i].m_timestamp = m_timestamp.fetch_add(1);
+ m_merge_queue_lock.lock();
+ m_merge_queue.push(tasks[i]);
+ m_merge_queue_lock.unlock();
+ }
+ }
+ }
+
+
+ void run_buffer_merge(Buffer *buffer, Structure *version) {
+ version->merge_buffer(buffer);
+ if (!version->validate_tombstone_proportion(0)) {
+ auto tasks = version->get_merge_tasks_from_level(0);
+
+ /*
+ * Schedule the merge tasks (FIXME: currently this just
+ * executes them sequentially in a blocking fashion)
+ */
+ for (ssize_t i=tasks.size()-1; i>=0; i--) {
+ tasks[i].m_timestamp = m_timestamp.fetch_add(1);
+ m_merge_queue_lock.lock();
+ m_merge_queue.push(tasks[i]);
+ m_merge_queue_lock.unlock();
+ }
+ }
+ }
+
+ void run_scheduler() {
+ do {
+ std::unique_lock<std::mutex> cv_lock(m_cv_lock);
+ m_cv.wait(cv_lock);
+
+ while (m_merge_queue.size() > 0 && m_used_threads.load() < m_thread_cnt) {
+ schedule_next_task();
+ }
+ cv_lock.unlock();
+ } while(!m_shutdown);
+ }
+
+ size_t m_memory_budget;
+ size_t m_thread_cnt;
+
+ Buffer *pending_buffer;
+ Structure *pending_version;
+
+ alignas(64) std::atomic<size_t> m_used_memory;
+ alignas(64) std::atomic<size_t> m_used_threads;
+ alignas(64) std::atomic<size_t> m_timestamp;
+
+ std::priority_queue<Task, std::vector<Task>, std::greater<Task>> m_merge_queue;
+ std::mutex m_merge_queue_lock;
+
+ std::mutex m_cv_lock;
+ std::condition_variable m_cv;
+
+ std::mutex m_merge_cv_lock;
+ std::condition_variable m_merge_cv;
+
+ std::thread m_sched_thrd;
+
+ bool m_shutdown;
+};
+
+}
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
new file mode 100644
index 0000000..9e0655a
--- /dev/null
+++ b/include/framework/scheduling/Task.h
@@ -0,0 +1,63 @@
+/*
+ *
+ */
+#pragma once
+
+#include <variant>
+
+#include "framework/util/Configuration.h"
+
+namespace de {
+
+enum class TaskType {
+ MERGE,
+ QUERY
+};
+
+struct MergeTask {
+ level_index m_source_level;
+ level_index m_target_level;
+ size_t m_timestamp;
+ size_t m_size;
+ TaskType m_type;
+
+ TaskType get_type() const {
+ return m_type;
+ }
+
+ friend bool operator<(const MergeTask &self, const MergeTask &other) {
+ return self.m_timestamp < other.m_timestamp;
+ }
+
+ friend bool operator>(const MergeTask &self, const MergeTask &other) {
+ return self.m_timestamp > other.m_timestamp;
+ }
+
+};
+
+struct QueryTask {
+ size_t m_timestamp;
+ size_t m_size;
+ TaskType m_type;
+
+ TaskType get_type() const {
+ return m_type;
+ }
+
+ friend bool operator<(const QueryTask &self, const QueryTask &other) {
+ return self.m_timestamp < other.m_timestamp;
+ }
+
+ friend bool operator>(const QueryTask &self, const QueryTask &other) {
+ return self.m_timestamp > other.m_timestamp;
+ }
+};
+
+struct GetTaskType {
+ TaskType operator()(const MergeTask &t) { return t.get_type(); }
+ TaskType operator()(const QueryTask &t) { return t.get_type(); }
+};
+
+typedef std::variant<MergeTask, QueryTask> Task;
+
+}
diff --git a/include/framework/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index 892e63b..920e1c3 100644
--- a/include/framework/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -14,34 +14,20 @@
#include <cstdio>
#include <vector>
-#include "framework/MutableBuffer.h"
-#include "framework/InternalLevel.h"
-#include "framework/ShardInterface.h"
-#include "framework/QueryInterface.h"
-#include "framework/RecordInterface.h"
+#include "framework/structure/MutableBuffer.h"
+#include "framework/structure/InternalLevel.h"
+#include "framework/interface/Shard.h"
+#include "framework/interface/Query.h"
+#include "framework/interface/Record.h"
-#include "framework/Configuration.h"
+#include "framework/util/Configuration.h"
+#include "framework/scheduling/Task.h"
#include "psu-util/timer.h"
#include "psu-ds/Alias.h"
namespace de {
-struct MergeTask {
- level_index m_source_level;
- level_index m_target_level;
- size_t m_size;
- size_t m_timestamp;
-
- friend bool operator<(const MergeTask &self, const MergeTask &other) {
- return self.m_timestamp < other.m_timestamp;
- }
-
- friend bool operator>(const MergeTask &self, const MergeTask &other) {
- return self.m_timestamp > other.m_timestamp;
- }
-};
-
template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING>
class ExtensionStructure {
typedef S Shard;
@@ -233,6 +219,7 @@ public:
MergeTask task;
task.m_source_level = i - 1;
task.m_target_level = i;
+ task.m_type = TaskType::MERGE;
/*
* The amount of storage required for the merge accounts
diff --git a/include/framework/InternalLevel.h b/include/framework/structure/InternalLevel.h
index 6cdac4e..b9230f4 100644
--- a/include/framework/InternalLevel.h
+++ b/include/framework/structure/InternalLevel.h
@@ -13,10 +13,10 @@
#include <memory>
#include "util/types.h"
-#include "framework/ShardInterface.h"
-#include "framework/QueryInterface.h"
-#include "framework/RecordInterface.h"
-#include "framework/MutableBuffer.h"
+#include "framework/interface/Shard.h"
+#include "framework/interface/Query.h"
+#include "framework/interface/Record.h"
+#include "framework/structure/MutableBuffer.h"
namespace de {
template <RecordInterface R, ShardInterface S, QueryInterface Q>
diff --git a/include/framework/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index 572b656..9f12175 100644
--- a/include/framework/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -22,7 +22,7 @@
#include "psu-ds/BloomFilter.h"
#include "psu-ds/Alias.h"
#include "psu-util/timer.h"
-#include "framework/RecordInterface.h"
+#include "framework/interface/Record.h"
using psudb::CACHELINE_SIZE;
diff --git a/include/framework/Configuration.h b/include/framework/util/Configuration.h
index eb9b93f..eb9b93f 100644
--- a/include/framework/Configuration.h
+++ b/include/framework/util/Configuration.h
diff --git a/include/shard/Alex.h b/include/shard/Alex.h
deleted file mode 100644
index 9f794dc..0000000
--- a/include/shard/Alex.h
+++ /dev/null
@@ -1,360 +0,0 @@
-/*
- * include/shard/Alex.h
- *
- * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
- *
- * All rights reserved. Published under the Modified BSD License.
- *
- */
-#pragma once
-
-
-#include <vector>
-#include <cassert>
-#include <queue>
-#include <memory>
-#include <concepts>
-
-#include "alex.h"
-#include "psu-ds/PriorityQueue.h"
-#include "util/Cursor.h"
-#include "psu-ds/BloomFilter.h"
-#include "util/bf_config.h"
-#include "framework/MutableBuffer.h"
-#include "framework/RecordInterface.h"
-#include "framework/ShardInterface.h"
-#include "framework/QueryInterface.h"
-
-using psudb::CACHELINE_SIZE;
-using psudb::BloomFilter;
-using psudb::PriorityQueue;
-using psudb::queue_record;
-using psudb::Alias;
-
-namespace de {
-
-template <RecordInterface R>
-struct alex_range_query_parms {
- decltype(R::key) lower_bound;
- decltype(R::key) upper_bound;
-};
-
-template <RecordInterface R>
-class AlexRangeQuery;
-
-template <RecordInterface R>
-struct AlexState {
- size_t start_idx;
- size_t stop_idx;
-};
-
-template <RecordInterface R>
-struct AlexBufferState {
- size_t cutoff;
- Alias* alias;
-
- ~AlexBufferState() {
- delete alias;
- }
-};
-
-
-template <RecordInterface R, size_t epsilon=128>
-class Alex {
-private:
- typedef decltype(R::key) K;
- typedef decltype(R::value) V;
-
-public:
-
- // FIXME: there has to be a better way to do this
- friend class AlexRangeQuery<R>;
-
- Alex(MutableBuffer<R>* buffer)
- : m_reccnt(0), m_tombstone_cnt(0) {
-
- m_alloc_size = (buffer->get_record_count() * sizeof(Wrapped<R>)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(Wrapped<R>)) % CACHELINE_SIZE);
- assert(m_alloc_size % CACHELINE_SIZE == 0);
- m_data = (Wrapped<R>*)std::aligned_alloc(CACHELINE_SIZE, m_alloc_size);
- std::vector<std::pair<K, V>> temp_records;
-
- m_bf = new BloomFilter<R>(BF_FPR, buffer->get_tombstone_count(), BF_HASH_FUNCS);
-
- size_t offset = 0;
- m_reccnt = 0;
- auto base = buffer->get_data();
- auto stop = base + buffer->get_record_count();
-
- std::sort(base, stop, std::less<Wrapped<R>>());
-
- K min_key = base->rec.key;
- K max_key = (stop - 1)->rec.key;
-
- while (base < stop) {
- if (!(base->is_tombstone()) && (base + 1) < stop) {
- if (base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) {
- base += 2;
- continue;
- }
- } else if (base->is_deleted()) {
- base += 1;
- continue;
- }
-
- // FIXME: this shouldn't be necessary, but the tagged record
- // bypass doesn't seem to be working on this code-path, so this
- // ensures that tagged records from the buffer are able to be
- // dropped, eventually. It should only need to be &= 1
- base->header &= 3;
- m_data[m_reccnt++] = *base;
- temp_records.push_back({base->rec.key, base->rec.value});
-
- if (m_bf && base->is_tombstone()) {
- m_tombstone_cnt++;
- m_bf->insert(base->rec);
- }
-
- base++;
- }
-
- if (m_reccnt > 0) {
- m_alex = alex::Alex<K, V>();
- m_alex.set_expected_insert_frac(0);
- m_alex.bulkload(temp_records.data(), temp_records.size());
- }
- }
-
- Alex(Alex** shards, size_t len)
- : m_reccnt(0), m_tombstone_cnt(0) {
- std::vector<Cursor<Wrapped<R>>> cursors;
- cursors.reserve(len);
-
- PriorityQueue<Wrapped<R>> pq(len);
-
- size_t attemp_reccnt = 0;
- size_t tombstone_count = 0;
-
- for (size_t i = 0; i < len; ++i) {
- if (shards[i]) {
- auto base = shards[i]->get_data();
- cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()});
- attemp_reccnt += shards[i]->get_record_count();
- tombstone_count += shards[i]->get_tombstone_count();
- pq.push(cursors[i].ptr, i);
-
- } else {
- cursors.emplace_back(Cursor<Wrapped<R>>{nullptr, nullptr, 0, 0});
- }
- }
-
- m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS);
-
- m_alloc_size = (attemp_reccnt * sizeof(Wrapped<R>)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(Wrapped<R>)) % CACHELINE_SIZE);
- assert(m_alloc_size % CACHELINE_SIZE == 0);
- m_data = (Wrapped<R>*)std::aligned_alloc(CACHELINE_SIZE, m_alloc_size);
-
- std::vector<std::pair<K, V>> temp_records;
-
- while (pq.size()) {
- auto now = pq.peek();
- auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
- if (!now.data->is_tombstone() && next.data != nullptr &&
- now.data->rec == next.data->rec && next.data->is_tombstone()) {
-
- pq.pop(); pq.pop();
- auto& cursor1 = cursors[now.version];
- auto& cursor2 = cursors[next.version];
- if (advance_cur5sor<Wrapped<R>>(cursor1)) pq.push(cursor1.ptr, now.version);
- if (advance_cursor<Wrapped<R>>(cursor2)) pq.push(cursor2.ptr, next.version);
- } else {
- auto& cursor = cursors[now.version];
- if (!cursor.ptr->is_deleted()) {
- m_data[m_reccnt++] = *cursor.ptr;
- temp_records.pushback({cursor.ptr->rec.key, cursor.ptr->rec.value});
- if (m_bf && cursor.ptr->is_tombstone()) {
- ++m_tombstone_cnt;
- if (m_bf) m_bf->insert(cursor.ptr->rec);
- }
- }
- pq.pop();
-
- if (advance_cursor<Wrapped<R>>(cursor)) pq.push(cursor.ptr, now.version);
- }
- }
-
- if (m_reccnt > 0) {
- m_alex = alex::Alex<K, V>();
- m_alex.set_expected_insert_frac(0);
- m_alex.bulkload(temp_records.data(), temp_records.size());
- }
- }
-
- ~Alex() {
- if (m_data) free(m_data);
- if (m_bf) delete m_bf;
-
- }
-
- Wrapped<R> *point_lookup(const R &rec, bool filter=false) {
- if (filter && !m_bf->lookup(rec)) {
- return nullptr;
- }
-
- size_t idx = get_lower_bound(rec.key);
- if (idx >= m_reccnt) {
- return nullptr;
- }
-
- while (idx < m_reccnt && m_data[idx].rec < rec) ++idx;
-
- if (m_data[idx].rec == rec) {
- return m_data + idx;
- }
-
- return nullptr;
- }
-
- Wrapped<R>* get_data() const {
- return m_data;
- }
-
- size_t get_record_count() const {
- return m_reccnt;
- }
-
- size_t get_tombstone_count() const {
- return m_tombstone_cnt;
- }
-
- const Wrapped<R>* get_record_at(size_t idx) const {
- if (idx >= m_reccnt) return nullptr;
- return m_data + idx;
- }
-
-
- size_t get_memory_usage() {
- return m_alex.size_in_bytes() + m_alloc_size;
- }
-
- alex::Alex<K, V>::Iterator get_lower_bound(const K& key) const {
- auto bound = m_alex.find(key);
- while (bound != m_alex.end() && bound.key() < key) {
- bound++;
- }
-
- return bound;
- }
-
-private:
- Wrapped<R>* m_data;
- size_t m_reccnt;
- size_t m_tombstone_cnt;
- size_t m_alloc_size;
- K m_max_key;
- K m_min_key;
- alex::Alex<K, V> m_alex;
- BloomFilter<R> *m_bf;
-};
-
-
-template <RecordInterface R>
-class AlexRangeQuery {
-public:
- static void *get_query_state(Alex<R> *ts, void *parms) {
- auto res = new AlexState<R>();
- auto p = (alex_range_query_parms<R> *) parms;
-
- res->start_idx = ts->get_lower_bound(p->lower_bound);
- res->stop_idx = ts->get_record_count();
-
- return res;
- }
-
- static void* get_buffer_query_state(MutableBuffer<R> *buffer, void *parms) {
- auto res = new AlexBufferState<R>();
- res->cutoff = buffer->get_record_count();
-
- return res;
- }
-
- static void process_query_states(void *query_parms, std::vector<void*> &shard_states, void *buff_state) {
- return;
- }
-
- static std::vector<Wrapped<R>> query(Alex<R> *ts, void *q_state, void *parms) {
- std::vector<Wrapped<R>> records;
- auto p = (alex_range_query_parms<R> *) parms;
- auto s = (AlexState<R> *) q_state;
-
- // if the returned index is one past the end of the
- // records for the Alex, then there are not records
- // in the index falling into the specified range.
- if (s->start_idx == ts->get_record_count()) {
- return records;
- }
-
- auto ptr = ts->get_record_at(s->start_idx);
-
- // roll the pointer forward to the first record that is
- // greater than or equal to the lower bound.
- while(ptr->rec.key < p->lower_bound) {
- ptr++;
- }
-
- while (ptr->rec.key <= p->upper_bound && ptr < ts->m_data + s->stop_idx) {
- records.emplace_back(*ptr);
- ptr++;
- }
-
- return records;
- }
-
- static std::vector<Wrapped<R>> buffer_query(MutableBuffer<R> *buffer, void *state, void *parms) {
- auto p = (alex_range_query_parms<R> *) parms;
- auto s = (AlexBufferState<R> *) state;
-
- std::vector<Wrapped<R>> records;
- for (size_t i=0; i<s->cutoff; i++) {
- auto rec = buffer->get_data() + i;
- if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound) {
- records.emplace_back(*rec);
- }
- }
-
- return records;
- }
-
- static std::vector<R> merge(std::vector<std::vector<R>> &results, void *parms) {
- size_t total = 0;
- for (size_t i=0; i<results.size(); i++) {
- total += results[i].size();
- }
-
- if (total == 0) {
- return std::vector<R>();
- }
-
- std::vector<R> output;
- output.reserve(total);
-
- for (size_t i=0; i<results.size(); i++) {
- std::move(results[i].begin(), results[i].end(), std::back_inserter(output));
- }
-
- return output;
- }
-
- static void delete_query_state(void *state) {
- auto s = (AlexState<R> *) state;
- delete s;
- }
-
- static void delete_buffer_query_state(void *state) {
- auto s = (AlexBufferState<R> *) state;
- delete s;
- }
-};
-
-;
-
-}
diff --git a/include/shard/MemISAM.h b/include/shard/MemISAM.h
index a220792..f9c621e 100644
--- a/include/shard/MemISAM.h
+++ b/include/shard/MemISAM.h
@@ -14,7 +14,8 @@
#include <queue>
#include <memory>
-#include "framework/MutableBuffer.h"
+#include "framework/ShardRequirements.h"
+
#include "util/bf_config.h"
#include "psu-ds/PriorityQueue.h"
#include "util/Cursor.h"
diff --git a/include/shard/PGM.h b/include/shard/PGM.h
index 2cd153e..d960e70 100644
--- a/include/shard/PGM.h
+++ b/include/shard/PGM.h
@@ -15,15 +15,13 @@
#include <memory>
#include <concepts>
+#include "framework/ShardRequirements.h"
+
#include "pgm/pgm_index.hpp"
#include "psu-ds/PriorityQueue.h"
#include "util/Cursor.h"
#include "psu-ds/BloomFilter.h"
#include "util/bf_config.h"
-#include "framework/MutableBuffer.h"
-#include "framework/RecordInterface.h"
-#include "framework/ShardInterface.h"
-#include "framework/QueryInterface.h"
using psudb::CACHELINE_SIZE;
using psudb::BloomFilter;
diff --git a/include/shard/TrieSpline.h b/include/shard/TrieSpline.h
index 69fcfbc..98153c0 100644
--- a/include/shard/TrieSpline.h
+++ b/include/shard/TrieSpline.h
@@ -15,15 +15,12 @@
#include <memory>
#include <concepts>
+#include "framework/ShardRequirements.h"
#include "ts/builder.h"
#include "psu-ds/PriorityQueue.h"
#include "util/Cursor.h"
#include "psu-ds/BloomFilter.h"
#include "util/bf_config.h"
-#include "framework/MutableBuffer.h"
-#include "framework/RecordInterface.h"
-#include "framework/ShardInterface.h"
-#include "framework/QueryInterface.h"
using psudb::CACHELINE_SIZE;
using psudb::BloomFilter;
diff --git a/include/shard/VPTree.h b/include/shard/VPTree.h
index 8feec84..0e998d9 100644
--- a/include/shard/VPTree.h
+++ b/include/shard/VPTree.h
@@ -15,14 +15,12 @@
#include <concepts>
#include <map>
+#include "framework/ShardRequirements.h"
+
#include "psu-ds/PriorityQueue.h"
#include "util/Cursor.h"
#include "psu-ds/BloomFilter.h"
#include "util/bf_config.h"
-#include "framework/MutableBuffer.h"
-#include "framework/RecordInterface.h"
-#include "framework/ShardInterface.h"
-#include "framework/QueryInterface.h"
using psudb::CACHELINE_SIZE;
using psudb::BloomFilter;
diff --git a/include/shard/WIRS.h b/include/shard/WIRS.h
index 19d3eea..8583cb0 100644
--- a/include/shard/WIRS.h
+++ b/include/shard/WIRS.h
@@ -16,15 +16,13 @@
#include <memory>
#include <concepts>
+#include "framework/ShardRequirements.h"
+
#include "psu-ds/PriorityQueue.h"
#include "util/Cursor.h"
#include "psu-ds/Alias.h"
#include "psu-ds/BloomFilter.h"
#include "util/bf_config.h"
-#include "framework/MutableBuffer.h"
-#include "framework/RecordInterface.h"
-#include "framework/ShardInterface.h"
-#include "framework/QueryInterface.h"
using psudb::CACHELINE_SIZE;
using psudb::BloomFilter;
diff --git a/include/shard/WSS.h b/include/shard/WSS.h
index c0af573..87b016c 100644
--- a/include/shard/WSS.h
+++ b/include/shard/WSS.h
@@ -16,15 +16,13 @@
#include <memory>
#include <concepts>
+#include "framework/ShardRequirements.h"
+
#include "psu-ds/PriorityQueue.h"
#include "util/Cursor.h"
#include "psu-ds/Alias.h"
#include "psu-ds/BloomFilter.h"
#include "util/bf_config.h"
-#include "framework/MutableBuffer.h"
-#include "framework/RecordInterface.h"
-#include "framework/ShardInterface.h"
-#include "framework/QueryInterface.h"
using psudb::CACHELINE_SIZE;
using psudb::BloomFilter;
diff --git a/include/util/Cursor.h b/include/util/Cursor.h
index 1b0b8ed..1cf20e1 100644
--- a/include/util/Cursor.h
+++ b/include/util/Cursor.h
@@ -9,7 +9,7 @@
*/
#pragma once
-#include "framework/RecordInterface.h"
+#include "framework/ShardRequirements.h"
#include "psu-ds/BloomFilter.h"
#include "psu-ds/PriorityQueue.h"
diff --git a/tests/internal_level_tests.cpp b/tests/internal_level_tests.cpp
index 58369ff..056e458 100644
--- a/tests/internal_level_tests.cpp
+++ b/tests/internal_level_tests.cpp
@@ -10,10 +10,10 @@
*
*/
#include "shard/WIRS.h"
-#include "framework/InternalLevel.h"
-#include "framework/RecordInterface.h"
-#include "framework/QueryInterface.h"
-#include "framework/ShardInterface.h"
+#include "framework/structure/InternalLevel.h"
+#include "framework/interface/Record.h"
+#include "framework/interface/Query.h"
+#include "framework/interface/Shard.h"
#include "testing.h"
diff --git a/tests/mutable_buffer_tests.cpp b/tests/mutable_buffer_tests.cpp
index 201fddb..a2561c8 100644
--- a/tests/mutable_buffer_tests.cpp
+++ b/tests/mutable_buffer_tests.cpp
@@ -15,7 +15,7 @@
#include <algorithm>
#include "testing.h"
-#include "framework/MutableBuffer.h"
+#include "framework/structure/MutableBuffer.h"
#include <check.h>
diff --git a/tests/testing.h b/tests/testing.h
index bdf4869..023be7f 100644
--- a/tests/testing.h
+++ b/tests/testing.h
@@ -18,8 +18,8 @@
#include "util/types.h"
#include "psu-util/alignment.h"
-#include "framework/MutableBuffer.h"
-#include "framework/RecordInterface.h"
+#include "framework/structure/MutableBuffer.h"
+#include "framework/interface/Record.h"
typedef de::WeightedRecord<uint64_t, uint32_t, uint64_t> WRec;
typedef de::Record<uint64_t, uint32_t> Rec;