summaryrefslogtreecommitdiffstats
path: root/include/framework/structure/ExtensionStructure.h
diff options
context:
space:
mode:
authorDouglas B. Rumbaugh <dbr4@psu.edu>2024-12-06 13:13:51 -0500
committerGitHub <noreply@github.com>2024-12-06 18:13:51 +0000
commit9fe305c7d28e993e55c55427f377ae7e3251ea4f (patch)
tree384b687f64b84eb81bde2becac8a5f24916b07b4 /include/framework/structure/ExtensionStructure.h
parent47916da2ba5ed5bee2dda3cbcc58d39e1e931bfc (diff)
downloaddynamic-extension-9fe305c7d28e993e55c55427f377ae7e3251ea4f.tar.gz
Interface update (#5)
* Query Interface Adjustments/Refactoring Began the process of adjusting the query interface (and also the shard interface, to a lesser degree) to better accommodate the user. In particular the following changes have been made, 1. The number of necessary template arguments for the query type has been drastically reduced, while also removing the void pointers and manual delete functions from the interface. This was accomplished by requiring many of the sub-types associated with a query (parameters, etc.) to be nested inside the main query class, and by forcing the SHARD type to expose its associated record type. 2. User-defined query return types are now supported. Queries no longer are required to return strictly sets of records. Instead, the query now has LocalResultType and ResultType template parameters (which can be defaulted using a typedef in the Query type itself), allowing much more flexibility. Note that, at least for the short term, the LocalResultType must still expose the same is_deleted/is_tombstone interface as a Wrapped<R> used to, as this is currently needed for delete filtering. A better approach to this is, hopefully, forthcoming. 3. Updated the ISAMTree.h shard and rangequery.h query to use the new interfaces, and adjusted the associated unit tests as well. 4. Dropped the unnecessary "get_data()" function from the ShardInterface concept. 5. Dropped the need to specify a record type in the ShardInterface concept. This is now handled using a required Shard::RECORD member of the Shard class itself, which should expose the name of the record type. * Updates to framework to support new Query/Shard interfaces Pretty extensive adjustments to the framework, particularly to the templates themselves, along with some type-renaming work, to support the new query and shard interfaces. Adjusted the external query interface to take an rvalue reference, rather than a pointer, to the query parameters. * Removed framework-level delete filtering This was causing some issues with the new query interface, and should probably be reworked anyway, so I'm temporarily (TM) removing the feature. * Updated benchmarks + remaining code for new interface
Diffstat (limited to 'include/framework/structure/ExtensionStructure.h')
-rw-r--r--include/framework/structure/ExtensionStructure.h1080
1 files changed, 559 insertions, 521 deletions
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index b83674b..2728246 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -1,8 +1,8 @@
/*
* include/framework/structure/ExtensionStructure.h
*
- * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
- * Dong Xie <dongx@psu.edu>
+ * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
*
* Distributed under the Modified BSD License.
*
@@ -22,622 +22,660 @@
namespace de {
-template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q, LayoutPolicy L=LayoutPolicy::TEIRING>
+template <ShardInterface ShardType, QueryInterface<ShardType> QueryType,
+ LayoutPolicy L = LayoutPolicy::TEIRING>
class ExtensionStructure {
- typedef S Shard;
- typedef BufferView<R> BuffView;
+ typedef typename ShardType::RECORD RecordType;
+ typedef BufferView<RecordType> BuffView;
- typedef struct {
- size_t reccnt;
- size_t reccap;
+ typedef struct {
+ size_t reccnt;
+ size_t reccap;
- size_t shardcnt;
- size_t shardcap;
- } level_state;
+ size_t shardcnt;
+ size_t shardcap;
+ } level_state;
- typedef std::vector<level_state> state_vector;
+ typedef std::vector<level_state> state_vector;
public:
- ExtensionStructure(size_t buffer_size, size_t scale_factor, double max_delete_prop)
- : m_scale_factor(scale_factor)
- , m_max_delete_prop(max_delete_prop)
- , m_buffer_size(buffer_size)
- {}
-
- ~ExtensionStructure() = default;
-
- /*
- * Create a shallow copy of this extension structure. The copy will share
- * references to the same levels/shards as the original, but will have its
- * own lists. As all of the shards are immutable (with the exception of
- * deletes), the copy can be restructured with reconstructions and flushes
- * without affecting the original. The copied structure will be returned
- * with a reference count of 0; generally you will want to immediately call
- * take_reference() on it.
- *
- * NOTE: When using tagged deletes, a delete of a record in the original
- * structure will affect the copy, so long as the copy retains a reference
- * to the same shard as the original. This could cause synchronization
- * problems under tagging with concurrency. Any deletes in this context will
- * need to be forwarded to the appropriate structures manually.
- */
- ExtensionStructure<R, S, Q, L> *copy() {
- auto new_struct = new ExtensionStructure<R, S, Q, L>(m_buffer_size, m_scale_factor,
- m_max_delete_prop);
- for (size_t i=0; i<m_levels.size(); i++) {
- new_struct->m_levels.push_back(m_levels[i]->clone());
- }
-
- new_struct->m_refcnt = 0;
- new_struct->m_current_state = m_current_state;
+ ExtensionStructure(size_t buffer_size, size_t scale_factor,
+ double max_delete_prop)
+ : m_scale_factor(scale_factor), m_max_delete_prop(max_delete_prop),
+ m_buffer_size(buffer_size) {}
+
+ ~ExtensionStructure() = default;
+
+ /*
+ * Create a shallow copy of this extension structure. The copy will share
+ * references to the same levels/shards as the original, but will have its
+ * own lists. As all of the shards are immutable (with the exception of
+ * deletes), the copy can be restructured with reconstructions and flushes
+ * without affecting the original. The copied structure will be returned
+ * with a reference count of 0; generally you will want to immediately call
+ * take_reference() on it.
+ *
+ * NOTE: When using tagged deletes, a delete of a record in the original
+ * structure will affect the copy, so long as the copy retains a reference
+ * to the same shard as the original. This could cause synchronization
+ * problems under tagging with concurrency. Any deletes in this context will
+ * need to be forwarded to the appropriate structures manually.
+ */
+ ExtensionStructure<ShardType, QueryType, L> *copy() {
+ auto new_struct = new ExtensionStructure<ShardType, QueryType, L>(
+ m_buffer_size, m_scale_factor, m_max_delete_prop);
+ for (size_t i = 0; i < m_levels.size(); i++) {
+ new_struct->m_levels.push_back(m_levels[i]->clone());
+ }
- return new_struct;
+ new_struct->m_refcnt = 0;
+ new_struct->m_current_state = m_current_state;
+
+ return new_struct;
+ }
+
+ /*
+ * Search for a record matching the argument and mark it deleted by
+ * setting the delete bit in its wrapped header. Returns 1 if a matching
+ * record was found and deleted, and 0 if a matching record was not found.
+ *
+ * This function will stop after finding the first matching record. It is
+ * assumed that no duplicate records exist. In the case of duplicates, this
+ * function will still "work", but in the sense of "delete first match".
+ */
+ int tagged_delete(const RecordType &rec) {
+ for (auto level : m_levels) {
+ if (level && level->delete_record(rec)) {
+ return 1;
+ }
}
/*
- * Search for a record matching the argument and mark it deleted by
- * setting the delete bit in its wrapped header. Returns 1 if a matching
- * record was found and deleted, and 0 if a matching record was not found.
- *
- * This function will stop after finding the first matching record. It is
- * assumed that no duplicate records exist. In the case of duplicates, this
- * function will still "work", but in the sense of "delete first match".
+ * If the record to be erased wasn't found, return 0. The
+ * DynamicExtension itself will then search the active
+ * Buffers.
*/
- int tagged_delete(const R &rec) {
- for (auto level : m_levels) {
- if (level && level->delete_record(rec)) {
- return 1;
- }
- }
-
- /*
- * If the record to be erased wasn't found, return 0. The
- * DynamicExtension itself will then search the active
- * Buffers.
- */
- return 0;
+ return 0;
+ }
+
+ /*
+ * Flush a buffer into the extension structure, performing any necessary
+ * reconstructions to free up room in L0.
+ *
+ * FIXME: arguably, this should be a method attached to the buffer that
+ * takes a structure as input.
+ */
+ inline bool flush_buffer(BuffView buffer) {
+ state_vector tmp = m_current_state;
+
+ if (tmp.size() == 0) {
+ grow(tmp);
}
- /*
- * Flush a buffer into the extension structure, performing any necessary
- * reconstructions to free up room in L0.
- *
- * FIXME: arguably, this should be a method attached to the buffer that
- * takes a structure as input.
- */
- inline bool flush_buffer(BuffView buffer) {
- state_vector tmp = m_current_state;
+ assert(can_reconstruct_with(0, buffer.get_record_count(), tmp));
+ flush_buffer_into_l0(std::move(buffer));
- if (tmp.size() == 0) {
- grow(tmp);
- }
+ return true;
+ }
- assert(can_reconstruct_with(0, buffer.get_record_count(), tmp));
- flush_buffer_into_l0(std::move(buffer));
+ /*
+ * Return the total number of records (including tombstones) within all
+ * of the levels of the structure.
+ */
+ size_t get_record_count() {
+ size_t cnt = 0;
- return true;
+ for (size_t i = 0; i < m_levels.size(); i++) {
+ if (m_levels[i])
+ cnt += m_levels[i]->get_record_count();
}
- /*
- * Return the total number of records (including tombstones) within all
- * of the levels of the structure.
- */
- size_t get_record_count() {
- size_t cnt = 0;
+ return cnt;
+ }
- for (size_t i=0; i<m_levels.size(); i++) {
- if (m_levels[i]) cnt += m_levels[i]->get_record_count();
- }
+ /*
+ * Return the total number of tombstones contained within all of the
+ * levels of the structure.
+ */
+ size_t get_tombstone_count() {
+ size_t cnt = 0;
- return cnt;
+ for (size_t i = 0; i < m_levels.size(); i++) {
+ if (m_levels[i])
+ cnt += m_levels[i]->get_tombstone_count();
}
- /*
- * Return the total number of tombstones contained within all of the
- * levels of the structure.
- */
- size_t get_tombstone_count() {
- size_t cnt = 0;
-
- for (size_t i=0; i<m_levels.size(); i++) {
- if (m_levels[i]) cnt += m_levels[i]->get_tombstone_count();
- }
-
- return cnt;
+ return cnt;
+ }
+
+ /*
+ * Return the number of levels within the structure. Note that not
+ * all of these levels are necessarily populated.
+ */
+ size_t get_height() { return m_levels.size(); }
+
+ /*
+ * Return the amount of memory (in bytes) used by the shards within the
+ * structure for storing the primary data structure and raw data.
+ */
+ size_t get_memory_usage() {
+ size_t cnt = 0;
+ for (size_t i = 0; i < m_levels.size(); i++) {
+ if (m_levels[i])
+ cnt += m_levels[i]->get_memory_usage();
}
- /*
- * Return the number of levels within the structure. Note that not
- * all of these levels are necessarily populated.
- */
- size_t get_height() {
- return m_levels.size();
+ return cnt;
+ }
+
+ /*
+ * Return the amount of memory (in bytes) used by the shards within the
+ * structure for storing auxiliary data structures. This total does not
+ * include memory used for the main data structure, or raw data.
+ */
+ size_t get_aux_memory_usage() {
+ size_t cnt = 0;
+ for (size_t i = 0; i < m_levels.size(); i++) {
+ if (m_levels[i]) {
+ cnt += m_levels[i]->get_aux_memory_usage();
+ }
}
- /*
- * Return the amount of memory (in bytes) used by the shards within the
- * structure for storing the primary data structure and raw data.
- */
- size_t get_memory_usage() {
- size_t cnt = 0;
- for (size_t i=0; i<m_levels.size(); i++) {
- if (m_levels[i]) cnt += m_levels[i]->get_memory_usage();
+ return cnt;
+ }
+
+ /*
+ * Validate that no level in the structure exceeds its maximum tombstone
+ * capacity. This is used to trigger preemptive compactions at the end of
+ * the reconstruction process.
+ */
+ bool validate_tombstone_proportion() {
+ long double ts_prop;
+ for (size_t i = 0; i < m_levels.size(); i++) {
+ if (m_levels[i]) {
+ ts_prop = (long double)m_levels[i]->get_tombstone_count() /
+ (long double)calc_level_record_capacity(i);
+ if (ts_prop > (long double)m_max_delete_prop) {
+ return false;
}
-
- return cnt;
+ }
}
- /*
- * Return the amount of memory (in bytes) used by the shards within the
- * structure for storing auxiliary data structures. This total does not
- * include memory used for the main data structure, or raw data.
+ return true;
+ }
+
+ bool validate_tombstone_proportion(level_index level) {
+ long double ts_prop = (long double)m_levels[level]->get_tombstone_count() /
+ (long double)calc_level_record_capacity(level);
+ return ts_prop <= (long double)m_max_delete_prop;
+ }
+
+ /*
+ * Return a reference to the underlying vector of levels within the
+ * structure.
+ */
+ std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> &
+ get_levels() {
+ return m_levels;
+ }
+
+ /*
+ * NOTE: This cannot be simulated, because tombstone cancellation is not
+ * cheaply predictable. It is possible that the worst case number could
+ * be used instead, to allow for prediction, but compaction isn't a
+ * major concern outside of sampling; at least for now. So I'm not
+ * going to focus too much time on it at the moment.
+ */
+ ReconstructionVector get_compaction_tasks() {
+ ReconstructionVector tasks;
+ state_vector scratch_state = m_current_state;
+
+ /* if the tombstone/delete invariant is satisfied, no need for compactions
*/
- size_t get_aux_memory_usage() {
- size_t cnt = 0;
- for (size_t i=0; i<m_levels.size(); i++) {
- if (m_levels[i]) {
- cnt += m_levels[i]->get_aux_memory_usage();
- }
- }
-
- return cnt;
+ if (validate_tombstone_proportion()) {
+ return tasks;
}
- /*
- * Validate that no level in the structure exceeds its maximum tombstone
- * capacity. This is used to trigger preemptive compactions at the end of
- * the reconstruction process.
- */
- bool validate_tombstone_proportion() {
- long double ts_prop;
- for (size_t i = 0; i < m_levels.size(); i++) {
- if (m_levels[i]) {
- ts_prop = (long double)m_levels[i]->get_tombstone_count() /
- (long double)calc_level_record_capacity(i);
- if (ts_prop > (long double)m_max_delete_prop) {
- return false;
- }
- }
+ /* locate the first level to violate the invariant */
+ level_index violation_idx = -1;
+ for (level_index i = 0; i < m_levels.size(); i++) {
+ if (!validate_tombstone_proportion(i)) {
+ violation_idx = i;
+ break;
}
-
- return true;
}
- bool validate_tombstone_proportion(level_index level) {
- long double ts_prop = (long double) m_levels[level]->get_tombstone_count() / (long double) calc_level_record_capacity(level);
- return ts_prop <= (long double) m_max_delete_prop;
- }
+ assert(violation_idx != -1);
- /*
- * Return a reference to the underlying vector of levels within the
- * structure.
- */
- std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> &get_levels() {
- return m_levels;
+ level_index base_level =
+ find_reconstruction_target(violation_idx, scratch_state);
+ if (base_level == -1) {
+ base_level = grow(scratch_state);
}
- /*
- * NOTE: This cannot be simulated, because tombstone cancellation is not
- * cheaply predictable. It is possible that the worst case number could
- * be used instead, to allow for prediction, but compaction isn't a
- * major concern outside of sampling; at least for now. So I'm not
- * going to focus too much time on it at the moment.
- */
- ReconstructionVector get_compaction_tasks() {
- ReconstructionVector tasks;
- state_vector scratch_state = m_current_state;
-
- /* if the tombstone/delete invariant is satisfied, no need for compactions */
- if (validate_tombstone_proportion()) {
- return tasks;
- }
-
- /* locate the first level to violate the invariant */
- level_index violation_idx = -1;
- for (level_index i=0; i<m_levels.size(); i++) {
- if (!validate_tombstone_proportion(i)) {
- violation_idx = i;
- break;
- }
- }
-
- assert(violation_idx != -1);
-
- level_index base_level = find_reconstruction_target(violation_idx, scratch_state);
- if (base_level == -1) {
- base_level = grow(scratch_state);
- }
-
- for (level_index i=base_level; i>0; i--) {
- /*
- * The amount of storage required for the reconstruction accounts
- * for the cost of storing the new records, along with the
- * cost of retaining the old records during the process
- * (hence the 2x multiplier).
- *
- * FIXME: currently does not account for the *actual* size
- * of the shards, only the storage for the records
- * themselves.
- */
- size_t reccnt = m_levels[i - 1]->get_record_count();
- if constexpr (L == LayoutPolicy::LEVELING) {
- if (can_reconstruct_with(i, reccnt, scratch_state)) {
- reccnt += m_levels[i]->get_record_count();
- }
- }
- tasks.add_reconstruction(i-i, i, reccnt);
+ for (level_index i = base_level; i > 0; i--) {
+ /*
+ * The amount of storage required for the reconstruction accounts
+ * for the cost of storing the new records, along with the
+ * cost of retaining the old records during the process
+ * (hence the 2x multiplier).
+ *
+ * FIXME: currently does not account for the *actual* size
+ * of the shards, only the storage for the records
+ * themselves.
+ */
+ size_t reccnt = m_levels[i - 1]->get_record_count();
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ if (can_reconstruct_with(i, reccnt, scratch_state)) {
+ reccnt += m_levels[i]->get_record_count();
}
-
- return tasks;
+ }
+ tasks.add_reconstruction(i - i, i, reccnt);
}
+ return tasks;
+ }
+
+ /*
+ *
+ */
+ ReconstructionVector
+ get_reconstruction_tasks(size_t buffer_reccnt,
+ state_vector scratch_state = {}) {
/*
- *
+ * If no scratch state vector is provided, use a copy of the
+ * current one. The only time an empty vector could be used as
+ * *real* input to this function is when the current state is also
+ * empty, so this should would even in that case.
*/
- ReconstructionVector get_reconstruction_tasks(size_t buffer_reccnt,
- state_vector scratch_state={}) {
- /*
- * If no scratch state vector is provided, use a copy of the
- * current one. The only time an empty vector could be used as
- * *real* input to this function is when the current state is also
- * empty, so this should would even in that case.
- */
- if (scratch_state.size() == 0) {
- scratch_state = m_current_state;
- }
-
- ReconstructionVector reconstructions;
- size_t LOOKAHEAD = 1;
- for (size_t i=0; i<LOOKAHEAD; i++) {
- /*
- * If L0 cannot support a direct buffer flush, figure out what
- * work must be done to free up space first. Otherwise, the
- * reconstruction vector will be initially empty.
- */
- if (!can_reconstruct_with(0, buffer_reccnt, scratch_state)) {
- auto local_recon = get_reconstruction_tasks_from_level(0, scratch_state);
-
- /*
- * for the first iteration, we need to do all of the
- * reconstructions, so use these to initially the returned
- * reconstruction list
- */
- if (i == 0) {
- reconstructions = local_recon;
- /*
- * Quick sanity test of idea: if the next reconstruction
- * would be larger than this one, steal the largest
- * task from it and run it now instead.
- */
- } else if (local_recon.get_total_reccnt() > reconstructions.get_total_reccnt()) {
- auto t = local_recon.remove_reconstruction(0);
- reconstructions.add_reconstruction(t);
- }
- }
-
- /* simulate the buffer flush in the scratch state */
- scratch_state[0].reccnt += buffer_reccnt;
- if (L == LayoutPolicy::TEIRING || scratch_state[0].shardcnt == 0) {
- scratch_state[0].shardcnt += 1;
- }
-
- }
-
- return std::move(reconstructions);
+ if (scratch_state.size() == 0) {
+ scratch_state = m_current_state;
}
-
- /*
- *
- */
- ReconstructionVector get_reconstruction_tasks_from_level(level_index source_level, state_vector &scratch_state) {
- ReconstructionVector reconstructions;
+ ReconstructionVector reconstructions;
+ size_t LOOKAHEAD = 1;
+ for (size_t i = 0; i < LOOKAHEAD; i++) {
+ /*
+ * If L0 cannot support a direct buffer flush, figure out what
+ * work must be done to free up space first. Otherwise, the
+ * reconstruction vector will be initially empty.
+ */
+ if (!can_reconstruct_with(0, buffer_reccnt, scratch_state)) {
+ auto local_recon =
+ get_reconstruction_tasks_from_level(0, scratch_state);
/*
- * Find the first level capable of sustaining a reconstruction from
- * the level above it. If no such level exists, add a new one at
- * the bottom of the structure.
+ * for the first iteration, we need to do all of the
+ * reconstructions, so use these to initially the returned
+ * reconstruction list
*/
- level_index base_level = find_reconstruction_target(source_level, scratch_state);
- if (base_level == -1) {
- base_level = grow(scratch_state);
+ if (i == 0) {
+ reconstructions = local_recon;
+ /*
+ * Quick sanity test of idea: if the next reconstruction
+ * would be larger than this one, steal the largest
+ * task from it and run it now instead.
+ */
+ } else if (local_recon.get_total_reccnt() >
+ reconstructions.get_total_reccnt()) {
+ auto t = local_recon.remove_reconstruction(0);
+ reconstructions.add_reconstruction(t);
}
+ }
- if constexpr (L == LayoutPolicy::BSM) {
- if (base_level == 0) {
- return std::move(reconstructions);
- }
-
- ReconstructionTask task;
- task.target = base_level;
-
- size_t base_reccnt = 0;
- for (level_index i=base_level; i>source_level; i--) {
- auto recon_reccnt = scratch_state[i-1].reccnt;
- base_reccnt += recon_reccnt;
- scratch_state[i-1].reccnt = 0;
- scratch_state[i-1].shardcnt = 0;
- task.add_source(i-1, recon_reccnt);
- }
+ /* simulate the buffer flush in the scratch state */
+ scratch_state[0].reccnt += buffer_reccnt;
+ if (L == LayoutPolicy::TEIRING || scratch_state[0].shardcnt == 0) {
+ scratch_state[0].shardcnt += 1;
+ }
+ }
- reconstructions.add_reconstruction(task);
- scratch_state[base_level].reccnt = base_reccnt;
- scratch_state[base_level].shardcnt = 1;
+ return reconstructions;
+ }
- return std::move(reconstructions);
- }
+ /*
+ *
+ */
+ ReconstructionVector
+ get_reconstruction_tasks_from_level(level_index source_level,
+ state_vector &scratch_state) {
+ ReconstructionVector reconstructions;
- /*
- * Determine the full set of reconstructions necessary to open up
- * space in the source level.
- */
- for (level_index i=base_level; i>source_level; i--) {
- size_t recon_reccnt = scratch_state[i-1].reccnt;
- size_t base_reccnt = recon_reccnt;
-
- /*
- * If using Leveling, the total reconstruction size will be the
- * records in *both* base and target, because they will need to
- * be merged (assuming that target isn't empty).
- */
- if constexpr (L == LayoutPolicy::LEVELING) {
- if (can_reconstruct_with(i, base_reccnt, scratch_state)) {
- recon_reccnt += scratch_state[i].reccnt;
- }
- }
- reconstructions.add_reconstruction(i-1, i, recon_reccnt);
-
- /*
- * The base level will be emptied and its records moved to
- * the target.
- */
- scratch_state[i-1].reccnt = 0;
- scratch_state[i-1].shardcnt = 0;
-
- /*
- * The target level will have the records from the base level
- * added to it, and potentially gain a shard if the LayoutPolicy
- * is tiering or the level currently lacks any shards at all.
- */
- scratch_state[i].reccnt += base_reccnt;
- if (L == LayoutPolicy::TEIRING || scratch_state[i].shardcnt == 0) {
- scratch_state[i].shardcnt += 1;
- }
- }
-
- return std::move(reconstructions);
+ /*
+ * Find the first level capable of sustaining a reconstruction from
+ * the level above it. If no such level exists, add a new one at
+ * the bottom of the structure.
+ */
+ level_index base_level =
+ find_reconstruction_target(source_level, scratch_state);
+ if (base_level == -1) {
+ base_level = grow(scratch_state);
}
- inline void reconstruction(ReconstructionTask task) {
- static_assert(L == LayoutPolicy::BSM);
- std::vector<InternalLevel<R, Shard, Q>*> levels(task.sources.size());
- for (size_t i=0; i<task.sources.size(); i++) {
- levels[i] = m_levels[task.sources[i]].get();
- }
+ if constexpr (L == LayoutPolicy::BSM) {
+ if (base_level == 0) {
+ return reconstructions;
+ }
- auto new_level = InternalLevel<R, Shard, Q>::reconstruction(levels, task.target);
- if (task.target >= m_levels.size()) {
- m_current_state.push_back({new_level->get_record_count(), calc_level_record_capacity(task.target),
- 1, 1});
- m_levels.emplace_back(new_level);
- } else {
- m_current_state[task.target] = {new_level->get_record_count(), calc_level_record_capacity(task.target),
- 1, 1};
- m_levels[task.target] = new_level;
- }
+ ReconstructionTask task;
+ task.target = base_level;
- /* remove all of the levels that have been flattened */
- for (size_t i=0; i<task.sources.size(); i++) {
- m_levels[task.sources[i]] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(task.sources[i], 1));
- m_current_state[task.sources[i]] = {0, calc_level_record_capacity(task.target), 0, 1};
- }
+ size_t base_reccnt = 0;
+ for (level_index i = base_level; i > source_level; i--) {
+ auto recon_reccnt = scratch_state[i - 1].reccnt;
+ base_reccnt += recon_reccnt;
+ scratch_state[i - 1].reccnt = 0;
+ scratch_state[i - 1].shardcnt = 0;
+ task.add_source(i - 1, recon_reccnt);
+ }
+
+ reconstructions.add_reconstruction(task);
+ scratch_state[base_level].reccnt = base_reccnt;
+ scratch_state[base_level].shardcnt = 1;
- return;
+ return reconstructions;
}
/*
- * Combine incoming_level with base_level and reconstruct the shard,
- * placing it in base_level. The two levels should be sequential--i.e. no
- * levels are skipped in the reconstruction process--otherwise the
- * tombstone ordering invariant may be violated.
+ * Determine the full set of reconstructions necessary to open up
+ * space in the source level.
*/
- inline void reconstruction(level_index base_level, level_index incoming_level) {
- size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor;
-
- if (base_level >= m_levels.size()) {
- m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(base_level, shard_capacity)));
- m_current_state.push_back({0, calc_level_record_capacity(base_level),
- 0, shard_capacity});
- }
-
- if constexpr (L == LayoutPolicy::LEVELING) {
- /* if the base level has a shard, merge the base and incoming together to make a new one */
- if (m_levels[base_level]->get_shard_count() > 0) {
- m_levels[base_level] = InternalLevel<R, Shard, Q>::reconstruction(m_levels[base_level].get(), m_levels[incoming_level].get());
- /* otherwise, we can just move the incoming to the base */
- } else {
- m_levels[base_level] = m_levels[incoming_level];
- }
-
- } else {
- m_levels[base_level]->append_level(m_levels[incoming_level].get());
- m_levels[base_level]->finalize();
+ for (level_index i = base_level; i > source_level; i--) {
+ size_t recon_reccnt = scratch_state[i - 1].reccnt;
+ size_t base_reccnt = recon_reccnt;
+
+ /*
+ * If using Leveling, the total reconstruction size will be the
+ * records in *both* base and target, because they will need to
+ * be merged (assuming that target isn't empty).
+ */
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ if (can_reconstruct_with(i, base_reccnt, scratch_state)) {
+ recon_reccnt += scratch_state[i].reccnt;
}
+ }
+ reconstructions.add_reconstruction(i - 1, i, recon_reccnt);
+
+ /*
+ * The base level will be emptied and its records moved to
+ * the target.
+ */
+ scratch_state[i - 1].reccnt = 0;
+ scratch_state[i - 1].shardcnt = 0;
+
+ /*
+ * The target level will have the records from the base level
+ * added to it, and potentially gain a shard if the LayoutPolicy
+ * is tiering or the level currently lacks any shards at all.
+ */
+ scratch_state[i].reccnt += base_reccnt;
+ if (L == LayoutPolicy::TEIRING || scratch_state[i].shardcnt == 0) {
+ scratch_state[i].shardcnt += 1;
+ }
+ }
- /* place a new, empty level where the incoming level used to be */
- m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor));
+ return reconstructions;
+ }
- /*
- * Update the state vector to match the *real* state following
- * the reconstruction
- */
- m_current_state[base_level] = {m_levels[base_level]->get_record_count(),
- calc_level_record_capacity(base_level), m_levels[base_level]->get_shard_count(), shard_capacity};
- m_current_state[incoming_level] = {0, calc_level_record_capacity(incoming_level), 0, shard_capacity};
+ inline void reconstruction(ReconstructionTask task) {
+ static_assert(L == LayoutPolicy::BSM);
+ std::vector<InternalLevel<ShardType, QueryType> *> levels(
+ task.sources.size());
+ for (size_t i = 0; i < task.sources.size(); i++) {
+ levels[i] = m_levels[task.sources[i]].get();
}
- bool take_reference() {
- m_refcnt.fetch_add(1);
- return true;
+ auto new_level = InternalLevel<ShardType, QueryType>::reconstruction(
+ levels, task.target);
+ if (task.target >= m_levels.size()) {
+ m_current_state.push_back({new_level->get_record_count(),
+ calc_level_record_capacity(task.target), 1,
+ 1});
+ m_levels.emplace_back(new_level);
+ } else {
+ m_current_state[task.target] = {new_level->get_record_count(),
+ calc_level_record_capacity(task.target),
+ 1, 1};
+ m_levels[task.target] = new_level;
}
- bool release_reference() {
- assert(m_refcnt.load() > 0);
- m_refcnt.fetch_add(-1);
- return true;
+ /* remove all of the levels that have been flattened */
+ for (size_t i = 0; i < task.sources.size(); i++) {
+ m_levels[task.sources[i]] =
+ std::shared_ptr<InternalLevel<ShardType, QueryType>>(
+ new InternalLevel<ShardType, QueryType>(task.sources[i], 1));
+ m_current_state[task.sources[i]] = {
+ 0, calc_level_record_capacity(task.target), 0, 1};
}
- size_t get_reference_count() {
- return m_refcnt.load();
+ return;
+ }
+
+ /*
+ * Combine incoming_level with base_level and reconstruct the shard,
+ * placing it in base_level. The two levels should be sequential--i.e. no
+ * levels are skipped in the reconstruction process--otherwise the
+ * tombstone ordering invariant may be violated.
+ */
+ inline void reconstruction(level_index base_level,
+ level_index incoming_level) {
+ size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor;
+
+ if (base_level >= m_levels.size()) {
+ m_levels.emplace_back(
+ std::shared_ptr<InternalLevel<ShardType, QueryType>>(
+ new InternalLevel<ShardType, QueryType>(base_level,
+ shard_capacity)));
+ m_current_state.push_back(
+ {0, calc_level_record_capacity(base_level), 0, shard_capacity});
}
- std::vector<void *> get_query_states(std::vector<std::pair<ShardID, Shard*>> &shards, void *parms) {
- std::vector<void*> states;
-
- for (auto &level : m_levels) {
- level->get_query_states(shards, states, parms);
- }
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ /* if the base level has a shard, merge the base and incoming together to
+ * make a new one */
+ if (m_levels[base_level]->get_shard_count() > 0) {
+ m_levels[base_level] =
+ InternalLevel<ShardType, QueryType>::reconstruction(
+ m_levels[base_level].get(), m_levels[incoming_level].get());
+ /* otherwise, we can just move the incoming to the base */
+ } else {
+ m_levels[base_level] = m_levels[incoming_level];
+ }
- return states;
+ } else {
+ m_levels[base_level]->append_level(m_levels[incoming_level].get());
+ m_levels[base_level]->finalize();
}
-private:
- size_t m_scale_factor;
- double m_max_delete_prop;
- size_t m_buffer_size;
-
- std::atomic<size_t> m_refcnt;
-
- std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> m_levels;
-
- /*
- * A pair of <record_count, shard_count> for each level in the
- * structure. Record counts may be slightly inaccurate due to
- * deletes.
- */
- state_vector m_current_state;
+ /* place a new, empty level where the incoming level used to be */
+ m_levels[incoming_level] =
+ std::shared_ptr<InternalLevel<ShardType, QueryType>>(
+ new InternalLevel<ShardType, QueryType>(
+ incoming_level,
+ (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor));
/*
- * Add a new level to the scratch state and return its index.
- *
- * IMPORTANT: This does _not_ add a level to the extension structure
- * anymore. This is handled by the appropriate reconstruction and flush
- * methods as needed. This function is for use in "simulated"
- * reconstructions.
+ * Update the state vector to match the *real* state following
+ * the reconstruction
*/
- inline level_index grow(state_vector &scratch_state) {
- level_index new_idx = m_levels.size();
- size_t new_shard_cap = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor;
-
- scratch_state.push_back({0, calc_level_record_capacity(new_idx),
- 0, new_shard_cap});
- return new_idx;
+ m_current_state[base_level] = {m_levels[base_level]->get_record_count(),
+ calc_level_record_capacity(base_level),
+ m_levels[base_level]->get_shard_count(),
+ shard_capacity};
+ m_current_state[incoming_level] = {
+ 0, calc_level_record_capacity(incoming_level), 0, shard_capacity};
+ }
+
+ bool take_reference() {
+ m_refcnt.fetch_add(1);
+ return true;
+ }
+
+ bool release_reference() {
+ assert(m_refcnt.load() > 0);
+ m_refcnt.fetch_add(-1);
+ return true;
+ }
+
+ size_t get_reference_count() { return m_refcnt.load(); }
+
+ std::vector<typename QueryType::LocalQuery *>
+ get_local_queries(std::vector<std::pair<ShardID, ShardType *>> &shards,
+ typename QueryType::Parameters *parms) {
+
+ std::vector<typename QueryType::LocalQuery *> queries;
+
+ for (auto &level : m_levels) {
+ level->get_local_queries(shards, queries, parms);
}
- /*
- * Find the first level below the level indicated by idx that
- * is capable of sustaining a reconstruction and return its
- * level index. If no such level exists, returns -1. Also
- * returns -1 if idx==0, and no such level exists, to simplify
- * the logic of the first buffer flush.
- */
- inline level_index find_reconstruction_target(level_index idx, state_vector &state) {
+ return queries;
+ }
- /*
- * this handles the very first buffer flush, when the state vector
- * is empty.
- */
- if (idx == 0 && state.size() == 0) return -1;
+private:
+ size_t m_scale_factor;
+ double m_max_delete_prop;
+ size_t m_buffer_size;
+
+ std::atomic<size_t> m_refcnt;
+
+ std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> m_levels;
+
+ /*
+ * A pair of <record_count, shard_count> for each level in the
+ * structure. Record counts may be slightly inaccurate due to
+ * deletes.
+ */
+ state_vector m_current_state;
+
+ /*
+ * Add a new level to the scratch state and return its index.
+ *
+ * IMPORTANT: This does _not_ add a level to the extension structure
+ * anymore. This is handled by the appropriate reconstruction and flush
+ * methods as needed. This function is for use in "simulated"
+ * reconstructions.
+ */
+ inline level_index grow(state_vector &scratch_state) {
+ level_index new_idx = m_levels.size();
+ size_t new_shard_cap = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor;
+
+ scratch_state.push_back(
+ {0, calc_level_record_capacity(new_idx), 0, new_shard_cap});
+ return new_idx;
+ }
+
+ /*
+ * Find the first level below the level indicated by idx that
+ * is capable of sustaining a reconstruction and return its
+ * level index. If no such level exists, returns -1. Also
+ * returns -1 if idx==0, and no such level exists, to simplify
+ * the logic of the first buffer flush.
+ */
+ inline level_index find_reconstruction_target(level_index idx,
+ state_vector &state) {
- size_t incoming_rec_cnt = state[idx].reccnt;
- for (level_index i=idx+1; i<state.size(); i++) {
- if (can_reconstruct_with(i, incoming_rec_cnt, state)) {
- return i;
- }
+ /*
+ * this handles the very first buffer flush, when the state vector
+ * is empty.
+ */
+ if (idx == 0 && state.size() == 0)
+ return -1;
- incoming_rec_cnt = state[idx].reccnt;
- }
+ size_t incoming_rec_cnt = state[idx].reccnt;
+ for (level_index i = idx + 1; i < state.size(); i++) {
+ if (can_reconstruct_with(i, incoming_rec_cnt, state)) {
+ return i;
+ }
- return -1;
+ incoming_rec_cnt = state[idx].reccnt;
}
- inline void flush_buffer_into_l0(BuffView buffer) {
- size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor;
-
- if (m_levels.size() == 0) {
- m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(0, shard_capacity)));
+ return -1;
+ }
- m_current_state.push_back({0, calc_level_record_capacity(0),
- 0, shard_capacity});
- }
+ inline void flush_buffer_into_l0(BuffView buffer) {
+ size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor;
- if constexpr (L == LayoutPolicy::LEVELING) {
- // FIXME: Kludgey implementation due to interface constraints.
- auto old_level = m_levels[0].get();
- auto temp_level = new InternalLevel<R, Shard, Q>(0, 1);
- temp_level->append_buffer(std::move(buffer));
-
- if (old_level->get_shard_count() > 0) {
- m_levels[0] = InternalLevel<R, Shard, Q>::reconstruction(old_level, temp_level);
- delete temp_level;
- } else {
- m_levels[0] = std::shared_ptr<InternalLevel<R, Shard, Q>>(temp_level);
- }
- } else {
- m_levels[0]->append_buffer(std::move(buffer));
- }
+ if (m_levels.size() == 0) {
+ m_levels.emplace_back(
+ std::shared_ptr<InternalLevel<ShardType, QueryType>>(
+ new InternalLevel<ShardType, QueryType>(0, shard_capacity)));
- /* update the state vector */
- m_current_state[0].reccnt = m_levels[0]->get_record_count();
- m_current_state[0].shardcnt = m_levels[0]->get_shard_count();
+ m_current_state.push_back(
+ {0, calc_level_record_capacity(0), 0, shard_capacity});
}
- /*
- * Mark a given memory level as no-longer in use by the tree. For now this
- * will just free the level. In future, this will be more complex as the
- * level may not be able to immediately be deleted, depending upon who
- * else is using it.
- */
- inline void mark_as_unused(std::shared_ptr<InternalLevel<R, Shard, Q>> level) {
- level.reset();
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ // FIXME: Kludgey implementation due to interface constraints.
+ auto old_level = m_levels[0].get();
+ auto temp_level = new InternalLevel<ShardType, QueryType>(0, 1);
+ temp_level->append_buffer(std::move(buffer));
+
+ if (old_level->get_shard_count() > 0) {
+ m_levels[0] = InternalLevel<ShardType, QueryType>::reconstruction(
+ old_level, temp_level);
+ delete temp_level;
+ } else {
+ m_levels[0] =
+ std::shared_ptr<InternalLevel<ShardType, QueryType>>(temp_level);
+ }
+ } else {
+ m_levels[0]->append_buffer(std::move(buffer));
}
- /*
- * Assume that level "0" should be larger than the buffer. The buffer
- * itself is index -1, which should return simply the buffer capacity.
- */
- inline size_t calc_level_record_capacity(level_index idx) {
- return m_buffer_size * pow(m_scale_factor, idx+1);
+ /* update the state vector */
+ m_current_state[0].reccnt = m_levels[0]->get_record_count();
+ m_current_state[0].shardcnt = m_levels[0]->get_shard_count();
+ }
+
+ /*
+ * Mark a given memory level as no-longer in use by the tree. For now this
+ * will just free the level. In future, this will be more complex as the
+ * level may not be able to immediately be deleted, depending upon who
+ * else is using it.
+ */
+ inline void
+ mark_as_unused(std::shared_ptr<InternalLevel<ShardType, QueryType>> level) {
+ level.reset();
+ }
+
+ /*
+ * Assume that level "0" should be larger than the buffer. The buffer
+ * itself is index -1, which should return simply the buffer capacity.
+ */
+ inline size_t calc_level_record_capacity(level_index idx) {
+ return m_buffer_size * pow(m_scale_factor, idx + 1);
+ }
+
+ /*
+ * Returns the number of records present on a specified level.
+ */
+ inline size_t get_level_record_count(level_index idx) {
+ return (m_levels[idx]) ? m_levels[idx]->get_record_count() : 0;
+ }
+
+ /*
+ * Determines if a level can sustain a reconstruction with incoming_rec_cnt
+ * additional records without exceeding its capacity.
+ */
+ inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt,
+ state_vector &state) {
+ if (idx >= state.size()) {
+ return false;
}
- /*
- * Returns the number of records present on a specified level.
- */
- inline size_t get_level_record_count(level_index idx) {
- return (m_levels[idx]) ? m_levels[idx]->get_record_count() : 0;
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ return state[idx].reccnt + incoming_rec_cnt <= state[idx].reccap;
+ } else if constexpr (L == LayoutPolicy::BSM) {
+ return state[idx].reccnt == 0;
+ } else {
+ return state[idx].shardcnt < state[idx].shardcap;
}
- /*
- * Determines if a level can sustain a reconstruction with incoming_rec_cnt
- * additional records without exceeding its capacity.
- */
- inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt, state_vector &state) {
- if (idx >= state.size()) {
- return false;
- }
-
- if constexpr (L == LayoutPolicy::LEVELING) {
- return state[idx].reccnt + incoming_rec_cnt <= state[idx].reccap;
- } else if constexpr (L == LayoutPolicy::BSM) {
- return state[idx].reccnt == 0;
- } else {
- return state[idx].shardcnt < state[idx].shardcap;
- }
-
- /* unreachable */
- assert(true);
- }
+ /* unreachable */
+ assert(true);
+ }
};
-}
-
+} // namespace de