summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2023-09-13 16:22:03 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2023-09-13 16:22:03 -0400
commiteb8dbaa770a57557d67c817c2839c64f536a6ce4 (patch)
tree77bbbb79fb70f79965e7f6fd75bb5f4799a6f120
parent076e104b8672924c3d80cd1da2fdb5ebee1766ac (diff)
downloaddynamic-extension-eb8dbaa770a57557d67c817c2839c64f536a6ce4.tar.gz
Began re-architecting the project for concurrency support
The project is now in a state where it builds, but it probably has a lot of bugs still.
m---------external/psudb-common0
-rw-r--r--include/framework/Configuration.h54
-rw-r--r--include/framework/DynamicExtension.h372
-rw-r--r--include/framework/ExtensionStructure.h374
-rw-r--r--include/framework/InternalLevel.h8
-rw-r--r--include/framework/MutableBuffer.h67
-rw-r--r--include/framework/RecordInterface.h2
-rw-r--r--include/framework/Scheduler.h76
-rw-r--r--tests/internal_level_tests.cpp4
9 files changed, 647 insertions, 310 deletions
diff --git a/external/psudb-common b/external/psudb-common
-Subproject e5a10e888d248638e48bf82da70fa356eef47ba
+Subproject b436420bf4c9a574b3a8e54c1ab46f46e82240a
diff --git a/include/framework/Configuration.h b/include/framework/Configuration.h
new file mode 100644
index 0000000..eb9b93f
--- /dev/null
+++ b/include/framework/Configuration.h
@@ -0,0 +1,54 @@
+/*
+ * include/framework/DynamicExtension.h
+ *
+ * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * All rights reserved. Published under the Modified BSD License.
+ *
+ */
+#pragma once
+
+#include <atomic>
+#include <numeric>
+#include <cstdio>
+#include <vector>
+
+#include "psu-util/timer.h"
+#include "psu-ds/Alias.h"
+
+namespace de {
+
+thread_local size_t sampling_attempts = 0;
+thread_local size_t sampling_rejections = 0;
+thread_local size_t deletion_rejections = 0;
+thread_local size_t bounds_rejections = 0;
+thread_local size_t tombstone_rejections = 0;
+thread_local size_t buffer_rejections = 0;
+
+/*
+ * thread_local size_t various_sampling_times go here.
+ */
+thread_local size_t sample_range_time = 0;
+thread_local size_t alias_time = 0;
+thread_local size_t alias_query_time = 0;
+thread_local size_t rejection_check_time = 0;
+thread_local size_t buffer_sample_time = 0;
+thread_local size_t memlevel_sample_time = 0;
+thread_local size_t disklevel_sample_time = 0;
+thread_local size_t sampling_bailouts = 0;
+
+
+enum class LayoutPolicy {
+ LEVELING,
+ TEIRING
+};
+
+enum class DeletePolicy {
+ TOMBSTONE,
+ TAGGING
+};
+
+typedef ssize_t level_index;
+
+}
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 524024b..5e9bcee 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -19,62 +19,36 @@
#include "framework/ShardInterface.h"
#include "framework/QueryInterface.h"
#include "framework/RecordInterface.h"
+#include "framework/ExtensionStructure.h"
+
+#include "framework/Configuration.h"
+#include "framework/Scheduler.h"
-#include "shard/WIRS.h"
#include "psu-util/timer.h"
#include "psu-ds/Alias.h"
namespace de {
-thread_local size_t sampling_attempts = 0;
-thread_local size_t sampling_rejections = 0;
-thread_local size_t deletion_rejections = 0;
-thread_local size_t bounds_rejections = 0;
-thread_local size_t tombstone_rejections = 0;
-thread_local size_t buffer_rejections = 0;
-
-/*
- * thread_local size_t various_sampling_times go here.
- */
-thread_local size_t sample_range_time = 0;
-thread_local size_t alias_time = 0;
-thread_local size_t alias_query_time = 0;
-thread_local size_t rejection_check_time = 0;
-thread_local size_t buffer_sample_time = 0;
-thread_local size_t memlevel_sample_time = 0;
-thread_local size_t disklevel_sample_time = 0;
-thread_local size_t sampling_bailouts = 0;
-
-
-enum class LayoutPolicy {
- LEVELING,
- TEIRING
-};
-
-enum class DeletePolicy {
- TOMBSTONE,
- TAGGING
-};
-
-typedef ssize_t level_index;
-
template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING, DeletePolicy D=DeletePolicy::TAGGING>
class DynamicExtension {
- //typedef typename S<R> Shard;
typedef S Shard;
typedef MutableBuffer<R> Buffer;
-
+ typedef ExtensionStructure<R, S, Q, L> Structure;
public:
- DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop)
- : m_scale_factor(scale_factor), m_max_delete_prop(max_delete_prop),
- m_buffer(new Buffer(buffer_cap, buffer_cap * max_delete_prop))
+ DynamicExtension(size_t buffer_cap, size_t scale_factor, double max_delete_prop, size_t memory_budget=0,
+ size_t thread_cnt=16)
+ : m_scale_factor(scale_factor)
+ , m_max_delete_prop(max_delete_prop)
+ , m_sched(memory_budget, thread_cnt)
{ }
~DynamicExtension() {
- delete m_buffer;
+ for (size_t i=0; i<m_buffers.size(); i++) {
+ delete m_buffers[i];
+ }
- for (size_t i=0; i<m_levels.size(); i++) {
- delete m_levels[i];
+ for (size_t i=0; i<m_versions.size(); i++) {
+ delete m_versions[i];
}
}
@@ -83,30 +57,30 @@ public:
}
int erase(const R &rec) {
- Buffer *buffer;
+ Buffer *buffer = get_buffer();
if constexpr (D == DeletePolicy::TAGGING) {
- auto buffer = get_buffer();
-
- // Check the levels first. This assumes there aren't
- // any undeleted duplicate records.
- for (auto level : m_levels) {
- if (level && level->delete_record(rec)) {
- return 1;
- }
+ if (get_active_version()->tagged_delete(rec)) {
+ return 1;
}
- // the buffer will take the longest amount of time, and
- // probably has the lowest probability of having the record,
- // so we'll check it last.
+ /*
+ * the buffer will take the longest amount of time, and
+ * probably has the lowest probability of having the record,
+ * so we'll check it last.
+ */
return buffer->delete_record(rec);
}
+ /*
+ * If tagging isn't used, then delete using a tombstone
+ */
return internal_append(rec, true);
}
std::vector<R> query(void *parms) {
auto buffer = get_buffer();
+ auto vers = get_active_version();
// Get the buffer query state
auto buffer_state = Q::get_buffer_query_state(buffer, parms);
@@ -115,7 +89,7 @@ public:
std::vector<std::pair<ShardID, Shard*>> shards;
std::vector<void*> states;
- for (auto &level : m_levels) {
+ for (auto &level : vers->get_levels()) {
level->get_query_states(shards, states, parms);
}
@@ -125,7 +99,7 @@ public:
// Execute the query for the buffer
auto buffer_results = Q::buffer_query(buffer, buffer_state, parms);
- query_results[0] = std::move(filter_deletes(buffer_results, {-1, -1}, buffer));
+ query_results[0] = std::move(filter_deletes(buffer_results, {-1, -1}, buffer, vers));
if constexpr (Q::EARLY_ABORT) {
if (query_results[0].size() > 0) {
auto result = Q::merge(query_results, parms);
@@ -141,7 +115,7 @@ public:
// Execute the query for each shard
for (size_t i=0; i<shards.size(); i++) {
auto shard_results = Q::query(shards[i].second, states[i], parms);
- query_results[i+1] = std::move(filter_deletes(shard_results, shards[i].first, buffer));
+ query_results[i+1] = std::move(filter_deletes(shard_results, shards[i].first, buffer, vers));
if constexpr (Q::EARLY_ABORT) {
if (query_results[i].size() > 0) {
auto result = Q::merge(query_results, parms);
@@ -170,75 +144,44 @@ public:
size_t get_record_count() {
size_t cnt = get_buffer()->get_record_count();
-
- for (size_t i=0; i<m_levels.size(); i++) {
- if (m_levels[i]) cnt += m_levels[i]->get_record_count();
- }
-
- return cnt;
+ return cnt + get_active_version()->get_record_count();
}
size_t get_tombstone_cnt() {
size_t cnt = get_buffer()->get_tombstone_count();
-
- for (size_t i=0; i<m_levels.size(); i++) {
- if (m_levels[i]) cnt += m_levels[i]->get_tombstone_count();
- }
-
- return cnt;
+ return cnt + get_active_version()->get_tombstone_cnt();
}
size_t get_height() {
- return m_levels.size();
+ return get_active_version()->get_height();
}
size_t get_memory_usage() {
- size_t cnt = m_buffer->get_memory_usage();
-
- for (size_t i=0; i<m_levels.size(); i++) {
- if (m_levels[i]) cnt += m_levels[i]->get_memory_usage();
- }
+ auto vers = get_active_version();
+ auto buffer = get_buffer();
- return cnt;
+ return vers.get_memory_usage() + buffer->get_memory_usage();
}
size_t get_aux_memory_usage() {
- size_t cnt = m_buffer->get_aux_memory_usage();
-
- for (size_t i=0; i<m_levels.size(); i++) {
- if (m_levels[i]) {
- cnt += m_levels[i]->get_aux_memory_usage();
- }
- }
-
- return cnt;
- }
-
- bool validate_tombstone_proportion() {
- long double ts_prop;
- for (size_t i=0; i<m_levels.size(); i++) {
- if (m_levels[i]) {
- ts_prop = (long double) m_levels[i]->get_tombstone_count() / (long double) calc_level_record_capacity(i);
- if (ts_prop > (long double) m_max_delete_prop) {
- return false;
- }
- }
- }
+ auto vers = get_active_version();
+ auto buffer = get_buffer();
- return true;
+ return vers.get_aux_memory_usage() + buffer->get_aux_memory_usage();
}
size_t get_buffer_capacity() {
- return m_buffer->get_capacity();
+ return get_height()->get_capacity();
}
Shard *create_static_structure() {
+ auto vers = get_active_version();
std::vector<Shard *> shards;
- if (m_levels.size() > 0) {
- for (int i=m_levels.size() - 1; i>= 0; i--) {
- if (m_levels[i]) {
- shards.emplace_back(m_levels[i]->get_merged_shard());
+ if (vers->get_levels().size() > 0) {
+ for (int i=vers->get_levels().size() - 1; i>= 0; i--) {
+ if (vers->get_levels()[i]) {
+ shards.emplace_back(vers->get_levels()[i]->get_merged_shard());
}
}
}
@@ -263,16 +206,32 @@ public:
return flattened;
}
+ /*
+ * Mostly exposed for unit-testing purposes. Verifies that the current
+ * active version of the ExtensionStructure doesn't violate the maximum
+ * tombstone proportion invariant.
+ */
+ bool validate_tombstone_proportion() {
+ return get_active_version()->validate_tombstone_proportion();
+ }
+
private:
- Buffer *m_buffer;
+ Scheduler<R, S, Q, L> m_sched;
+
+ std::vector<Buffer *> m_buffers;
+ std::vector<Structure *> m_versions;
+
+ std::atomic<size_t> m_current_epoch;
size_t m_scale_factor;
double m_max_delete_prop;
- std::vector<InternalLevel<R, S, Q> *> m_levels;
-
Buffer *get_buffer() {
- return m_buffer;
+ return m_buffers[0];
+ }
+
+ Structure *get_active_version() {
+ return m_versions[0];
}
int internal_append(const R &rec, bool ts) {
@@ -281,13 +240,14 @@ private:
;
if (buffer->is_full()) {
- merge_buffer();
+ auto vers = get_active_version();
+ m_sched.schedule_merge(vers, buffer);
}
return buffer->append(rec, ts);
}
- std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Buffer *buffer) {
+ std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Buffer *buffer, Structure *vers) {
if constexpr (!Q::SKIP_DELETE_FILTER) {
return records;
}
@@ -322,12 +282,12 @@ private:
if (shid != INVALID_SHID) {
for (size_t lvl=0; lvl<=shid.level_idx; lvl++) {
- if (m_levels[lvl]->check_tombstone(0, rec.rec)) {
+ if (vers->get_levels()[lvl]->check_tombstone(0, rec.rec)) {
continue;
}
}
- if (m_levels[shid.level_idx]->check_tombstone(shid.shard_idx + 1, rec.rec)) {
+ if (vers->get_levels()[shid.level_idx]->check_tombstone(shid.shard_idx + 1, rec.rec)) {
continue;
}
}
@@ -337,198 +297,6 @@ private:
return processed_records;
}
-
- /*
- * Add a new level to the LSM Tree and return that level's index. Will
- * automatically determine whether the level should be on memory or on disk,
- * and act appropriately.
- */
- inline level_index grow() {
- level_index new_idx;
-
- size_t new_shard_cnt = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor;
- new_idx = m_levels.size();
- if (new_idx > 0) {
- assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0);
- }
- m_levels.emplace_back(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt));
-
- return new_idx;
- }
-
-
- // Merge the memory table down into the tree, completing any required other
- // merges to make room for it.
- inline void merge_buffer() {
- auto buffer = get_buffer();
-
- if (!can_merge_with(0, buffer->get_record_count())) {
- merge_down(0);
- }
-
- merge_buffer_into_l0(buffer);
- enforce_delete_maximum(0);
-
- buffer->truncate();
- return;
- }
-
- /*
- * Merge the specified level down into the tree. The level index must be
- * non-negative (i.e., this function cannot be used to merge the buffer). This
- * routine will recursively perform any necessary merges to make room for the
- * specified level.
- */
- inline void merge_down(level_index idx) {
- level_index merge_base_level = find_mergable_level(idx);
- if (merge_base_level == -1) {
- merge_base_level = grow();
- }
-
- for (level_index i=merge_base_level; i>idx; i--) {
- merge_levels(i, i-1);
- enforce_delete_maximum(i);
- }
-
- return;
- }
-
- /*
- * Find the first level below the level indicated by idx that
- * is capable of sustaining a merge operation and return its
- * level index. If no such level exists, returns -1. Also
- * returns -1 if idx==0, and no such level exists, to simplify
- * the logic of the first merge.
- */
- inline level_index find_mergable_level(level_index idx, Buffer *buffer=nullptr) {
-
- if (idx == 0 && m_levels.size() == 0) return -1;
-
- bool level_found = false;
- bool disk_level;
- level_index merge_level_idx;
-
- size_t incoming_rec_cnt = get_level_record_count(idx, buffer);
- for (level_index i=idx+1; i<m_levels.size(); i++) {
- if (can_merge_with(i, incoming_rec_cnt)) {
- return i;
- }
-
- incoming_rec_cnt = get_level_record_count(i);
- }
-
- return -1;
- }
-
- /*
- * Merge the level specified by incoming level into the level specified
- * by base level. The two levels should be sequential--i.e. no levels
- * are skipped in the merge process--otherwise the tombstone ordering
- * invariant may be violated by the merge operation.
- */
- inline void merge_levels(level_index base_level, level_index incoming_level) {
- // merging two memory levels
- if constexpr (L == LayoutPolicy::LEVELING) {
- auto tmp = m_levels[base_level];
- m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level], m_levels[incoming_level]);
- mark_as_unused(tmp);
- } else {
- m_levels[base_level]->append_merged_shards(m_levels[incoming_level]);
- }
-
- mark_as_unused(m_levels[incoming_level]);
- m_levels[incoming_level] = new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor);
- }
-
-
- inline void merge_buffer_into_l0(Buffer *buffer) {
- assert(m_levels[0]);
- if constexpr (L == LayoutPolicy::LEVELING) {
- // FIXME: Kludgey implementation due to interface constraints.
- auto old_level = m_levels[0];
- auto temp_level = new InternalLevel<R, Shard, Q>(0, 1);
- temp_level->append_buffer(buffer);
- auto new_level = InternalLevel<R, Shard, Q>::merge_levels(old_level, temp_level);
-
- m_levels[0] = new_level;
- delete temp_level;
- mark_as_unused(old_level);
- } else {
- m_levels[0]->append_buffer(buffer);
- }
- }
-
- /*
- * Mark a given memory level as no-longer in use by the tree. For now this
- * will just free the level. In future, this will be more complex as the
- * level may not be able to immediately be deleted, depending upon who
- * else is using it.
- */
- inline void mark_as_unused(InternalLevel<R, Shard, Q> *level) {
- delete level;
- }
-
- /*
- * Check the tombstone proportion for the specified level and
- * if the limit is exceeded, forcibly merge levels until all
- * levels below idx are below the limit.
- */
- inline void enforce_delete_maximum(level_index idx) {
- long double ts_prop = (long double) m_levels[idx]->get_tombstone_count() / (long double) calc_level_record_capacity(idx);
-
- if (ts_prop > (long double) m_max_delete_prop) {
- merge_down(idx);
- }
-
- return;
- }
-
- /*
- * Assume that level "0" should be larger than the buffer. The buffer
- * itself is index -1, which should return simply the buffer capacity.
- */
- inline size_t calc_level_record_capacity(level_index idx) {
- return get_buffer()->get_capacity() * pow(m_scale_factor, idx+1);
- }
-
- /*
- * Returns the actual number of records present on a specified level. An
- * index value of -1 indicates the memory table. Can optionally pass in
- * a pointer to the memory table to use, if desired. Otherwise, there are
- * no guarantees about which buffer will be accessed if level_index is -1.
- */
- inline size_t get_level_record_count(level_index idx, Buffer *buffer=nullptr) {
-
- assert(idx >= -1);
- if (idx == -1) {
- return (buffer) ? buffer->get_record_count() : get_buffer()->get_record_count();
- }
-
- return (m_levels[idx]) ? m_levels[idx]->get_record_count() : 0;
- }
-
- /*
- * Determines if the specific level can merge with another record containing
- * incoming_rec_cnt number of records. The provided level index should be
- * non-negative (i.e., not refer to the buffer) and will be automatically
- * translated into the appropriate index into either the disk or memory level
- * vector.
- */
- inline bool can_merge_with(level_index idx, size_t incoming_rec_cnt) {
- if (idx>= m_levels.size() || !m_levels[idx]) {
- return false;
- }
-
- if (L == LayoutPolicy::LEVELING) {
- return m_levels[idx]->get_record_count() + incoming_rec_cnt <= calc_level_record_capacity(idx);
- } else {
- return m_levels[idx]->get_shard_count() < m_scale_factor;
- }
-
- // unreachable
- assert(true);
- }
};
-
}
diff --git a/include/framework/ExtensionStructure.h b/include/framework/ExtensionStructure.h
new file mode 100644
index 0000000..1e756db
--- /dev/null
+++ b/include/framework/ExtensionStructure.h
@@ -0,0 +1,374 @@
+/*
+ * include/framework/ExtensionStructure.h
+ *
+ * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * All rights reserved. Published under the Modified BSD License.
+ *
+ */
+#pragma once
+
+#include <atomic>
+#include <numeric>
+#include <cstdio>
+#include <vector>
+
+#include "framework/MutableBuffer.h"
+#include "framework/InternalLevel.h"
+#include "framework/ShardInterface.h"
+#include "framework/QueryInterface.h"
+#include "framework/RecordInterface.h"
+
+#include "framework/Configuration.h"
+
+#include "psu-util/timer.h"
+#include "psu-ds/Alias.h"
+
+namespace de {
+
+template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING>
+class ExtensionStructure {
+ typedef S Shard;
+ typedef MutableBuffer<R> Buffer;
+
+public:
+ ExtensionStructure(size_t buffer_size, size_t scale_factor, double max_delete_prop)
+ : m_scale_factor(scale_factor)
+ , m_max_delete_prop(max_delete_prop)
+ , m_buffer_size(buffer_size)
+ {}
+
+ ~ExtensionStructure() = default;
+
+ /*
+ * Create a shallow copy of this extension structure. The copy will share references to the
+ * same levels/shards as the original, but will have its own lists. As all of the shards are
+ * immutable (with the exception of deletes), the copy can be restructured with merges, etc.,
+ * without affecting the original.
+ *
+ * NOTE: When using tagged deletes, a delete of a record in the original structure will affect
+ * the copy, so long as the copy retains a reference to the same shard as the original. This could
+ * cause synchronization problems under tagging with concurrency. Any deletes in this context will
+ * need to be forwarded to the appropriate structures manually.
+ */
+ ExtensionStructure<R, S, Q, L> *copy() {
+ auto new_struct = new ExtensionStructure<R, S, Q, L>(m_scale_factor, m_max_delete_prop, m_buffer_size);
+ for (size_t i=0; i<m_levels.size(); i++) {
+ new_struct->m_levels.push_back(m_levels[i]);
+ }
+
+ return new_struct;
+ }
+
+ /*
+ * Search for a record matching the argument and mark it deleted by
+ * setting the delete bit in its wrapped header. Returns 1 if a matching
+ * record was found and deleted, and 0 if a matching record was not found.
+ *
+ * This function will stop after finding the first matching record. It is assumed
+ * that no duplicate records exist. In the case of duplicates, this function will
+ * still "work", but in the sense of "delete first match".
+ */
+ int tagged_delete(const R &rec) {
+ for (auto level : m_levels) {
+ if (level && level->delete_record(rec)) {
+ return 1;
+ }
+ }
+
+ /*
+ * If the record to be erased wasn't found, return 0. The
+ * DynamicExtension itself will then search the active
+ * Buffers.
+ */
+ return 0;
+ }
+
+ /*
+ * Merge the memory table down into the tree, completing any required other
+ * merges to make room for it.
+ */
+ inline bool merge_buffer(Buffer *buffer) {
+ if (!can_merge_with(0, buffer->get_record_count())) {
+ merge_down(0);
+ }
+
+ merge_buffer_into_l0(buffer);
+ enforce_delete_maximum(0);
+
+ buffer->truncate();
+ return true;
+ }
+
+ /*
+ * Return the total number of records (including tombstones) within all
+ * of the levels of the structure.
+ */
+ size_t get_record_count() {
+ size_t cnt = 0;
+
+ for (size_t i=0; i<m_levels.size(); i++) {
+ if (m_levels[i]) cnt += m_levels[i]->get_record_count();
+ }
+
+ return cnt;
+ }
+
+ /*
+ * Return the total number of tombstones contained within all of the
+ * levels of the structure.
+ */
+ size_t get_tombstone_cnt() {
+ size_t cnt = 0;
+
+ for (size_t i=0; i<m_levels.size(); i++) {
+ if (m_levels[i]) cnt += m_levels[i]->get_tombstone_count();
+ }
+
+ return cnt;
+ }
+
+ /*
+ * Return the number of levels within the structure. Note that not
+ * all of these levels are necessarily populated.
+ */
+ size_t get_height() {
+ return m_levels.size();
+ }
+
+ /*
+ * Return the amount of memory (in bytes) used by the shards within the
+ * structure for storing the primary data structure and raw data.
+ */
+ size_t get_memory_usage() {
+ size_t cnt = 0;
+ for (size_t i=0; i<m_levels.size(); i++) {
+ if (m_levels[i]) cnt += m_levels[i]->get_memory_usage();
+ }
+
+ return cnt;
+ }
+
+ /*
+ * Return the amount of memory (in bytes) used by the shards within the
+ * structure for storing auxiliary data structures. This total does not
+ * include memory used for the main data structure, or raw data.
+ */
+ size_t get_aux_memory_usage() {
+ size_t cnt = 0;
+ for (size_t i=0; i<m_levels.size(); i++) {
+ if (m_levels[i]) {
+ cnt += m_levels[i]->get_aux_memory_usage();
+ }
+ }
+
+ return cnt;
+ }
+
+ /*
+ * Validate that no level in the structure exceeds its maximum tombstone capacity. This is
+ * used to trigger preemptive compactions at the end of the merge process.
+ */
+ bool validate_tombstone_proportion() {
+ long double ts_prop;
+ for (size_t i=0; i<m_levels.size(); i++) {
+ if (m_levels[i]) {
+ ts_prop = (long double) m_levels[i]->get_tombstone_count() / (long double) calc_level_record_capacity(i);
+ if (ts_prop > (long double) m_max_delete_prop) {
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+
+ /*
+ * Return a reference to the underlying vector of levels within the
+ * structure.
+ */
+ std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> &get_levels() {
+ return m_levels;
+ }
+
+private:
+ size_t m_scale_factor;
+ double m_max_delete_prop;
+ size_t m_buffer_size;
+
+ std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> m_levels;
+
+ /*
+ * Add a new level to the LSM Tree and return that level's index. Will
+ * automatically determine whether the level should be on memory or on disk,
+ * and act appropriately.
+ */
+ inline level_index grow() {
+ level_index new_idx;
+
+ size_t new_shard_cnt = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor;
+ new_idx = m_levels.size();
+ if (new_idx > 0) {
+ assert(m_levels[new_idx - 1]->get_shard(0)->get_tombstone_count() == 0);
+ }
+ m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt)));
+
+ return new_idx;
+ }
+
+
+ /*
+ * Merge the specified level down into the tree. The level index must be
+ * non-negative (i.e., this function cannot be used to merge the buffer). This
+ * routine will recursively perform any necessary merges to make room for the
+ * specified level.
+ */
+ inline void merge_down(level_index idx) {
+ level_index merge_base_level = find_mergable_level(idx);
+ if (merge_base_level == -1) {
+ merge_base_level = grow();
+ }
+
+ for (level_index i=merge_base_level; i>idx; i--) {
+ merge_levels(i, i-1);
+ enforce_delete_maximum(i);
+ }
+
+ return;
+ }
+
+ /*
+ * Find the first level below the level indicated by idx that
+ * is capable of sustaining a merge operation and return its
+ * level index. If no such level exists, returns -1. Also
+ * returns -1 if idx==0, and no such level exists, to simplify
+ * the logic of the first merge.
+ */
+ inline level_index find_mergable_level(level_index idx, Buffer *buffer=nullptr) {
+
+ if (idx == 0 && m_levels.size() == 0) return -1;
+
+ bool level_found = false;
+ bool disk_level;
+ level_index merge_level_idx;
+
+ size_t incoming_rec_cnt = get_level_record_count(idx, buffer);
+ for (level_index i=idx+1; i<m_levels.size(); i++) {
+ if (can_merge_with(i, incoming_rec_cnt)) {
+ return i;
+ }
+
+ incoming_rec_cnt = get_level_record_count(i);
+ }
+
+ return -1;
+ }
+
+ /*
+ * Merge the level specified by incoming level into the level specified
+ * by base level. The two levels should be sequential--i.e. no levels
+ * are skipped in the merge process--otherwise the tombstone ordering
+ * invariant may be violated by the merge operation.
+ */
+ inline void merge_levels(level_index base_level, level_index incoming_level) {
+ // merging two memory levels
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ auto tmp = m_levels[base_level];
+ m_levels[base_level] = InternalLevel<R, Shard, Q>::merge_levels(m_levels[base_level].get(), m_levels[incoming_level].get());
+ } else {
+ m_levels[base_level]->append_merged_shards(m_levels[incoming_level].get());
+ }
+
+ m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor));
+ }
+
+
+ inline void merge_buffer_into_l0(Buffer *buffer) {
+ assert(m_levels[0]);
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ // FIXME: Kludgey implementation due to interface constraints.
+ auto old_level = m_levels[0].get();
+ auto temp_level = new InternalLevel<R, Shard, Q>(0, 1);
+ temp_level->append_buffer(buffer);
+ auto new_level = InternalLevel<R, Shard, Q>::merge_levels(old_level, temp_level);
+
+ m_levels[0] = new_level;
+ delete temp_level;
+ } else {
+ m_levels[0]->append_buffer(buffer);
+ }
+ }
+
+ /*
+ * Mark a given memory level as no-longer in use by the tree. For now this
+ * will just free the level. In future, this will be more complex as the
+ * level may not be able to immediately be deleted, depending upon who
+ * else is using it.
+ */
+ inline void mark_as_unused(std::shared_ptr<InternalLevel<R, Shard, Q>> level) {
+ level.reset();
+ }
+
+ /*
+ * Check the tombstone proportion for the specified level and
+ * if the limit is exceeded, forcibly merge levels until all
+ * levels below idx are below the limit.
+ */
+ inline void enforce_delete_maximum(level_index idx) {
+ long double ts_prop = (long double) m_levels[idx]->get_tombstone_count() / (long double) calc_level_record_capacity(idx);
+
+ if (ts_prop > (long double) m_max_delete_prop) {
+ merge_down(idx);
+ }
+
+ return;
+ }
+
+ /*
+ * Assume that level "0" should be larger than the buffer. The buffer
+ * itself is index -1, which should return simply the buffer capacity.
+ */
+ inline size_t calc_level_record_capacity(level_index idx) {
+ return m_buffer_size * pow(m_scale_factor, idx+1);
+ }
+
+ /*
+ * Returns the actual number of records present on a specified level. An
+ * index value of -1 indicates the memory table. Can optionally pass in
+ * a pointer to the memory table to use, if desired. Otherwise, there are
+ * no guarantees about which buffer will be accessed if level_index is -1.
+ */
+ inline size_t get_level_record_count(level_index idx, Buffer *buffer=nullptr) {
+ if (buffer) {
+ return buffer->get_record_count();
+ }
+
+ return (m_levels[idx]) ? m_levels[idx]->get_record_count() : 0;
+ }
+
+ /*
+ * Determines if the specific level can merge with another record containing
+ * incoming_rec_cnt number of records. The provided level index should be
+ * non-negative (i.e., not refer to the buffer) and will be automatically
+ * translated into the appropriate index into either the disk or memory level
+ * vector.
+ */
+ inline bool can_merge_with(level_index idx, size_t incoming_rec_cnt) {
+ if (idx>= m_levels.size() || !m_levels[idx]) {
+ return false;
+ }
+
+ if (L == LayoutPolicy::LEVELING) {
+ return m_levels[idx]->get_record_count() + incoming_rec_cnt <= calc_level_record_capacity(idx);
+ } else {
+ return m_levels[idx]->get_shard_count() < m_scale_factor;
+ }
+
+ // unreachable
+ assert(true);
+ }
+};
+
+}
+
diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h
index ec8ffc4..983ec6a 100644
--- a/include/framework/InternalLevel.h
+++ b/include/framework/InternalLevel.h
@@ -19,6 +19,10 @@
#include "framework/MutableBuffer.h"
namespace de {
+template <RecordInterface R, ShardInterface S, QueryInterface Q>
+class InternalLevel;
+
+
template <RecordInterface R, ShardInterface S, QueryInterface Q>
class InternalLevel {
@@ -55,7 +59,7 @@ public:
// WARNING: for leveling only.
// assuming the base level is the level new level is merging into. (base_level is larger.)
- static InternalLevel* merge_levels(InternalLevel* base_level, InternalLevel* new_level) {
+ static std::shared_ptr<InternalLevel> merge_levels(InternalLevel* base_level, InternalLevel* new_level) {
assert(base_level->m_level_no > new_level->m_level_no || (base_level->m_level_no == 0 && new_level->m_level_no == 0));
auto res = new InternalLevel(base_level->m_level_no, 1);
res->m_shard_cnt = 1;
@@ -64,7 +68,7 @@ public:
shards[1] = new_level->m_shards[0];
res->m_shards[0] = new S(shards, 2);
- return res;
+ return std::shared_ptr<InternalLevel>(res);
}
void append_buffer(Buffer* buffer) {
diff --git a/include/framework/MutableBuffer.h b/include/framework/MutableBuffer.h
index b79fc02..cadecb6 100644
--- a/include/framework/MutableBuffer.h
+++ b/include/framework/MutableBuffer.h
@@ -11,6 +11,7 @@
#include <cstdlib>
#include <atomic>
+#include <condition_variable>
#include <cassert>
#include <numeric>
#include <algorithm>
@@ -33,16 +34,22 @@ public:
MutableBuffer(size_t capacity, size_t max_tombstone_cap)
: m_cap(capacity), m_tombstone_cap(max_tombstone_cap), m_reccnt(0)
, m_tombstonecnt(0), m_weight(0), m_max_weight(0) {
- auto len = capacity * sizeof(Wrapped<R>);
- size_t aligned_buffersize = len + (CACHELINE_SIZE - (len % CACHELINE_SIZE));
- m_data = (Wrapped<R>*) std::aligned_alloc(CACHELINE_SIZE, aligned_buffersize);
+ m_data = (Wrapped<R>*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped<R>));
+ m_merge_data = (Wrapped<R>*) psudb::sf_aligned_alloc(CACHELINE_SIZE, capacity*sizeof(Wrapped<R>));
m_tombstone_filter = nullptr;
if (max_tombstone_cap > 0) {
m_tombstone_filter = new psudb::BloomFilter<R>(BF_FPR, max_tombstone_cap, BF_HASH_FUNCS);
}
+
+ m_refcnt.store(0);
+ m_deferred_truncate.store(false);
+ m_merging.store(false);
}
~MutableBuffer() {
+ assert(m_refcnt.load() == 0);
+ assert(m_merging.load() == false);
+
if (m_data) free(m_data);
if (m_tombstone_filter) delete m_tombstone_filter;
}
@@ -157,6 +164,50 @@ public:
return m_max_weight;
}
+ bool start_merge() {
+ if (m_merge_lock.try_lock()) {
+ /* there cannot already been an active merge */
+ if (m_merging.load()) {
+ m_merge_lock.unlock();
+ return false;
+ }
+
+ m_merging.store(true);
+ memcpy(m_merge_data, m_data, sizeof(Wrapped<R>) * m_reccnt.load());
+ return true;
+ }
+
+ /* lock could not be obtained */
+ return false;
+ }
+
+ bool finish_merge() {
+ m_merge_lock.unlock();
+ return true;
+ }
+
+ /*
+ * Concurrency-related operations
+ */
+ bool take_reference() {
+ m_refcnt.fetch_add(1);
+ return true;
+ }
+
+ bool release_reference() {
+ m_refcnt.fetch_add(-1);
+
+ if (m_refcnt.load() == 0 && m_deferred_truncate.load()) {
+ assert(this->truncate());
+ }
+
+ return true;
+ }
+
+ bool active_merge() {
+ return m_merging.load();
+ }
+
private:
int32_t try_advance_tail() {
size_t new_tail = m_reccnt.fetch_add(1);
@@ -169,12 +220,22 @@ private:
size_t m_tombstone_cap;
Wrapped<R>* m_data;
+ Wrapped<R>* m_merge_data;
+
psudb::BloomFilter<R>* m_tombstone_filter;
alignas(64) std::atomic<size_t> m_tombstonecnt;
alignas(64) std::atomic<uint32_t> m_reccnt;
alignas(64) std::atomic<double> m_weight;
alignas(64) std::atomic<double> m_max_weight;
+ alignas(64) std::atomic<bool> m_merging;
+ alignas(64) std::atomic<bool> m_deferred_truncate;
+ alignas(64) std::atomic<size_t> m_refcnt;
+
+ alignas(64) std::mutex m_merge_lock;
+ alignas(64) std::mutex m_trunc_lock;
+ alignas(64) std::condition_variable m_trunc_signal;
+
};
}
diff --git a/include/framework/RecordInterface.h b/include/framework/RecordInterface.h
index f78918c..1ef1984 100644
--- a/include/framework/RecordInterface.h
+++ b/include/framework/RecordInterface.h
@@ -207,7 +207,7 @@ struct EuclidPoint{
template<RecordInterface R>
struct RecordHash {
size_t operator()(R const &rec) const {
- return psudb::hash_bytes((char *) &rec, sizeof(R));
+ return psudb::hash_bytes((std::byte *) &rec, sizeof(R));
}
};
diff --git a/include/framework/Scheduler.h b/include/framework/Scheduler.h
new file mode 100644
index 0000000..cd3f430
--- /dev/null
+++ b/include/framework/Scheduler.h
@@ -0,0 +1,76 @@
+/*
+ * include/framework/Scheduler.h
+ *
+ * Copyright (C) 2023 Douglas Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * All rights reserved. Published under the Modified BSD License.
+ *
+ */
+#pragma once
+
+#include <vector>
+#include <memory>
+#include <queue>
+
+#include "util/types.h"
+#include "framework/ShardInterface.h"
+#include "framework/QueryInterface.h"
+#include "framework/RecordInterface.h"
+#include "framework/MutableBuffer.h"
+#include "framework/Configuration.h"
+#include "framework/ExtensionStructure.h"
+
+namespace de {
+
+
+struct MergeTask {
+ level_index m_source_level;
+ level_index m_target_level;
+ size_t m_size;
+ size_t m_timestamp;
+
+ bool operator<(MergeTask &other) {
+ return m_timestamp < other.m_timestamp;
+ }
+};
+
+
+template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L>
+class Scheduler {
+ typedef ExtensionStructure<R, S, Q, L> Structure;
+public:
+ /*
+ * Memory budget stated in bytes, with 0 meaning unlimited. Likewise, 0 threads means
+ * unlimited.
+ */
+ Scheduler(size_t memory_budget, size_t thread_cnt)
+ : m_memory_budget((memory_budget) ? memory_budget : UINT64_MAX)
+ , m_thread_cnt((thread_cnt) ? thread_cnt : UINT64_MAX)
+ , m_used_memory(0)
+ , m_used_threads(0)
+ {}
+
+ bool schedule_merge(Structure *version, MutableBuffer<R> *buffer) {
+ // FIXME: this is a non-concurrent implementation
+ return version->merge_buffer(buffer);
+ }
+
+private:
+ size_t get_timestamp() {
+ auto ts = m_timestamp.fetch_add(1);
+ return ts;
+ }
+
+ size_t m_memory_budget;
+ size_t m_thread_cnt;
+
+ alignas(64) std::atomic<size_t> m_used_memory;
+ alignas(64) std::atomic<size_t> m_used_threads;
+ alignas(64) std::atomic<size_t> m_timestamp;
+
+ std::priority_queue<MergeTask> m_merge_queue;
+ std::mutex m_merge_queue_lock;
+};
+
+}
diff --git a/tests/internal_level_tests.cpp b/tests/internal_level_tests.cpp
index 9deb485..b0dfacb 100644
--- a/tests/internal_level_tests.cpp
+++ b/tests/internal_level_tests.cpp
@@ -37,11 +37,11 @@ START_TEST(t_memlevel_merge)
ck_assert_int_eq(merging_level->get_record_count(), 100);
auto old_level = base_level;
- base_level = ILevel::merge_levels(old_level, merging_level);
+ auto new_level = ILevel::merge_levels(old_level, merging_level);
delete old_level;
delete merging_level;
- ck_assert_int_eq(base_level->get_record_count(), 200);
+ ck_assert_int_eq(new_level->get_record_count(), 200);
delete base_level;
delete tbl1;