From cf5f3bbb0cb58430ed68ad3ebfcefc009e553d71 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Thu, 25 Sep 2025 14:42:44 -0400 Subject: Code reformatting --- include/framework/DynamicExtension.h | 73 ++- include/framework/QueryRequirements.h | 6 +- include/framework/ShardRequirements.h | 6 +- include/framework/interface/Query.h | 176 +++--- include/framework/interface/Scheduler.h | 2 +- include/framework/interface/Shard.h | 5 +- include/framework/reconstruction/BSMPolicy.h | 12 +- .../reconstruction/BackgroundTieringPolicy.h | 38 +- include/framework/reconstruction/CompactOnFull.h | 37 +- .../reconstruction/FixedShardCountPolicy.h | 30 +- include/framework/reconstruction/FloodL0Policy.h | 3 +- include/framework/reconstruction/LevelingPolicy.h | 10 +- .../reconstruction/ReconstructionPolicy.h | 19 +- include/framework/reconstruction/TieringPolicy.h | 26 +- include/framework/scheduling/Epoch.h | 4 +- include/framework/scheduling/FIFOScheduler.h | 4 +- include/framework/scheduling/LockManager.h | 20 +- include/framework/scheduling/SerialScheduler.h | 3 +- include/framework/scheduling/Task.h | 11 +- include/framework/scheduling/Version.h | 20 +- include/framework/scheduling/statistics.h | 65 ++- include/framework/structure/BufferView.h | 11 +- include/framework/structure/ExtensionStructure.h | 139 ++--- include/framework/structure/InternalLevel.h | 26 +- include/framework/structure/MutableBuffer.h | 31 +- include/framework/util/Configuration.h | 53 +- include/query/irs.h | 10 +- include/query/pointlookup.h | 14 +- include/query/rangecount.h | 12 +- include/query/wss.h | 14 +- include/shard/Alias.h | 259 ++++----- include/shard/FSTrie.h | 282 +++++----- include/shard/ISAMTree.h | 4 +- include/shard/LoudsPatricia.h | 278 +++++----- include/shard/PGM.h | 451 ++++++++------- include/shard/TrieSpline.h | 503 +++++++++-------- include/shard/VPTree.h | 605 ++++++++++----------- include/util/bf_config.h | 4 +- include/util/types.h | 24 +- 39 files changed, 1626 insertions(+), 1664 deletions(-) diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 6cfe72b..63264a0 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -70,7 +70,8 @@ public: * for various configuration parameters in the system. See * include/framework/util/Configuration.h for details. */ - DynamicExtension(ConfType &&config, double insertion_rate=1.0) : m_config(std::move(config)) { + DynamicExtension(ConfType &&config, double insertion_rate = 1.0) + : m_config(std::move(config)) { m_buffer = std::make_unique(m_config.buffer_flush_trigger, m_config.buffer_size); @@ -426,13 +427,10 @@ private: alignas(64) std::atomic m_scheduled_records; - enum reconstruction_phase { - RECON_SCHEDULED = 1, - RECON_ENDED = -1 - }; + enum reconstruction_phase { RECON_SCHEDULED = 1, RECON_ENDED = -1 }; - - void update_insertion_delay(long runtime, size_t reccnt, reconstruction_phase phase) { + void update_insertion_delay(long runtime, size_t reccnt, + reconstruction_phase phase) { if (!m_config.dynamic_ratelimiting) { return; @@ -441,28 +439,19 @@ private: m_scheduled_reconstruction_time.fetch_add(runtime * phase); m_scheduled_records.fetch_add(reccnt * phase); - long time_per_record = (m_scheduled_records.load()) - ? m_scheduled_reconstruction_time.load() / - m_scheduled_records.load() - : 0; + size_t total_reccnt = get_record_count(); + + long time_per_record = + (total_reccnt) ? m_scheduled_reconstruction_time.load() / total_reccnt + : 0; long us = time_per_record / 1000; - long excess_ns = time_per_record - us*1000; + long excess_ns = time_per_record - us * 1000; m_stall_us.store(us); size_t records_to_1us = (excess_ns) ? 1000 / excess_ns : 0; - m_insertion_rate.store(1.0 - 1.0 / (double) records_to_1us ); - - fprintf(stderr, "[%d] Updated delay: %ld and %ld\n", phase, us, excess_ns); - - if (runtime == 0) { - fprintf(stderr, "\t[W] Predicted runtime change of 0\n"); - } - - if (reccnt == 0) { - fprintf(stderr, "\t[W] Predicted reccnt change of 0\n"); - } + m_insertion_rate.store(1.0 - 1.0 / (double)records_to_1us); } bool restart_query(QueryArgs *args, @@ -537,9 +526,9 @@ private: * this code will be bypassed in that case. */ if (args->priority == ReconstructionPriority::FLUSH) { - #ifdef DE_PRINT_SHARD_COUNT - fprintf(stdout, "S\t%ld\n", extension->get_shard_count()); - #endif +#ifdef DE_PRINT_SHARD_COUNT + fprintf(stdout, "S\t%ld\n", extension->get_shard_count()); +#endif // fprintf(stderr, "[I] Running flush (%ld)\n", recon_id); // fprintf(stderr, "[I]\t Assigned Version %ld (%ld)\n", @@ -694,9 +683,8 @@ private: extension->m_lock_mngr.release_lock(level, args->version->get_id()); } - extension->update_insertion_delay(args->predicted_runtime, - args->tasks.get_total_reccnt(), - RECON_ENDED); + extension->update_insertion_delay( + args->predicted_runtime, args->tasks.get_total_reccnt(), RECON_ENDED); } if (args->priority == ReconstructionPriority::FLUSH) { @@ -708,7 +696,6 @@ private: // args->version->get_id(), recon_id); // - /* manually delete the argument object */ delete args; } @@ -937,23 +924,33 @@ private: args->initial_version = active_version->get_id(); size_t level_idx = 0; - for (size_t i=0; i< recon.size(); i++) { - if (recon[i].target < (level_index) active_version->get_structure()->get_level_vector().size()) { + for (size_t i = 0; i < recon.size(); i++) { + if (recon[i].target < (level_index)active_version->get_structure() + ->get_level_vector() + .size()) { level_idx = recon[i].target; - args->predicted_runtime += active_version->get_structure()->get_level_vector()[level_idx]->predict_reconstruction_time(recon[i].reccnt); + args->predicted_runtime += + active_version->get_structure() + ->get_level_vector()[level_idx] + ->predict_reconstruction_time(recon[i].reccnt); } else { level_idx = recon[i].target - 1; - args->predicted_runtime += m_config.rt_level_scale * active_version->get_structure()->get_level_vector()[level_idx]->predict_reconstruction_time(recon[i].reccnt); + args->predicted_runtime += + m_config.rt_level_scale * + active_version->get_structure() + ->get_level_vector()[level_idx] + ->predict_reconstruction_time(recon[i].reccnt); } } if (args->predicted_runtime == 0) { - fprintf(stderr, "Runtime Prediction of 0 for Level %ld with %ld records\n", level_idx, args->tasks.get_total_reccnt()); + fprintf(stderr, + "Runtime Prediction of 0 for Level %ld with %ld records\n", + level_idx, args->tasks.get_total_reccnt()); } update_insertion_delay(args->predicted_runtime, - args->tasks.get_total_reccnt(), - RECON_SCHEDULED); + args->tasks.get_total_reccnt(), RECON_SCHEDULED); m_sched->schedule_job(reconstruction, args->tasks.get_total_reccnt(), args, RECONSTRUCTION); } @@ -1002,7 +999,7 @@ private: return m_buffer->append(rec, ts); } -//#ifdef _GNU_SOURCE + //#ifdef _GNU_SOURCE void set_thread_affinity() { if constexpr (std::same_as) { return; diff --git a/include/framework/QueryRequirements.h b/include/framework/QueryRequirements.h index dcba67e..9f56e4a 100644 --- a/include/framework/QueryRequirements.h +++ b/include/framework/QueryRequirements.h @@ -1,7 +1,7 @@ /* * include/framework/QueryRequirements.h * - * Copyright (C) 2023 Douglas B. Rumbaugh + * Copyright (C) 2023 Douglas B. Rumbaugh * * Distributed under the Modified BSD License. * @@ -11,7 +11,7 @@ */ #pragma once -#include "framework/structure/BufferView.h" #include "framework/interface/Record.h" -#include "framework/interface/Shard.h" +#include "framework/structure/BufferView.h" #include "framework/interface/Query.h" +#include "framework/interface/Shard.h" diff --git a/include/framework/ShardRequirements.h b/include/framework/ShardRequirements.h index d054030..335c916 100644 --- a/include/framework/ShardRequirements.h +++ b/include/framework/ShardRequirements.h @@ -1,7 +1,7 @@ /* * include/framework/ShardRequirements.h * - * Copyright (C) 2023 Douglas B. Rumbaugh + * Copyright (C) 2023 Douglas B. Rumbaugh * * Distributed under the Modified BSD License. * @@ -11,7 +11,7 @@ */ #pragma once -#include "framework/structure/BufferView.h" +#include "framework/interface/Query.h" #include "framework/interface/Record.h" #include "framework/interface/Shard.h" -#include "framework/interface/Query.h" +#include "framework/structure/BufferView.h" diff --git a/include/framework/interface/Query.h b/include/framework/interface/Query.h index 97a973d..4e9a3fb 100644 --- a/include/framework/interface/Query.h +++ b/include/framework/interface/Query.h @@ -12,7 +12,6 @@ namespace de { - /* * * @@ -23,100 +22,99 @@ template -concept QueryInterface = - requires(PARAMETERS *parameters, LOCAL *local, LOCAL_BUFFER *buffer_query, - SHARD *shard, std::vector &local_queries, - std::vector &local_results, RESULT &result, - BufferView *bv) { - /* - * Given a set of query parameters and a shard, return a local query - * object for that shard. - */ - { - QUERY::local_preproc(shard, parameters) - } -> std::convertible_to; +concept QueryInterface = requires(PARAMETERS *parameters, LOCAL *local, + LOCAL_BUFFER *buffer_query, SHARD *shard, + std::vector &local_queries, + std::vector &local_results, + RESULT &result, + BufferView *bv) { + /* + * Given a set of query parameters and a shard, return a local query + * object for that shard. + */ + { QUERY::local_preproc(shard, parameters) } -> std::convertible_to; - /* - * Given a set of query parameters and a buffer view, return a local - * query object for the buffer. - * NOTE: for interface reasons, the pointer to the buffer view MUST be - * stored inside of the local query object. The future buffer - * query routine will access the buffer by way of this pointer. - */ - { - QUERY::local_preproc_buffer(bv, parameters) - } -> std::convertible_to; + /* + * Given a set of query parameters and a buffer view, return a local + * query object for the buffer. + * NOTE: for interface reasons, the pointer to the buffer view MUST be + * stored inside of the local query object. The future buffer + * query routine will access the buffer by way of this pointer. + */ + { + QUERY::local_preproc_buffer(bv, parameters) + } -> std::convertible_to; - /* - * Given a full set of local queries, and the buffer query, make any - * necessary adjustments to the local queries in-place, to account for - * global information. If no additional processing is required, this - * function can be left empty. - */ - { QUERY::distribute_query(parameters, local_queries, buffer_query) }; + /* + * Given a full set of local queries, and the buffer query, make any + * necessary adjustments to the local queries in-place, to account for + * global information. If no additional processing is required, this + * function can be left empty. + */ + {QUERY::distribute_query(parameters, local_queries, buffer_query)}; - /* - * Answer the local query, defined by `local` against `shard` and return - * a vector of LOCAL_RESULT objects defining the query result. - */ - { QUERY::local_query(shard, local) } -> std::convertible_to; + /* + * Answer the local query, defined by `local` against `shard` and return + * a vector of LOCAL_RESULT objects defining the query result. + */ + { QUERY::local_query(shard, local) } -> std::convertible_to; - /* - * Answer the local query defined by `local` against the buffer (which - * should be accessed by a pointer inside of `local`) and return a vector - * of LOCAL_RESULT objects defining the query result. - */ - { - QUERY::local_query_buffer(buffer_query) - } -> std::convertible_to; + /* + * Answer the local query defined by `local` against the buffer (which + * should be accessed by a pointer inside of `local`) and return a vector + * of LOCAL_RESULT objects defining the query result. + */ + { + QUERY::local_query_buffer(buffer_query) + } -> std::convertible_to; - /* - * Process the local results from the buffer and all of the shards, - * stored in `local_results`, and insert the associated ResultType - * objects into the `result` vector, which represents the final result - * of the query. Updates to this vector are done in-place. - */ - { QUERY::combine(local_results, parameters, result) }; + /* + * Process the local results from the buffer and all of the shards, + * stored in `local_results`, and insert the associated ResultType + * objects into the `result` vector, which represents the final result + * of the query. Updates to this vector are done in-place. + */ + {QUERY::combine(local_results, parameters, result)}; - /* - * Process the post-combine `result` vector of ResultType objects, - * in the context of the global and local query parameters, to determine - * if the query should be repeated. If so, make any necessary adjustments - * to the local query objects and return True. Otherwise, return False. - * - * If no repetition is needed for a given problem type, simply return - * False immediately and the query will end. - */ - { - QUERY::repeat(parameters, result, local_queries, buffer_query) - } -> std::same_as; + /* + * Process the post-combine `result` vector of ResultType objects, + * in the context of the global and local query parameters, to determine + * if the query should be repeated. If so, make any necessary adjustments + * to the local query objects and return True. Otherwise, return False. + * + * If no repetition is needed for a given problem type, simply return + * False immediately and the query will end. + */ + { + QUERY::repeat(parameters, result, local_queries, buffer_query) + } -> std::same_as; - /* - * If this flag is True, then the query will immediately stop and return - * a result as soon as the first non-deleted LocalRecordType is found. - * Otherwise, every Shard and the buffer will be queried and the results - * merged, like normal. - * - * This is largely an optimization flag for use with point-lookup, or - * other single-record result queries - */ - { QUERY::EARLY_ABORT } -> std::convertible_to; + /* + * If this flag is True, then the query will immediately stop and return + * a result as soon as the first non-deleted LocalRecordType is found. + * Otherwise, every Shard and the buffer will be queried and the results + * merged, like normal. + * + * This is largely an optimization flag for use with point-lookup, or + * other single-record result queries + */ + { QUERY::EARLY_ABORT } -> std::convertible_to; - /* - * If false, the built-in delete filtering that the framework can - * apply to the local results, prior to calling combine, will be skipped. - * This general filtering can be inefficient, particularly for tombstone - * -based deletes, and so if a more efficient manual filtering can be - * performed, it is worth setting this to True and doing that filtering - * in the combine step. - * - * If deletes are not a consideration for your problem, it's also best - * to turn this off, as it'll avoid the framework making an extra pass - * over the local results prior to combining them. - * - * TODO: Temporarily disabling this, as we've dropped framework-level - * delete filtering for the time being. - */ - /* { QUERY::SKIP_DELETE_FILTER } -> std::convertible_to; */ - }; + /* + * If false, the built-in delete filtering that the framework can + * apply to the local results, prior to calling combine, will be skipped. + * This general filtering can be inefficient, particularly for tombstone + * -based deletes, and so if a more efficient manual filtering can be + * performed, it is worth setting this to True and doing that filtering + * in the combine step. + * + * If deletes are not a consideration for your problem, it's also best + * to turn this off, as it'll avoid the framework making an extra pass + * over the local results prior to combining them. + * + * TODO: Temporarily disabling this, as we've dropped framework-level + * delete filtering for the time being. + */ + /* { QUERY::SKIP_DELETE_FILTER } -> std::convertible_to; */ +}; } // namespace de diff --git a/include/framework/interface/Scheduler.h b/include/framework/interface/Scheduler.h index d76a6c8..0724bce 100644 --- a/include/framework/interface/Scheduler.h +++ b/include/framework/interface/Scheduler.h @@ -14,7 +14,7 @@ template concept SchedulerInterface = requires(SchedType s, size_t i, void *vp, de::Job j) { {SchedType(i, i)}; - {s.schedule_job(j, i, vp, i)} -> std::convertible_to; + { s.schedule_job(j, i, vp, i) } -> std::convertible_to; {s.shutdown()}; {s.print_statistics()}; }; diff --git a/include/framework/interface/Shard.h b/include/framework/interface/Shard.h index fb5ce1a..e3a2de4 100644 --- a/include/framework/interface/Shard.h +++ b/include/framework/interface/Shard.h @@ -14,8 +14,8 @@ namespace de { template concept ShardInterface = RecordInterface && - requires(SHARD shard, const std::vector &shard_vector, bool b, - BufferView bv, + requires(SHARD shard, const std::vector &shard_vector, + bool b, BufferView bv, typename SHARD::RECORD rec) { /* construct a shard from a vector of shards of the same type */ {SHARD(shard_vector)}; @@ -52,7 +52,6 @@ concept ShardInterface = RecordInterface && * use only at the moment */ { shard.get_aux_memory_usage() } -> std::convertible_to; - }; template diff --git a/include/framework/reconstruction/BSMPolicy.h b/include/framework/reconstruction/BSMPolicy.h index 61f379e..db4c8d4 100644 --- a/include/framework/reconstruction/BSMPolicy.h +++ b/include/framework/reconstruction/BSMPolicy.h @@ -36,7 +36,8 @@ public: ReconstructionVector reconstructions; auto levels = version->get_structure()->get_level_vector(); - level_index target_level = find_reconstruction_target(levels, version->get_structure()->get_record_count()); + level_index target_level = find_reconstruction_target( + levels, version->get_structure()->get_record_count()); assert(target_level != -1); level_index source_level = 0; @@ -49,7 +50,8 @@ public: task.target = target_level; size_t reccnt = 0; - if (target_level < (ssize_t)levels.size() && levels[target_level]->get_record_count() > 0) { + if (target_level < (ssize_t)levels.size() && + levels[target_level]->get_record_count() > 0) { task.sources.push_back({target_level, all_shards_idx}); task.type = ReconstructionType::Merge; } else { @@ -71,7 +73,8 @@ public: } private: - level_index find_reconstruction_target(LevelVector &levels, size_t reccnt) const { + level_index find_reconstruction_target(LevelVector &levels, + size_t reccnt) const { level_index target_level = invalid_level_idx; for (level_index i = 1; i < (level_index)levels.size(); i++) { @@ -85,7 +88,8 @@ private: } inline size_t capacity(level_index level, size_t reccnt) const { - double base = std::ceil(m_scale_factor * std::pow(std::log10(reccnt), m_size_modifier)); + double base = std::ceil( + m_scale_factor * std::pow(std::log10(reccnt), m_size_modifier)); return m_buffer_size * (base - 1) * pow(base, level + 1); } diff --git a/include/framework/reconstruction/BackgroundTieringPolicy.h b/include/framework/reconstruction/BackgroundTieringPolicy.h index 36556a2..1c000b9 100644 --- a/include/framework/reconstruction/BackgroundTieringPolicy.h +++ b/include/framework/reconstruction/BackgroundTieringPolicy.h @@ -16,16 +16,20 @@ namespace de { template QueryType> -class BackgroundTieringPolicy : public ReconstructionPolicy { +class BackgroundTieringPolicy + : public ReconstructionPolicy { typedef std::vector>> LevelVector; public: - BackgroundTieringPolicy(size_t scale_factor, size_t buffer_size, size_t size_modifier=0) - : m_scale_factor(scale_factor), m_buffer_size(buffer_size), m_size_modifier(size_modifier) {} - - std::vector get_reconstruction_tasks( - const Version *version, LockManager &lock_mngr) const override { + BackgroundTieringPolicy(size_t scale_factor, size_t buffer_size, + size_t size_modifier = 0) + : m_scale_factor(scale_factor), m_buffer_size(buffer_size), + m_size_modifier(size_modifier) {} + + std::vector + get_reconstruction_tasks(const Version *version, + LockManager &lock_mngr) const override { std::vector reconstructions; auto levels = version->get_structure()->get_level_vector(); @@ -34,7 +38,8 @@ public: return {}; } - level_index target_level = find_reconstruction_target(levels, version->get_structure()->get_record_count()); + level_index target_level = find_reconstruction_target( + levels, version->get_structure()->get_record_count()); assert(target_level != -1); level_index source_level = 0; @@ -44,19 +49,21 @@ public: } for (level_index i = target_level; i > source_level; i--) { - if (lock_mngr.take_lock(i-1, version->get_id())) { + if (lock_mngr.take_lock(i - 1, version->get_id())) { ReconstructionVector recon; size_t total_reccnt = levels[i - 1]->get_record_count(); std::vector shards; - for (ssize_t j=0; j<(ssize_t)levels[i-1]->get_shard_count(); j++) { - shards.push_back({i-1, j}); + for (ssize_t j = 0; j < (ssize_t)levels[i - 1]->get_shard_count(); + j++) { + shards.push_back({i - 1, j}); } - recon.add_reconstruction(shards, i, total_reccnt, ReconstructionType::Compact); + recon.add_reconstruction(shards, i, total_reccnt, + ReconstructionType::Compact); reconstructions.push_back(recon); } } - + return reconstructions; } @@ -68,7 +75,8 @@ public: } private: - level_index find_reconstruction_target(LevelVector &levels, size_t reccnt) const { + level_index find_reconstruction_target(LevelVector &levels, + size_t reccnt) const { level_index target_level = invalid_level_idx; for (level_index i = 1; i < (level_index)levels.size(); i++) { @@ -81,7 +89,9 @@ private: return target_level; } - inline size_t capacity(size_t reccnt) const { return m_scale_factor * std::pow(std::log(reccnt), m_size_modifier); } + inline size_t capacity(size_t reccnt) const { + return m_scale_factor * std::pow(std::log(reccnt), m_size_modifier); + } size_t m_scale_factor; size_t m_buffer_size; diff --git a/include/framework/reconstruction/CompactOnFull.h b/include/framework/reconstruction/CompactOnFull.h index f5e0400..f0b549d 100644 --- a/include/framework/reconstruction/CompactOnFull.h +++ b/include/framework/reconstruction/CompactOnFull.h @@ -21,11 +21,14 @@ class CompactOnFull : public ReconstructionPolicy { LevelVector; public: - CompactOnFull(size_t scale_factor, size_t buffer_size, size_t size_modifier=0) - : m_scale_factor(scale_factor), m_buffer_size(buffer_size), m_size_modifier(size_modifier) {} - - std::vector get_reconstruction_tasks( - const Version *version, LockManager &lock_mngr) const override { + CompactOnFull(size_t scale_factor, size_t buffer_size, + size_t size_modifier = 0) + : m_scale_factor(scale_factor), m_buffer_size(buffer_size), + m_size_modifier(size_modifier) {} + + std::vector + get_reconstruction_tasks(const Version *version, + LockManager &lock_mngr) const override { std::vector reconstructions; auto levels = version->get_structure()->get_level_vector(); @@ -34,23 +37,24 @@ public: return {}; } - for (level_index i=0; i < (ssize_t) levels.size(); i++) { - if (levels[i]->get_shard_count() >= m_scale_factor && lock_mngr.take_lock(i, version->get_id())) { + for (level_index i = 0; i < (ssize_t)levels.size(); i++) { + if (levels[i]->get_shard_count() >= m_scale_factor && + lock_mngr.take_lock(i, version->get_id())) { ReconstructionVector recon; size_t total_reccnt = levels[i]->get_record_count(); std::vector shards; - for (ssize_t j=0; j<(ssize_t) levels[i]->get_shard_count(); j++) { - shards.push_back({i,j}); + for (ssize_t j = 0; j < (ssize_t)levels[i]->get_shard_count(); j++) { + shards.push_back({i, j}); } - recon.add_reconstruction(shards, i+1, total_reccnt, ReconstructionType::Compact); + recon.add_reconstruction(shards, i + 1, total_reccnt, + ReconstructionType::Compact); reconstructions.push_back(recon); } } - return reconstructions; - } - + return reconstructions; + } ReconstructionVector get_flush_tasks(const Version *version) const override { @@ -60,7 +64,8 @@ public: } private: - level_index find_reconstruction_target(LevelVector &levels, size_t reccnt) const { + level_index find_reconstruction_target(LevelVector &levels, + size_t reccnt) const { level_index target_level = invalid_level_idx; for (level_index i = 1; i < (level_index)levels.size(); i++) { @@ -73,7 +78,9 @@ private: return target_level; } - inline size_t capacity(size_t reccnt) const { return m_scale_factor * std::pow(std::log(reccnt), m_size_modifier); } + inline size_t capacity(size_t reccnt) const { + return m_scale_factor * std::pow(std::log(reccnt), m_size_modifier); + } size_t m_scale_factor; size_t m_buffer_size; diff --git a/include/framework/reconstruction/FixedShardCountPolicy.h b/include/framework/reconstruction/FixedShardCountPolicy.h index cc8dce4..52d9f39 100644 --- a/include/framework/reconstruction/FixedShardCountPolicy.h +++ b/include/framework/reconstruction/FixedShardCountPolicy.h @@ -16,16 +16,20 @@ namespace de { template QueryType> -class FixedShardCountPolicy : public ReconstructionPolicy { +class FixedShardCountPolicy + : public ReconstructionPolicy { typedef std::vector>> LevelVector; public: - FixedShardCountPolicy(size_t buffer_size, size_t shard_count, size_t max_record_count) - : m_buffer_size(buffer_size), m_shard_count(shard_count), m_max_reccnt(max_record_count) {} + FixedShardCountPolicy(size_t buffer_size, size_t shard_count, + size_t max_record_count) + : m_buffer_size(buffer_size), m_shard_count(shard_count), + m_max_reccnt(max_record_count) {} std::vector - get_reconstruction_tasks(const Version *version, LockManager &lock_mngr) const override { + get_reconstruction_tasks(const Version *version, + LockManager &lock_mngr) const override { return {}; } @@ -37,26 +41,26 @@ public: /* if this is the very first flush, there won't be an L1 yet */ if (levels.size() > 1 && levels[1]->get_shard_count() > 0) { - ShardID last_shid = {1, (shard_index) (levels[1]->get_shard_count() - 1)}; - if (levels[1]->get_shard(last_shid.shard_idx)->get_record_count() + m_buffer_size <= capacity()) { - auto task = ReconstructionTask { - {{0, 0}, last_shid}, 1, m_buffer_size,ReconstructionType::Merge - }; + ShardID last_shid = {1, (shard_index)(levels[1]->get_shard_count() - 1)}; + if (levels[1]->get_shard(last_shid.shard_idx)->get_record_count() + + m_buffer_size <= + capacity()) { + auto task = ReconstructionTask{ + {{0, 0}, last_shid}, 1, m_buffer_size, ReconstructionType::Merge}; v.add_reconstruction(task); return v; } } - auto task = ReconstructionTask { - {{0, 0}}, 1, m_buffer_size, ReconstructionType::Append - }; + auto task = ReconstructionTask{ + {{0, 0}}, 1, m_buffer_size, ReconstructionType::Append}; v.add_reconstruction(task); return v; } private: inline size_t capacity() const { - double bps = (double) m_max_reccnt / (double) m_buffer_size / m_shard_count; + double bps = (double)m_max_reccnt / (double)m_buffer_size / m_shard_count; return ceil(bps) * m_buffer_size; } diff --git a/include/framework/reconstruction/FloodL0Policy.h b/include/framework/reconstruction/FloodL0Policy.h index c0d29fe..7e38e02 100644 --- a/include/framework/reconstruction/FloodL0Policy.h +++ b/include/framework/reconstruction/FloodL0Policy.h @@ -24,7 +24,8 @@ public: FloodL0Policy(size_t buffer_size) : m_buffer_size(buffer_size) {} std::vector - get_reconstruction_tasks(const Version *version, LockManager &lock_mngr) const override { + get_reconstruction_tasks(const Version *version, + LockManager &lock_mngr) const override { return {}; } diff --git a/include/framework/reconstruction/LevelingPolicy.h b/include/framework/reconstruction/LevelingPolicy.h index 955bc02..3e0afaa 100644 --- a/include/framework/reconstruction/LevelingPolicy.h +++ b/include/framework/reconstruction/LevelingPolicy.h @@ -58,8 +58,9 @@ public: (target_level == 1) ? m_buffer_size + target_reccnt : levels[target_level - 1]->get_record_count() + target_reccnt; - auto type = (target_level >= (level_index) levels.size()) ? ReconstructionType::Append - : ReconstructionType::Merge; + auto type = (target_level >= (level_index)levels.size()) + ? ReconstructionType::Append + : ReconstructionType::Merge; reconstructions.add_reconstruction(target_level - 1, target_level, total_reccnt, type); @@ -95,8 +96,9 @@ private: inline size_t capacity(level_index level, size_t reccnt) const { return m_buffer_size * - pow(m_scale_factor * std::ceil(std::pow(std::log10(reccnt), m_size_modifier)), level); - + pow(m_scale_factor * std::ceil(std::pow(std::log10(reccnt), + m_size_modifier)), + level); } size_t m_scale_factor; diff --git a/include/framework/reconstruction/ReconstructionPolicy.h b/include/framework/reconstruction/ReconstructionPolicy.h index 41a2092..3c842b2 100644 --- a/include/framework/reconstruction/ReconstructionPolicy.h +++ b/include/framework/reconstruction/ReconstructionPolicy.h @@ -12,19 +12,22 @@ */ #pragma once -#include "util/types.h" -#include "framework/structure/ExtensionStructure.h" -#include "framework/scheduling/Version.h" #include "framework/scheduling/LockManager.h" +#include "framework/scheduling/Version.h" +#include "framework/structure/ExtensionStructure.h" +#include "util/types.h" namespace de { -template QueryType> +template QueryType> class ReconstructionPolicy { typedef ExtensionStructure StructureType; public: ReconstructionPolicy() {} - virtual std::vector get_reconstruction_tasks(const Version *version, LockManager &lock_mngr) const = 0; - virtual ReconstructionVector get_flush_tasks(const Version *version) const = 0; - }; -} + virtual std::vector + get_reconstruction_tasks(const Version *version, + LockManager &lock_mngr) const = 0; + virtual ReconstructionVector + get_flush_tasks(const Version *version) const = 0; +}; +} // namespace de diff --git a/include/framework/reconstruction/TieringPolicy.h b/include/framework/reconstruction/TieringPolicy.h index b1fcb49..c16c427 100644 --- a/include/framework/reconstruction/TieringPolicy.h +++ b/include/framework/reconstruction/TieringPolicy.h @@ -21,11 +21,13 @@ class TieringPolicy : public ReconstructionPolicy { LevelVector; public: - TieringPolicy(size_t scale_factor, size_t buffer_size, double modifier=0) - : m_scale_factor(scale_factor), m_buffer_size(buffer_size), m_size_modifier(modifier) {} + TieringPolicy(size_t scale_factor, size_t buffer_size, double modifier = 0) + : m_scale_factor(scale_factor), m_buffer_size(buffer_size), + m_size_modifier(modifier) {} - std::vector get_reconstruction_tasks( - const Version *version, LockManager &lock_mngr) const override { + std::vector + get_reconstruction_tasks(const Version *version, + LockManager &lock_mngr) const override { return {}; } @@ -34,7 +36,8 @@ public: ReconstructionVector reconstructions; auto levels = version->get_structure()->get_level_vector(); - level_index target_level = find_reconstruction_target(levels, version->get_structure()->get_record_count()); + level_index target_level = find_reconstruction_target( + levels, version->get_structure()->get_record_count()); assert(target_level != -1); level_index source_level = 0; @@ -47,12 +50,13 @@ public: size_t total_reccnt = levels[i - 1]->get_record_count(); std::vector shards; - for (ssize_t j=0; j<(ssize_t)levels[i-1]->get_shard_count(); j++) { - shards.push_back({i-1, j}); + for (ssize_t j = 0; j < (ssize_t)levels[i - 1]->get_shard_count(); j++) { + shards.push_back({i - 1, j}); } if (total_reccnt > 0 || shards.size() > 0) { - reconstructions.add_reconstruction(shards, i, total_reccnt, ReconstructionType::Compact); + reconstructions.add_reconstruction(shards, i, total_reccnt, + ReconstructionType::Compact); } } @@ -60,7 +64,8 @@ public: } private: - level_index find_reconstruction_target(LevelVector &levels, size_t reccnt) const { + level_index find_reconstruction_target(LevelVector &levels, + size_t reccnt) const { level_index target_level = invalid_level_idx; for (level_index i = 1; i < (level_index)levels.size(); i++) { @@ -74,7 +79,8 @@ private: } inline size_t capacity(size_t reccnt) const { - return std::ceil((double) m_scale_factor * std::pow(std::log10(reccnt), m_size_modifier)); + return std::ceil((double)m_scale_factor * + std::pow(std::log10(reccnt), m_size_modifier)); } size_t m_scale_factor; diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index 7583727..a642b31 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -60,7 +60,9 @@ public: Structure *get_mutable_structure() { return m_structure; } - BufView get_buffer() const { return m_buffer->get_buffer_view(m_buffer_head); } + BufView get_buffer() const { + return m_buffer->get_buffer_view(m_buffer_head); + } /* * Returns a new Epoch object that is a copy of this one. The new object diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h index 8fbe07c..ef6dde5 100644 --- a/include/framework/scheduling/FIFOScheduler.h +++ b/include/framework/scheduling/FIFOScheduler.h @@ -61,7 +61,8 @@ public: std::unique_lock lk(m_cv_lock); m_stats.job_queued(ts, type, size); - m_task_queue.push(Task(size, ts, job, args, type, &m_stats, nullptr, &m_cv)); + m_task_queue.push( + Task(size, ts, job, args, type, &m_stats, nullptr, &m_cv)); m_cv.notify_all(); } @@ -81,7 +82,6 @@ private: [[maybe_unused]] size_t m_memory_budget; size_t m_thrd_cnt; - std::atomic m_counter; std::mutex m_cv_lock; std::condition_variable m_cv; diff --git a/include/framework/scheduling/LockManager.h b/include/framework/scheduling/LockManager.h index a40cf7a..820b975 100644 --- a/include/framework/scheduling/LockManager.h +++ b/include/framework/scheduling/LockManager.h @@ -3,15 +3,15 @@ */ #pragma once -#include #include #include +#include namespace de { class LockManager { public: - LockManager(size_t levels=1) { - for (size_t i=0; i < levels; i++) { + LockManager(size_t levels = 1) { + for (size_t i = 0; i < levels; i++) { m_lks.emplace_back(false); } @@ -20,9 +20,7 @@ public: ~LockManager() = default; - void add_lock() { - m_lks.emplace_back(false); - } + void add_lock() { m_lks.emplace_back(false); } void release_lock(size_t idx, size_t version) { if (idx < m_lks.size()) { @@ -72,17 +70,13 @@ public: return false; } - bool is_buffer_locked() { - return m_buffer_lk.load(); - } + bool is_buffer_locked() { return m_buffer_lk.load(); } - void release_buffer_lock() { - m_buffer_lk.store(false); - } + void release_buffer_lock() { m_buffer_lk.store(false); } private: std::deque> m_lks; std::atomic m_buffer_lk; std::atomic m_last_unlocked_version; }; -} +} // namespace de diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h index b6ebe53..6a8ed58 100644 --- a/include/framework/scheduling/SerialScheduler.h +++ b/include/framework/scheduling/SerialScheduler.h @@ -37,7 +37,8 @@ public: t(0); } - void shutdown() { /* intentionally left blank */ } + void shutdown() { /* intentionally left blank */ + } void print_statistics() { m_stats.print_statistics(); } void print_query_time_data() { m_stats.print_query_time_data(); } diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 4529b2e..00ddbbb 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -14,9 +14,9 @@ #pragma once #include +#include #include #include -#include #include "framework/scheduling/Version.h" #include "framework/scheduling/statistics.h" @@ -24,11 +24,7 @@ namespace de { -enum class ReconstructionPriority { - FLUSH = 0, - CMPCT = 1, - MAINT = 2 -}; +enum class ReconstructionPriority { FLUSH = 0, CMPCT = 1, MAINT = 2 }; template QueryType> struct ReconstructionArgs { @@ -51,7 +47,8 @@ typedef std::function Job; struct Task { Task(size_t size, size_t ts, Job job, void *args, size_t type = 0, - SchedulerStatistics *stats = nullptr, std::mutex *lk = nullptr, std::condition_variable *cv=nullptr) + SchedulerStatistics *stats = nullptr, std::mutex *lk = nullptr, + std::condition_variable *cv = nullptr) : m_job(job), m_size(size), m_timestamp(ts), m_args(args), m_type(type), m_stats(stats), m_lk(lk), m_cv(cv) {} diff --git a/include/framework/scheduling/Version.h b/include/framework/scheduling/Version.h index be54c84..bbcbe25 100644 --- a/include/framework/scheduling/Version.h +++ b/include/framework/scheduling/Version.h @@ -25,18 +25,17 @@ private: typedef BufferView BufferViewType; public: - Version(size_t vid = 0) : m_buffer(nullptr), m_structure(nullptr), m_id(vid), m_buffer_head(0) {} - Version(size_t number, std::unique_ptr structure, BufferType *buff, - size_t head) + Version(size_t number, std::unique_ptr structure, + BufferType *buff, size_t head) : m_buffer(buff), m_structure(std::move(structure)), m_id(number), m_buffer_head(head) { - if (m_buffer) { - m_buffer->take_head_reference(m_buffer_head); - } - } + if (m_buffer) { + m_buffer->take_head_reference(m_buffer_head); + } + } ~Version() { if (m_buffer) { @@ -55,7 +54,7 @@ public: size_t get_id() const { return m_id; } - void set_id(size_t id) { m_id = id;} + void set_id(size_t id) { m_id = id; } const StructureType *get_structure() const { return m_structure.get(); } @@ -107,10 +106,7 @@ public: m_structure->update_shard_version(version); } - size_t get_head() { - return m_buffer_head; - } - + size_t get_head() { return m_buffer_head; } void set_buffer(BufferType *buffer, size_t head) { assert(m_buffer == nullptr); diff --git a/include/framework/scheduling/statistics.h b/include/framework/scheduling/statistics.h index 6d9f9f0..8cb6dbd 100644 --- a/include/framework/scheduling/statistics.h +++ b/include/framework/scheduling/statistics.h @@ -16,12 +16,12 @@ #include #include #include +#include #include #include #include #include #include -#include namespace de { @@ -68,7 +68,6 @@ private: queue_time) .count(); } - }; public: @@ -117,10 +116,10 @@ public: size_t worst_query = 0; size_t first_query = UINT64_MAX; - for (auto &job : m_jobs) { if (job.second.type != 1) { - fprintf(stdout, "%ld %ld %ld %ld\n", job.second.id, job.second.size, job.second.runtime(), job.second.runtime() / (job.second.size)); + fprintf(stdout, "%ld %ld %ld %ld\n", job.second.id, job.second.size, + job.second.runtime(), job.second.runtime() / (job.second.size)); } if (job.first < first_query) { @@ -149,7 +148,6 @@ public: query_cnt++; } - int64_t average_queue_time = (query_cnt) ? total_queue_time / query_cnt : 0; int64_t average_runtime = (query_cnt) ? total_runtime / query_cnt : 0; @@ -162,28 +160,37 @@ public: continue; } - queue_deviation_sum += std::pow(job.second.time_in_queue() - average_queue_time, 2); - runtime_deviation_sum += std::pow(job.second.runtime() - average_runtime, 2); - + queue_deviation_sum += + std::pow(job.second.time_in_queue() - average_queue_time, 2); + runtime_deviation_sum += + std::pow(job.second.runtime() - average_runtime, 2); } - int64_t queue_stddev = (query_cnt) ? std::sqrt(queue_deviation_sum / query_cnt) : 0; - int64_t runtime_stddev = (query_cnt) ? std::sqrt(runtime_deviation_sum / query_cnt) : 0; - - - fprintf(stdout, "Query Count: %ld\tWorst Query: %ld\tFirst Query: %ld\n", query_cnt, worst_query, first_query); - fprintf(stdout, "Average Query Scheduling Delay: %ld\t Min Scheduling Delay: %ld\t Max Scheduling Delay: %ld\tStandard Deviation: %ld\n", average_queue_time, min_queue_time, max_queue_time, queue_stddev); - fprintf(stdout, "Average Query Latency: %ld\t\t Min Query Latency: %ld\t Max Query Latency: %ld\tStandard Deviation: %ld\n", average_runtime, min_runtime, max_runtime, runtime_stddev); + int64_t queue_stddev = + (query_cnt) ? std::sqrt(queue_deviation_sum / query_cnt) : 0; + int64_t runtime_stddev = + (query_cnt) ? std::sqrt(runtime_deviation_sum / query_cnt) : 0; + + fprintf(stdout, "Query Count: %ld\tWorst Query: %ld\tFirst Query: %ld\n", + query_cnt, worst_query, first_query); + fprintf(stdout, + "Average Query Scheduling Delay: %ld\t Min Scheduling Delay: %ld\t " + "Max Scheduling Delay: %ld\tStandard Deviation: %ld\n", + average_queue_time, min_queue_time, max_queue_time, queue_stddev); + fprintf(stdout, + "Average Query Latency: %ld\t\t Min Query Latency: %ld\t Max Query " + "Latency: %ld\tStandard Deviation: %ld\n", + average_runtime, min_runtime, max_runtime, runtime_stddev); } - void print_query_time_data() { std::unique_lock lk(m_mutex); update_job_data_from_log(); for (auto &job : m_jobs) { if (job.second.type == 1) { - fprintf(stdout, "%ld\t%ld\t%ld\n", job.second.time_in_queue(), job.second.runtime(), job.second.end_to_end_time()); + fprintf(stdout, "%ld\t%ld\t%ld\n", job.second.time_in_queue(), + job.second.runtime(), job.second.end_to_end_time()); } } } @@ -211,18 +218,18 @@ private: auto &job = m_jobs.at(event.id); switch (event.type) { - case EventType::QUEUED: - job.queue_time = event.time; - break; - case EventType::FINISHED: - job.stop_time = event.time; - break; - case EventType::SCHEDULED: - job.scheduled_time = event.time; - break; - case EventType::STARTED: - job.start_time = event.time; - break; + case EventType::QUEUED: + job.queue_time = event.time; + break; + case EventType::FINISHED: + job.stop_time = event.time; + break; + case EventType::SCHEDULED: + job.scheduled_time = event.time; + break; + case EventType::STARTED: + job.start_time = event.time; + break; } } diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h index a9fb12d..afe5dbb 100644 --- a/include/framework/structure/BufferView.h +++ b/include/framework/structure/BufferView.h @@ -46,10 +46,9 @@ public: BufferView(Wrapped *buffer, size_t cap, size_t head, size_t tail, size_t tombstone_cnt, psudb::BloomFilter *filter) - : m_data(buffer), m_head(head), m_tail(tail), - m_start(m_head % cap), m_stop(m_tail % cap), m_cap(cap), - m_approx_ts_cnt(tombstone_cnt), m_tombstone_filter(filter), - m_active(true) {} + : m_data(buffer), m_head(head), m_tail(tail), m_start(m_head % cap), + m_stop(m_tail % cap), m_cap(cap), m_approx_ts_cnt(tombstone_cnt), + m_tombstone_filter(filter), m_active(true) {} ~BufferView() = default; @@ -104,9 +103,7 @@ public: */ size_t get_tombstone_count() { return m_approx_ts_cnt; } - Wrapped *get(size_t i) { - return m_data + to_idx(i); - } + Wrapped *get(size_t i) { return m_data + to_idx(i); } void copy_to_buffer(psudb::byte *buffer) { /* check if the region to be copied circles back to start. If so, do it in diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index bb8a480..e03c7ad 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -27,12 +27,14 @@ class ExtensionStructure { typedef BufferView BuffView; typedef std::vector>> LevelVector; + public: - ExtensionStructure(bool default_level=true) { + ExtensionStructure(bool default_level = true) { if (default_level) - m_levels.emplace_back(std::make_shared>(0)); + m_levels.emplace_back( + std::make_shared>(0)); } - + ~ExtensionStructure() = default; /* @@ -162,34 +164,37 @@ public: return cnt; } - /* * Perform the reconstruction described by task. If the resulting * reconstruction grows the structure (i.e., adds a level), returns * true. Otherwise, returns false. */ - inline reconstruction_results perform_reconstruction(ReconstructionTask task) const { + inline reconstruction_results + perform_reconstruction(ReconstructionTask task) const { reconstruction_results result; result.target_level = task.target; /* if there is only one source, then we don't need to actually rebuild */ if (task.sources.size() == 1) { auto shid = task.sources[0]; - if (shid.shard_idx == all_shards_idx && m_levels[shid.level_idx]->get_shard_count() > 1) { + if (shid.shard_idx == all_shards_idx && + m_levels[shid.level_idx]->get_shard_count() > 1) { /* there's more than one shard, so we need to do the reconstruction */ } else { - auto raw_shard_ptr = m_levels[shid.level_idx]->get_shard(shid.shard_idx); + auto raw_shard_ptr = + m_levels[shid.level_idx]->get_shard(shid.shard_idx); assert(raw_shard_ptr); result.source_shards.emplace_back(shid.level_idx, raw_shard_ptr); - result.new_shard = m_levels[shid.level_idx]->get_shard_ptr(shid.shard_idx).first; + result.new_shard = + m_levels[shid.level_idx]->get_shard_ptr(shid.shard_idx).first; return result; } } - - std::vector shards; + + std::vector shards; for (ShardID shid : task.sources) { - assert(shid.level_idx < (level_index) m_levels.size()); + assert(shid.level_idx < (level_index)m_levels.size()); assert(shid.shard_idx >= -1); auto raw_shard_ptr = m_levels[shid.level_idx]->get_shard(shid.shard_idx); @@ -197,12 +202,13 @@ public: result.source_shards.emplace_back(shid.level_idx, raw_shard_ptr); } - auto start = std::chrono::high_resolution_clock::now(); result.new_shard = std::make_shared(shards); auto stop = std::chrono::high_resolution_clock::now(); - result.runtime = std::chrono::duration_cast(stop- start).count(); + result.runtime = + std::chrono::duration_cast(stop - start) + .count(); result.reccnt = result.new_shard->get_record_count(); return result; @@ -221,11 +227,10 @@ public: return queries; } - size_t l0_size() const { - return m_levels[0]->get_shard_count(); - } + size_t l0_size() const { return m_levels[0]->get_shard_count(); } - bool apply_reconstruction(reconstruction_results &recon, size_t version) { + bool apply_reconstruction(reconstruction_results &recon, + size_t version) { bool res = append_shard(recon.new_shard, version, recon.target_level); m_levels[recon.target_level]->update_reconstruction_model(recon); delete_shards(recon.source_shards); @@ -233,13 +238,15 @@ public: return res; } - bool append_shard(std::shared_ptr shard, size_t version, size_t level) { + bool append_shard(std::shared_ptr shard, size_t version, + size_t level) { assert(level <= m_levels.size()); auto rc = false; if (level == m_levels.size()) { /* grow the structure */ - m_levels.push_back(std::make_shared>(level)); + m_levels.push_back( + std::make_shared>(level)); rc = true; } @@ -248,12 +255,15 @@ public: return rc; } - void delete_shards(std::vector> shards) { - for (size_t i=0; i> shards) { + for (size_t i = 0; i < shards.size(); i++) { + assert(shards[i].first < (level_index)m_levels.size()); ssize_t shard_idx = -1; - for (size_t j=0; jget_shard_count(); j++) { - if (m_levels[shards[i].first]->get_shard_ptr(j).first.get() == shards[i].second) { + for (size_t j = 0; j < m_levels[shards[i].first]->get_shard_count(); + j++) { + if (m_levels[shards[i].first]->get_shard_ptr(j).first.get() == + shards[i].second) { shard_idx = j; break; } @@ -262,7 +272,8 @@ public: if (shard_idx != -1) { m_levels[shards[i].first]->delete_shard(shard_idx); } else { - fprintf(stderr, "ERROR: failed to delete shard %ld\t%p\n", shards[i].first, shards[i].second); + fprintf(stderr, "ERROR: failed to delete shard %ld\t%p\n", + shards[i].first, shards[i].second); exit(EXIT_FAILURE); } } @@ -270,51 +281,55 @@ public: LevelVector const &get_level_vector() const { return m_levels; } - - /* - * Validate that no level in the structure exceeds its maximum tombstone - * capacity. This is used to trigger preemptive compactions at the end of - * the reconstruction process. - */ - bool validate_tombstone_proportion(double max_delete_prop) const { - long double ts_prop; - for (size_t i = 0; i < m_levels.size(); i++) { - if (m_levels[i]) { - ts_prop = (long double)m_levels[i]->get_tombstone_count() / - (long double)m_levels[i]->get_record_count(); - if (ts_prop > (long double)max_delete_prop) { - return false; - } + /* + * Validate that no level in the structure exceeds its maximum tombstone + * capacity. This is used to trigger preemptive compactions at the end of + * the reconstruction process. + */ + bool validate_tombstone_proportion(double max_delete_prop) const { + long double ts_prop; + for (size_t i = 0; i < m_levels.size(); i++) { + if (m_levels[i]) { + ts_prop = (long double)m_levels[i]->get_tombstone_count() / + (long double)m_levels[i]->get_record_count(); + if (ts_prop > (long double)max_delete_prop) { + return false; } } - - return true; } - bool validate_tombstone_proportion(level_index level, double max_delete_prop) const { - long double ts_prop = (long double) m_levels[level]->get_tombstone_count() / (long double) m_levels[level]->get_record_count(); - return ts_prop <= (long double) max_delete_prop; - } + return true; + } - void print_structure(bool debug=false) const { - for (size_t i=0; iget_tombstone_count() / + (long double)m_levels[level]->get_record_count(); + return ts_prop <= (long double)max_delete_prop; + } - if (m_levels[i]) { - for (size_t j=0; jget_shard_count(); j++) { - fprintf(stdout, "(%ld, %ld, %p: %ld) ", j, m_levels[i]->get_shard_ptr(j).second, m_levels[i]->get_shard_ptr(j).first.get(), m_levels[i]->get_shard(j)->get_record_count()); - } - } else { - fprintf(stdout, "[Empty]"); - } + void print_structure(bool debug = false) const { + for (size_t i = 0; i < m_levels.size(); i++) { + if (debug) { + fprintf(stdout, "[D] [%ld]:\t", i); + } else { + fprintf(stdout, "[%ld]:\t", i); + } - fprintf(stdout, "\n"); + if (m_levels[i]) { + for (size_t j = 0; j < m_levels[i]->get_shard_count(); j++) { + fprintf(stdout, "(%ld, %ld, %p: %ld) ", j, + m_levels[i]->get_shard_ptr(j).second, + m_levels[i]->get_shard_ptr(j).first.get(), + m_levels[i]->get_shard(j)->get_record_count()); + } + } else { + fprintf(stdout, "[Empty]"); } - } + + fprintf(stdout, "\n"); + } + } private: LevelVector m_levels; diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h index 205dbf9..cf2b16a 100644 --- a/include/framework/structure/InternalLevel.h +++ b/include/framework/structure/InternalLevel.h @@ -15,11 +15,11 @@ */ #pragma once +#include +#include #include #include #include -#include -#include #include "framework/interface/Query.h" #include "framework/interface/Shard.h" @@ -34,7 +34,7 @@ class InternalLevel { typedef std::pair, size_t> shard_ptr; public: - InternalLevel(ssize_t level_no) : m_level_no(level_no) { } + InternalLevel(ssize_t level_no) : m_level_no(level_no) {} ~InternalLevel() = default; @@ -109,7 +109,7 @@ public: idx = 0; } - if (idx >= (ssize_t) m_shards.size()) { + if (idx >= (ssize_t)m_shards.size()) { return nullptr; } @@ -183,7 +183,7 @@ public: } size_t get_nonempty_shard_count() const { - size_t cnt = 0; + size_t cnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i].first && m_shards[i].first->get_record_count() > 0) { cnt += 1; @@ -199,7 +199,7 @@ public: new_level->append(m_shards[i].first, m_shards[i].second); } - for (auto itr=m_rt_window.begin(); itr < m_rt_window.end(); itr++) { + for (auto itr = m_rt_window.begin(); itr < m_rt_window.end(); itr++) { new_level->m_rt_window.push_front(*itr); } @@ -208,23 +208,21 @@ public: void truncate() { m_shards.erase(m_shards.begin(), m_shards.end()); } - void delete_shard(shard_index shard, bool log_delete=true) { + void delete_shard(shard_index shard, bool log_delete = true) { size_t before = m_shards.size(); m_shards.erase(m_shards.begin() + shard); size_t after = m_shards.size(); - assert( before > after); + assert(before > after); } - void append(std::shared_ptr shard, size_t version=0) { + void append(std::shared_ptr shard, size_t version = 0) { m_shards.push_back({shard, version}); } - void append(shard_ptr shard) { - m_shards.push_back(shard); - } + void append(shard_ptr shard) { m_shards.push_back(shard); } const shard_ptr get_shard_ptr(ssize_t idx) const { - if (idx >= 0 && idx < (ssize_t) m_shards.size()) { + if (idx >= 0 && idx < (ssize_t)m_shards.size()) { return m_shards[idx]; } else if (idx == all_shards_idx && m_shards.size() == 1) { return m_shards[0]; @@ -247,7 +245,7 @@ public: size_t total = 0; for (auto rt : m_rt_window) { total += rt; - } + } return total / m_rt_window.size(); } diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index 7357915..156b718 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -97,7 +97,9 @@ public: return true; } - size_t get_record_count() const { return m_tail.load() - m_head.load().head_idx; } + size_t get_record_count() const { + return m_tail.load() - m_head.load().head_idx; + } size_t get_capacity() const { return m_cap; } @@ -139,14 +141,16 @@ public: m_active_head_advance.store(true); if (m_old_head.load().refcnt > 0) { - //fprintf(stderr, "[W]: Refusing to advance head due to remaining reference counts [2]\n"); + // fprintf(stderr, "[W]: Refusing to advance head due to remaining + // reference counts [2]\n"); m_active_head_advance.store(false); return false; } - // fprintf(stderr, "[I] Advancing head pointer: %ld %ld %ld\n", m_old_head.load().head_idx, m_head.load().head_idx, new_head); - // fprintf(stderr, "[I] Refcnts: %ld %ld\n", m_old_head.load().refcnt, m_head.load().refcnt); - + // fprintf(stderr, "[I] Advancing head pointer: %ld %ld %ld\n", + // m_old_head.load().head_idx, m_head.load().head_idx, new_head); + // fprintf(stderr, "[I] Refcnts: %ld %ld\n", m_old_head.load().refcnt, + // m_head.load().refcnt); buffer_head new_hd = {new_head, 1}; buffer_head cur_hd; @@ -163,7 +167,6 @@ public: return true; } - void set_low_watermark(size_t lwm) { assert(lwm < m_hwm); m_lwm = lwm; @@ -197,13 +200,9 @@ public: return m_cap - (m_tail.load() - m_old_head.load().head_idx); } - size_t debug_get_old_head() const { - return m_old_head.load().head_idx; - } + size_t debug_get_old_head() const { return m_old_head.load().head_idx; } - size_t debug_get_head() const { - return m_head.load().head_idx; - } + size_t debug_get_head() const { return m_head.load().head_idx; } bool take_head_reference(size_t target_head) { buffer_head cur_hd, new_hd; @@ -225,7 +224,7 @@ public: return head_acquired; } - + bool release_head_reference(size_t head) { buffer_head cur_hd, new_hd; bool head_released = false; @@ -261,8 +260,8 @@ private: buffer_head cur_hd, new_hd; bool head_acquired = false; - - //fprintf(stderr, "[I]: getting head %ld %ld %ld\n", target_head, m_old_head.load().head_idx, m_head.load().head_idx); + // fprintf(stderr, "[I]: getting head %ld %ld %ld\n", target_head, + // m_old_head.load().head_idx, m_head.load().head_idx); do { if (m_old_head.load().head_idx == target_head) { cur_hd = m_old_head.load(); @@ -279,7 +278,7 @@ private: return new_hd.head_idx; } - + ssize_t try_advance_tail() { size_t old_value = m_tail.load(); diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h index 3ae3492..e2484d5 100644 --- a/include/framework/util/Configuration.h +++ b/include/framework/util/Configuration.h @@ -8,48 +8,49 @@ */ #pragma once +#include "framework/interface/Scheduler.h" #include "framework/reconstruction/ReconstructionPolicy.h" #include "util/types.h" -#include "framework/interface/Scheduler.h" #include namespace de { template QueryType, -DeletePolicy D, SchedulerInterface SchedType> + DeletePolicy D, SchedulerInterface SchedType> class DEConfiguration { - public: - DEConfiguration(std::unique_ptr> recon_policy) - : recon_policy(std::move(recon_policy)) {} +public: + DEConfiguration( + std::unique_ptr> recon_policy) + : recon_policy(std::move(recon_policy)) {} - std::unique_ptr> recon_policy; + std::unique_ptr> recon_policy; - /* buffer parameters */ - size_t buffer_count = 1; - size_t buffer_size = 8000; - size_t buffer_flush_trigger = buffer_size / 2; + /* buffer parameters */ + size_t buffer_count = 1; + size_t buffer_size = 8000; + size_t buffer_flush_trigger = buffer_size / 2; - /* reconstruction triggers */ - bool recon_enable_seek_trigger = false; - bool recon_enable_maint_on_flush = false; - bool recon_enable_delete_cmpct = false; - bool recon_maint_disabled = true; + /* reconstruction triggers */ + bool recon_enable_seek_trigger = false; + bool recon_enable_maint_on_flush = false; + bool recon_enable_delete_cmpct = false; + bool recon_maint_disabled = true; - size_t recon_l0_capacity = 0; /* 0 for unbounded */ - double maximum_delete_proportion = 1; + size_t recon_l0_capacity = 0; /* 0 for unbounded */ + double maximum_delete_proportion = 1; - /* resource management */ - size_t maximum_threads = 16; - size_t minimum_recon_threads = 1; - size_t minimum_query_threads = 4; - size_t maximum_memory_usage = 0; /* o for unbounded */ + /* resource management */ + size_t maximum_threads = 16; + size_t minimum_recon_threads = 1; + size_t minimum_query_threads = 4; + size_t maximum_memory_usage = 0; /* o for unbounded */ - size_t physical_core_count = 6; + size_t physical_core_count = 6; - size_t buffer_flush_query_preemption_trigger = UINT64_MAX; + size_t buffer_flush_query_preemption_trigger = UINT64_MAX; - bool dynamic_ratelimiting = false; - size_t rt_level_scale = 1; + bool dynamic_ratelimiting = false; + size_t rt_level_scale = 1; }; } // namespace de diff --git a/include/query/irs.h b/include/query/irs.h index ec6fa29..eb584ae 100644 --- a/include/query/irs.h +++ b/include/query/irs.h @@ -64,7 +64,7 @@ public: if (query->lower_idx == shard->get_record_count()) { query->total_weight = 0; } else { - query->total_weight = query->upper_idx - query->lower_idx; + query->total_weight = query->upper_idx - query->lower_idx; } query->sample_size = 0; @@ -192,8 +192,7 @@ public: return result_set; } - static LocalResultType - local_query_buffer(LocalQueryBuffer *query) { + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { LocalResultType result; result.reserve(query->sample_size); @@ -221,9 +220,8 @@ public: return result; } - static void - combine(std::vector const &local_results, - Parameters *parms, ResultType &output) { + static void combine(std::vector const &local_results, + Parameters *parms, ResultType &output) { for (size_t i = 0; i < local_results.size(); i++) { for (size_t j = 0; j < local_results[i].size(); j++) { output.emplace_back(local_results[i][j]); diff --git a/include/query/pointlookup.h b/include/query/pointlookup.h index b42ec85..ab3d93f 100644 --- a/include/query/pointlookup.h +++ b/include/query/pointlookup.h @@ -40,7 +40,7 @@ public: typedef std::vector> LocalResultType; typedef std::vector ResultType; - + constexpr static bool EARLY_ABORT = false; constexpr static bool SKIP_DELETE_FILTER = true; @@ -58,7 +58,7 @@ public: return query; } - + static void distribute_query(Parameters *parms, std::vector const &local_queries, LocalQueryBuffer *buffer_query) { @@ -76,7 +76,7 @@ public: return result; } - + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { LocalResultType result; @@ -91,11 +91,9 @@ public: return result; } - - static void - combine(std::vector const &local_results, - Parameters *parms, ResultType &output) { + static void combine(std::vector const &local_results, + Parameters *parms, ResultType &output) { for (auto r : local_results) { if (r.size() > 0) { if (r[0].is_deleted() || r[0].is_tombstone()) { @@ -107,7 +105,7 @@ public: } } } - + static bool repeat(Parameters *parms, ResultType &output, std::vector const &local_queries, LocalQueryBuffer *buffer_query) { diff --git a/include/query/rangecount.h b/include/query/rangecount.h index 0743ee6..89774a8 100644 --- a/include/query/rangecount.h +++ b/include/query/rangecount.h @@ -40,8 +40,8 @@ public: size_t record_count; size_t tombstone_count; - bool is_deleted() {return false;} - bool is_tombstone() {return false;} + bool is_deleted() { return false; } + bool is_tombstone() { return false; } }; typedef size_t ResultType; @@ -116,8 +116,7 @@ public: return result; } - static LocalResultType - local_query_buffer(LocalQueryBuffer *query) { + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { LocalResultType result = {0, 0}; for (size_t i = 0; i < query->buffer->get_record_count(); i++) { @@ -137,9 +136,8 @@ public: return result; } - static void - combine(std::vector const &local_results, - Parameters *parms, ResultType &output) { + static void combine(std::vector const &local_results, + Parameters *parms, ResultType &output) { size_t reccnt = 0; size_t tscnt = 0; diff --git a/include/query/wss.h b/include/query/wss.h index d4e75f3..311de78 100644 --- a/include/query/wss.h +++ b/include/query/wss.h @@ -139,7 +139,8 @@ public: for (size_t i = 0; i < query->sample_size; i++) { size_t idx = shard->get_weighted_sample(query->global_parms.rng); - if (!shard->get_record_at(idx)->is_deleted() && !shard->get_record_at(idx)->is_tombstone()) { + if (!shard->get_record_at(idx)->is_deleted() && + !shard->get_record_at(idx)->is_tombstone()) { result.emplace_back(shard->get_record_at(idx)->rec); } } @@ -147,8 +148,7 @@ public: return result; } - static LocalResultType - local_query_buffer(LocalQueryBuffer *query) { + static LocalResultType local_query_buffer(LocalQueryBuffer *query) { LocalResultType result; for (size_t i = 0; i < query->sample_size; i++) { @@ -156,7 +156,8 @@ public: auto rec = query->buffer->get(idx); auto test = gsl_rng_uniform(query->global_parms.rng) * query->max_weight; - if (test <= rec->rec.weight && !rec->is_deleted() && !rec->is_tombstone()) { + if (test <= rec->rec.weight && !rec->is_deleted() && + !rec->is_tombstone()) { result.emplace_back(rec->rec); } } @@ -164,9 +165,8 @@ public: return result; } - static void - combine(std::vector const &local_results, - Parameters *parms, ResultType &output) { + static void combine(std::vector const &local_results, + Parameters *parms, ResultType &output) { for (size_t i = 0; i < local_results.size(); i++) { for (size_t j = 0; j < local_results[i].size(); j++) { output.emplace_back(local_results[i][j]); diff --git a/include/shard/Alias.h b/include/shard/Alias.h index 15b0884..c176fa2 100644 --- a/include/shard/Alias.h +++ b/include/shard/Alias.h @@ -20,186 +20,163 @@ #include "psu-ds/Alias.h" #include "psu-ds/BloomFilter.h" -#include "util/bf_config.h" #include "util/SortedMerge.h" +#include "util/bf_config.h" -using psudb::CACHELINE_SIZE; using psudb::BloomFilter; using psudb::byte; +using psudb::CACHELINE_SIZE; namespace de { -template -class Alias { +template class Alias { public: - typedef R RECORD; -private: - typedef decltype(R::key) K; - typedef decltype(R::value) V; - typedef decltype(R::weight) W; + typedef R RECORD; +private: + typedef decltype(R::key) K; + typedef decltype(R::value) V; + typedef decltype(R::weight) W; public: - Alias(BufferView buffer) - : m_data(nullptr) - , m_alias(nullptr) - , m_total_weight(0) - , m_reccnt(0) - , m_tombstone_cnt(0) - , m_alloc_size(0) - , m_bf(new BloomFilter(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS)) { - - - m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, - buffer.get_record_count() * - sizeof(Wrapped), - (byte**) &m_data); - - auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf); - m_reccnt = res.record_count; - m_tombstone_cnt = res.tombstone_count; - - if (m_reccnt > 0) { - std::vector weights; - for (size_t i=0; i buffer) + : m_data(nullptr), m_alias(nullptr), m_total_weight(0), m_reccnt(0), + m_tombstone_cnt(0), m_alloc_size(0), + m_bf(new BloomFilter(BF_FPR, buffer.get_tombstone_count(), + BF_HASH_FUNCS)) { + + m_alloc_size = psudb::sf_aligned_alloc( + CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped), + (byte **)&m_data); + + auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf); + m_reccnt = res.record_count; + m_tombstone_cnt = res.tombstone_count; + + if (m_reccnt > 0) { + std::vector weights; + for (size_t i = 0; i < m_reccnt; i++) { + weights.emplace_back(m_data[i].rec.weight); + m_total_weight += m_data[i].rec.weight; + } + + build_alias_structure(weights); } + } - Alias(std::vector const &shards) - : m_data(nullptr) - , m_alias(nullptr) - , m_total_weight(0) - , m_reccnt(0) - , m_tombstone_cnt(0) - , m_alloc_size(0) - , m_bf(nullptr) { - - size_t attemp_reccnt = 0; - size_t tombstone_count = 0; - auto cursors = build_cursor_vec(shards, &attemp_reccnt, &tombstone_count); - - m_bf = new BloomFilter(BF_FPR, tombstone_count, BF_HASH_FUNCS); - m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, - attemp_reccnt * sizeof(Wrapped), - (byte **) &m_data); - - auto res = sorted_array_merge(cursors, m_data, m_bf); - m_reccnt = res.record_count; - m_tombstone_cnt = res.tombstone_count; - - if (m_reccnt > 0) { - std::vector weights; - for (size_t i=0; i const &shards) + : m_data(nullptr), m_alias(nullptr), m_total_weight(0), m_reccnt(0), + m_tombstone_cnt(0), m_alloc_size(0), m_bf(nullptr) { - Wrapped *point_lookup(const R &rec, bool filter=false) { - if (filter && !m_bf->lookup(rec)) { - return nullptr; - } + size_t attemp_reccnt = 0; + size_t tombstone_count = 0; + auto cursors = + build_cursor_vec(shards, &attemp_reccnt, &tombstone_count); - size_t idx = get_lower_bound(rec.key); - if (idx >= m_reccnt) { - return nullptr; - } + m_bf = new BloomFilter(BF_FPR, tombstone_count, BF_HASH_FUNCS); + m_alloc_size = psudb::sf_aligned_alloc( + CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped), (byte **)&m_data); - while (idx < (m_reccnt-1) && m_data[idx].rec < rec) ++idx; + auto res = sorted_array_merge(cursors, m_data, m_bf); + m_reccnt = res.record_count; + m_tombstone_cnt = res.tombstone_count; - if (m_data[idx].rec == rec) { - return m_data + idx; - } + if (m_reccnt > 0) { + std::vector weights; + for (size_t i = 0; i < m_reccnt; i++) { + weights.emplace_back(m_data[i].rec.weight); + m_total_weight += m_data[i].rec.weight; + } - return nullptr; + build_alias_structure(weights); } + } - Wrapped* get_data() const { - return m_data; - } - - size_t get_record_count() const { - return m_reccnt; - } + ~Alias() { + free(m_data); + delete m_alias; + delete m_bf; + } - size_t get_tombstone_count() const { - return m_tombstone_cnt; + Wrapped *point_lookup(const R &rec, bool filter = false) { + if (filter && !m_bf->lookup(rec)) { + return nullptr; } - const Wrapped* get_record_at(size_t idx) const { - if (idx >= m_reccnt) return nullptr; - return m_data + idx; + size_t idx = get_lower_bound(rec.key); + if (idx >= m_reccnt) { + return nullptr; } + while (idx < (m_reccnt - 1) && m_data[idx].rec < rec) + ++idx; - size_t get_memory_usage() const { - return 0; + if (m_data[idx].rec == rec) { + return m_data + idx; } - size_t get_aux_memory_usage() const { - return (m_bf) ? m_bf->memory_usage() : 0; - } + return nullptr; + } - W get_total_weight() const { - return m_total_weight; - } + Wrapped *get_data() const { return m_data; } - size_t get_weighted_sample(gsl_rng *rng) const { - return m_alias->get(rng); - } + size_t get_record_count() const { return m_reccnt; } - size_t get_lower_bound(const K& key) const { - size_t min = 0; - size_t max = m_reccnt - 1; + size_t get_tombstone_count() const { return m_tombstone_cnt; } - while (min < max) { - size_t mid = (min + max) / 2; + const Wrapped *get_record_at(size_t idx) const { + if (idx >= m_reccnt) + return nullptr; + return m_data + idx; + } - if (key > m_data[mid].rec.key) { - min = mid + 1; - } else { - max = mid; - } - } + size_t get_memory_usage() const { return 0; } - return min; - } + size_t get_aux_memory_usage() const { + return (m_bf) ? m_bf->memory_usage() : 0; + } -private: + W get_total_weight() const { return m_total_weight; } + + size_t get_weighted_sample(gsl_rng *rng) const { return m_alias->get(rng); } - void build_alias_structure(std::vector &weights) { + size_t get_lower_bound(const K &key) const { + size_t min = 0; + size_t max = m_reccnt - 1; - // normalize the weights vector - std::vector norm_weights(weights.size()); + while (min < max) { + size_t mid = (min + max) / 2; + + if (key > m_data[mid].rec.key) { + min = mid + 1; + } else { + max = mid; + } + } + + return min; + } + +private: + void build_alias_structure(std::vector &weights) { - for (size_t i=0; i norm_weights(weights.size()); - // build the alias structure - m_alias = new psudb::Alias(norm_weights); + for (size_t i = 0; i < weights.size(); i++) { + norm_weights[i] = (double)weights[i] / (double)m_total_weight; } - Wrapped* m_data; - psudb::Alias *m_alias; - W m_total_weight; - size_t m_reccnt; - size_t m_tombstone_cnt; - size_t m_alloc_size; - BloomFilter *m_bf; + // build the alias structure + m_alias = new psudb::Alias(norm_weights); + } + + Wrapped *m_data; + psudb::Alias *m_alias; + W m_total_weight; + size_t m_reccnt; + size_t m_tombstone_cnt; + size_t m_alloc_size; + BloomFilter *m_bf; }; -} +} // namespace de diff --git a/include/shard/FSTrie.h b/include/shard/FSTrie.h index 59ff116..31db40b 100644 --- a/include/shard/FSTrie.h +++ b/include/shard/FSTrie.h @@ -9,194 +9,182 @@ */ #pragma once - #include #include "framework/ShardRequirements.h" #include "fst.hpp" #include "util/SortedMerge.h" -using psudb::CACHELINE_SIZE; using psudb::BloomFilter; +using psudb::byte; +using psudb::CACHELINE_SIZE; using psudb::PriorityQueue; using psudb::queue_record; -using psudb::byte; namespace de { -template -class FSTrie { +template class FSTrie { public: - typedef R RECORD; -private: + typedef R RECORD; - typedef decltype(R::key) K; - typedef decltype(R::value) V; - static_assert(std::is_same_v, "FST requires const char* keys."); +private: + typedef decltype(R::key) K; + typedef decltype(R::value) V; + static_assert(std::is_same_v, + "FST requires const char* keys."); public: - FSTrie(BufferView buffer) - : m_data(nullptr) - , m_reccnt(0) - , m_alloc_size(0) - { - m_data = new Wrapped[buffer.get_record_count()](); - m_alloc_size = sizeof(Wrapped) * buffer.get_record_count(); - - size_t cnt = 0; - std::vector keys; - keys.reserve(buffer.get_record_count()); - - /* - * Copy the contents of the buffer view into a temporary buffer, and - * sort them. We still need to iterate over these temporary records to - * apply tombstone/deleted record filtering, as well as any possible - * per-record processing that is required by the shard being built. - */ - auto temp_buffer = new Wrapped[buffer.get_record_count()](); - for (size_t i=0; i>()); + FSTrie(BufferView buffer) : m_data(nullptr), m_reccnt(0), m_alloc_size(0) { + m_data = new Wrapped[buffer.get_record_count()](); + m_alloc_size = sizeof(Wrapped) * buffer.get_record_count(); + + size_t cnt = 0; + std::vector keys; + keys.reserve(buffer.get_record_count()); + + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = new Wrapped[buffer.get_record_count()](); + for (size_t i = 0; i < buffer.get_record_count(); i++) { + temp_buffer[i] = *(buffer.get(i)); + } - for (size_t i=0; i>()); - m_data[cnt] = temp_buffer[i]; - m_data[cnt].clear_timestamp(); + for (size_t i = 0; i < buffer.get_record_count(); i++) { + if (temp_buffer[i].is_deleted() || !temp_buffer[i].is_visible() || + temp_buffer[i].rec.key[0] != '\0') { + continue; + } - keys.push_back(std::string(m_data[cnt].rec.key)); - cnt++; - } + m_data[cnt] = temp_buffer[i]; + m_data[cnt].clear_timestamp(); - m_reccnt = cnt; - if (m_reccnt > 0) { - m_fst = new fst::Trie(keys, true, 1); - } + keys.push_back(std::string(m_data[cnt].rec.key)); + cnt++; + } - delete[] temp_buffer; + m_reccnt = cnt; + if (m_reccnt > 0) { + m_fst = new fst::Trie(keys, true, 1); } - FSTrie(std::vector const &shards) - : m_data(nullptr) - , m_reccnt(0) - , m_alloc_size(0) - { - size_t attemp_reccnt = 0; - size_t tombstone_count = 0; - auto cursors = build_cursor_vec(shards, &attemp_reccnt, &tombstone_count); - - m_data = new Wrapped[attemp_reccnt](); - m_alloc_size = attemp_reccnt * sizeof(Wrapped); - - std::vector keys; - keys.reserve(attemp_reccnt); - - // FIXME: For smaller cursor arrays, it may be more efficient to skip - // the priority queue and just do a scan. - PriorityQueue> pq(cursors.size()); - for (size_t i=0; i 1 ? pq.peek(1) : queue_record>{nullptr, 0}; - /* - * if the current record is not a tombstone, and the next record is - * a tombstone that matches the current one, then the current one - * has been deleted, and both it and its tombstone can be skipped - * over. - */ - if (!now.data->is_tombstone() && next.data != nullptr && - now.data->rec == next.data->rec && next.data->is_tombstone()) { - - pq.pop(); pq.pop(); - auto& cursor1 = cursors[now.version]; - auto& cursor2 = cursors[next.version]; - if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); - } else { - auto& cursor = cursors[now.version]; - /* skip over records that have been deleted via tagging */ - if (!cursor.ptr->is_deleted() && cursor.ptr->rec.key[0] != '\0') { - m_data[m_reccnt] = *cursor.ptr; - keys.push_back(std::string(m_data[m_reccnt].rec.key)); - - m_reccnt++; - } - pq.pop(); - - if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); - } - } + FSTrie(std::vector const &shards) + : m_data(nullptr), m_reccnt(0), m_alloc_size(0) { + size_t attemp_reccnt = 0; + size_t tombstone_count = 0; + auto cursors = + build_cursor_vec(shards, &attemp_reccnt, &tombstone_count); - if (m_reccnt > 0) { - m_fst = new fst::Trie(keys, true, 1); - } + m_data = new Wrapped[attemp_reccnt](); + m_alloc_size = attemp_reccnt * sizeof(Wrapped); + + std::vector keys; + keys.reserve(attemp_reccnt); + + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue> pq(cursors.size()); + for (size_t i = 0; i < cursors.size(); i++) { + pq.push(cursors[i].ptr, i); } - ~FSTrie() { - delete[] m_data; - delete m_fst; + while (pq.size()) { + auto now = pq.peek(); + auto next = + pq.size() > 1 ? pq.peek(1) : queue_record>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); + pq.pop(); + auto &cursor1 = cursors[now.version]; + auto &cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) + pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) + pq.push(cursor2.ptr, next.version); + } else { + auto &cursor = cursors[now.version]; + /* skip over records that have been deleted via tagging */ + if (!cursor.ptr->is_deleted() && cursor.ptr->rec.key[0] != '\0') { + m_data[m_reccnt] = *cursor.ptr; + keys.push_back(std::string(m_data[m_reccnt].rec.key)); + + m_reccnt++; + } + pq.pop(); + + if (advance_cursor(cursor)) + pq.push(cursor.ptr, now.version); + } } - Wrapped *point_lookup(const R &rec, bool filter=false) { + if (m_reccnt > 0) { + m_fst = new fst::Trie(keys, true, 1); + } + } - auto idx = m_fst->exactSearch(rec.key); + ~FSTrie() { + delete[] m_data; + delete m_fst; + } - if (idx == fst::kNotFound) { - return nullptr; - } + Wrapped *point_lookup(const R &rec, bool filter = false) { - // FIXME: for convenience, I'm treating this Trie as a unique index - // for now, so no need to scan forward and/or check values. This - // also makes the point lookup query class a lot easier to make. - // Ultimately, though, we can support non-unique indexes with some - // extra work. + auto idx = m_fst->exactSearch(rec.key); - return m_data + idx; + if (idx == fst::kNotFound) { + return nullptr; } - Wrapped* get_data() const { - return m_data; - } - - size_t get_record_count() const { - return m_reccnt; - } + // FIXME: for convenience, I'm treating this Trie as a unique index + // for now, so no need to scan forward and/or check values. This + // also makes the point lookup query class a lot easier to make. + // Ultimately, though, we can support non-unique indexes with some + // extra work. - size_t get_tombstone_count() const { - return 0; - } + return m_data + idx; + } - const Wrapped* get_record_at(size_t idx) const { - if (idx >= m_reccnt) return nullptr; - return m_data + idx; - } + Wrapped *get_data() const { return m_data; } + size_t get_record_count() const { return m_reccnt; } - size_t get_memory_usage() const { - return m_fst->getMemoryUsage(); - } + size_t get_tombstone_count() const { return 0; } - size_t get_aux_memory_usage() const { - return m_alloc_size; - } + const Wrapped *get_record_at(size_t idx) const { + if (idx >= m_reccnt) + return nullptr; + return m_data + idx; + } - size_t get_lower_bound(R &rec) {return 0;} - size_t get_upper_bound(R &rec) {return 0;} + size_t get_memory_usage() const { return m_fst->getMemoryUsage(); } -private: + size_t get_aux_memory_usage() const { return m_alloc_size; } + + size_t get_lower_bound(R &rec) { return 0; } + size_t get_upper_bound(R &rec) { return 0; } - Wrapped* m_data; - size_t m_reccnt; - size_t m_alloc_size; - fst::Trie *m_fst; +private: + Wrapped *m_data; + size_t m_reccnt; + size_t m_alloc_size; + fst::Trie *m_fst; }; -} +} // namespace de diff --git a/include/shard/ISAMTree.h b/include/shard/ISAMTree.h index f6b525f..6722aaf 100644 --- a/include/shard/ISAMTree.h +++ b/include/shard/ISAMTree.h @@ -121,7 +121,9 @@ public: size_t get_memory_usage() const { return m_internal_node_cnt * NODE_SZ; } - size_t get_aux_memory_usage() const { return (m_bf) ? m_bf->memory_usage() : 0; } + size_t get_aux_memory_usage() const { + return (m_bf) ? m_bf->memory_usage() : 0; + } /* SortedShardInterface methods */ size_t get_lower_bound(const K &key) const { diff --git a/include/shard/LoudsPatricia.h b/include/shard/LoudsPatricia.h index fe0c30e..0b7c74c 100644 --- a/include/shard/LoudsPatricia.h +++ b/include/shard/LoudsPatricia.h @@ -9,191 +9,179 @@ */ #pragma once - #include #include "framework/ShardRequirements.h" #include "louds-patricia.hpp" #include "util/SortedMerge.h" -using psudb::CACHELINE_SIZE; using psudb::BloomFilter; +using psudb::byte; +using psudb::CACHELINE_SIZE; using psudb::PriorityQueue; using psudb::queue_record; -using psudb::byte; namespace de { -template -class LoudsPatricia { +template class LoudsPatricia { private: - - typedef decltype(R::key) K; - typedef decltype(R::value) V; - static_assert(std::is_same_v, "FST requires const char* keys."); + typedef decltype(R::key) K; + typedef decltype(R::value) V; + static_assert(std::is_same_v, + "FST requires const char* keys."); public: - LoudsPatricia(BufferView buffer) - : m_data(nullptr) - , m_reccnt(0) - , m_alloc_size(0) - { - m_data = new Wrapped[buffer.get_record_count()](); - m_alloc_size = sizeof(Wrapped) * buffer.get_record_count(); - - m_louds = new louds::Patricia(); - - size_t cnt = 0; - std::vector keys; - keys.reserve(buffer.get_record_count()); - - /* - * Copy the contents of the buffer view into a temporary buffer, and - * sort them. We still need to iterate over these temporary records to - * apply tombstone/deleted record filtering, as well as any possible - * per-record processing that is required by the shard being built. - */ - auto temp_buffer = new Wrapped[buffer.get_record_count()](); - for (size_t i=0; i>()); + LoudsPatricia(BufferView buffer) + : m_data(nullptr), m_reccnt(0), m_alloc_size(0) { + m_data = new Wrapped[buffer.get_record_count()](); + m_alloc_size = sizeof(Wrapped) * buffer.get_record_count(); + + m_louds = new louds::Patricia(); + + size_t cnt = 0; + std::vector keys; + keys.reserve(buffer.get_record_count()); + + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = new Wrapped[buffer.get_record_count()](); + for (size_t i = 0; i < buffer.get_record_count(); i++) { + temp_buffer[i] = *(buffer.get(i)); + } - for (size_t i=0; i>()); - m_data[cnt] = temp_buffer[i]; - m_data[cnt].clear_timestamp(); + for (size_t i = 0; i < buffer.get_record_count(); i++) { + if (temp_buffer[i].is_deleted() || !temp_buffer[i].is_visible() || + temp_buffer[i].rec.key == "") { + continue; + } - m_louds->add(std::string(m_data[cnt].rec.key)); - cnt++; - } + m_data[cnt] = temp_buffer[i]; + m_data[cnt].clear_timestamp(); - m_reccnt = cnt; - if (m_reccnt > 0) { - m_louds->build(); - } + m_louds->add(std::string(m_data[cnt].rec.key)); + cnt++; + } - delete[] temp_buffer; + m_reccnt = cnt; + if (m_reccnt > 0) { + m_louds->build(); } - LoudsPatricia(std::vector &shards) - : m_data(nullptr) - , m_reccnt(0) - , m_alloc_size(0) - { - size_t attemp_reccnt = 0; - size_t tombstone_count = 0; - auto cursors = build_cursor_vec(shards, &attemp_reccnt, &tombstone_count); - - m_data = new Wrapped[attemp_reccnt](); - m_alloc_size = attemp_reccnt * sizeof(Wrapped); - - m_louds = new louds::Patricia(); - - // FIXME: For smaller cursor arrays, it may be more efficient to skip - // the priority queue and just do a scan. - PriorityQueue> pq(cursors.size()); - for (size_t i=0; i 1 ? pq.peek(1) : queue_record>{nullptr, 0}; - /* - * if the current record is not a tombstone, and the next record is - * a tombstone that matches the current one, then the current one - * has been deleted, and both it and its tombstone can be skipped - * over. - */ - if (!now.data->is_tombstone() && next.data != nullptr && - now.data->rec == next.data->rec && next.data->is_tombstone()) { - - pq.pop(); pq.pop(); - auto& cursor1 = cursors[now.version]; - auto& cursor2 = cursors[next.version]; - if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); - } else { - auto& cursor = cursors[now.version]; - /* skip over records that have been deleted via tagging */ - if (!cursor.ptr->is_deleted() && cursor.ptr->rec.key != "") { - m_data[m_reccnt] = *cursor.ptr; - m_louds->add(std::string(m_data[m_reccnt].rec.key)); - m_reccnt++; - } - pq.pop(); - - if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); - } - } + LoudsPatricia(std::vector &shards) + : m_data(nullptr), m_reccnt(0), m_alloc_size(0) { + size_t attemp_reccnt = 0; + size_t tombstone_count = 0; + auto cursors = build_cursor_vec(shards, &attemp_reccnt, + &tombstone_count); - if (m_reccnt > 0) { - m_louds->build(); + m_data = new Wrapped[attemp_reccnt](); + m_alloc_size = attemp_reccnt * sizeof(Wrapped); + + m_louds = new louds::Patricia(); + + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue> pq(cursors.size()); + for (size_t i = 0; i < cursors.size(); i++) { + pq.push(cursors[i].ptr, i); + } + + while (pq.size()) { + auto now = pq.peek(); + auto next = + pq.size() > 1 ? pq.peek(1) : queue_record>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); + pq.pop(); + auto &cursor1 = cursors[now.version]; + auto &cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) + pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) + pq.push(cursor2.ptr, next.version); + } else { + auto &cursor = cursors[now.version]; + /* skip over records that have been deleted via tagging */ + if (!cursor.ptr->is_deleted() && cursor.ptr->rec.key != "") { + m_data[m_reccnt] = *cursor.ptr; + m_louds->add(std::string(m_data[m_reccnt].rec.key)); + m_reccnt++; } + pq.pop(); + + if (advance_cursor(cursor)) + pq.push(cursor.ptr, now.version); + } } - ~LoudsPatricia() { - delete[] m_data; - delete m_louds; + if (m_reccnt > 0) { + m_louds->build(); } + } - Wrapped *point_lookup(const R &rec, bool filter=false) { + ~LoudsPatricia() { + delete[] m_data; + delete m_louds; + } - auto idx = m_louds->lookup(std::string(rec.key)); + Wrapped *point_lookup(const R &rec, bool filter = false) { - if (idx == -1) { - return nullptr; - } + auto idx = m_louds->lookup(std::string(rec.key)); - // FIXME: for convenience, I'm treating this Trie as a unique index - // for now, so no need to scan forward and/or check values. This - // also makes the point lookup query class a lot easier to make. - // Ultimately, though, we can support non-unique indexes with some - // extra work. - return m_data + idx; + if (idx == -1) { + return nullptr; } - Wrapped* get_data() const { - return m_data; - } - - size_t get_record_count() const { - return m_reccnt; - } + // FIXME: for convenience, I'm treating this Trie as a unique index + // for now, so no need to scan forward and/or check values. This + // also makes the point lookup query class a lot easier to make. + // Ultimately, though, we can support non-unique indexes with some + // extra work. + return m_data + idx; + } - size_t get_tombstone_count() const { - return 0; - } + Wrapped *get_data() const { return m_data; } - const Wrapped* get_record_at(size_t idx) const { - if (idx >= m_reccnt) return nullptr; - return m_data + idx; - } + size_t get_record_count() const { return m_reccnt; } + size_t get_tombstone_count() const { return 0; } - size_t get_memory_usage() const { - return m_louds->size(); - } + const Wrapped *get_record_at(size_t idx) const { + if (idx >= m_reccnt) + return nullptr; + return m_data + idx; + } - size_t get_aux_memory_usage() const { - return m_alloc_size; - } + size_t get_memory_usage() const { return m_louds->size(); } - size_t get_lower_bound(R &rec) {return 0;} - size_t get_upper_bound(R &rec) {return 0;} + size_t get_aux_memory_usage() const { return m_alloc_size; } -private: + size_t get_lower_bound(R &rec) { return 0; } + size_t get_upper_bound(R &rec) { return 0; } - Wrapped* m_data; - size_t m_reccnt; - size_t m_alloc_size; - louds::Patricia *m_louds; +private: + Wrapped *m_data; + size_t m_reccnt; + size_t m_alloc_size; + louds::Patricia *m_louds; }; -} +} // namespace de diff --git a/include/shard/PGM.h b/include/shard/PGM.h index 5b39ab4..40c9141 100644 --- a/include/shard/PGM.h +++ b/include/shard/PGM.h @@ -13,7 +13,6 @@ */ #pragma once - #include #include "framework/ShardRequirements.h" @@ -23,278 +22,268 @@ #include "util/SortedMerge.h" #include "util/bf_config.h" -using psudb::CACHELINE_SIZE; using psudb::BloomFilter; +using psudb::byte; +using psudb::CACHELINE_SIZE; using psudb::PriorityQueue; using psudb::queue_record; -using psudb::byte; namespace de { -template -class PGM { +template class PGM { public: - typedef R RECORD; + typedef R RECORD; + private: - typedef decltype(R::key) K; - typedef decltype(R::value) V; + typedef decltype(R::key) K; + typedef decltype(R::value) V; public: - PGM(BufferView buffer) - : m_bf(nullptr) - , m_reccnt(0) - , m_tombstone_cnt(0) - , m_alloc_size(0) { - - m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, - buffer.get_record_count() * - sizeof(Wrapped), - (byte**) &m_data); - - std::vector keys; - /* - * Copy the contents of the buffer view into a temporary buffer, and - * sort them. We still need to iterate over these temporary records to - * apply tombstone/deleted record filtering, as well as any possible - * per-record processing that is required by the shard being built. - */ - auto temp_buffer = (Wrapped *) psudb::sf_aligned_calloc(CACHELINE_SIZE, - buffer.get_record_count(), - sizeof(Wrapped)); - buffer.copy_to_buffer((byte *) temp_buffer); - - auto base = temp_buffer; - auto stop = base + buffer.get_record_count(); - std::sort(base, stop, std::less>()); - - merge_info info = {0, 0}; - - /* - * Iterate over the temporary buffer to process the records, copying - * them into buffer as needed - */ - while (base < stop) { - if (!base->is_tombstone() && (base + 1 < stop) - && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { - base += 2; - continue; - } else if (base->is_deleted()) { - base += 1; - continue; - } - - // FIXME: this shouldn't be necessary, but the tagged record - // bypass doesn't seem to be working on this code-path, so this - // ensures that tagged records from the buffer are able to be - // dropped, eventually. It should only need to be &= 1 - base->header &= 3; - keys.emplace_back(base->rec.key); - m_data[info.record_count++] = *base; - - if (base->is_tombstone()) { - info.tombstone_count++; - if (m_bf){ - m_bf->insert(base->rec); - } - } - - base++; + PGM(BufferView buffer) + : m_bf(nullptr), m_reccnt(0), m_tombstone_cnt(0), m_alloc_size(0) { + + m_alloc_size = psudb::sf_aligned_alloc( + CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped), + (byte **)&m_data); + + std::vector keys; + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = (Wrapped *)psudb::sf_aligned_calloc( + CACHELINE_SIZE, buffer.get_record_count(), sizeof(Wrapped)); + buffer.copy_to_buffer((byte *)temp_buffer); + + auto base = temp_buffer; + auto stop = base + buffer.get_record_count(); + std::sort(base, stop, std::less>()); + + merge_info info = {0, 0}; + + /* + * Iterate over the temporary buffer to process the records, copying + * them into buffer as needed + */ + while (base < stop) { + if (!base->is_tombstone() && (base + 1 < stop) && + base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) { + base += 2; + continue; + } else if (base->is_deleted()) { + base += 1; + continue; + } + + // FIXME: this shouldn't be necessary, but the tagged record + // bypass doesn't seem to be working on this code-path, so this + // ensures that tagged records from the buffer are able to be + // dropped, eventually. It should only need to be &= 1 + base->header &= 3; + keys.emplace_back(base->rec.key); + m_data[info.record_count++] = *base; + + if (base->is_tombstone()) { + info.tombstone_count++; + if (m_bf) { + m_bf->insert(base->rec); } + } - free(temp_buffer); - - m_reccnt = info.record_count; - m_tombstone_cnt = info.tombstone_count; - - if (m_reccnt > 0) { - m_pgm = pgm::PGMIndex(keys); - } + base++; } - PGM(std::vector const &shards) - : m_data(nullptr) - , m_bf(nullptr) - , m_reccnt(0) - , m_tombstone_cnt(0) - , m_alloc_size(0) { - - size_t attemp_reccnt = 0; - size_t tombstone_count = 0; - auto cursors = build_cursor_vec(shards, &attemp_reccnt, &tombstone_count); - - m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, - attemp_reccnt * sizeof(Wrapped), - (byte **) &m_data); - std::vector keys; - - // FIXME: For smaller cursor arrays, it may be more efficient to skip - // the priority queue and just do a scan. - PriorityQueue> pq(cursors.size()); - for (size_t i=0; i 1 ? pq.peek(1) : queue_record>{nullptr, 0}; - /* - * if the current record is not a tombstone, and the next record is - * a tombstone that matches the current one, then the current one - * has been deleted, and both it and its tombstone can be skipped - * over. - */ - if (!now.data->is_tombstone() && next.data != nullptr && - now.data->rec == next.data->rec && next.data->is_tombstone()) { - - pq.pop(); pq.pop(); - auto& cursor1 = cursors[now.version]; - auto& cursor2 = cursors[next.version]; - if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); - } else { - auto& cursor = cursors[now.version]; - /* skip over records that have been deleted via tagging */ - if (!cursor.ptr->is_deleted()) { - keys.emplace_back(cursor.ptr->rec.key); - m_data[info.record_count++] = *cursor.ptr; - - /* - * if the record is a tombstone, increment the ts count and - * insert it into the bloom filter if one has been - * provided. - */ - if (cursor.ptr->is_tombstone()) { - info.tombstone_count++; - if (m_bf) { - m_bf->insert(cursor.ptr->rec); - } - } - } - pq.pop(); - - if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); - } - } + m_reccnt = info.record_count; + m_tombstone_cnt = info.tombstone_count; - m_reccnt = info.record_count; - m_tombstone_cnt = info.tombstone_count; + if (m_reccnt > 0) { + m_pgm = pgm::PGMIndex(keys); + } + } + + PGM(std::vector const &shards) + : m_data(nullptr), m_bf(nullptr), m_reccnt(0), m_tombstone_cnt(0), + m_alloc_size(0) { + + size_t attemp_reccnt = 0; + size_t tombstone_count = 0; + auto cursors = + build_cursor_vec(shards, &attemp_reccnt, &tombstone_count); + + m_alloc_size = psudb::sf_aligned_alloc( + CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped), (byte **)&m_data); + std::vector keys; + + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue> pq(cursors.size()); + for (size_t i = 0; i < cursors.size(); i++) { + pq.push(cursors[i].ptr, i); + } - if (m_reccnt > 0) { - m_pgm = pgm::PGMIndex(keys); + merge_info info = {0, 0}; + while (pq.size()) { + auto now = pq.peek(); + auto next = + pq.size() > 1 ? pq.peek(1) : queue_record>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone()) { + + pq.pop(); + pq.pop(); + auto &cursor1 = cursors[now.version]; + auto &cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) + pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) + pq.push(cursor2.ptr, next.version); + } else { + auto &cursor = cursors[now.version]; + /* skip over records that have been deleted via tagging */ + if (!cursor.ptr->is_deleted()) { + keys.emplace_back(cursor.ptr->rec.key); + m_data[info.record_count++] = *cursor.ptr; + + /* + * if the record is a tombstone, increment the ts count and + * insert it into the bloom filter if one has been + * provided. + */ + if (cursor.ptr->is_tombstone()) { + info.tombstone_count++; + if (m_bf) { + m_bf->insert(cursor.ptr->rec); + } + } } - } + pq.pop(); - ~PGM() { - free(m_data); - delete m_bf; + if (advance_cursor(cursor)) + pq.push(cursor.ptr, now.version); + } } - Wrapped *point_lookup(const R &rec, bool filter=false) const { - size_t idx = get_lower_bound(rec.key); - if (idx >= m_reccnt) { - return nullptr; - } + m_reccnt = info.record_count; + m_tombstone_cnt = info.tombstone_count; - while (idx < m_reccnt && m_data[idx].rec < rec) ++idx; + if (m_reccnt > 0) { + m_pgm = pgm::PGMIndex(keys); + } + } - if (m_data[idx].rec == rec) { - return m_data + idx; - } + ~PGM() { + free(m_data); + delete m_bf; + } - return nullptr; + Wrapped *point_lookup(const R &rec, bool filter = false) const { + size_t idx = get_lower_bound(rec.key); + if (idx >= m_reccnt) { + return nullptr; } - Wrapped* get_data() const { - return m_data; - } - - size_t get_record_count() const { - return m_reccnt; - } + while (idx < m_reccnt && m_data[idx].rec < rec) + ++idx; - size_t get_tombstone_count() const { - return m_tombstone_cnt; + if (m_data[idx].rec == rec) { + return m_data + idx; } - const Wrapped* get_record_at(size_t idx) const { - if (idx >= m_reccnt) return nullptr; - return m_data + idx; - } + return nullptr; + } + Wrapped *get_data() const { return m_data; } - size_t get_memory_usage() const { - return m_pgm.size_in_bytes(); - } + size_t get_record_count() const { return m_reccnt; } - size_t get_aux_memory_usage() const { - return (m_bf) ? m_bf->memory_usage() : 0; - } + size_t get_tombstone_count() const { return m_tombstone_cnt; } - size_t get_lower_bound(const K& key) const { - auto bound = m_pgm.search(key); - size_t idx = bound.lo; + const Wrapped *get_record_at(size_t idx) const { + if (idx >= m_reccnt) + return nullptr; + return m_data + idx; + } - if (idx >= m_reccnt) { - return m_reccnt; - } + size_t get_memory_usage() const { return m_pgm.size_in_bytes(); } - /* - * If the region to search is less than some pre-specified - * amount, perform a linear scan to locate the record. - */ - if (bound.hi - bound.lo < 256) { - while (idx < bound.hi && m_data[idx].rec.key < key) { - idx++; - } - } else { - /* Otherwise, perform a binary search */ - idx = bound.lo; - size_t max = bound.hi; - - while (idx < max) { - size_t mid = (idx + max) / 2; - if (key > m_data[mid].rec.key) { - idx = mid + 1; - } else { - max = mid; - } - } + size_t get_aux_memory_usage() const { + return (m_bf) ? m_bf->memory_usage() : 0; + } - } + size_t get_lower_bound(const K &key) const { + auto bound = m_pgm.search(key); + size_t idx = bound.lo; - /* - * the upper bound returned by PGM is one passed the end of the - * array. If we are at that point, we should just return "not found" - */ - if (idx == m_reccnt) { - return idx; - } + if (idx >= m_reccnt) { + return m_reccnt; + } - /* - * We may have walked one passed the actual lower bound, so check - * the index before the current one to see if it is the actual bound - */ - if (m_data[idx].rec.key > key && idx > 0 && m_data[idx-1].rec.key <= key) { - return idx-1; + /* + * If the region to search is less than some pre-specified + * amount, perform a linear scan to locate the record. + */ + if (bound.hi - bound.lo < 256) { + while (idx < bound.hi && m_data[idx].rec.key < key) { + idx++; + } + } else { + /* Otherwise, perform a binary search */ + idx = bound.lo; + size_t max = bound.hi; + + while (idx < max) { + size_t mid = (idx + max) / 2; + if (key > m_data[mid].rec.key) { + idx = mid + 1; + } else { + max = mid; } + } + } + + /* + * the upper bound returned by PGM is one passed the end of the + * array. If we are at that point, we should just return "not found" + */ + if (idx == m_reccnt) { + return idx; + } - /* - * Otherwise, check idx. If it is a valid bound, then return it, - * otherwise return "not found". - */ - return (m_data[idx].rec.key >= key) ? idx : m_reccnt; + /* + * We may have walked one passed the actual lower bound, so check + * the index before the current one to see if it is the actual bound + */ + if (m_data[idx].rec.key > key && idx > 0 && + m_data[idx - 1].rec.key <= key) { + return idx - 1; } + /* + * Otherwise, check idx. If it is a valid bound, then return it, + * otherwise return "not found". + */ + return (m_data[idx].rec.key >= key) ? idx : m_reccnt; + } + private: - Wrapped* m_data; - BloomFilter *m_bf; - size_t m_reccnt; - size_t m_tombstone_cnt; - size_t m_alloc_size; - K m_max_key; - K m_min_key; - pgm::PGMIndex m_pgm; + Wrapped *m_data; + BloomFilter *m_bf; + size_t m_reccnt; + size_t m_tombstone_cnt; + size_t m_alloc_size; + K m_max_key; + K m_min_key; + pgm::PGMIndex m_pgm; }; -} +} // namespace de diff --git a/include/shard/TrieSpline.h b/include/shard/TrieSpline.h index 1a04afc..2cd514d 100644 --- a/include/shard/TrieSpline.h +++ b/include/shard/TrieSpline.h @@ -11,319 +11,306 @@ */ #pragma once - #include #include "framework/ShardRequirements.h" -#include "ts/builder.h" #include "psu-ds/BloomFilter.h" -#include "util/bf_config.h" +#include "ts/builder.h" #include "util/SortedMerge.h" +#include "util/bf_config.h" -using psudb::CACHELINE_SIZE; using psudb::BloomFilter; +using psudb::byte; +using psudb::CACHELINE_SIZE; using psudb::PriorityQueue; using psudb::queue_record; -using psudb::byte; namespace de { -template -class TrieSpline { +template class TrieSpline { public: - typedef R RECORD; + typedef R RECORD; + private: - typedef decltype(R::key) K; - typedef decltype(R::value) V; + typedef decltype(R::key) K; + typedef decltype(R::value) V; public: - TrieSpline(BufferView buffer) - : m_reccnt(0) - , m_tombstone_cnt(0) - , m_alloc_size(0) - , m_max_key(0) - , m_min_key(0) - , m_bf(nullptr) - { - m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, - buffer.get_record_count() * - sizeof(Wrapped), - (byte**) &m_data); + TrieSpline(BufferView buffer) + : m_reccnt(0), m_tombstone_cnt(0), m_alloc_size(0), m_max_key(0), + m_min_key(0), m_bf(nullptr) { + m_alloc_size = psudb::sf_aligned_alloc( + CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped), + (byte **)&m_data); + + /* + * Copy the contents of the buffer view into a temporary buffer, and + * sort them. We still need to iterate over these temporary records to + * apply tombstone/deleted record filtering, as well as any possible + * per-record processing that is required by the shard being built. + */ + auto temp_buffer = (Wrapped *)psudb::sf_aligned_calloc( + CACHELINE_SIZE, buffer.get_record_count(), sizeof(Wrapped)); + buffer.copy_to_buffer((byte *)temp_buffer); + + auto base = temp_buffer; + auto stop = base + buffer.get_record_count(); + std::sort(base, stop, std::less>()); + + auto tmp_min_key = temp_buffer[0].rec.key; + auto tmp_max_key = temp_buffer[buffer.get_record_count() - 1].rec.key; + auto bldr = ts::Builder(tmp_min_key, tmp_max_key, E); + + merge_info info = {0, 0}; + + m_min_key = tmp_min_key; + m_max_key = tmp_max_key; + + /* + * Iterate over the temporary buffer to process the records, copying + * them into buffer as needed + */ + while (base < stop) { + if (!base->is_tombstone() && (base + 1 < stop) && + base->rec == (base + 1)->rec && (base + 1)->is_tombstone() && + base->rec.key != m_max_key && base->rec.key != m_min_key) { + base += 2; + continue; + } else if (base->is_deleted() && base->rec.key != m_max_key && + base->rec.key != m_min_key) { + base += 1; + continue; + } + + base->header &= 3; + bldr.AddKey(base->rec.key); + m_data[info.record_count++] = *base; + + if (base->is_tombstone()) { + info.tombstone_count++; + if (m_bf) { + m_bf->insert(base->rec); + } + } - /* - * Copy the contents of the buffer view into a temporary buffer, and - * sort them. We still need to iterate over these temporary records to - * apply tombstone/deleted record filtering, as well as any possible - * per-record processing that is required by the shard being built. - */ - auto temp_buffer = (Wrapped *) psudb::sf_aligned_calloc(CACHELINE_SIZE, - buffer.get_record_count(), - sizeof(Wrapped)); - buffer.copy_to_buffer((byte *) temp_buffer); + base++; + } - auto base = temp_buffer; - auto stop = base + buffer.get_record_count(); - std::sort(base, stop, std::less>()); + free(temp_buffer); - auto tmp_min_key = temp_buffer[0].rec.key; - auto tmp_max_key = temp_buffer[buffer.get_record_count() - 1].rec.key; - auto bldr = ts::Builder(tmp_min_key, tmp_max_key, E); + m_reccnt = info.record_count; + m_tombstone_cnt = info.tombstone_count; - merge_info info = {0, 0}; + if (m_reccnt > 50) { + m_ts = bldr.Finalize(); + } + } + + TrieSpline(std::vector const &shards) + : m_reccnt(0), m_tombstone_cnt(0), m_alloc_size(0), m_max_key(0), + m_min_key(0), m_bf(nullptr) { + size_t attemp_reccnt = 0; + size_t tombstone_count = 0; + auto cursors = build_cursor_vec(shards, &attemp_reccnt, + &tombstone_count); + + m_alloc_size = psudb::sf_aligned_alloc( + CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped), (byte **)&m_data); + + // FIXME: For smaller cursor arrays, it may be more efficient to skip + // the priority queue and just do a scan. + PriorityQueue> pq(cursors.size()); + for (size_t i = 0; i < cursors.size(); i++) { + pq.push(cursors[i].ptr, i); + } - m_min_key = tmp_min_key; - m_max_key = tmp_max_key; + auto tmp_max_key = shards[0]->m_max_key; + auto tmp_min_key = shards[0]->m_min_key; - /* - * Iterate over the temporary buffer to process the records, copying - * them into buffer as needed - */ - while (base < stop) { - if (!base->is_tombstone() && (base + 1 < stop) - && base->rec == (base + 1)->rec && (base + 1)->is_tombstone() - && base->rec.key != m_max_key && base->rec.key != m_min_key) { - base += 2; - continue; - } else if (base->is_deleted() && base->rec.key != m_max_key && base->rec.key != m_min_key) { - base += 1; - continue; - } + for (size_t i = 0; i < shards.size(); i++) { + if (shards[i]->m_max_key > tmp_max_key) { + tmp_max_key = shards[i]->m_max_key; + } - base->header &= 3; - bldr.AddKey(base->rec.key); - m_data[info.record_count++] = *base; + if (shards[i]->m_min_key < tmp_min_key) { + tmp_min_key = shards[i]->m_min_key; + } + } - if (base->is_tombstone()) { - info.tombstone_count++; - if (m_bf){ - m_bf->insert(base->rec); - } + auto bldr = ts::Builder(tmp_min_key, tmp_max_key, E); + + m_max_key = tmp_max_key; + m_min_key = tmp_min_key; + + merge_info info = {0, 0}; + while (pq.size()) { + auto now = pq.peek(); + auto next = + pq.size() > 1 ? pq.peek(1) : queue_record>{nullptr, 0}; + /* + * if the current record is not a tombstone, and the next record is + * a tombstone that matches the current one, then the current one + * has been deleted, and both it and its tombstone can be skipped + * over. Unless the tombstone would remove the maximum or + * minimum valued key, which cannot be removed at this point + * without breaking triespline + */ + if (!now.data->is_tombstone() && next.data != nullptr && + now.data->rec == next.data->rec && next.data->is_tombstone() && + now.data->rec.key != tmp_max_key && + now.data->rec.key != tmp_min_key) { + + pq.pop(); + pq.pop(); + auto &cursor1 = cursors[now.version]; + auto &cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) + pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) + pq.push(cursor2.ptr, next.version); + } else { + auto &cursor = cursors[now.version]; + /* + * skip over records that have been deleted via tagging, + * unless they are the max or min keys, which cannot be + * removed without breaking triespline + */ + if (!cursor.ptr->is_deleted() || cursor.ptr->rec.key == tmp_max_key || + cursor.ptr->rec.key == tmp_min_key) { + bldr.AddKey(cursor.ptr->rec.key); + m_data[info.record_count++] = *cursor.ptr; + + /* + * if the record is a tombstone, increment the ts count and + * insert it into the bloom filter if one has been + * provided. + */ + if (cursor.ptr->is_tombstone()) { + info.tombstone_count++; + if (m_bf) { + m_bf->insert(cursor.ptr->rec); } - - base++; + } } + pq.pop(); - free(temp_buffer); - - m_reccnt = info.record_count; - m_tombstone_cnt = info.tombstone_count; - - if (m_reccnt > 50) { - m_ts = bldr.Finalize(); - } + if (advance_cursor(cursor)) + pq.push(cursor.ptr, now.version); + } } - TrieSpline(std::vector const &shards) - : m_reccnt(0) - , m_tombstone_cnt(0) - , m_alloc_size(0) - , m_max_key(0) - , m_min_key(0) - , m_bf(nullptr) - { - size_t attemp_reccnt = 0; - size_t tombstone_count = 0; - auto cursors = build_cursor_vec(shards, &attemp_reccnt, &tombstone_count); - - m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, - attemp_reccnt * sizeof(Wrapped), - (byte **) &m_data); - - // FIXME: For smaller cursor arrays, it may be more efficient to skip - // the priority queue and just do a scan. - PriorityQueue> pq(cursors.size()); - for (size_t i=0; im_max_key; - auto tmp_min_key = shards[0]->m_min_key; - - for (size_t i=0; im_max_key > tmp_max_key) { - tmp_max_key = shards[i]->m_max_key; - } - - if (shards[i]->m_min_key < tmp_min_key) { - tmp_min_key = shards[i]->m_min_key; - } - } + m_reccnt = info.record_count; + m_tombstone_cnt = info.tombstone_count; - auto bldr = ts::Builder(tmp_min_key, tmp_max_key, E); - - m_max_key = tmp_max_key; - m_min_key = tmp_min_key; - - merge_info info = {0, 0}; - while (pq.size()) { - auto now = pq.peek(); - auto next = pq.size() > 1 ? pq.peek(1) : queue_record>{nullptr, 0}; - /* - * if the current record is not a tombstone, and the next record is - * a tombstone that matches the current one, then the current one - * has been deleted, and both it and its tombstone can be skipped - * over. Unless the tombstone would remove the maximum or - * minimum valued key, which cannot be removed at this point - * without breaking triespline - */ - if (!now.data->is_tombstone() && next.data != nullptr && - now.data->rec == next.data->rec && next.data->is_tombstone() - && now.data->rec.key != tmp_max_key && now.data->rec.key != tmp_min_key) { - - pq.pop(); pq.pop(); - auto& cursor1 = cursors[now.version]; - auto& cursor2 = cursors[next.version]; - if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); - } else { - auto& cursor = cursors[now.version]; - /* - * skip over records that have been deleted via tagging, - * unless they are the max or min keys, which cannot be - * removed without breaking triespline - */ - if (!cursor.ptr->is_deleted() || - cursor.ptr->rec.key == tmp_max_key || cursor.ptr->rec.key == tmp_min_key) { - bldr.AddKey(cursor.ptr->rec.key); - m_data[info.record_count++] = *cursor.ptr; - - /* - * if the record is a tombstone, increment the ts count and - * insert it into the bloom filter if one has been - * provided. - */ - if (cursor.ptr->is_tombstone()) { - info.tombstone_count++; - if (m_bf) { - m_bf->insert(cursor.ptr->rec); - } - } - } - pq.pop(); - - if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); - } - } + if (m_reccnt > 50) { + m_ts = bldr.Finalize(); + } + } - m_reccnt = info.record_count; - m_tombstone_cnt = info.tombstone_count; + ~TrieSpline() { + free(m_data); + delete m_bf; + } - if (m_reccnt > 50) { - m_ts = bldr.Finalize(); - } + Wrapped *point_lookup(const R &rec, bool filter = false) const { + if (filter && m_bf && !m_bf->lookup(rec)) { + return nullptr; } - ~TrieSpline() { - free(m_data); - delete m_bf; + size_t idx = get_lower_bound(rec.key); + if (idx >= m_reccnt) { + return nullptr; } - Wrapped *point_lookup(const R &rec, bool filter=false) const { - if (filter && m_bf && !m_bf->lookup(rec)) { - return nullptr; - } - - size_t idx = get_lower_bound(rec.key); - if (idx >= m_reccnt) { - return nullptr; - } - - while (idx < m_reccnt && m_data[idx].rec < rec) ++idx; - - if (m_data[idx].rec == rec) { - return m_data + idx; - } + while (idx < m_reccnt && m_data[idx].rec < rec) + ++idx; - return nullptr; + if (m_data[idx].rec == rec) { + return m_data + idx; } - Wrapped* get_data() const { - return m_data; - } - - size_t get_record_count() const { - return m_reccnt; - } + return nullptr; + } - size_t get_tombstone_count() const { - return m_tombstone_cnt; - } + Wrapped *get_data() const { return m_data; } - const Wrapped* get_record_at(size_t idx) const { - if (idx >= m_reccnt) return nullptr; - return m_data + idx; - } + size_t get_record_count() const { return m_reccnt; } + size_t get_tombstone_count() const { return m_tombstone_cnt; } - size_t get_memory_usage() const { - return m_ts.GetSize(); - } + const Wrapped *get_record_at(size_t idx) const { + if (idx >= m_reccnt) + return nullptr; + return m_data + idx; + } - size_t get_aux_memory_usage() const { - return (m_bf) ? m_bf->memory_usage() : 0; - } + size_t get_memory_usage() const { return m_ts.GetSize(); } - size_t get_lower_bound(const K& key) const { - if (m_reccnt < 50) { - size_t bd = m_reccnt; - for (size_t i=0; i= key) { - bd = i; - break; - } - } + size_t get_aux_memory_usage() const { + return (m_bf) ? m_bf->memory_usage() : 0; + } - return bd; + size_t get_lower_bound(const K &key) const { + if (m_reccnt < 50) { + size_t bd = m_reccnt; + for (size_t i = 0; i < m_reccnt; i++) { + if (m_data[i].rec.key >= key) { + bd = i; + break; } + } - auto bound = m_ts.GetSearchBound(key); - size_t idx = bound.begin; + return bd; + } - if (idx >= m_reccnt) { - return m_reccnt; - } + auto bound = m_ts.GetSearchBound(key); + size_t idx = bound.begin; - // If the region to search is less than some pre-specified - // amount, perform a linear scan to locate the record. - if (bound.end - bound.begin < 256) { - while (idx < bound.end && m_data[idx].rec.key < key) { - idx++; - } - } else { - // Otherwise, perform a binary search - idx = bound.begin; - size_t max = bound.end; - - while (idx < max) { - size_t mid = (idx + max) / 2; - if (key > m_data[mid].rec.key) { - idx = mid + 1; - } else { - max = mid; - } - } - } + if (idx >= m_reccnt) { + return m_reccnt; + } - if (idx == m_reccnt) { - return m_reccnt; + // If the region to search is less than some pre-specified + // amount, perform a linear scan to locate the record. + if (bound.end - bound.begin < 256) { + while (idx < bound.end && m_data[idx].rec.key < key) { + idx++; + } + } else { + // Otherwise, perform a binary search + idx = bound.begin; + size_t max = bound.end; + + while (idx < max) { + size_t mid = (idx + max) / 2; + if (key > m_data[mid].rec.key) { + idx = mid + 1; + } else { + max = mid; } + } + } - if (m_data[idx].rec.key > key && idx > 0 && m_data[idx-1].rec.key <= key) { - return idx-1; - } + if (idx == m_reccnt) { + return m_reccnt; + } - return idx; + if (m_data[idx].rec.key > key && idx > 0 && + m_data[idx - 1].rec.key <= key) { + return idx - 1; } -private: + return idx; + } - Wrapped* m_data; - size_t m_reccnt; - size_t m_tombstone_cnt; - size_t m_alloc_size; - K m_max_key; - K m_min_key; - ts::TrieSpline m_ts; - BloomFilter *m_bf; +private: + Wrapped *m_data; + size_t m_reccnt; + size_t m_tombstone_cnt; + size_t m_alloc_size; + K m_max_key; + K m_min_key; + ts::TrieSpline m_ts; + BloomFilter *m_bf; }; -} +} // namespace de diff --git a/include/shard/VPTree.h b/include/shard/VPTree.h index 7130efe..33ce9b9 100644 --- a/include/shard/VPTree.h +++ b/include/shard/VPTree.h @@ -15,379 +15,376 @@ #include -#include #include "framework/ShardRequirements.h" #include "psu-ds/PriorityQueue.h" +#include +using psudb::byte; using psudb::CACHELINE_SIZE; using psudb::PriorityQueue; -using psudb::byte; namespace de { -template +template class VPTree { public: - typedef R RECORD; + typedef R RECORD; private: - struct vpnode { - size_t start; - size_t stop; - bool leaf; + struct vpnode { + size_t start; + size_t stop; + bool leaf; + + double radius; + vpnode *inside; + vpnode *outside; + + vpnode() + : start(0), stop(0), leaf(false), radius(0.0), inside(nullptr), + outside(nullptr) {} + + ~vpnode() { + delete inside; + delete outside; + } + }; - double radius; - vpnode *inside; - vpnode *outside; +public: + VPTree(BufferView buffer) + : m_reccnt(0), m_tombstone_cnt(0), m_node_cnt(0), m_root(nullptr) { + + m_alloc_size = psudb::sf_aligned_alloc( + CACHELINE_SIZE, buffer.get_record_count() * sizeof(Wrapped), + (byte **)&m_data); + + m_ptrs = new vp_ptr[buffer.get_record_count()]; + m_reccnt = 0; + + // FIXME: will eventually need to figure out tombstones + // this one will likely require the multi-pass + // approach, as otherwise we'll need to sort the + // records repeatedly on each reconstruction. + for (size_t i = 0; i < buffer.get_record_count(); i++) { + auto rec = buffer.get(i); + + if (rec->is_deleted()) { + continue; + } + + rec->header &= 3; + m_data[m_reccnt] = *rec; + m_ptrs[m_reccnt].ptr = &m_data[m_reccnt]; + m_reccnt++; + } - vpnode() : start(0), stop(0), leaf(false), radius(0.0), inside(nullptr), outside(nullptr) {} + if (m_reccnt > 0) { + m_root = build_vptree(); + build_map(); + } + } - ~vpnode() { - delete inside; - delete outside; - } - }; + VPTree(std::vector shards) + : m_reccnt(0), m_tombstone_cnt(0), m_node_cnt(0), m_root(nullptr) { + size_t attemp_reccnt = 0; + for (size_t i = 0; i < shards.size(); i++) { + attemp_reccnt += shards[i]->get_record_count(); + } + m_alloc_size = psudb::sf_aligned_alloc( + CACHELINE_SIZE, attemp_reccnt * sizeof(Wrapped), (byte **)&m_data); + m_ptrs = new vp_ptr[attemp_reccnt]; + + // FIXME: will eventually need to figure out tombstones + // this one will likely require the multi-pass + // approach, as otherwise we'll need to sort the + // records repeatedly on each reconstruction. + for (size_t i = 0; i < shards.size(); i++) { + for (size_t j = 0; j < shards[i]->get_record_count(); j++) { + if (shards[i]->get_record_at(j)->is_deleted()) { + continue; + } -public: - VPTree(BufferView buffer) - : m_reccnt(0), m_tombstone_cnt(0), m_node_cnt(0), m_root(nullptr) { + m_data[m_reccnt] = *shards[i]->get_record_at(j); + m_ptrs[m_reccnt].ptr = &m_data[m_reccnt]; + m_reccnt++; + } + } + if (m_reccnt > 0) { + m_root = build_vptree(); + build_map(); + } + } - m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, - buffer.get_record_count() * - sizeof(Wrapped), - (byte**) &m_data); + ~VPTree() { + free(m_data); + delete m_root; + delete[] m_ptrs; + } - m_ptrs = new vp_ptr[buffer.get_record_count()]; - m_reccnt = 0; + Wrapped *point_lookup(const R &rec, bool filter = false) { + if constexpr (HMAP) { + auto idx = m_lookup_map.find(rec); - // FIXME: will eventually need to figure out tombstones - // this one will likely require the multi-pass - // approach, as otherwise we'll need to sort the - // records repeatedly on each reconstruction. - for (size_t i=0; iis_deleted()) { - continue; - } + return m_data + idx->second; + } else { + vpnode *node = m_root; - rec->header &= 3; - m_data[m_reccnt] = *rec; - m_ptrs[m_reccnt].ptr = &m_data[m_reccnt]; - m_reccnt++; + while (!node->leaf && m_ptrs[node->start].ptr->rec != rec) { + if (rec.calc_distance((m_ptrs[node->start].ptr->rec)) >= node->radius) { + node = node->outside; + } else { + node = node->inside; } + } - if (m_reccnt > 0) { - m_root = build_vptree(); - build_map(); + for (size_t i = node->start; i <= node->stop; i++) { + if (m_ptrs[i].ptr->rec == rec) { + return m_ptrs[i].ptr; } + } + + return nullptr; } + } - VPTree(std::vector shards) - : m_reccnt(0), m_tombstone_cnt(0), m_node_cnt(0), m_root(nullptr) { + Wrapped *get_data() const { return m_data; } - size_t attemp_reccnt = 0; - for (size_t i=0; iget_record_count(); - } + size_t get_record_count() const { return m_reccnt; } - m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE, - attemp_reccnt * sizeof(Wrapped), - (byte **) &m_data); - m_ptrs = new vp_ptr[attemp_reccnt]; - - // FIXME: will eventually need to figure out tombstones - // this one will likely require the multi-pass - // approach, as otherwise we'll need to sort the - // records repeatedly on each reconstruction. - for (size_t i=0; iget_record_count(); j++) { - if (shards[i]->get_record_at(j)->is_deleted()) { - continue; - } - - m_data[m_reccnt] = *shards[i]->get_record_at(j); - m_ptrs[m_reccnt].ptr = &m_data[m_reccnt]; - m_reccnt++; - } - } + size_t get_tombstone_count() const { return m_tombstone_cnt; } - if (m_reccnt > 0) { - m_root = build_vptree(); - build_map(); - } - } + const Wrapped *get_record_at(size_t idx) const { + if (idx >= m_reccnt) + return nullptr; + return m_data + idx; + } - ~VPTree() { - free(m_data); - delete m_root; - delete[] m_ptrs; - } + size_t get_memory_usage() const { + return m_node_cnt * sizeof(vpnode) + m_reccnt * sizeof(R *); + } - Wrapped *point_lookup(const R &rec, bool filter=false) { - if constexpr (HMAP) { - auto idx = m_lookup_map.find(rec); + size_t get_aux_memory_usage() const { + // FIXME: need to return the size of the unordered_map + return 0; + } - if (idx == m_lookup_map.end()) { - return nullptr; - } + void search(const R &point, size_t k, + PriorityQueue, DistCmpMax>> &pq) { + double farthest = std::numeric_limits::max(); - return m_data + idx->second; - } else { - vpnode *node = m_root; - - while (!node->leaf && m_ptrs[node->start].ptr->rec != rec) { - if (rec.calc_distance((m_ptrs[node->start].ptr->rec)) >= node->radius) { - node = node->outside; - } else { - node = node->inside; - } - } - - for (size_t i=node->start; i<=node->stop; i++) { - if (m_ptrs[i].ptr->rec == rec) { - return m_ptrs[i].ptr; - } - } - - return nullptr; - } - } + internal_search(m_root, point, k, pq, &farthest); + } - Wrapped* get_data() const { - return m_data; - } - - size_t get_record_count() const { - return m_reccnt; +private: + struct vp_ptr { + Wrapped *ptr; + double dist; + }; + Wrapped *m_data; + vp_ptr *m_ptrs; + std::unordered_map> m_lookup_map; + size_t m_reccnt; + size_t m_tombstone_cnt; + size_t m_node_cnt; + size_t m_alloc_size; + + vpnode *m_root; + + vpnode *build_vptree() { + if (m_reccnt == 0) { + return nullptr; } - size_t get_tombstone_count() const { - return m_tombstone_cnt; - } + size_t lower = 0; + size_t upper = m_reccnt - 1; - const Wrapped* get_record_at(size_t idx) const { - if (idx >= m_reccnt) return nullptr; - return m_data + idx; - } + auto rng = gsl_rng_alloc(gsl_rng_mt19937); + auto root = build_subtree(lower, upper, rng); + gsl_rng_free(rng); + return root; + } - size_t get_memory_usage() const { - return m_node_cnt * sizeof(vpnode) + m_reccnt * sizeof(R*); + void build_map() { + // Skip constructing the hashmap if disabled in the + // template parameters. + if constexpr (!HMAP) { + return; } - size_t get_aux_memory_usage() const { - // FIXME: need to return the size of the unordered_map - return 0; + for (size_t i = 0; i < m_reccnt; i++) { + // FIXME: Will need to account for tombstones here too. Under + // tombstones, it is technically possible for two otherwise identical + // instances of the same record to exist within the same shard, so + // long as one of them is a tombstone. Because the table is currently + // using the unwrapped records for the key, it isn't possible for it + // to handle this case right now. + m_lookup_map.insert({m_data[i].rec, i}); } - - void search(const R &point, size_t k, PriorityQueue, - DistCmpMax>> &pq) { - double farthest = std::numeric_limits::max(); - - internal_search(m_root, point, k, pq, &farthest); + } + + vpnode *build_subtree(size_t start, size_t stop, gsl_rng *rng) { + /* + * base-case: sometimes happens (probably because of the +1 and -1 + * in the first recursive call) + */ + if (start > stop) { + return nullptr; } -private: - struct vp_ptr { - Wrapped *ptr; - double dist; - }; - Wrapped* m_data; - vp_ptr* m_ptrs; - std::unordered_map> m_lookup_map; - size_t m_reccnt; - size_t m_tombstone_cnt; - size_t m_node_cnt; - size_t m_alloc_size; - - vpnode *m_root; - - vpnode *build_vptree() { - if (m_reccnt == 0) { - return nullptr; - } - - size_t lower = 0; - size_t upper = m_reccnt - 1; + /* base-case: create a leaf node */ + if (stop - start <= LEAFSZ) { + vpnode *node = new vpnode(); + node->start = start; + node->stop = stop; + node->leaf = true; - auto rng = gsl_rng_alloc(gsl_rng_mt19937); - auto root = build_subtree(lower, upper, rng); - gsl_rng_free(rng); - return root; + m_node_cnt++; + return node; } - void build_map() { - // Skip constructing the hashmap if disabled in the - // template parameters. - if constexpr (!HMAP) { - return; - } - - for (size_t i=0; irec.calc_distance(m_ptrs[i].ptr->rec); } - vpnode *build_subtree(size_t start, size_t stop, gsl_rng *rng) { - /* - * base-case: sometimes happens (probably because of the +1 and -1 - * in the first recursive call) - */ - if (start > stop) { - return nullptr; - } - - /* base-case: create a leaf node */ - if (stop - start <= LEAFSZ) { - vpnode *node = new vpnode(); - node->start = start; - node->stop = stop; - node->leaf = true; - - m_node_cnt++; - return node; - } - - /* - * select a random element to be the root of the - * subtree - */ - auto i = start + gsl_rng_uniform_int(rng, stop - start + 1); - swap(start, i); - - /* for efficiency, we'll pre-calculate the distances between each point and the root */ - for (size_t i=start+1; i<=stop; i++) { - m_ptrs[i].dist = m_ptrs[start].ptr->rec.calc_distance(m_ptrs[i].ptr->rec); - } - - /* - * partition elements based on their distance from the start, - * with those elements with distance falling below the median - * distance going into the left sub-array and those above - * the median in the right. This is easily done using QuickSelect. - */ - auto mid = (start + 1 + stop) / 2; - quickselect(start + 1, stop, mid, m_ptrs[start].ptr, rng); - - /* Create a new node based on this partitioning */ - vpnode *node = new vpnode(); - node->start = start; + /* + * partition elements based on their distance from the start, + * with those elements with distance falling below the median + * distance going into the left sub-array and those above + * the median in the right. This is easily done using QuickSelect. + */ + auto mid = (start + 1 + stop) / 2; + quickselect(start + 1, stop, mid, m_ptrs[start].ptr, rng); - /* store the radius of the circle used for partitioning the node. */ - node->radius = m_ptrs[start].ptr->rec.calc_distance(m_ptrs[mid].ptr->rec); - m_ptrs[start].dist = node->radius; + /* Create a new node based on this partitioning */ + vpnode *node = new vpnode(); + node->start = start; - /* recursively construct the left and right subtrees */ - node->inside = build_subtree(start + 1, mid-1, rng); - node->outside = build_subtree(mid, stop, rng); + /* store the radius of the circle used for partitioning the node. */ + node->radius = m_ptrs[start].ptr->rec.calc_distance(m_ptrs[mid].ptr->rec); + m_ptrs[start].dist = node->radius; - m_node_cnt++; + /* recursively construct the left and right subtrees */ + node->inside = build_subtree(start + 1, mid - 1, rng); + node->outside = build_subtree(mid, stop, rng); - return node; - } + m_node_cnt++; - void quickselect(size_t start, size_t stop, size_t k, Wrapped *p, gsl_rng *rng) { - if (start == stop) return; + return node; + } - auto pivot = partition(start, stop, p, rng); + void quickselect(size_t start, size_t stop, size_t k, Wrapped *p, + gsl_rng *rng) { + if (start == stop) + return; - if (k < pivot) { - quickselect(start, pivot - 1, k, p, rng); - } else if (k > pivot) { - quickselect(pivot + 1, stop, k, p, rng); - } - } + auto pivot = partition(start, stop, p, rng); - // TODO: The quickselect code can probably be generalized and moved out - // to psudb-common instead. - size_t partition(size_t start, size_t stop, Wrapped *p, gsl_rng *rng) { - auto pivot = start + gsl_rng_uniform_int(rng, stop - start); - //double pivot_dist = p->rec.calc_distance(m_ptrs[pivot]->rec); - - swap(pivot, stop); - - size_t j = start; - for (size_t i=start; irec.calc_distance(m_ptrs[i]->rec)); - //if (distances[i - start] < distances[stop - start]) { - //if (p->rec .calc_distance(m_ptrs[i]->rec) < pivot_dist) { - swap(j, i); - j++; - } - } - - swap(j, stop); - return j; + if (k < pivot) { + quickselect(start, pivot - 1, k, p, rng); + } else if (k > pivot) { + quickselect(pivot + 1, stop, k, p, rng); } - - void swap(size_t idx1, size_t idx2) { - auto tmp = m_ptrs[idx1]; - m_ptrs[idx1] = m_ptrs[idx2]; - m_ptrs[idx2] = tmp; + } + + // TODO: The quickselect code can probably be generalized and moved out + // to psudb-common instead. + size_t partition(size_t start, size_t stop, Wrapped *p, gsl_rng *rng) { + auto pivot = start + gsl_rng_uniform_int(rng, stop - start); + // double pivot_dist = p->rec.calc_distance(m_ptrs[pivot]->rec); + + swap(pivot, stop); + + size_t j = start; + for (size_t i = start; i < stop; i++) { + if (m_ptrs[i].dist < m_ptrs[stop].dist) { + // assert(distances[i - start] == p->rec.calc_distance(m_ptrs[i]->rec)); + // if (distances[i - start] < distances[stop - start]) { + // if (p->rec .calc_distance(m_ptrs[i]->rec) < pivot_dist) { + swap(j, i); + j++; + } } - void internal_search(vpnode *node, const R &point, size_t k, PriorityQueue, - DistCmpMax>> &pq, double *farthest) { - - if (node == nullptr) return; + swap(j, stop); + return j; + } - if (node->leaf) { - for (size_t i=node->start; i<=node->stop; i++) { - double d = point.calc_distance(m_ptrs[i].ptr->rec); - if (d < *farthest) { - if (pq.size() == k) { - pq.pop(); - } + void swap(size_t idx1, size_t idx2) { + auto tmp = m_ptrs[idx1]; + m_ptrs[idx1] = m_ptrs[idx2]; + m_ptrs[idx2] = tmp; + } - pq.push(m_ptrs[i].ptr); - if (pq.size() == k) { - *farthest = point.calc_distance(pq.peek().data->rec); - } - } - } + void internal_search(vpnode *node, const R &point, size_t k, + PriorityQueue, DistCmpMax>> &pq, + double *farthest) { - return; - } - - double d = point.calc_distance(m_ptrs[node->start].ptr->rec); + if (node == nullptr) + return; + if (node->leaf) { + for (size_t i = node->start; i <= node->stop; i++) { + double d = point.calc_distance(m_ptrs[i].ptr->rec); if (d < *farthest) { - if (pq.size() == k) { - pq.pop(); - } - pq.push(m_ptrs[node->start].ptr); - if (pq.size() == k) { - *farthest = point.calc_distance(pq.peek().data->rec); - } + if (pq.size() == k) { + pq.pop(); + } + + pq.push(m_ptrs[i].ptr); + if (pq.size() == k) { + *farthest = point.calc_distance(pq.peek().data->rec); + } } + } - if (d < node->radius) { - if (d - (*farthest) <= node->radius) { - internal_search(node->inside, point, k, pq, farthest); - } + return; + } - if (d + (*farthest) >= node->radius) { - internal_search(node->outside, point, k, pq, farthest); - } - } else { - if (d + (*farthest) >= node->radius) { - internal_search(node->outside, point, k, pq, farthest); - } + double d = point.calc_distance(m_ptrs[node->start].ptr->rec); - if (d - (*farthest) <= node->radius) { - internal_search(node->inside, point, k, pq, farthest); - } - } + if (d < *farthest) { + if (pq.size() == k) { + pq.pop(); + } + pq.push(m_ptrs[node->start].ptr); + if (pq.size() == k) { + *farthest = point.calc_distance(pq.peek().data->rec); + } + } + + if (d < node->radius) { + if (d - (*farthest) <= node->radius) { + internal_search(node->inside, point, k, pq, farthest); + } + + if (d + (*farthest) >= node->radius) { + internal_search(node->outside, point, k, pq, farthest); + } + } else { + if (d + (*farthest) >= node->radius) { + internal_search(node->outside, point, k, pq, farthest); + } + + if (d - (*farthest) <= node->radius) { + internal_search(node->inside, point, k, pq, farthest); + } } - }; -} + } +}; +} // namespace de diff --git a/include/util/bf_config.h b/include/util/bf_config.h index 836e452..7d823a7 100644 --- a/include/util/bf_config.h +++ b/include/util/bf_config.h @@ -35,6 +35,8 @@ static size_t BF_HASH_FUNCS = 7; * Adjust the value of BF_HASH_FUNCS. The argument must be on the interval * (0, INT64_MAX], or the behavior of bloom filters is undefined. */ -[[maybe_unused]] static void BF_SET_HASHFUNC(size_t func_cnt) { BF_HASH_FUNCS = func_cnt; } +[[maybe_unused]] static void BF_SET_HASHFUNC(size_t func_cnt) { + BF_HASH_FUNCS = func_cnt; +} } // namespace de diff --git a/include/util/types.h b/include/util/types.h index 64dc773..be04d05 100644 --- a/include/util/types.h +++ b/include/util/types.h @@ -20,8 +20,8 @@ #include #include #include -#include #include +#include namespace de { @@ -76,21 +76,18 @@ constexpr ShardID buffer_shid = {buffer_level_idx, all_shards_idx}; enum class ReconstructionType { Invalid, /* placeholder type */ - Flush, /* a flush of the buffer into L0 */ - Merge, /* the merging of shards in two seperate levels */ - Append, /* adding a shard directly to a level */ - Compact /* the merging of shards on one level */ + Flush, /* a flush of the buffer into L0 */ + Merge, /* the merging of shards in two seperate levels */ + Append, /* adding a shard directly to a level */ + Compact /* the merging of shards on one level */ }; - -template -struct reconstruction_results { +template struct reconstruction_results { std::shared_ptr new_shard; std::vector> source_shards; size_t target_level; size_t reccnt; long runtime; - }; typedef struct ReconstructionTask { @@ -121,11 +118,14 @@ public: total_reccnt += reccnt; } - void add_reconstruction(level_index source, level_index target, - size_t reccnt, ReconstructionType type) { + void add_reconstruction(level_index source, level_index target, size_t reccnt, + ReconstructionType type) { if (type == ReconstructionType::Merge) { - m_tasks.push_back({{{source, all_shards_idx}, {target, all_shards_idx}}, target, reccnt, type}); + m_tasks.push_back({{{source, all_shards_idx}, {target, all_shards_idx}}, + target, + reccnt, + type}); } else { m_tasks.push_back({{{source, all_shards_idx}}, target, reccnt, type}); } -- cgit v1.2.3