summaryrefslogtreecommitdiffstats
path: root/include/framework
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework')
-rw-r--r--include/framework/DynamicExtension.h960
-rw-r--r--include/framework/InternalLevel.h213
-rw-r--r--include/framework/MutableBuffer.h180
-rw-r--r--include/framework/QueryInterface.h35
-rw-r--r--include/framework/QueryRequirements.h17
-rw-r--r--include/framework/ShardInterface.h26
-rw-r--r--include/framework/ShardRequirements.h17
-rw-r--r--include/framework/interface/Query.h30
-rw-r--r--include/framework/interface/Record.h (renamed from include/framework/RecordInterface.h)29
-rw-r--r--include/framework/interface/Scheduler.h19
-rw-r--r--include/framework/interface/Shard.h36
-rw-r--r--include/framework/scheduling/Epoch.h143
-rw-r--r--include/framework/scheduling/FIFOScheduler.h129
-rw-r--r--include/framework/scheduling/SerialScheduler.h62
-rw-r--r--include/framework/scheduling/Task.h89
-rw-r--r--include/framework/scheduling/statistics.h118
-rw-r--r--include/framework/structure/BufferView.h170
-rw-r--r--include/framework/structure/ExtensionStructure.h495
-rw-r--r--include/framework/structure/InternalLevel.h271
-rw-r--r--include/framework/structure/MutableBuffer.h313
-rw-r--r--include/framework/util/Configuration.h49
21 files changed, 2571 insertions, 830 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 524024b..7ea5370 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -1,302 +1,639 @@
/*
* include/framework/DynamicExtension.h
*
- * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
* Dong Xie <dongx@psu.edu>
*
- * All rights reserved. Published under the Modified BSD License.
+ * Distributed under the Modified BSD License.
*
*/
#pragma once
#include <atomic>
-#include <numeric>
#include <cstdio>
#include <vector>
-#include "framework/MutableBuffer.h"
-#include "framework/InternalLevel.h"
-#include "framework/ShardInterface.h"
-#include "framework/QueryInterface.h"
-#include "framework/RecordInterface.h"
+#include "framework/interface/Scheduler.h"
+#include "framework/scheduling/FIFOScheduler.h"
+#include "framework/scheduling/SerialScheduler.h"
-#include "shard/WIRS.h"
-#include "psu-util/timer.h"
-#include "psu-ds/Alias.h"
+#include "framework/structure/MutableBuffer.h"
+#include "framework/interface/Record.h"
+#include "framework/structure/ExtensionStructure.h"
-namespace de {
-
-thread_local size_t sampling_attempts = 0;
-thread_local size_t sampling_rejections = 0;
-thread_local size_t deletion_rejections = 0;
-thread_local size_t bounds_rejections = 0;
-thread_local size_t tombstone_rejections = 0;
-thread_local size_t buffer_rejections = 0;
-
-/*
- * thread_local size_t various_sampling_times go here.
- */
-thread_local size_t sample_range_time = 0;
-thread_local size_t alias_time = 0;
-thread_local size_t alias_query_time = 0;
-thread_local size_t rejection_check_time = 0;
-thread_local size_t buffer_sample_time = 0;
-thread_local size_t memlevel_sample_time = 0;
-thread_local size_t disklevel_sample_time = 0;
-thread_local size_t sampling_bailouts = 0;
-
-
-enum class LayoutPolicy {
- LEVELING,
- TEIRING
-};
-
-enum class DeletePolicy {
- TOMBSTONE,
- TAGGING
-};
+#include "framework/util/Configuration.h"
+#include "framework/scheduling/Epoch.h"
-typedef ssize_t level_index;
+namespace de {
-template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING, DeletePolicy D=DeletePolicy::TAGGING>
+template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q, LayoutPolicy L=LayoutPolicy::TEIRING,
+ DeletePolicy D=DeletePolicy::TAGGING, SchedulerInterface SCHED=FIFOScheduler>
class DynamicExtension {
- //typedef typename S<R> Shard;
typedef S Shard;
typedef MutableBuffer<R> Buffer;
+ typedef ExtensionStructure<R, S, Q, L> Structure;
+ typedef Epoch<R, S, Q, L> _Epoch;
+ typedef BufferView<R> BufView;
+
+ static constexpr size_t QUERY = 1;
+ static constexpr size_t RECONSTRUCTION = 2;
+
+ struct epoch_ptr {
+ _Epoch *epoch;
+ size_t refcnt;
+ };
public:
- DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop)
- : m_scale_factor(scale_factor), m_max_delete_prop(max_delete_prop),
- m_buffer(new Buffer(buffer_cap, buffer_cap * max_delete_prop))
- { }
+ DynamicExtension(size_t buffer_lwm, size_t buffer_hwm, size_t scale_factor, size_t memory_budget=0,
+ size_t thread_cnt=16)
+ : m_scale_factor(scale_factor)
+ , m_max_delete_prop(1)
+ , m_sched(memory_budget, thread_cnt)
+ , m_buffer(new Buffer(buffer_lwm, buffer_hwm))
+ , m_core_cnt(thread_cnt)
+ , m_next_core(0)
+ , m_epoch_cnt(0)
+ {
+ auto vers = new Structure(buffer_hwm, m_scale_factor, m_max_delete_prop);
+ m_current_epoch.store({new _Epoch(0, vers, m_buffer, 0), 0});
+ m_previous_epoch.store({nullptr, 0});
+ m_next_epoch.store({nullptr, 0});
+ }
~DynamicExtension() {
- delete m_buffer;
- for (size_t i=0; i<m_levels.size(); i++) {
- delete m_levels[i];
- }
+ /* let any in-flight epoch transition finish */
+ await_next_epoch();
+
+ /* shutdown the scheduler */
+ m_sched.shutdown();
+
+ /* delete all held resources */
+ delete m_next_epoch.load().epoch;
+ delete m_current_epoch.load().epoch;
+ delete m_previous_epoch.load().epoch;
+
+ delete m_buffer;
}
+ /*
+ * Insert the record `rec` into the index. If the buffer is full and
+ * the framework is blocking on an epoch transition, this call may fail
+ * and return 0. In this case, retry the call again later. If
+ * successful, 1 will be returned. The record will be immediately
+ * visible in the buffer upon the successful return of this function.
+ */
int insert(const R &rec) {
return internal_append(rec, false);
}
+ /*
+ * Erase the record `rec` from the index. It is assumed that `rec`
+ * currently exists--no special checks are made for correctness here.
+ * The behavior if this function will differ depending on if tombstone
+ * or tagged deletes are used.
+ *
+ * Tombstone deletes - inserts a tombstone record for `rec`. This *may*
+ * return 0 and fail if the buffer is full and the framework is
+ * blocking on an epoch transition. In this case, repeat the call
+ * later. 1 will be returned when the tombstone is successfully
+ * inserted.
+ *
+ * Tagging deletes - Does a point lookup for the record across the
+ * entire structure, and sets its delete bit when found. Returns 1 if
+ * the record is found and marked, and 0 if it was not (i.e., if it
+ * isn't present in the index).
+ */
int erase(const R &rec) {
- Buffer *buffer;
-
+ // FIXME: delete tagging will require a lot of extra work to get
+ // operating "correctly" in a concurrent environment.
+
+ /*
+ * Get a view on the buffer *first*. This will ensure a stronger
+ * ordering than simply accessing the buffer directly, but is
+ * not *strictly* necessary.
+ */
if constexpr (D == DeletePolicy::TAGGING) {
- auto buffer = get_buffer();
+ static_assert(std::same_as<SCHED, SerialScheduler>, "Tagging is only supported in single-threaded operation");
- // Check the levels first. This assumes there aren't
- // any undeleted duplicate records.
- for (auto level : m_levels) {
- if (level && level->delete_record(rec)) {
- return 1;
- }
+ auto view = m_buffer->get_buffer_view();
+
+ auto epoch = get_active_epoch();
+ if (epoch->get_structure()->tagged_delete(rec)) {
+ end_job(epoch);
+ return 1;
}
- // the buffer will take the longest amount of time, and
- // probably has the lowest probability of having the record,
- // so we'll check it last.
- return buffer->delete_record(rec);
+ end_job(epoch);
+
+ /*
+ * the buffer will take the longest amount of time, and
+ * probably has the lowest probability of having the record,
+ * so we'll check it last.
+ */
+ return view.delete_record(rec);
}
+ /*
+ * If tagging isn't used, then delete using a tombstone
+ */
return internal_append(rec, true);
}
- std::vector<R> query(void *parms) {
- auto buffer = get_buffer();
+ /*
+ * Execute the query with parameters `parms` and return a future. This
+ * future can be used to access a vector containing the results of the
+ * query.
+ *
+ * The behavior of this function is undefined if `parms` is not a
+ * pointer to a valid query parameter object for the query type used as
+ * a template parameter to construct the framework.
+ */
+ std::future<std::vector<R>> query(void *parms) {
+ return schedule_query(parms);
+ }
+
+ /*
+ * Returns the number of records (included tagged records and
+ * tombstones) currently within the framework.
+ */
+ size_t get_record_count() {
+ auto epoch = get_active_epoch();
+ auto t = epoch->get_buffer().get_record_count() + epoch->get_structure()->get_record_count();
+ end_job(epoch);
+
+ return t;
+ }
+
+ /*
+ * Returns the number of tombstone records currently within the
+ * framework. This function can be called when tagged deletes are used,
+ * but will always return 0 in that case.
+ */
+ size_t get_tombstone_count() {
+ auto epoch = get_active_epoch();
+ auto t = epoch->get_buffer().get_tombstone_count() + epoch->get_structure()->get_tombstone_count();
+ end_job(epoch);
- // Get the buffer query state
- auto buffer_state = Q::get_buffer_query_state(buffer, parms);
+ return t;
+ }
- // Get the shard query states
- std::vector<std::pair<ShardID, Shard*>> shards;
- std::vector<void*> states;
+ /*
+ * Get the number of levels within the framework. This count will
+ * include any empty levels, but will not include the buffer. Note that
+ * this is *not* the same as the number of shards when tiering is used,
+ * as each level can contain multiple shards in that case.
+ */
+ size_t get_height() {
+ auto epoch = get_active_epoch();
+ auto t = epoch->get_structure()->get_height();
+ end_job(epoch);
- for (auto &level : m_levels) {
- level->get_query_states(shards, states, parms);
- }
+ return t;
+ }
- Q::process_query_states(parms, states, buffer_state);
+ /*
+ * Get the number of bytes of memory allocated across the framework for
+ * storing records and associated index information (i.e., internal
+ * ISAM tree nodes). This includes memory that is allocated but
+ * currently unused in the buffer, or in shards themselves
+ * (overallocation due to delete cancellation, etc.).
+ */
+ size_t get_memory_usage() {
+ auto epoch = get_active_epoch();
+ auto t= epoch->get_buffer().get_memory_usage() + epoch->get_structure()->get_memory_usage();
+ end_job(epoch);
- std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + 1);
+ return t;
+ }
- // Execute the query for the buffer
- auto buffer_results = Q::buffer_query(buffer, buffer_state, parms);
- query_results[0] = std::move(filter_deletes(buffer_results, {-1, -1}, buffer));
- if constexpr (Q::EARLY_ABORT) {
- if (query_results[0].size() > 0) {
- auto result = Q::merge(query_results, parms);
- for (size_t i=0; i<states.size(); i++) {
- Q::delete_query_state(states[i]);
- }
+ /*
+ * Get the number of bytes of memory allocated across the framework for
+ * auxiliary structures. This can include bloom filters, aux
+ * hashtables, etc.
+ */
+ size_t get_aux_memory_usage() {
+ auto epoch = get_active_epoch();
+ auto t = epoch->get_buffer().get_aux_memory_usage() + epoch->get_structure()->get_aux_memory_usage();
+ end_job(epoch);
- Q::delete_buffer_query_state(buffer_state);
- return result;
- }
+ return t;
+ }
+
+ /*
+ * Returns the maximum physical capacity of the buffer, measured in
+ * records.
+ */
+ size_t get_buffer_capacity() {
+ return m_buffer->get_capacity();
+ }
+
+ /*
+ * Create a new single Shard object containing all of the records
+ * within the framework (buffer and shards). The optional parameter can
+ * be used to specify whether the Shard should be constructed with the
+ * currently active state of the framework (false), or if shard
+ * construction should wait until any ongoing reconstructions have
+ * finished and use that new version (true).
+ */
+ Shard *create_static_structure(bool await_reconstruction_completion=false) {
+ if (await_reconstruction_completion) {
+ await_next_epoch();
}
- // Execute the query for each shard
- for (size_t i=0; i<shards.size(); i++) {
- auto shard_results = Q::query(shards[i].second, states[i], parms);
- query_results[i+1] = std::move(filter_deletes(shard_results, shards[i].first, buffer));
- if constexpr (Q::EARLY_ABORT) {
- if (query_results[i].size() > 0) {
- auto result = Q::merge(query_results, parms);
- for (size_t i=0; i<states.size(); i++) {
- Q::delete_query_state(states[i]);
- }
+ auto epoch = get_active_epoch();
+ auto vers = epoch->get_structure();
+ std::vector<Shard *> shards;
- Q::delete_buffer_query_state(buffer_state);
- return result;
+ if (vers->get_levels().size() > 0) {
+ for (int i=vers->get_levels().size() - 1; i>= 0; i--) {
+ if (vers->get_levels()[i] && vers->get_levels()[i]->get_record_count() > 0) {
+ shards.emplace_back(vers->get_levels()[i]->get_combined_shard());
}
}
}
-
- // Merge the results together
- auto result = Q::merge(query_results, parms);
- for (size_t i=0; i<states.size(); i++) {
- Q::delete_query_state(states[i]);
+ /*
+ * construct a shard from the buffer view. We'll hold the view
+ * for as short a time as possible: once the records are exfiltrated
+ * from the buffer, there's no reason to retain a hold on the view's
+ * head pointer any longer
+ */
+ {
+ auto bv = epoch->get_buffer();
+ if (bv.get_record_count() > 0) {
+ shards.emplace_back(new S(std::move(bv)));
+ }
}
- Q::delete_buffer_query_state(buffer_state);
+ Shard *flattened = new S(shards);
- return result;
- }
+ for (auto shard : shards) {
+ delete shard;
+ }
- size_t get_record_count() {
- size_t cnt = get_buffer()->get_record_count();
+ end_job(epoch);
+ return flattened;
+ }
- for (size_t i=0; i<m_levels.size(); i++) {
- if (m_levels[i]) cnt += m_levels[i]->get_record_count();
+ /*
+ * If the current epoch is *not* the newest one, then wait for
+ * the newest one to become available. Otherwise, returns immediately.
+ */
+ void await_next_epoch() {
+ while (m_next_epoch.load().epoch != nullptr) {
+ std::unique_lock<std::mutex> lk(m_epoch_cv_lk);
+ m_epoch_cv.wait(lk);
}
+ }
- return cnt;
+ /*
+ * Mostly exposed for unit-testing purposes. Verifies that the current
+ * active version of the ExtensionStructure doesn't violate the maximum
+ * tombstone proportion invariant.
+ */
+ bool validate_tombstone_proportion() {
+ auto epoch = get_active_epoch();
+ auto t = epoch->get_structure()->validate_tombstone_proportion();
+ end_job(epoch);
+ return t;
}
- size_t get_tombstone_cnt() {
- size_t cnt = get_buffer()->get_tombstone_count();
- for (size_t i=0; i<m_levels.size(); i++) {
- if (m_levels[i]) cnt += m_levels[i]->get_tombstone_count();
- }
+ void print_scheduler_statistics() {
+ m_sched.print_statistics();
+ }
+
+private:
+ SCHED m_sched;
- return cnt;
+ Buffer *m_buffer;
+
+ //std::mutex m_struct_lock;
+ //std::set<Structure *> m_versions;
+
+ alignas(64) std::atomic<bool> m_reconstruction_scheduled;
+
+ std::atomic<epoch_ptr> m_next_epoch;
+ std::atomic<epoch_ptr> m_current_epoch;
+ std::atomic<epoch_ptr> m_previous_epoch;
+
+ std::condition_variable m_epoch_cv;
+ std::mutex m_epoch_cv_lk;
+
+ std::atomic<size_t> m_epoch_cnt;
+
+ size_t m_scale_factor;
+ double m_max_delete_prop;
+
+ std::atomic<int> m_next_core;
+ size_t m_core_cnt;
+
+ void enforce_delete_invariant(_Epoch *epoch) {
+ auto structure = epoch->get_structure();
+ auto compactions = structure->get_compaction_tasks();
+
+ while (compactions.size() > 0) {
+
+ /* schedule a compaction */
+ ReconstructionArgs<R, S, Q, L> *args = new ReconstructionArgs<R, S, Q, L>();
+ args->epoch = epoch;
+ args->merges = compactions;
+ args->extension = this;
+ args->compaction = true;
+ /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed here */
+
+ auto wait = args->result.get_future();
+
+ m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION);
+
+ /* wait for compaction completion */
+ wait.get();
+
+ /* get a new batch of compactions to perform, if needed */
+ compactions = structure->get_compaction_tasks();
+ }
}
- size_t get_height() {
- return m_levels.size();
+ _Epoch *get_active_epoch() {
+ epoch_ptr old, new_ptr;
+
+ do {
+ /*
+ * during an epoch transition, a nullptr will installed in the
+ * current_epoch. At this moment, the "new" current epoch will
+ * soon be installed, but the "current" current epoch has been
+ * moved back to m_previous_epoch.
+ */
+ if (m_current_epoch.load().epoch == nullptr) {
+ old = m_previous_epoch;
+ new_ptr = {old.epoch, old.refcnt+1};
+ if (old.epoch != nullptr && m_previous_epoch.compare_exchange_strong(old, new_ptr)) {
+ break;
+ }
+ } else {
+ old = m_current_epoch;
+ new_ptr = {old.epoch, old.refcnt+1};
+ if (old.epoch != nullptr && m_current_epoch.compare_exchange_strong(old, new_ptr)) {
+ break;
+ }
+ }
+ } while (true);
+
+ assert(new_ptr.refcnt > 0);
+
+ return new_ptr.epoch;
}
- size_t get_memory_usage() {
- size_t cnt = m_buffer->get_memory_usage();
+ void advance_epoch(size_t buffer_head) {
+
+ retire_epoch(m_previous_epoch.load().epoch);
+
+ epoch_ptr tmp = {nullptr, 0};
+ epoch_ptr cur;
+ do {
+ cur = m_current_epoch;
+ } while(!m_current_epoch.compare_exchange_strong(cur, tmp));
+
+ m_previous_epoch.store(cur);
- for (size_t i=0; i<m_levels.size(); i++) {
- if (m_levels[i]) cnt += m_levels[i]->get_memory_usage();
+ // FIXME: this may currently block because there isn't any
+ // query preemption yet. At this point, we'd need to either
+ // 1) wait for all queries on the old_head to finish
+ // 2) kill all queries on the old_head
+ // 3) somehow migrate all queries on the old_head to the new
+ // version
+ while (!m_next_epoch.load().epoch->advance_buffer_head(buffer_head)) {
+ _mm_pause();
}
- return cnt;
+
+ m_current_epoch.store(m_next_epoch);
+ m_next_epoch.store({nullptr, 0});
+
+
+ /* notify any blocking threads that the new epoch is available */
+ m_epoch_cv_lk.lock();
+ m_epoch_cv.notify_all();
+ m_epoch_cv_lk.unlock();
}
- size_t get_aux_memory_usage() {
- size_t cnt = m_buffer->get_aux_memory_usage();
+ /*
+ * Creates a new epoch by copying the currently active one. The new epoch's
+ * structure will be a shallow copy of the old one's.
+ */
+ _Epoch *create_new_epoch() {
+ /*
+ * This epoch access is _not_ protected under the assumption that
+ * only one reconstruction will be able to trigger at a time. If that condition
+ * is violated, it is possible that this code will clone a retired
+ * epoch.
+ */
+ assert(m_next_epoch.load().epoch == nullptr);
+ auto current_epoch = get_active_epoch();
+
+ m_epoch_cnt.fetch_add(1);
+ m_next_epoch.store({current_epoch->clone(m_epoch_cnt.load()), 0});
+
+ end_job(current_epoch);
+
+ return m_next_epoch.load().epoch;
+ }
- for (size_t i=0; i<m_levels.size(); i++) {
- if (m_levels[i]) {
- cnt += m_levels[i]->get_aux_memory_usage();
- }
+ void retire_epoch(_Epoch *epoch) {
+ /*
+ * Epochs with currently active jobs cannot
+ * be retired. By the time retire_epoch is called,
+ * it is assumed that a new epoch is active, meaning
+ * that the epoch to be retired should no longer
+ * accumulate new active jobs. Eventually, this
+ * number will hit zero and the function will
+ * proceed.
+ */
+
+ if (epoch == nullptr) {
+ return;
}
- return cnt;
- }
+ epoch_ptr old, new_ptr;
+ new_ptr = {nullptr, 0};
+ do {
+ old = m_previous_epoch.load();
+
+ /*
+ * If running in single threaded mode, the failure to retire
+ * an Epoch will result in the thread of execution blocking
+ * indefinitely.
+ */
+ if constexpr (std::same_as<SCHED, SerialScheduler>) {
+ if (old.epoch == epoch) assert(old.refcnt == 0);
+ }
- bool validate_tombstone_proportion() {
- long double ts_prop;
- for (size_t i=0; i<m_levels.size(); i++) {
- if (m_levels[i]) {
- ts_prop = (long double) m_levels[i]->get_tombstone_count() / (long double) calc_level_record_capacity(i);
- if (ts_prop > (long double) m_max_delete_prop) {
- return false;
- }
+ if (old.epoch == epoch && old.refcnt == 0 &&
+ m_previous_epoch.compare_exchange_strong(old, new_ptr)) {
+ break;
}
- }
+ usleep(1);
+
+ } while(true);
- return true;
+ delete epoch;
}
- size_t get_buffer_capacity() {
- return m_buffer->get_capacity();
+ static void reconstruction(void *arguments) {
+ auto args = (ReconstructionArgs<R, S, Q, L> *) arguments;
+
+ ((DynamicExtension *) args->extension)->SetThreadAffinity();
+ Structure *vers = args->epoch->get_structure();
+
+ for (ssize_t i=0; i<args->merges.size(); i++) {
+ vers->reconstruction(args->merges[i].second, args->merges[i].first);
+ }
+
+ /*
+ * we'll grab the buffer AFTER doing the internal reconstruction, so we
+ * can flush as many records as possible in one go. The reconstruction
+ * was done so as to make room for the full buffer anyway, so there's
+ * no real benefit to doing this first.
+ */
+ auto buffer_view = args->epoch->get_buffer();
+ size_t new_head = buffer_view.get_tail();
+
+ /*
+ * if performing a compaction, don't flush the buffer, as
+ * there is no guarantee that any necessary reconstructions
+ * will free sufficient space in L0 to support a flush
+ */
+ if (!args->compaction) {
+ vers->flush_buffer(std::move(buffer_view));
+ }
+
+ args->result.set_value(true);
+
+ /*
+ * Compactions occur on an epoch _before_ it becomes active,
+ * and as a result the active epoch should _not_ be advanced as
+ * part of a compaction
+ */
+ if (!args->compaction) {
+ ((DynamicExtension *) args->extension)->advance_epoch(new_head);
+ }
+
+ ((DynamicExtension *) args->extension)->m_reconstruction_scheduled.store(false);
+
+ delete args;
}
- Shard *create_static_structure() {
- std::vector<Shard *> shards;
+ static void async_query(void *arguments) {
+ QueryArgs<R, S, Q, L> *args = (QueryArgs<R, S, Q, L> *) arguments;
- if (m_levels.size() > 0) {
- for (int i=m_levels.size() - 1; i>= 0; i--) {
- if (m_levels[i]) {
- shards.emplace_back(m_levels[i]->get_merged_shard());
- }
- }
- }
+ auto epoch = ((DynamicExtension *) args->extension)->get_active_epoch();
+
+ auto ptr1 = ((DynamicExtension *) args->extension)->m_previous_epoch.load().epoch;
+ auto ptr2 = ((DynamicExtension *) args->extension)->m_current_epoch.load().epoch;
+ auto ptr3 = ((DynamicExtension *) args->extension)->m_next_epoch.load().epoch;
+
+
+ auto buffer = epoch->get_buffer();
+ auto vers = epoch->get_structure();
+ void *parms = args->query_parms;
+
+ /* Get the buffer query states */
+ void *buffer_state = Q::get_buffer_query_state(&buffer, parms);
+
+ /* Get the shard query states */
+ std::vector<std::pair<ShardID, Shard*>> shards;
+ std::vector<void *> states = vers->get_query_states(shards, parms);
+
+ Q::process_query_states(parms, states, buffer_state);
- shards.emplace_back(new S(get_buffer()));
+ std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + 1);
+ for (size_t i=0; i<query_results.size(); i++) {
+ std::vector<Wrapped<R>> local_results;
+ ShardID shid;
+
+ if (i == 0) { /* process the buffer first */
+ local_results = Q::buffer_query(buffer_state, parms);
+ shid = INVALID_SHID;
+ } else {
+ local_results = Q::query(shards[i - 1].second, states[i - 1], parms);
+ shid = shards[i - 1].first;
+ }
- Shard *shards_array[shards.size()];
+ query_results[i] = std::move(filter_deletes(local_results, shid, vers, &buffer));
- size_t j = 0;
- for (size_t i=0; i<shards.size(); i++) {
- if (shards[i]) {
- shards_array[j++] = shards[i];
+ if constexpr (Q::EARLY_ABORT) {
+ if (query_results[i].size() > 0) break;
}
}
- Shard *flattened = new S(shards_array, j);
+ auto result = Q::merge(query_results, parms);
+ args->result_set.set_value(std::move(result));
- for (auto shard : shards) {
- delete shard;
+ ((DynamicExtension *) args->extension)->end_job(epoch);
+
+ Q::delete_buffer_query_state(buffer_state);
+ for (size_t i=0; i<states.size(); i++) {
+ Q::delete_query_state(states[i]);
}
- return flattened;
+ delete args;
}
-private:
- Buffer *m_buffer;
+ void schedule_reconstruction() {
+ auto epoch = create_new_epoch();
+ /*
+ * the reconstruction process calls end_job(),
+ * so we must start one before calling it
+ */
+
+ ReconstructionArgs<R, S, Q, L> *args = new ReconstructionArgs<R, S, Q, L>();
+ args->epoch = epoch;
+ args->merges = epoch->get_structure()->get_reconstruction_tasks(m_buffer->get_high_watermark());
+ args->extension = this;
+ args->compaction = false;
+ /* NOTE: args is deleted by the reconstruction job, so shouldn't be freed here */
+
+ m_sched.schedule_job(reconstruction, 0, args, RECONSTRUCTION);
+ }
- size_t m_scale_factor;
- double m_max_delete_prop;
+ std::future<std::vector<R>> schedule_query(void *query_parms) {
+ QueryArgs<R, S, Q, L> *args = new QueryArgs<R, S, Q, L>();
+ args->extension = this;
+ args->query_parms = query_parms;
+ auto result = args->result_set.get_future();
- std::vector<InternalLevel<R, S, Q> *> m_levels;
+ m_sched.schedule_job(async_query, 0, args, QUERY);
- Buffer *get_buffer() {
- return m_buffer;
+ return result;
}
int internal_append(const R &rec, bool ts) {
- Buffer *buffer;
- while (!(buffer = get_buffer()))
- ;
-
- if (buffer->is_full()) {
- merge_buffer();
+ if (m_buffer->is_at_low_watermark()) {
+ auto old = false;
+
+ if (m_reconstruction_scheduled.compare_exchange_strong(old, true)) {
+ schedule_reconstruction();
+ }
}
- return buffer->append(rec, ts);
+ /* this will fail if the HWM is reached and return 0 */
+ return m_buffer->append(rec, ts);
}
- std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Buffer *buffer) {
- if constexpr (!Q::SKIP_DELETE_FILTER) {
+ static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Structure *vers, BufView *bview) {
+ if constexpr (Q::SKIP_DELETE_FILTER) {
return records;
}
std::vector<Wrapped<R>> processed_records;
processed_records.reserve(records.size());
- // For delete tagging, we just need to check the delete bit on each
- // record.
+ /*
+ * For delete tagging, we just need to check the delete bit
+ * on each record.
+ */
if constexpr (D == DeletePolicy::TAGGING) {
for (auto &rec : records) {
if (rec.is_deleted()) {
@@ -309,25 +646,35 @@ private:
return processed_records;
}
- // For tombstone deletes, we need to search for the corresponding
- // tombstone for each record.
+ /*
+ * For tombstone deletes, we need to search for the corresponding
+ * tombstone for each record.
+ */
for (auto &rec : records) {
if (rec.is_tombstone()) {
continue;
}
- if (buffer->check_tombstone(rec.rec)) {
- continue;
+ // FIXME: need to figure out how best to re-enable the buffer tombstone
+ // check in the correct manner.
+ //if (buffview.check_tombstone(rec.rec)) {
+ //continue;
+ //}
+
+ for (size_t i=0; i<bview->get_record_count(); i++) {
+ if (bview->get(i)->is_tombstone() && bview->get(i)->rec == rec.rec) {
+ continue;
+ }
}
if (shid != INVALID_SHID) {
for (size_t lvl=0; lvl<=shid.level_idx; lvl++) {
- if (m_levels[lvl]->check_tombstone(0, rec.rec)) {
+ if (vers->get_levels()[lvl]->check_tombstone(0, rec.rec)) {
continue;
}
}
- if (m_levels[shid.level_idx]->check_tombstone(shid.shard_idx + 1, rec.rec)) {
+ if (vers->get_levels()[shid.level_idx]->check_tombstone(shid.shard_idx + 1, rec.rec)) {
continue;
}
}
@@ -338,197 +685,70 @@ private:
return processed_records;
}
- /*
- * Add a new level to the LSM Tree and return that level's index. Will
- * automatically determine whether the level should be on memory or on disk,
- * and act appropriately.
- */
- inline level_index grow() {
- level_index new_idx;
-
- size_t new_shard_cnt = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor;
- new_idx = m_levels.size();
- if (new_idx > 0) {
- assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0);
- }
- m_levels.emplace_back(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt));
-
- return new_idx;
- }
-
-
- // Merge the memory table down into the tree, completing any required other
- // merges to make room for it.
- inline void merge_buffer() {
- auto buffer = get_buffer();
-
- if (!can_merge_with(0, buffer->get_record_count())) {
- merge_down(0);
+ void SetThreadAffinity() {
+ int core = m_next_core.fetch_add(1) % m_core_cnt;
+ cpu_set_t mask;
+ CPU_ZERO(&mask);
+
+ switch (core % 2) {
+ case 0:
+ // 0 |-> 0
+ // 2 |-> 2
+ // 4 |-> 4
+ core = core;
+ break;
+ case 1:
+ // 1 |-> 28
+ // 3 |-> 30
+ // 5 |-> 32
+ core = (core - 1) + m_core_cnt;
+ break;
}
-
- merge_buffer_into_l0(buffer);
- enforce_delete_maximum(0);
-
- buffer->truncate();
- return;
+ CPU_SET(core, &mask);
+ ::sched_setaffinity(0, sizeof(mask), &mask);
}
- /*
- * Merge the specified level down into the tree. The level index must be
- * non-negative (i.e., this function cannot be used to merge the buffer). This
- * routine will recursively perform any necessary merges to make room for the
- * specified level.
- */
- inline void merge_down(level_index idx) {
- level_index merge_base_level = find_mergable_level(idx);
- if (merge_base_level == -1) {
- merge_base_level = grow();
- }
- for (level_index i=merge_base_level; i>idx; i--) {
- merge_levels(i, i-1);
- enforce_delete_maximum(i);
- }
+ void end_job(_Epoch *epoch) {
+ epoch_ptr old, new_ptr;
- return;
- }
+ do {
+ if (m_previous_epoch.load().epoch == epoch) {
+ old = m_previous_epoch;
+ /*
+ * This could happen if we get into the system during a
+ * transition. In this case, we can just back out and retry
+ */
+ if (old.epoch == nullptr) {
+ continue;
+ }
- /*
- * Find the first level below the level indicated by idx that
- * is capable of sustaining a merge operation and return its
- * level index. If no such level exists, returns -1. Also
- * returns -1 if idx==0, and no such level exists, to simplify
- * the logic of the first merge.
- */
- inline level_index find_mergable_level(level_index idx, Buffer *buffer=nullptr) {
+ assert(old.refcnt > 0);
- if (idx == 0 && m_levels.size() == 0) return -1;
+ new_ptr = {old.epoch, old.refcnt - 1};
+ if (m_previous_epoch.compare_exchange_strong(old, new_ptr)) {
+ break;
+ }
+ } else {
+ old = m_current_epoch;
+ /*
+ * This could happen if we get into the system during a
+ * transition. In this case, we can just back out and retry
+ */
+ if (old.epoch == nullptr) {
+ continue;
+ }
- bool level_found = false;
- bool disk_level;
- level_index merge_level_idx;
+ assert(old.refcnt > 0);
- size_t incoming_rec_cnt = get_level_record_count(idx, buffer);
- for (level_index i=idx+1; i<m_levels.size(); i++) {
- if (can_merge_with(i, incoming_rec_cnt)) {
- return i;
+ new_ptr = {old.epoch, old.refcnt - 1};
+ if (m_current_epoch.compare_exchange_strong(old, new_ptr)) {
+ break;
+ }
}
-
- incoming_rec_cnt = get_level_record_count(i);
- }
-
- return -1;
- }
-
- /*
- * Merge the level specified by incoming level into the level specified
- * by base level. The two levels should be sequential--i.e. no levels
- * are skipped in the merge process--otherwise the tombstone ordering
- * invariant may be violated by the merge operation.
- */
- inline void merge_levels(level_index base_level, level_index incoming_level) {
- // merging two memory levels
- if constexpr (L == LayoutPolicy::LEVELING) {
- auto tmp = m_levels[base_level];
- m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level], m_levels[incoming_level]);
- mark_as_unused(tmp);
- } else {
- m_levels[base_level]->append_merged_shards(m_levels[incoming_level]);
- }
-
- mark_as_unused(m_levels[incoming_level]);
- m_levels[incoming_level] = new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor);
- }
-
-
- inline void merge_buffer_into_l0(Buffer *buffer) {
- assert(m_levels[0]);
- if constexpr (L == LayoutPolicy::LEVELING) {
- // FIXME: Kludgey implementation due to interface constraints.
- auto old_level = m_levels[0];
- auto temp_level = new InternalLevel<R, Shard, Q>(0, 1);
- temp_level->append_buffer(buffer);
- auto new_level = InternalLevel<R, Shard, Q>::merge_levels(old_level, temp_level);
-
- m_levels[0] = new_level;
- delete temp_level;
- mark_as_unused(old_level);
- } else {
- m_levels[0]->append_buffer(buffer);
- }
- }
-
- /*
- * Mark a given memory level as no-longer in use by the tree. For now this
- * will just free the level. In future, this will be more complex as the
- * level may not be able to immediately be deleted, depending upon who
- * else is using it.
- */
- inline void mark_as_unused(InternalLevel<R, Shard, Q> *level) {
- delete level;
- }
-
- /*
- * Check the tombstone proportion for the specified level and
- * if the limit is exceeded, forcibly merge levels until all
- * levels below idx are below the limit.
- */
- inline void enforce_delete_maximum(level_index idx) {
- long double ts_prop = (long double) m_levels[idx]->get_tombstone_count() / (long double) calc_level_record_capacity(idx);
-
- if (ts_prop > (long double) m_max_delete_prop) {
- merge_down(idx);
- }
-
- return;
- }
-
- /*
- * Assume that level "0" should be larger than the buffer. The buffer
- * itself is index -1, which should return simply the buffer capacity.
- */
- inline size_t calc_level_record_capacity(level_index idx) {
- return get_buffer()->get_capacity() * pow(m_scale_factor, idx+1);
+ } while (true);
}
- /*
- * Returns the actual number of records present on a specified level. An
- * index value of -1 indicates the memory table. Can optionally pass in
- * a pointer to the memory table to use, if desired. Otherwise, there are
- * no guarantees about which buffer will be accessed if level_index is -1.
- */
- inline size_t get_level_record_count(level_index idx, Buffer *buffer=nullptr) {
-
- assert(idx >= -1);
- if (idx == -1) {
- return (buffer) ? buffer->get_record_count() : get_buffer()->get_record_count();
- }
-
- return (m_levels[idx]) ? m_levels[idx]->get_record_count() : 0;
- }
-
- /*
- * Determines if the specific level can merge with another record containing
- * incoming_rec_cnt number of records. The provided level index should be
- * non-negative (i.e., not refer to the buffer) and will be automatically
- * translated into the appropriate index into either the disk or memory level
- * vector.
- */
- inline bool can_merge_with(level_index idx, size_t incoming_rec_cnt) {
- if (idx>= m_levels.size() || !m_levels[idx]) {
- return false;
- }
-
- if (L == LayoutPolicy::LEVELING) {
- return m_levels[idx]->get_record_count() + incoming_rec_cnt <= calc_level_record_capacity(idx);
- } else {
- return m_levels[idx]->get_shard_count() < m_scale_factor;
- }
-
- // unreachable
- assert(true);
- }
};
-
}
diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h
deleted file mode 100644
index ec8ffc4..0000000
--- a/include/framework/InternalLevel.h
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * include/framework/InternalLevel.h
- *
- * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
- * Dong Xie <dongx@psu.edu>
- *
- * All rights reserved. Published under the Modified BSD License.
- *
- */
-#pragma once
-
-#include <vector>
-#include <memory>
-
-#include "util/types.h"
-#include "framework/ShardInterface.h"
-#include "framework/QueryInterface.h"
-#include "framework/RecordInterface.h"
-#include "framework/MutableBuffer.h"
-
-namespace de {
-
-template <RecordInterface R, ShardInterface S, QueryInterface Q>
-class InternalLevel {
- typedef S Shard;
- typedef MutableBuffer<R> Buffer;
-public:
- InternalLevel(ssize_t level_no, size_t shard_cap)
- : m_level_no(level_no)
- , m_shard_cnt(0)
- , m_shards(shard_cap, nullptr)
- , m_owns(shard_cap, true)
- {}
-
- // Create a new memory level sharing the shards and repurposing it as previous level_no + 1
- // WARNING: for leveling only.
- InternalLevel(InternalLevel* level)
- : m_level_no(level->m_level_no + 1)
- , m_shard_cnt(level->m_shard_cnt)
- , m_shards(level->m_shards.size(), nullptr)
- , m_owns(level->m_owns.size(), true) {
- assert(m_shard_cnt == 1 && m_shards.size() == 1);
-
- for (size_t i=0; i<m_shards.size(); i++) {
- level->m_owns[i] = false;
- m_shards[i] = level->m_shards[i];
- }
- }
-
- ~InternalLevel() {
- for (size_t i=0; i<m_shards.size(); i++) {
- if (m_owns[i]) delete m_shards[i];
- }
- }
-
- // WARNING: for leveling only.
- // assuming the base level is the level new level is merging into. (base_level is larger.)
- static InternalLevel* merge_levels(InternalLevel* base_level, InternalLevel* new_level) {
- assert(base_level->m_level_no > new_level->m_level_no || (base_level->m_level_no == 0 && new_level->m_level_no == 0));
- auto res = new InternalLevel(base_level->m_level_no, 1);
- res->m_shard_cnt = 1;
- Shard* shards[2];
- shards[0] = base_level->m_shards[0];
- shards[1] = new_level->m_shards[0];
-
- res->m_shards[0] = new S(shards, 2);
- return res;
- }
-
- void append_buffer(Buffer* buffer) {
- assert(m_shard_cnt < m_shards.size());
- m_shards[m_shard_cnt] = new S(buffer);
- m_owns[m_shard_cnt] = true;
- ++m_shard_cnt;
- }
-
- void append_merged_shards(InternalLevel* level) {
- assert(m_shard_cnt < m_shards.size());
- m_shards[m_shard_cnt] = new S(level->m_shards.data(), level->m_shard_cnt);
- m_owns[m_shard_cnt] = true;
-
- ++m_shard_cnt;
- }
-
- Shard *get_merged_shard() {
- Shard *shards[m_shard_cnt];
-
- for (size_t i=0; i<m_shard_cnt; i++) {
- shards[i] = m_shards[i];
- }
-
- return new S(shards, m_shard_cnt);
- }
-
- // Append the sample range in-order.....
- void get_query_states(std::vector<std::pair<ShardID, Shard *>> &shards, std::vector<void*>& shard_states, void *query_parms) {
- for (size_t i=0; i<m_shard_cnt; i++) {
- if (m_shards[i]) {
- auto shard_state = Q::get_query_state(m_shards[i], query_parms);
- shards.push_back({{m_level_no, (ssize_t) i}, m_shards[i]});
- shard_states.emplace_back(shard_state);
- }
- }
- }
-
- bool check_tombstone(size_t shard_stop, const R& rec) {
- if (m_shard_cnt == 0) return false;
-
- for (int i = m_shard_cnt - 1; i >= (ssize_t) shard_stop; i--) {
- if (m_shards[i]) {
- auto res = m_shards[i]->point_lookup(rec, true);
- if (res && res->is_tombstone()) {
- return true;
- }
- }
- }
- return false;
- }
-
- bool delete_record(const R &rec) {
- if (m_shard_cnt == 0) return false;
-
- for (size_t i = 0; i < m_shards.size(); ++i) {
- if (m_shards[i]) {
- auto res = m_shards[i]->point_lookup(rec);
- if (res) {
- res->set_delete();
- return true;
- }
- }
- }
-
- return false;
- }
-
- Shard* get_shard(size_t idx) {
- return m_shards[idx];
- }
-
- size_t get_shard_count() {
- return m_shard_cnt;
- }
-
- size_t get_record_count() {
- size_t cnt = 0;
- for (size_t i=0; i<m_shard_cnt; i++) {
- cnt += m_shards[i]->get_record_count();
- }
-
- return cnt;
- }
-
- size_t get_tombstone_count() {
- size_t res = 0;
- for (size_t i = 0; i < m_shard_cnt; ++i) {
- res += m_shards[i]->get_tombstone_count();
- }
- return res;
- }
-
- size_t get_aux_memory_usage() {
- size_t cnt = 0;
- for (size_t i=0; i<m_shard_cnt; i++) {
- cnt += m_shards[i]->get_aux_memory_usage();
- }
-
- return cnt;
- }
-
- size_t get_memory_usage() {
- size_t cnt = 0;
- for (size_t i=0; i<m_shard_cnt; i++) {
- if (m_shards[i]) {
- cnt += m_shards[i]->get_memory_usage();
- }
- }
-
- return cnt;
- }
-
- double get_tombstone_prop() {
- size_t tscnt = 0;
- size_t reccnt = 0;
- for (size_t i=0; i<m_shard_cnt; i++) {
- if (m_shards[i]) {
- tscnt += m_shards[i]->get_tombstone_count();
- reccnt += (*m_shards[i])->get_record_count();
- }
- }
-
- return (double) tscnt / (double) (tscnt + reccnt);
- }
-
-private:
- ssize_t m_level_no;
-
- size_t m_shard_cnt;
- size_t m_shard_size_cap;
-
- std::vector<Shard*> m_shards;
- std::vector<bool> m_owns;
-
- InternalLevel *clone() {
- auto new_level = new InternalLevel(m_level_no, m_shards.size());
- for (size_t i=0; i<m_shard_cnt; i++) {
- new_level->m_shards[i] = m_shards[i];
- new_level->m_owns[i] = true;
- m_owns[i] = false;
- }
- }
-};
-
-}
diff --git a/include/framework/MutableBuffer.h b/include/framework/MutableBuffer.h
deleted file mode 100644
index b79fc02..0000000
--- a/include/framework/MutableBuffer.h
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * include/framework/MutableBuffer.h
- *
- * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
- * Dong Xie <dongx@psu.edu>
- *
- * All rights reserved. Published under the Modified BSD License.
- *
- */
-#pragma once
-
-#include <cstdlib>
-#include <atomic>
-#include <cassert>
-#include <numeric>
-#include <algorithm>
-#include <type_traits>
-
-#include "psu-util/alignment.h"
-#include "util/bf_config.h"
-#include "psu-ds/BloomFilter.h"
-#include "psu-ds/Alias.h"
-#include "psu-util/timer.h"
-#include "framework/RecordInterface.h"
-
-using psudb::CACHELINE_SIZE;
-
-namespace de {
-
-template <RecordInterface R>
-class MutableBuffer {
-public:
- MutableBuffer(size_t capacity, size_t max_tombstone_cap)
- : m_cap(capacity), m_tombstone_cap(max_tombstone_cap), m_reccnt(0)
- , m_tombstonecnt(0), m_weight(0), m_max_weight(0) {
- auto len = capacity * sizeof(Wrapped<R>);
- size_t aligned_buffersize = len + (CACHELINE_SIZE - (len % CACHELINE_SIZE));
- m_data = (Wrapped<R>*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize);
- m_tombstone_filter = nullptr;
- if (max_tombstone_cap > 0) {
- m_tombstone_filter = new psudb::BloomFilter<R>(BF_FPR, max_tombstone_cap, BF_HASH_FUNCS);
- }
- }
-
- ~MutableBuffer() {
- if (m_data) free(m_data);
- if (m_tombstone_filter) delete m_tombstone_filter;
- }
-
- template <typename R_ = R>
- int append(const R &rec, bool tombstone=false) {
- if (tombstone && m_tombstonecnt + 1 > m_tombstone_cap) return 0;
-
- int32_t pos = 0;
- if ((pos = try_advance_tail()) == -1) return 0;
-
- Wrapped<R> wrec;
- wrec.rec = rec;
- wrec.header = 0;
- if (tombstone) wrec.set_tombstone();
-
- m_data[pos] = wrec;
- m_data[pos].header |= (pos << 2);
-
- if (tombstone) {
- m_tombstonecnt.fetch_add(1);
- if (m_tombstone_filter) m_tombstone_filter->insert(rec);
- }
-
- if constexpr (WeightedRecordInterface<R_>) {
- m_weight.fetch_add(rec.weight);
- double old = m_max_weight.load();
- while (old < rec.weight) {
- m_max_weight.compare_exchange_strong(old, rec.weight);
- old = m_max_weight.load();
- }
- } else {
- m_weight.fetch_add(1);
- }
-
- return 1;
- }
-
- bool truncate() {
- m_tombstonecnt.store(0);
- m_reccnt.store(0);
- m_weight.store(0);
- m_max_weight.store(0);
- if (m_tombstone_filter) m_tombstone_filter->clear();
-
- return true;
- }
-
- size_t get_record_count() {
- return m_reccnt;
- }
-
- size_t get_capacity() {
- return m_cap;
- }
-
- bool is_full() {
- return m_reccnt == m_cap;
- }
-
- size_t get_tombstone_count() {
- return m_tombstonecnt.load();
- }
-
- bool delete_record(const R& rec) {
- auto offset = 0;
- while (offset < m_reccnt.load()) {
- if (m_data[offset].rec == rec) {
- m_data[offset].set_delete();
- return true;
- }
- offset++;
- }
-
- return false;
- }
-
- bool check_tombstone(const R& rec) {
- if (m_tombstone_filter && !m_tombstone_filter->lookup(rec)) return false;
-
- auto offset = 0;
- while (offset < m_reccnt.load()) {
- if (m_data[offset].rec == rec && m_data[offset].is_tombstone()) {
- return true;
- }
- offset++;;
- }
- return false;
- }
-
- size_t get_memory_usage() {
- return m_cap * sizeof(R);
- }
-
- size_t get_aux_memory_usage() {
- return m_tombstone_filter->get_memory_usage();
- }
-
- size_t get_tombstone_capacity() {
- return m_tombstone_cap;
- }
-
- double get_total_weight() {
- return m_weight.load();
- }
-
- Wrapped<R> *get_data() {
- return m_data;
- }
-
- double get_max_weight() {
- return m_max_weight;
- }
-
-private:
- int32_t try_advance_tail() {
- size_t new_tail = m_reccnt.fetch_add(1);
-
- if (new_tail < m_cap) return new_tail;
- else return -1;
- }
-
- size_t m_cap;
- size_t m_tombstone_cap;
-
- Wrapped<R>* m_data;
- psudb::BloomFilter<R>* m_tombstone_filter;
-
- alignas(64) std::atomic<size_t> m_tombstonecnt;
- alignas(64) std::atomic<uint32_t> m_reccnt;
- alignas(64) std::atomic<double> m_weight;
- alignas(64) std::atomic<double> m_max_weight;
-};
-
-}
diff --git a/include/framework/QueryInterface.h b/include/framework/QueryInterface.h
deleted file mode 100644
index 46a1ce1..0000000
--- a/include/framework/QueryInterface.h
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * include/framework/QueryInterface.h
- *
- * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
- *
- * All rights reserved. Published under the Modified BSD License.
- *
- */
-#pragma once
-
-#include <vector>
-#include <concepts>
-#include "util/types.h"
-
-template <typename Q>
-concept QueryInterface = requires(Q q, void *p, std::vector<void*> &s) {
-
-/*
- {q.get_query_state(p, p)} -> std::convertible_to<void*>;
- {q.get_buffer_query_state(p, p)};
- {q.query(p, p)};
- {q.buffer_query(p, p)};
- {q.merge()};
- {q.delete_query_state(p)};
-*/
- {Q::EARLY_ABORT} -> std::convertible_to<bool>;
- {Q::SKIP_DELETE_FILTER} -> std::convertible_to<bool>;
- //{Q::get_query_state(p, p)} -> std::convertible_to<void*>;
- //{Q::get_buffer_query_state(p, p)} -> std::convertible_to<void*>;
- {Q::process_query_states(p, s, p)};
-
- {Q::delete_query_state(std::declval<void*>())} -> std::same_as<void>;
- {Q::delete_buffer_query_state(p)};
-
-};
diff --git a/include/framework/QueryRequirements.h b/include/framework/QueryRequirements.h
new file mode 100644
index 0000000..dcba67e
--- /dev/null
+++ b/include/framework/QueryRequirements.h
@@ -0,0 +1,17 @@
+/*
+ * include/framework/QueryRequirements.h
+ *
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ * A header file containing the necessary includes for Query
+ * development.
+ *
+ */
+#pragma once
+
+#include "framework/structure/BufferView.h"
+#include "framework/interface/Record.h"
+#include "framework/interface/Shard.h"
+#include "framework/interface/Query.h"
diff --git a/include/framework/ShardInterface.h b/include/framework/ShardInterface.h
deleted file mode 100644
index 3aa62df..0000000
--- a/include/framework/ShardInterface.h
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * include/framework/ShardInterface.h
- *
- * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
- *
- * All rights reserved. Published under the Modified BSD License.
- *
- */
-#pragma once
-
-#include <concepts>
-
-#include "util/types.h"
-#include "framework/RecordInterface.h"
-
-namespace de {
-
-//template <template<typename> typename S, typename R>
-template <typename S>
-concept ShardInterface = requires(S s, void *p, bool b) {
- //{s.point_lookup(r, b) } -> std::same_as<R*>;
- {s.get_record_count()} -> std::convertible_to<size_t>;
- {s.get_memory_usage()} -> std::convertible_to<size_t>;
-};
-
-}
diff --git a/include/framework/ShardRequirements.h b/include/framework/ShardRequirements.h
new file mode 100644
index 0000000..d054030
--- /dev/null
+++ b/include/framework/ShardRequirements.h
@@ -0,0 +1,17 @@
+/*
+ * include/framework/ShardRequirements.h
+ *
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ * A header file containing the necessary includes for Shard
+ * development.
+ *
+ */
+#pragma once
+
+#include "framework/structure/BufferView.h"
+#include "framework/interface/Record.h"
+#include "framework/interface/Shard.h"
+#include "framework/interface/Query.h"
diff --git a/include/framework/interface/Query.h b/include/framework/interface/Query.h
new file mode 100644
index 0000000..3d487f0
--- /dev/null
+++ b/include/framework/interface/Query.h
@@ -0,0 +1,30 @@
+/*
+ * include/framework/interface/Query.h
+ *
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ */
+#pragma once
+
+#include "framework/QueryRequirements.h"
+
+namespace de{
+
+template <typename Q, typename R, typename S>
+concept QueryInterface = requires(void *p, S *sh, std::vector<void*> &s, std::vector<std::vector<Wrapped<R>>> &rv, BufferView<R> *bv) {
+ {Q::get_query_state(sh, p)} -> std::convertible_to<void*>;
+ {Q::get_buffer_query_state(bv, p)} -> std::convertible_to<void *>;
+ {Q::process_query_states(p, s, p)};
+ {Q::query(sh, p, p)} -> std::convertible_to<std::vector<Wrapped<R>>>;
+ {Q::buffer_query(p, p)} -> std::convertible_to<std::vector<Wrapped<R>>>;
+ {Q::merge(rv, p)} -> std::convertible_to<std::vector<R>>;
+
+ {Q::delete_query_state(p)} -> std::same_as<void>;
+ {Q::delete_buffer_query_state(p)} -> std::same_as<void>;
+
+ {Q::EARLY_ABORT} -> std::convertible_to<bool>;
+ {Q::SKIP_DELETE_FILTER} -> std::convertible_to<bool>;
+};
+}
diff --git a/include/framework/RecordInterface.h b/include/framework/interface/Record.h
index f78918c..5b9f307 100644
--- a/include/framework/RecordInterface.h
+++ b/include/framework/interface/Record.h
@@ -1,11 +1,12 @@
/*
- * include/framework/RecordInterface.h
+ * include/framework/interface/Record.h
*
* Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
- * Dong Xie <dongx@psu.edu>
*
- * All rights reserved. Published under the Modified BSD License.
+ * Distributed under the Modified BSD License.
*
+ * FIXME: the record implementations could probably be broken out into
+ * different files, leaving only the interface here
*/
#pragma once
@@ -137,7 +138,7 @@ struct CosinePoint{
return true;
}
- // lexicographic order
+ /* lexicographic order */
inline bool operator<(const CosinePoint& other) const {
for (size_t i=0; i<D; i++) {
if (data[i] < other.data[i]) {
@@ -181,7 +182,7 @@ struct EuclidPoint{
return true;
}
- // lexicographic order
+ /* lexicographic order */
inline bool operator<(const EuclidPoint& other) const {
for (size_t i=0; i<D; i++) {
if (data[i] < other.data[i]) {
@@ -207,8 +208,24 @@ struct EuclidPoint{
template<RecordInterface R>
struct RecordHash {
size_t operator()(R const &rec) const {
- return psudb::hash_bytes((char *) &rec, sizeof(R));
+ return psudb::hash_bytes((std::byte *) &rec, sizeof(R));
}
};
+template <typename R>
+class DistCmpMax {
+public:
+ DistCmpMax(R *baseline) : P(baseline) {}
+
+ inline bool operator()(const R *a, const R *b) requires WrappedInterface<R> {
+ return a->rec.calc_distance(P->rec) > b->rec.calc_distance(P->rec);
+ }
+
+ inline bool operator()(const R *a, const R *b) requires (!WrappedInterface<R>){
+ return a->calc_distance(*P) > b->calc_distance(*P);
+ }
+
+private:
+ R *P;
+};
}
diff --git a/include/framework/interface/Scheduler.h b/include/framework/interface/Scheduler.h
new file mode 100644
index 0000000..451ddd2
--- /dev/null
+++ b/include/framework/interface/Scheduler.h
@@ -0,0 +1,19 @@
+/*
+ * include/framework/interface/Scheduler.h
+ *
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ */
+#pragma once
+
+#include "framework/scheduling/Task.h"
+
+template <typename S>
+concept SchedulerInterface = requires(S s, size_t i, void *vp, de::Job j) {
+ {S(i, i)};
+ {s.schedule_job(j, i, vp, i)} -> std::convertible_to<void>;
+ {s.shutdown()};
+ {s.print_statistics()};
+};
diff --git a/include/framework/interface/Shard.h b/include/framework/interface/Shard.h
new file mode 100644
index 0000000..c4a9180
--- /dev/null
+++ b/include/framework/interface/Shard.h
@@ -0,0 +1,36 @@
+/*
+ * include/framework/interface/Shard.h
+ *
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ */
+#pragma once
+
+#include "framework/ShardRequirements.h"
+
+namespace de {
+
+template <typename S, typename R>
+concept ShardInterface = RecordInterface<R> && requires(S s, std::vector<S*> spp, void *p, bool b, size_t i, BufferView<R> bv, R r) {
+ {S(spp)};
+ {S(std::move(bv))};
+
+ {s.point_lookup(r, b) } -> std::same_as<Wrapped<R>*>;
+ {s.get_data()} -> std::same_as<Wrapped<R>*>;
+
+ {s.get_record_count()} -> std::convertible_to<size_t>;
+ {s.get_tombstone_count()} -> std::convertible_to<size_t>;
+ {s.get_memory_usage()} -> std::convertible_to<size_t>;
+ {s.get_aux_memory_usage()} -> std::convertible_to<size_t>;
+};
+
+template <typename S, typename R>
+concept SortedShardInterface = ShardInterface<S, R> && requires(S s, R r, R *rp, size_t i) {
+ {s.lower_bound(r)} -> std::convertible_to<size_t>;
+ {s.upper_bound(r)} -> std::convertible_to<size_t>;
+ {s.get_record_at(i)} -> std::same_as<Wrapped<R>*>;
+};
+
+}
diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h
new file mode 100644
index 0000000..9377fb0
--- /dev/null
+++ b/include/framework/scheduling/Epoch.h
@@ -0,0 +1,143 @@
+/*
+ * include/framework/scheduling/Epoch.h
+ *
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ */
+#pragma once
+
+#include <condition_variable>
+#include <mutex>
+
+#include "framework/structure/MutableBuffer.h"
+#include "framework/structure/ExtensionStructure.h"
+#include "framework/structure/BufferView.h"
+
+namespace de {
+
+
+template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q, LayoutPolicy L>
+class Epoch {
+private:
+ typedef MutableBuffer<R> Buffer;
+ typedef ExtensionStructure<R, S, Q, L> Structure;
+ typedef BufferView<R> BufView;
+public:
+ Epoch(size_t number=0)
+ : m_buffer(nullptr)
+ , m_structure(nullptr)
+ , m_active_merge(false)
+ , m_epoch_number(number)
+ , m_buffer_head(0)
+ {}
+
+ Epoch(size_t number, Structure *structure, Buffer *buff, size_t head)
+ : m_buffer(buff)
+ , m_structure(structure)
+ , m_active_merge(false)
+ , m_epoch_number(number)
+ , m_buffer_head(head)
+ {
+ structure->take_reference();
+ }
+
+ ~Epoch() {
+ if (m_structure) {
+ m_structure->release_reference();
+ }
+
+ if (m_structure->get_reference_count() == 0) {
+ delete m_structure;
+ }
+
+ }
+
+ /*
+ * Epochs are *not* copyable or movable. Only one can exist, and all users
+ * of it work with pointers
+ */
+ Epoch(const Epoch&) = delete;
+ Epoch(Epoch&&) = delete;
+ Epoch &operator=(const Epoch&) = delete;
+ Epoch &operator=(Epoch&&) = delete;
+
+ size_t get_epoch_number() {
+ return m_epoch_number;
+ }
+
+ Structure *get_structure() {
+ return m_structure;
+ }
+
+ BufView get_buffer() {
+ return m_buffer->get_buffer_view(m_buffer_head);
+ }
+
+ /*
+ * Returns a new Epoch object that is a copy of this one. The new object
+ * will also contain a copy of the m_structure, rather than a reference to
+ * the same one. The epoch number of the new epoch will be set to the
+ * provided argument.
+ */
+ Epoch *clone(size_t number) {
+ std::unique_lock<std::mutex> m_buffer_lock;
+ auto epoch = new Epoch(number);
+ epoch->m_buffer = m_buffer;
+ epoch->m_buffer_head = m_buffer_head;
+
+ if (m_structure) {
+ epoch->m_structure = m_structure->copy();
+ /* the copy routine returns a structure with 0 references */
+ epoch->m_structure->take_reference();
+ }
+
+ return epoch;
+ }
+
+ /*
+ * Check if a merge can be started from this Epoch. At present, without
+ * concurrent merging, this simply checks if there is currently a scheduled
+ * merge based on this Epoch. If there is, returns false. If there isn't,
+ * return true and set a flag indicating that there is an active merge.
+ */
+ bool prepare_reconstruction() {
+ auto old = m_active_merge.load();
+ if (old) {
+ return false;
+ }
+
+ // FIXME: this needs cleaned up
+ while (!m_active_merge.compare_exchange_strong(old, true)) {
+ old = m_active_merge.load();
+ if (old) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ bool advance_buffer_head(size_t head) {
+ m_buffer_head = head;
+ return m_buffer->advance_head(m_buffer_head);
+ }
+
+private:
+ Structure *m_structure;
+ Buffer *m_buffer;
+
+ std::mutex m_buffer_lock;
+ std::atomic<bool> m_active_merge;
+
+ /*
+ * The number of currently active jobs
+ * (queries/merges) operating on this
+ * epoch. An epoch can only be retired
+ * when this number is 0.
+ */
+ size_t m_epoch_number;
+ size_t m_buffer_head;
+};
+}
diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h
new file mode 100644
index 0000000..3ed4f49
--- /dev/null
+++ b/include/framework/scheduling/FIFOScheduler.h
@@ -0,0 +1,129 @@
+/*
+ * include/framework/scheduling/FIFOScheduler.h
+ *
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ * This scheduler runs just concurrently, using a standard FIFO queue to
+ * determine which jobs to run next. If more jobs are scheduled than there
+ * are available threads, the excess will stall until a thread becomes
+ * available and then run in the order they were received by the scheduler.
+ *
+ * TODO: We need to set up a custom threadpool based on jthreads to support
+ * thread preemption for a later phase of this project. That will allow us
+ * to avoid blocking epoch transitions on long-running queries, or to pause
+ * reconstructions on demand.
+ */
+#pragma once
+
+#include <thread>
+#include <condition_variable>
+#include <chrono>
+#include "framework/scheduling/Task.h"
+#include "framework/scheduling/statistics.h"
+
+#include "ctpl/ctpl.h"
+#include "psu-ds/LockedPriorityQueue.h"
+
+namespace de {
+
+using namespace std::literals::chrono_literals;
+
+
+class FIFOScheduler {
+private:
+ static const size_t DEFAULT_MAX_THREADS = 8;
+
+public:
+ FIFOScheduler(size_t memory_budget, size_t thread_cnt)
+ : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX)
+ , m_thrd_cnt((thread_cnt) ? thread_cnt: DEFAULT_MAX_THREADS)
+ , m_used_memory(0)
+ , m_used_thrds(0)
+ , m_shutdown(false)
+ {
+ m_sched_thrd = std::thread(&FIFOScheduler::run, this);
+ m_sched_wakeup_thrd = std::thread(&FIFOScheduler::periodic_wakeup, this);
+ m_thrd_pool.resize(m_thrd_cnt);
+ }
+
+ ~FIFOScheduler() {
+ if (!m_shutdown.load()) {
+ shutdown();
+ }
+
+ m_sched_thrd.join();
+ m_sched_wakeup_thrd.join();
+ }
+
+ void schedule_job(std::function<void(void*)> job, size_t size, void *args, size_t type=0) {
+ std::unique_lock<std::mutex> lk(m_cv_lock);
+ size_t ts = m_counter.fetch_add(1);
+
+ m_stats.job_queued(ts, type, size);
+ m_task_queue.push(Task(size, ts, job, args, type, &m_stats));
+
+ m_cv.notify_all();
+ }
+
+ void shutdown() {
+ m_shutdown.store(true);
+ m_thrd_pool.stop(true);
+ m_cv.notify_all();
+ }
+
+ void print_statistics() {
+ m_stats.print_statistics();
+ }
+
+private:
+ psudb::LockedPriorityQueue<Task> m_task_queue;
+
+ size_t m_memory_budget;
+ size_t m_thrd_cnt;
+
+ std::atomic<bool> m_shutdown;
+
+ std::atomic<size_t> m_counter;
+ std::mutex m_cv_lock;
+ std::condition_variable m_cv;
+
+ std::thread m_sched_thrd;
+ std::thread m_sched_wakeup_thrd;
+ ctpl::thread_pool m_thrd_pool;
+
+ std::atomic<size_t> m_used_thrds;
+ std::atomic<size_t> m_used_memory;
+
+ SchedulerStatistics m_stats;
+
+ void periodic_wakeup() {
+ do {
+ std::this_thread::sleep_for(10us);
+ m_cv.notify_all();
+ } while (!m_shutdown.load());
+ }
+
+ void schedule_next() {
+ assert(m_task_queue.size() > 0);
+ auto t = m_task_queue.pop();
+ m_stats.job_scheduled(t.m_timestamp);
+
+ m_thrd_pool.push(t);
+ }
+
+ void run() {
+ do {
+ std::unique_lock<std::mutex> cv_lock(m_cv_lock);
+ m_cv.wait(cv_lock);
+
+ while (m_task_queue.size() > 0 && m_thrd_pool.n_idle() > 0) {
+ schedule_next();
+ }
+ } while(!m_shutdown.load());
+ }
+
+};
+
+}
diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h
new file mode 100644
index 0000000..ac59301
--- /dev/null
+++ b/include/framework/scheduling/SerialScheduler.h
@@ -0,0 +1,62 @@
+/*
+ * include/framework/scheduling/SerialScheduler.h
+ *
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ * IMPORTANT: This "scheduler" is a shim implementation for allowing
+ * strictly serial, single-threaded operation of the framework. It should
+ * never be used in multi-threaded contexts. A call to the schedule_job
+ * function will immediately run the job and block on its completion before
+ * returning.
+ *
+ */
+#pragma once
+
+#include "framework/scheduling/Task.h"
+#include "framework/scheduling/statistics.h"
+
+namespace de {
+
+class SerialScheduler {
+public:
+ SerialScheduler(size_t memory_budget, size_t thread_cnt)
+ : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX)
+ , m_thrd_cnt((thread_cnt) ? thread_cnt: UINT64_MAX)
+ , m_used_memory(0)
+ , m_used_thrds(0)
+ , m_counter(0)
+ {}
+
+ ~SerialScheduler() = default;
+
+ void schedule_job(std::function<void(void*)> job, size_t size, void *args, size_t type=0) {
+ size_t ts = m_counter++;
+ m_stats.job_queued(ts, type, size);
+ m_stats.job_scheduled(ts);
+ auto t = Task(size, ts, job, args, type, &m_stats);
+ t(0);
+ }
+
+ void shutdown() {
+ /* intentionally left blank */
+ }
+
+ void print_statistics() {
+ m_stats.print_statistics();
+ }
+
+private:
+ size_t m_memory_budget;
+ size_t m_thrd_cnt;
+
+ size_t m_used_thrds;
+ size_t m_used_memory;
+
+ size_t m_counter;
+
+ SchedulerStatistics m_stats;
+};
+
+}
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
new file mode 100644
index 0000000..d5d4266
--- /dev/null
+++ b/include/framework/scheduling/Task.h
@@ -0,0 +1,89 @@
+/*
+ * include/framework/scheduling/Task.h
+ *
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ * An abstraction to represent a job to be scheduled. Currently the
+ * supported task types are queries and merges. Based on the current plan,
+ * simple buffer inserts will likely also be made into a task at some
+ * point.
+ *
+ */
+#pragma once
+
+#include <future>
+#include <functional>
+#include <chrono>
+
+#include "framework/util/Configuration.h"
+#include "framework/scheduling/Epoch.h"
+#include "framework/scheduling/statistics.h"
+
+namespace de {
+
+template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q, LayoutPolicy L>
+struct ReconstructionArgs {
+ Epoch<R, S, Q, L> *epoch;
+ std::vector<ReconstructionTask> merges;
+ std::promise<bool> result;
+ bool compaction;
+ void *extension;
+};
+
+template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q, LayoutPolicy L>
+struct QueryArgs {
+ std::promise<std::vector<R>> result_set;
+ void *query_parms;
+ void *extension;
+};
+
+typedef std::function<void(void*)> Job;
+
+struct Task {
+ Task(size_t size, size_t ts, Job job, void *args, size_t type=0, SchedulerStatistics *stats=nullptr)
+ : m_job(job)
+ , m_size(size)
+ , m_timestamp(ts)
+ , m_args(args)
+ , m_type(type)
+ , m_stats(stats)
+ {}
+
+ Job m_job;
+ size_t m_size;
+ size_t m_timestamp;
+ void *m_args;
+ size_t m_type;
+ SchedulerStatistics *m_stats;
+
+ friend bool operator<(const Task &self, const Task &other) {
+ return self.m_timestamp < other.m_timestamp;
+ }
+
+ friend bool operator>(const Task &self, const Task &other) {
+ return self.m_timestamp > other.m_timestamp;
+ }
+
+ void operator()(size_t thrd_id) {
+ auto start = std::chrono::high_resolution_clock::now();
+ if (m_stats) {
+ m_stats->job_begin(m_timestamp);
+ }
+
+ m_job(m_args);
+
+ if (m_stats) {
+ m_stats->job_complete(m_timestamp);
+ }
+ auto stop = std::chrono::high_resolution_clock::now();
+
+ if (m_stats) {
+ auto time = std::chrono::duration_cast<std::chrono::nanoseconds>(stop - start).count();
+ m_stats->log_time_data(time, m_type);
+ }
+ }
+};
+
+}
diff --git a/include/framework/scheduling/statistics.h b/include/framework/scheduling/statistics.h
new file mode 100644
index 0000000..6c479cd
--- /dev/null
+++ b/include/framework/scheduling/statistics.h
@@ -0,0 +1,118 @@
+/*
+ * include/framework/scheduling/statistics.h
+ *
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ * This is a stub for a statistics tracker to be used in scheduling. It
+ * currently only tracks simple aggregated statistics, but should be
+ * updated in the future for more fine-grained statistics. These will be
+ * used for making scheduling decisions and predicting the runtime of a
+ * given job.
+ */
+#pragma once
+
+#include <cstdlib>
+#include <cassert>
+#include <unordered_map>
+#include <vector>
+#include <mutex>
+#include <chrono>
+#include <atomic>
+
+namespace de {
+
+class SchedulerStatistics {
+private:
+ enum class EventType {
+ QUEUED,
+ SCHEDULED,
+ STARTED,
+ FINISHED
+ };
+
+ struct Event {
+ size_t id;
+ EventType type;
+ };
+
+ struct JobInfo {
+ size_t id;
+ size_t size;
+ size_t type;
+ };
+
+
+public:
+ SchedulerStatistics() = default;
+ ~SchedulerStatistics() = default;
+
+ void job_queued(size_t id, size_t type, size_t size) {
+ auto time = std::chrono::high_resolution_clock::now();
+ }
+
+ void job_scheduled(size_t id) {
+ std::unique_lock<std::mutex> lk(m_mutex);
+
+ }
+
+ void job_begin(size_t id) {
+
+ }
+
+ void job_complete(size_t id) {
+
+ }
+
+ /* FIXME: This is just a temporary approach */
+ void log_time_data(size_t length, size_t type) {
+ assert(type == 1 || type == 2);
+
+ if (type == 1) {
+ m_type_1_cnt.fetch_add(1);
+ m_type_1_total_time.fetch_add(length);
+
+ if (length > m_type_1_largest_time) {
+ m_type_1_largest_time.store(length);
+ }
+ } else {
+ m_type_2_cnt.fetch_add(1);
+ m_type_2_total_time.fetch_add(length);
+
+ if (length > m_type_2_largest_time) {
+ m_type_2_largest_time.store(length);
+ }
+ }
+ }
+
+ void print_statistics() {
+ if (m_type_1_cnt > 0) {
+ fprintf(stdout, "Query Count: %ld\tQuery Avg. Latency: %ld\tMax Query Latency: %ld\n",
+ m_type_1_cnt.load(),
+ m_type_1_total_time.load() / m_type_1_cnt.load(),
+ m_type_1_largest_time.load());
+ }
+ if (m_type_2_cnt > 0) {
+ fprintf(stdout, "Reconstruction Count: %ld\tReconstruction Avg. Latency: %ld\tMax Recon. Latency:%ld\n",
+ m_type_2_cnt.load(),
+ m_type_2_total_time.load() / m_type_2_cnt.load(),
+ m_type_2_largest_time.load());
+ }
+ }
+
+private:
+ std::mutex m_mutex;
+ std::unordered_map<size_t, JobInfo> m_jobs;
+ std::vector<Event> m_event_log;
+
+ std::atomic<size_t> m_type_1_cnt;
+ std::atomic<size_t> m_type_1_total_time;
+
+ std::atomic<size_t> m_type_2_cnt;
+ std::atomic<size_t> m_type_2_total_time;
+
+ std::atomic<size_t> m_type_1_largest_time;
+ std::atomic<size_t> m_type_2_largest_time;
+};
+}
diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h
new file mode 100644
index 0000000..9e0872b
--- /dev/null
+++ b/include/framework/structure/BufferView.h
@@ -0,0 +1,170 @@
+/*
+ * include/framework/structure/BufferView.h
+ *
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ * TODO: This file is very poorly commented.
+ */
+#pragma once
+
+#include <cstdlib>
+#include <cassert>
+#include <functional>
+#include <utility>
+
+#include "psu-util/alignment.h"
+#include "psu-ds/BloomFilter.h"
+#include "framework/interface/Record.h"
+
+namespace de {
+
+typedef std::_Bind<void (*(void*, long unsigned int))(void*, long unsigned int)> ReleaseFunction;
+
+template <RecordInterface R>
+class BufferView {
+public:
+ BufferView() = default;
+
+ /*
+ * the BufferView's lifetime is tightly linked to buffer versioning, and so
+ * copying and assignment are disabled.
+ */
+ BufferView(const BufferView&) = delete;
+ BufferView &operator=(BufferView &) = delete;
+
+ BufferView(BufferView &&other)
+ : m_data(std::exchange(other.m_data, nullptr))
+ , m_release(std::move(other.m_release))
+ , m_head(std::exchange(other.m_head, 0))
+ , m_tail(std::exchange(other.m_tail, 0))
+ , m_start(std::exchange(other.m_start, 0))
+ , m_stop(std::exchange(other.m_stop, 0))
+ , m_cap(std::exchange(other.m_cap, 0))
+ , m_approx_ts_cnt(std::exchange(other.m_approx_ts_cnt, 0))
+ , m_tombstone_filter(std::exchange(other.m_tombstone_filter, nullptr))
+ , m_active(std::exchange(other.m_active, false)) {}
+
+ BufferView &operator=(BufferView &&other) = delete;
+
+
+ BufferView(Wrapped<R> *buffer, size_t cap, size_t head, size_t tail, size_t tombstone_cnt, psudb::BloomFilter<R> *filter,
+ ReleaseFunction release)
+ : m_data(buffer)
+ , m_release(release)
+ , m_head(head)
+ , m_tail(tail)
+ , m_start(m_head % cap)
+ , m_stop(m_tail % cap)
+ , m_cap(cap)
+ , m_approx_ts_cnt(tombstone_cnt)
+ , m_tombstone_filter(filter)
+ , m_active(true) {}
+
+ ~BufferView() {
+ if (m_active) {
+ m_release();
+ }
+ }
+
+ bool check_tombstone(const R& rec) {
+ if (m_tombstone_filter && !m_tombstone_filter->lookup(rec)) return false;
+
+ for (size_t i=0; i<get_record_count(); i++) {
+ if (m_data[to_idx(i)].rec == rec && m_data[to_idx(i)].is_tombstone()) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ bool delete_record(const R& rec) {
+ if (m_start < m_stop) {
+ for (size_t i=m_start; i<m_stop; i++) {
+ if (m_data[i].rec == rec) {
+ m_data[i].set_delete();
+ return true;
+ }
+ }
+ } else {
+ for (size_t i=m_start; i<m_cap; i++) {
+ if (m_data[i].rec == rec) {
+ m_data[i].set_delete();
+ return true;
+ }
+ }
+
+ for (size_t i=0; i<m_stop; i++) {
+ if (m_data[i].rec == rec) {
+ m_data[i].set_delete();
+ return true;
+ }
+
+ }
+
+ }
+
+ return false;
+ }
+
+ size_t get_record_count() {
+ return m_tail - m_head;
+ }
+
+ /*
+ * NOTE: This function returns an upper bound on the number
+ * of tombstones within the view. There may be less than
+ * this, due to synchronization issues during view creation.
+ */
+ size_t get_tombstone_count() {
+ return m_approx_ts_cnt;
+ }
+
+ Wrapped<R> *get(size_t i) {
+ assert(i < get_record_count());
+ return m_data + to_idx(i);
+ }
+
+ void copy_to_buffer(psudb::byte *buffer) {
+ /* check if the region to be copied circles back to start. If so, do it in two steps */
+ if (m_start > m_stop) {
+ size_t split_idx = m_cap - m_start;
+
+ memcpy(buffer, (std::byte*) (m_data + m_start), split_idx* sizeof(Wrapped<R>));
+ memcpy(buffer + (split_idx * sizeof(Wrapped<R>)), (std::byte*) m_data, m_stop * sizeof(Wrapped<R>));
+ } else {
+ memcpy(buffer, (std::byte*) (m_data + m_start), get_record_count() * sizeof(Wrapped<R>));
+ }
+ }
+
+ size_t get_tail() {
+ return m_tail;
+ }
+
+ size_t get_head() {
+ return m_head;
+ }
+
+private:
+ Wrapped<R>* m_data;
+ ReleaseFunction m_release;
+ size_t m_head;
+ size_t m_tail;
+ size_t m_start;
+ size_t m_stop;
+ size_t m_cap;
+ size_t m_approx_ts_cnt;
+ psudb::BloomFilter<R> *m_tombstone_filter;
+ bool m_active;
+
+ size_t to_idx(size_t i) {
+ size_t idx = (m_start + i >= m_cap) ? i = (m_cap - m_start)
+ : m_start + i;
+ assert(idx < m_cap);
+ return idx;
+ }
+};
+
+}
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
new file mode 100644
index 0000000..4802bc1
--- /dev/null
+++ b/include/framework/structure/ExtensionStructure.h
@@ -0,0 +1,495 @@
+/*
+ * include/framework/structure/ExtensionStructure.h
+ *
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ */
+#pragma once
+
+#include <atomic>
+#include <cstdio>
+#include <vector>
+
+#include "framework/structure/BufferView.h"
+#include "framework/structure/InternalLevel.h"
+
+#include "framework/util/Configuration.h"
+
+#include "psu-util/timer.h"
+
+namespace de {
+
+template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q, LayoutPolicy L=LayoutPolicy::TEIRING>
+class ExtensionStructure {
+ typedef S Shard;
+ typedef BufferView<R> BuffView;
+
+public:
+ ExtensionStructure(size_t buffer_size, size_t scale_factor, double max_delete_prop)
+ : m_scale_factor(scale_factor)
+ , m_max_delete_prop(max_delete_prop)
+ , m_buffer_size(buffer_size)
+ {}
+
+ ~ExtensionStructure() = default;
+
+ /*
+ * Create a shallow copy of this extension structure. The copy will share
+ * references to the same levels/shards as the original, but will have its
+ * own lists. As all of the shards are immutable (with the exception of
+ * deletes), the copy can be restructured with reconstructions and flushes
+ * without affecting the original. The copied structure will be returned
+ * with a reference count of 0; generally you will want to immediately call
+ * take_reference() on it.
+ *
+ * NOTE: When using tagged deletes, a delete of a record in the original
+ * structure will affect the copy, so long as the copy retains a reference
+ * to the same shard as the original. This could cause synchronization
+ * problems under tagging with concurrency. Any deletes in this context will
+ * need to be forwarded to the appropriate structures manually.
+ */
+ ExtensionStructure<R, S, Q, L> *copy() {
+ auto new_struct = new ExtensionStructure<R, S, Q, L>(m_buffer_size, m_scale_factor,
+ m_max_delete_prop);
+ for (size_t i=0; i<m_levels.size(); i++) {
+ new_struct->m_levels.push_back(m_levels[i]->clone());
+ }
+
+ new_struct->m_refcnt = 0;
+
+ return new_struct;
+ }
+
+ /*
+ * Search for a record matching the argument and mark it deleted by
+ * setting the delete bit in its wrapped header. Returns 1 if a matching
+ * record was found and deleted, and 0 if a matching record was not found.
+ *
+ * This function will stop after finding the first matching record. It is
+ * assumed that no duplicate records exist. In the case of duplicates, this
+ * function will still "work", but in the sense of "delete first match".
+ */
+ int tagged_delete(const R &rec) {
+ for (auto level : m_levels) {
+ if (level && level->delete_record(rec)) {
+ return 1;
+ }
+ }
+
+ /*
+ * If the record to be erased wasn't found, return 0. The
+ * DynamicExtension itself will then search the active
+ * Buffers.
+ */
+ return 0;
+ }
+
+ /*
+ * Flush a buffer into the extension structure, performing any necessary
+ * reconstructions to free up room in L0.
+ *
+ * FIXME: arguably, this should be a method attached to the buffer that
+ * takes a structure as input.
+ */
+ inline bool flush_buffer(BuffView buffer) {
+ assert(can_reconstruct_with(0, buffer.get_record_count()));
+
+ flush_buffer_into_l0(std::move(buffer));
+
+ return true;
+ }
+
+ /*
+ * Return the total number of records (including tombstones) within all
+ * of the levels of the structure.
+ */
+ size_t get_record_count() {
+ size_t cnt = 0;
+
+ for (size_t i=0; i<m_levels.size(); i++) {
+ if (m_levels[i]) cnt += m_levels[i]->get_record_count();
+ }
+
+ return cnt;
+ }
+
+ /*
+ * Return the total number of tombstones contained within all of the
+ * levels of the structure.
+ */
+ size_t get_tombstone_count() {
+ size_t cnt = 0;
+
+ for (size_t i=0; i<m_levels.size(); i++) {
+ if (m_levels[i]) cnt += m_levels[i]->get_tombstone_count();
+ }
+
+ return cnt;
+ }
+
+ /*
+ * Return the number of levels within the structure. Note that not
+ * all of these levels are necessarily populated.
+ */
+ size_t get_height() {
+ return m_levels.size();
+ }
+
+ /*
+ * Return the amount of memory (in bytes) used by the shards within the
+ * structure for storing the primary data structure and raw data.
+ */
+ size_t get_memory_usage() {
+ size_t cnt = 0;
+ for (size_t i=0; i<m_levels.size(); i++) {
+ if (m_levels[i]) cnt += m_levels[i]->get_memory_usage();
+ }
+
+ return cnt;
+ }
+
+ /*
+ * Return the amount of memory (in bytes) used by the shards within the
+ * structure for storing auxiliary data structures. This total does not
+ * include memory used for the main data structure, or raw data.
+ */
+ size_t get_aux_memory_usage() {
+ size_t cnt = 0;
+ for (size_t i=0; i<m_levels.size(); i++) {
+ if (m_levels[i]) {
+ cnt += m_levels[i]->get_aux_memory_usage();
+ }
+ }
+
+ return cnt;
+ }
+
+ /*
+ * Validate that no level in the structure exceeds its maximum tombstone
+ * capacity. This is used to trigger preemptive compactions at the end of
+ * the reconstruction process.
+ */
+ bool validate_tombstone_proportion() {
+ long double ts_prop;
+ for (size_t i = 0; i < m_levels.size(); i++) {
+ if (m_levels[i]) {
+ ts_prop = (long double)m_levels[i]->get_tombstone_count() /
+ (long double)calc_level_record_capacity(i);
+ if (ts_prop > (long double)m_max_delete_prop) {
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+
+ bool validate_tombstone_proportion(level_index level) {
+ long double ts_prop = (long double) m_levels[level]->get_tombstone_count() / (long double) calc_level_record_capacity(level);
+ return ts_prop <= (long double) m_max_delete_prop;
+ }
+
+ /*
+ * Return a reference to the underlying vector of levels within the
+ * structure.
+ */
+ std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> &get_levels() {
+ return m_levels;
+ }
+
+ std::vector<ReconstructionTask> get_compaction_tasks() {
+ std::vector<ReconstructionTask> tasks;
+
+ /* if the tombstone/delete invariant is satisfied, no need for compactions */
+ if (validate_tombstone_proportion()) {
+ return tasks;
+ }
+
+ /* locate the first level to violate the invariant */
+ level_index violation_idx = -1;
+ for (level_index i=0; i<m_levels.size(); i++) {
+ if (!validate_tombstone_proportion(i)) {
+ violation_idx = i;
+ break;
+ }
+ }
+
+ assert(violation_idx != -1);
+
+ level_index base_level = find_reconstruction_target(violation_idx);
+ if (base_level == -1) {
+ base_level = grow();
+ }
+
+ for (level_index i=base_level; i>0; i--) {
+ ReconstructionTask task = {i-1, i};
+
+ /*
+ * The amount of storage required for the reconstruction accounts
+ * for the cost of storing the new records, along with the
+ * cost of retaining the old records during the process
+ * (hence the 2x multiplier).
+ *
+ * FIXME: currently does not account for the *actual* size
+ * of the shards, only the storage for the records
+ * themselves.
+ */
+ size_t reccnt = m_levels[i - 1]->get_record_count();
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ if (can_reconstruct_with(i, reccnt)) {
+ reccnt += m_levels[i]->get_record_count();
+ }
+ }
+ //task.m_size = 2* reccnt * sizeof(R);
+
+ tasks.push_back(task);
+ }
+
+ return tasks;
+ }
+
+ /*
+ *
+ */
+ std::vector<ReconstructionTask> get_reconstruction_tasks(size_t buffer_reccnt) {
+ std::vector<ReconstructionTask> reconstructions;
+
+ /*
+ * The buffer flush is not included so if that can be done without any
+ * other change, just return an empty list.
+ */
+ if (can_reconstruct_with(0, buffer_reccnt)) {
+ return std::move(reconstructions);
+ }
+
+ level_index base_level = find_reconstruction_target(0);
+ if (base_level == -1) {
+ base_level = grow();
+ }
+
+ for (level_index i=base_level; i>0; i--) {
+ ReconstructionTask task = {i-1, i};
+
+ /*
+ * The amount of storage required for the reconstruction accounts
+ * for the cost of storing the new records, along with the
+ * cost of retaining the old records during the process
+ * (hence the 2x multiplier).
+ *
+ * FIXME: currently does not account for the *actual* size
+ * of the shards, only the storage for the records
+ * themselves.
+ */
+ size_t reccnt = m_levels[i-1]->get_record_count();
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ if (can_reconstruct_with(i, reccnt)) {
+ reccnt += m_levels[i]->get_record_count();
+ }
+ }
+ //task.m_size = 2* reccnt * sizeof(R);
+
+ reconstructions.push_back(task);
+ }
+
+ return std::move(reconstructions);
+ }
+
+
+ /*
+ *
+ */
+ std::vector<ReconstructionTask> get_reconstruction_tasks_from_level(level_index source_level) {
+ std::vector<ReconstructionTask> reconstructions;
+
+ level_index base_level = find_reconstruction_target(source_level);
+ if (base_level == -1) {
+ base_level = grow();
+ }
+
+ for (level_index i=base_level; i>source_level; i--) {
+ ReconstructionTask task = {i - 1, i};
+ /*
+ * The amount of storage required for the reconstruction accounts
+ * for the cost of storing the new records, along with the
+ * cost of retaining the old records during the process
+ * (hence the 2x multiplier).
+ *
+ * FIXME: currently does not account for the *actual* size
+ * of the shards, only the storage for the records
+ * themselves.
+ */
+ size_t reccnt = m_levels[i-1]->get_record_count();
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ if (can_reconstruct_with(i, reccnt)) {
+ reccnt += m_levels[i]->get_record_count();
+ }
+ }
+// task.m_size = 2* reccnt * sizeof(R);
+
+ reconstructions.push_back(task);
+ }
+
+ return reconstructions;
+ }
+
+ /*
+ * Combine incoming_level with base_level and reconstruct the shard,
+ * placing it in base_level. The two levels should be sequential--i.e. no
+ * levels are skipped in the reconstruction process--otherwise the
+ * tombstone ordering invariant may be violated.
+ */
+ inline void reconstruction(level_index base_level, level_index incoming_level) {
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ /* if the base level has a shard, merge the base and incoming together to make a new one */
+ if (m_levels[base_level]->get_shard_count() > 0) {
+ m_levels[base_level] = InternalLevel<R, Shard, Q>::reconstruction(m_levels[base_level].get(), m_levels[incoming_level].get());
+ /* otherwise, we can just move the incoming to the base */
+ } else {
+ m_levels[base_level] = m_levels[incoming_level];
+ }
+ } else {
+ m_levels[base_level]->append_level(m_levels[incoming_level].get());
+ m_levels[base_level]->finalize();
+ }
+
+ /* place a new, empty level where the incoming level used to be */
+ m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor));
+ }
+
+ bool take_reference() {
+ m_refcnt.fetch_add(1);
+ return true;
+ }
+
+ bool release_reference() {
+ assert(m_refcnt.load() > 0);
+ m_refcnt.fetch_add(-1);
+ return true;
+ }
+
+ size_t get_reference_count() {
+ return m_refcnt.load();
+ }
+
+ std::vector<void *> get_query_states(std::vector<std::pair<ShardID, Shard*>> &shards, void *parms) {
+ std::vector<void*> states;
+
+ for (auto &level : m_levels) {
+ level->get_query_states(shards, states, parms);
+ }
+
+ return states;
+ }
+
+private:
+ size_t m_scale_factor;
+ double m_max_delete_prop;
+ size_t m_buffer_size;
+
+ std::atomic<size_t> m_refcnt;
+
+ std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> m_levels;
+
+ /*
+ * Add a new level to the structure and return its index.
+ */
+ inline level_index grow() {
+ level_index new_idx = m_levels.size();
+ size_t new_shard_cnt = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor;
+
+ m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt)));
+ return new_idx;
+ }
+
+ /*
+ * Find the first level below the level indicated by idx that
+ * is capable of sustaining a reconstruction and return its
+ * level index. If no such level exists, returns -1. Also
+ * returns -1 if idx==0, and no such level exists, to simplify
+ * the logic of the first buffer flush.
+ */
+ inline level_index find_reconstruction_target(level_index idx) {
+
+ if (idx == 0 && m_levels.size() == 0) return -1;
+
+ size_t incoming_rec_cnt = get_level_record_count(idx);
+ for (level_index i=idx+1; i<m_levels.size(); i++) {
+ if (can_reconstruct_with(i, incoming_rec_cnt)) {
+ return i;
+ }
+
+ incoming_rec_cnt = get_level_record_count(i);
+ }
+
+ return -1;
+ }
+
+ inline void flush_buffer_into_l0(BuffView buffer) {
+ assert(m_levels[0]);
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ // FIXME: Kludgey implementation due to interface constraints.
+ auto old_level = m_levels[0].get();
+ auto temp_level = new InternalLevel<R, Shard, Q>(0, 1);
+ temp_level->append_buffer(std::move(buffer));
+
+ if (old_level->get_shard_count() > 0) {
+ m_levels[0] = InternalLevel<R, Shard, Q>::reconstruction(old_level, temp_level);
+ delete temp_level;
+ } else {
+ m_levels[0] = std::shared_ptr<InternalLevel<R, Shard, Q>>(temp_level);
+ }
+ } else {
+ m_levels[0]->append_buffer(std::move(buffer));
+ }
+ }
+
+ /*
+ * Mark a given memory level as no-longer in use by the tree. For now this
+ * will just free the level. In future, this will be more complex as the
+ * level may not be able to immediately be deleted, depending upon who
+ * else is using it.
+ */
+ inline void mark_as_unused(std::shared_ptr<InternalLevel<R, Shard, Q>> level) {
+ level.reset();
+ }
+
+ /*
+ * Assume that level "0" should be larger than the buffer. The buffer
+ * itself is index -1, which should return simply the buffer capacity.
+ */
+ inline size_t calc_level_record_capacity(level_index idx) {
+ return m_buffer_size * pow(m_scale_factor, idx+1);
+ }
+
+ /*
+ * Returns the number of records present on a specified level.
+ */
+ inline size_t get_level_record_count(level_index idx) {
+ return (m_levels[idx]) ? m_levels[idx]->get_record_count() : 0;
+ }
+
+ /*
+ * Determines if a level can sustain a reconstruction with incoming_rec_cnt
+ * additional records without exceeding its capacity.
+ */
+ inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt) {
+ if (idx >= m_levels.size() || !m_levels[idx]) {
+ return false;
+ }
+
+ if (L == LayoutPolicy::LEVELING) {
+ return m_levels[idx]->get_record_count() + incoming_rec_cnt <= calc_level_record_capacity(idx);
+ } else {
+ return m_levels[idx]->get_shard_count() < m_scale_factor;
+ }
+
+ /* unreachable */
+ assert(true);
+ }
+};
+
+}
+
diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h
new file mode 100644
index 0000000..db38946
--- /dev/null
+++ b/include/framework/structure/InternalLevel.h
@@ -0,0 +1,271 @@
+/*
+ * include/framework/structure/InternalLevel.h
+ *
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ * The word `Internal` in this class's name refers to memory. The current
+ * model, inherited from the framework in Practical Dynamic Extension for
+ * Sampling Indexes, would use a different ExternalLevel for shards stored
+ * on external storage. This is a distinction that can probably be avoided
+ * with some more thought being put into interface design.
+ *
+ */
+#pragma once
+
+#include <vector>
+#include <memory>
+
+#include "util/types.h"
+#include "framework/interface/Shard.h"
+#include "framework/interface/Query.h"
+#include "framework/interface/Record.h"
+#include "framework/structure/BufferView.h"
+
+namespace de {
+template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q>
+class InternalLevel;
+
+
+
+template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q>
+class InternalLevel {
+ typedef S Shard;
+ typedef BufferView<R> BuffView;
+public:
+ InternalLevel(ssize_t level_no, size_t shard_cap)
+ : m_level_no(level_no)
+ , m_shard_cnt(0)
+ , m_shards(shard_cap, nullptr)
+ , m_pending_shard(nullptr)
+ {}
+
+ ~InternalLevel() {
+ delete m_pending_shard;
+ }
+
+ /*
+ * Create a new shard combining the records from base_level and new_level,
+ * and return a shared_ptr to a new level containing this shard. This is used
+ * for reconstructions under the leveling layout policy.
+ *
+ * No changes are made to the levels provided as arguments.
+ */
+ static std::shared_ptr<InternalLevel> reconstruction(InternalLevel* base_level, InternalLevel* new_level) {
+ assert(base_level->m_level_no > new_level->m_level_no || (base_level->m_level_no == 0 && new_level->m_level_no == 0));
+ auto res = new InternalLevel(base_level->m_level_no, 1);
+ res->m_shard_cnt = 1;
+ std::vector<Shard *> shards = {base_level->m_shards[0].get(),
+ new_level->m_shards[0].get()};
+
+ res->m_shards[0] = std::make_shared<S>(shards);
+ return std::shared_ptr<InternalLevel>(res);
+ }
+
+ /*
+ * Create a new shard combining the records from all of
+ * the shards in level, and append this new shard into
+ * this level. This is used for reconstructions under
+ * the tiering layout policy.
+ *
+ * No changes are made to the level provided as an argument.
+ */
+ void append_level(InternalLevel* level) {
+ // FIXME: that this is happening probably means that
+ // something is going terribly wrong earlier in the
+ // reconstruction logic.
+ if (level->get_shard_count() == 0) {
+ return;
+ }
+
+ std::vector<S*> shards;
+ for (auto shard : level->m_shards) {
+ if (shard) shards.emplace_back(shard.get());
+ }
+
+ if (m_shard_cnt == m_shards.size()) {
+ m_pending_shard = new S(shards);
+ return;
+ }
+
+ auto tmp = new S(shards);
+ m_shards[m_shard_cnt] = std::shared_ptr<S>(tmp);
+
+ ++m_shard_cnt;
+ }
+
+ /*
+ * Create a new shard using the records in the
+ * provided buffer, and append this new shard
+ * into this level. This is used for buffer
+ * flushes under the tiering layout policy.
+ */
+ void append_buffer(BuffView buffer) {
+ if (m_shard_cnt == m_shards.size()) {
+ assert(m_pending_shard == nullptr);
+ m_pending_shard = new S(std::move(buffer));
+ return;
+ }
+
+ m_shards[m_shard_cnt] = std::make_shared<S>(std::move(buffer));
+ ++m_shard_cnt;
+ }
+
+ void finalize() {
+ if (m_pending_shard) {
+ for (size_t i=0; i<m_shards.size(); i++) {
+ m_shards[i] = nullptr;
+ }
+
+ m_shards[0] = std::shared_ptr<S>(m_pending_shard);
+ m_pending_shard = nullptr;
+ m_shard_cnt = 1;
+ }
+ }
+
+ /*
+ * Create a new shard containing the combined records
+ * from all shards on this level and return it.
+ *
+ * No changes are made to this level.
+ */
+ Shard *get_combined_shard() {
+ if (m_shard_cnt == 0) {
+ return nullptr;
+ }
+
+ std::vector<Shard *> shards;
+ for (auto shard : m_shards) {
+ if (shard) shards.emplace_back(shard.get());
+ }
+
+ return new S(shards);
+ }
+
+ void get_query_states(std::vector<std::pair<ShardID, Shard *>> &shards, std::vector<void*>& shard_states, void *query_parms) {
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ if (m_shards[i]) {
+ auto shard_state = Q::get_query_state(m_shards[i].get(), query_parms);
+ shards.push_back({{m_level_no, (ssize_t) i}, m_shards[i].get()});
+ shard_states.emplace_back(shard_state);
+ }
+ }
+ }
+
+ bool check_tombstone(size_t shard_stop, const R& rec) {
+ if (m_shard_cnt == 0) return false;
+
+ for (int i = m_shard_cnt - 1; i >= (ssize_t) shard_stop; i--) {
+ if (m_shards[i]) {
+ auto res = m_shards[i]->point_lookup(rec, true);
+ if (res && res->is_tombstone()) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ bool delete_record(const R &rec) {
+ if (m_shard_cnt == 0) return false;
+
+ for (size_t i = 0; i < m_shards.size(); ++i) {
+ if (m_shards[i]) {
+ auto res = m_shards[i]->point_lookup(rec);
+ if (res) {
+ res->set_delete();
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
+ Shard* get_shard(size_t idx) {
+ return m_shards[idx].get();
+ }
+
+ size_t get_shard_count() {
+ return m_shard_cnt;
+ }
+
+ size_t get_record_count() {
+ size_t cnt = 0;
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ if (m_shards[i]) {
+ cnt += m_shards[i]->get_record_count();
+ }
+ }
+
+ return cnt;
+ }
+
+ size_t get_tombstone_count() {
+ size_t res = 0;
+ for (size_t i = 0; i < m_shard_cnt; ++i) {
+ if (m_shards[i]) {
+ res += m_shards[i]->get_tombstone_count();
+ }
+ }
+ return res;
+ }
+
+ size_t get_aux_memory_usage() {
+ size_t cnt = 0;
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ if (m_shards[i]){
+ cnt += m_shards[i]->get_aux_memory_usage();
+ }
+ }
+
+ return cnt;
+ }
+
+ size_t get_memory_usage() {
+ size_t cnt = 0;
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ if (m_shards[i]) {
+ cnt += m_shards[i]->get_memory_usage();
+ }
+ }
+
+ return cnt;
+ }
+
+ double get_tombstone_prop() {
+ size_t tscnt = 0;
+ size_t reccnt = 0;
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ if (m_shards[i]) {
+ tscnt += m_shards[i]->get_tombstone_count();
+ reccnt += m_shards[i]->get_record_count();
+ }
+ }
+
+ return (double) tscnt / (double) (tscnt + reccnt);
+ }
+
+ std::shared_ptr<InternalLevel> clone() {
+ auto new_level = std::make_shared<InternalLevel>(m_level_no, m_shards.size());
+ for (size_t i=0; i<m_shard_cnt; i++) {
+ new_level->m_shards[i] = m_shards[i];
+ }
+ new_level->m_shard_cnt = m_shard_cnt;
+
+ return new_level;
+ }
+
+private:
+ ssize_t m_level_no;
+
+ size_t m_shard_cnt;
+ size_t m_shard_size_cap;
+
+ std::vector<std::shared_ptr<Shard>> m_shards;
+ Shard *m_pending_shard;
+};
+
+}
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
new file mode 100644
index 0000000..415c95a
--- /dev/null
+++ b/include/framework/structure/MutableBuffer.h
@@ -0,0 +1,313 @@
+/*
+ * include/framework/structure/MutableBuffer.h
+ *
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ * NOTE: Concerning the tombstone count. One possible approach
+ * would be to track the number of tombstones below and above the
+ * low water mark--this would be straightforward to do. Then, if we
+ * *require* that the head only advance up to the LWM, we can get a
+ * correct view on the number of tombstones in the active buffer at
+ * any point in time, and the BufferView will have a pretty good
+ * approximation as well (potentially with a few extra if new inserts
+ * happen between when the tail pointer and tombstone count are fetched)
+ *
+ */
+#pragma once
+
+#include <cstdlib>
+#include <atomic>
+#include <cassert>
+#include <immintrin.h>
+
+#include "psu-util/alignment.h"
+#include "util/bf_config.h"
+#include "psu-ds/BloomFilter.h"
+#include "framework/interface/Record.h"
+#include "framework/structure/BufferView.h"
+
+using psudb::CACHELINE_SIZE;
+
+namespace de {
+
+template <RecordInterface R>
+class MutableBuffer {
+ friend class BufferView<R>;
+
+ struct buffer_head {
+ size_t head_idx;
+ size_t refcnt;
+ };
+
+public:
+ MutableBuffer(size_t low_watermark, size_t high_watermark, size_t capacity=0)
+ : m_lwm(low_watermark)
+ , m_hwm(high_watermark)
+ , m_cap((capacity == 0) ? 2 * high_watermark : capacity)
+ , m_tail(0)
+ , m_head({0, 0})
+ , m_old_head({high_watermark, 0})
+ , m_data((Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped<R>)))
+ , m_tombstone_filter(new psudb::BloomFilter<R>(BF_FPR, m_hwm, BF_HASH_FUNCS))
+ , m_tscnt(0)
+ , m_old_tscnt(0)
+ , m_active_head_advance(false)
+ {
+ assert(m_cap > m_hwm);
+ assert(m_hwm > m_lwm);
+ }
+
+ ~MutableBuffer() {
+ free(m_data);
+ delete m_tombstone_filter;
+ }
+
+ int append(const R &rec, bool tombstone=false) {
+ int32_t tail = 0;
+ if ((tail = try_advance_tail()) == -1) {
+ return 0;
+ }
+
+ Wrapped<R> wrec;
+ wrec.rec = rec;
+ wrec.header = 0;
+ if (tombstone) wrec.set_tombstone();
+
+ size_t pos = tail % m_cap;
+
+ m_data[pos] = wrec;
+ m_data[pos].header |= (pos << 2);
+
+ if (tombstone) {
+ m_tscnt.fetch_add(1);
+ if (m_tombstone_filter) m_tombstone_filter->insert(rec);
+ }
+
+ return 1;
+ }
+
+ bool truncate() {
+ m_tscnt.store(0);
+ m_tail.store(0);
+ if (m_tombstone_filter) m_tombstone_filter->clear();
+
+ return true;
+ }
+
+ size_t get_record_count() {
+ return m_tail.load() - m_head.load().head_idx;
+ }
+
+ size_t get_capacity() {
+ return m_cap;
+ }
+
+ bool is_full() {
+ return get_record_count() >= m_hwm;
+ }
+
+ bool is_at_low_watermark() {
+ return get_record_count() >= m_lwm;
+ }
+
+ size_t get_tombstone_count() {
+ return m_tscnt.load();
+ }
+
+ bool delete_record(const R& rec) {
+ return get_buffer_view().delete_record(rec);
+ }
+
+ bool check_tombstone(const R& rec) {
+ return get_buffer_view().check_tombstone(rec);
+ }
+
+ size_t get_memory_usage() {
+ return m_cap * sizeof(Wrapped<R>);
+ }
+
+ size_t get_aux_memory_usage() {
+ return m_tombstone_filter->get_memory_usage();
+ }
+
+ BufferView<R> get_buffer_view(size_t target_head) {
+ size_t head = get_head(target_head);
+ auto f = std::bind(release_head_reference, (void *) this, head);
+
+ return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f);
+ }
+
+ BufferView<R> get_buffer_view() {
+ size_t head = get_head(m_head.load().head_idx);
+ auto f = std::bind(release_head_reference, (void *) this, head);
+
+ return BufferView<R>(m_data, m_cap, head, m_tail.load(), m_tscnt.load(), m_tombstone_filter, f);
+ }
+
+ /*
+ * Advance the buffer following a reconstruction. Move current
+ * head and head_refcnt into old_head and old_head_refcnt, then
+ * assign new_head to old_head.
+ */
+ bool advance_head(size_t new_head) {
+ assert(new_head > m_head.load().head_idx);
+ assert(new_head <= m_tail.load());
+
+ /* refuse to advance head while there is an old with one references */
+ if (m_old_head.load().refcnt > 0) {
+ //fprintf(stderr, "[W]: Refusing to advance head due to remaining reference counts\n");
+ return false;
+ }
+
+ m_active_head_advance.store(true);
+
+ buffer_head new_hd = {new_head, 0};
+ buffer_head cur_hd;
+
+ /* replace current head with new head */
+ do {
+ cur_hd = m_head.load();
+ } while(!m_head.compare_exchange_strong(cur_hd, new_hd));
+
+ /* move the current head into the old head */
+ m_old_head.store(cur_hd);
+
+ m_active_head_advance.store(false);
+ return true;
+ }
+
+ /*
+ * FIXME: If target_head does not match *either* the old_head or the
+ * current_head, this routine will loop infinitely.
+ */
+ size_t get_head(size_t target_head) {
+ buffer_head cur_hd, new_hd;
+ bool head_acquired = false;
+
+ do {
+ if (m_old_head.load().head_idx == target_head) {
+ cur_hd = m_old_head.load();
+ cur_hd.head_idx = target_head;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
+ head_acquired = m_old_head.compare_exchange_strong(cur_hd, new_hd);
+ } else if (m_head.load().head_idx == target_head){
+ cur_hd = m_head.load();
+ cur_hd.head_idx = target_head;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt + 1};
+ head_acquired = m_head.compare_exchange_strong(cur_hd, new_hd);
+ }
+ } while(!head_acquired);
+
+ return new_hd.head_idx;
+ }
+
+ void set_low_watermark(size_t lwm) {
+ assert(lwm < m_hwm);
+ m_lwm = lwm;
+ }
+
+ size_t get_low_watermark() {
+ return m_lwm;
+ }
+
+ void set_high_watermark(size_t hwm) {
+ assert(hwm > m_lwm);
+ assert(hwm < m_cap);
+ m_hwm = hwm;
+ }
+
+ size_t get_high_watermark() {
+ return m_hwm;
+ }
+
+ size_t get_tail() {
+ return m_tail.load();
+ }
+
+ /*
+ * Note: this returns the available physical storage capacity,
+ * *not* now many more records can be inserted before the
+ * HWM is reached. It considers the old_head to be "free"
+ * when it has no remaining references. This should be true,
+ * but a buggy framework implementation may violate the
+ * assumption.
+ */
+ size_t get_available_capacity() {
+ if (m_old_head.load().refcnt == 0) {
+ return m_cap - (m_tail.load() - m_head.load().head_idx);
+ }
+
+ return m_cap - (m_tail.load() - m_old_head.load().head_idx);
+ }
+
+private:
+ int64_t try_advance_tail() {
+ size_t old_value = m_tail.load();
+
+ /* if full, fail to advance the tail */
+ if (old_value - m_head.load().head_idx >= m_hwm) {
+ return -1;
+ }
+
+ while (!m_tail.compare_exchange_strong(old_value, old_value+1)) {
+ /* if full, stop trying and fail to advance the tail */
+ if (m_tail.load() >= m_hwm) {
+ return -1;
+ }
+
+ _mm_pause();
+ }
+
+ return old_value;
+ }
+
+ size_t to_idx(size_t i, size_t head) {
+ return (head + i) % m_cap;
+ }
+
+ static void release_head_reference(void *buff, size_t head) {
+ MutableBuffer<R> *buffer = (MutableBuffer<R> *) buff;
+
+ buffer_head cur_hd, new_hd;
+ do {
+ if (buffer->m_old_head.load().head_idx == head) {
+ cur_hd = buffer->m_old_head;
+ if (cur_hd.refcnt == 0) continue;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt-1};
+ if (buffer->m_old_head.compare_exchange_strong(cur_hd, new_hd)) {
+ break;
+ }
+ } else {
+ cur_hd = buffer->m_head;
+ if (cur_hd.refcnt == 0) continue;
+ new_hd = {cur_hd.head_idx, cur_hd.refcnt-1};
+
+ if (buffer->m_head.compare_exchange_strong(cur_hd, new_hd)) {
+ break;
+ }
+ }
+ _mm_pause();
+ } while(true);
+ }
+
+ size_t m_lwm;
+ size_t m_hwm;
+ size_t m_cap;
+
+ alignas(64) std::atomic<size_t> m_tail;
+
+ alignas(64) std::atomic<buffer_head> m_head;
+ alignas(64) std::atomic<buffer_head> m_old_head;
+
+ Wrapped<R>* m_data;
+ psudb::BloomFilter<R>* m_tombstone_filter;
+ alignas(64) std::atomic<size_t> m_tscnt;
+ size_t m_old_tscnt;
+
+ alignas(64) std::atomic<bool> m_active_head_advance;
+};
+
+}
diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h
new file mode 100644
index 0000000..65ca181
--- /dev/null
+++ b/include/framework/util/Configuration.h
@@ -0,0 +1,49 @@
+/*
+ * include/framework/util/Configuration.h
+ *
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ */
+#pragma once
+
+#include <cstdlib>
+#include <utility>
+
+namespace de {
+
+static thread_local size_t sampling_attempts = 0;
+static thread_local size_t sampling_rejections = 0;
+static thread_local size_t deletion_rejections = 0;
+static thread_local size_t bounds_rejections = 0;
+static thread_local size_t tombstone_rejections = 0;
+static thread_local size_t buffer_rejections = 0;
+
+/*
+ * thread_local size_t various_sampling_times go here.
+ */
+static thread_local size_t sample_range_time = 0;
+static thread_local size_t alias_time = 0;
+static thread_local size_t alias_query_time = 0;
+static thread_local size_t rejection_check_time = 0;
+static thread_local size_t buffer_sample_time = 0;
+static thread_local size_t memlevel_sample_time = 0;
+static thread_local size_t disklevel_sample_time = 0;
+static thread_local size_t sampling_bailouts = 0;
+
+
+enum class LayoutPolicy {
+ LEVELING,
+ TEIRING
+};
+
+enum class DeletePolicy {
+ TOMBSTONE,
+ TAGGING
+};
+
+typedef ssize_t level_index;
+typedef std::pair<level_index, level_index> ReconstructionTask;
+
+}