diff options
Diffstat (limited to 'include/framework')
26 files changed, 434 insertions, 396 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 6cfe72b..63264a0 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -70,7 +70,8 @@ public: * for various configuration parameters in the system. See * include/framework/util/Configuration.h for details. */ - DynamicExtension(ConfType &&config, double insertion_rate=1.0) : m_config(std::move(config)) { + DynamicExtension(ConfType &&config, double insertion_rate = 1.0) + : m_config(std::move(config)) { m_buffer = std::make_unique<BufferType>(m_config.buffer_flush_trigger, m_config.buffer_size); @@ -426,13 +427,10 @@ private: alignas(64) std::atomic<long> m_scheduled_records; - enum reconstruction_phase { - RECON_SCHEDULED = 1, - RECON_ENDED = -1 - }; + enum reconstruction_phase { RECON_SCHEDULED = 1, RECON_ENDED = -1 }; - - void update_insertion_delay(long runtime, size_t reccnt, reconstruction_phase phase) { + void update_insertion_delay(long runtime, size_t reccnt, + reconstruction_phase phase) { if (!m_config.dynamic_ratelimiting) { return; @@ -441,28 +439,19 @@ private: m_scheduled_reconstruction_time.fetch_add(runtime * phase); m_scheduled_records.fetch_add(reccnt * phase); - long time_per_record = (m_scheduled_records.load()) - ? m_scheduled_reconstruction_time.load() / - m_scheduled_records.load() - : 0; + size_t total_reccnt = get_record_count(); + + long time_per_record = + (total_reccnt) ? m_scheduled_reconstruction_time.load() / total_reccnt + : 0; long us = time_per_record / 1000; - long excess_ns = time_per_record - us*1000; + long excess_ns = time_per_record - us * 1000; m_stall_us.store(us); size_t records_to_1us = (excess_ns) ? 1000 / excess_ns : 0; - m_insertion_rate.store(1.0 - 1.0 / (double) records_to_1us ); - - fprintf(stderr, "[%d] Updated delay: %ld and %ld\n", phase, us, excess_ns); - - if (runtime == 0) { - fprintf(stderr, "\t[W] Predicted runtime change of 0\n"); - } - - if (reccnt == 0) { - fprintf(stderr, "\t[W] Predicted reccnt change of 0\n"); - } + m_insertion_rate.store(1.0 - 1.0 / (double)records_to_1us); } bool restart_query(QueryArgs<ShardType, QueryType, DynamicExtension> *args, @@ -537,9 +526,9 @@ private: * this code will be bypassed in that case. */ if (args->priority == ReconstructionPriority::FLUSH) { - #ifdef DE_PRINT_SHARD_COUNT - fprintf(stdout, "S\t%ld\n", extension->get_shard_count()); - #endif +#ifdef DE_PRINT_SHARD_COUNT + fprintf(stdout, "S\t%ld\n", extension->get_shard_count()); +#endif // fprintf(stderr, "[I] Running flush (%ld)\n", recon_id); // fprintf(stderr, "[I]\t Assigned Version %ld (%ld)\n", @@ -694,9 +683,8 @@ private: extension->m_lock_mngr.release_lock(level, args->version->get_id()); } - extension->update_insertion_delay(args->predicted_runtime, - args->tasks.get_total_reccnt(), - RECON_ENDED); + extension->update_insertion_delay( + args->predicted_runtime, args->tasks.get_total_reccnt(), RECON_ENDED); } if (args->priority == ReconstructionPriority::FLUSH) { @@ -708,7 +696,6 @@ private: // args->version->get_id(), recon_id); // - /* manually delete the argument object */ delete args; } @@ -937,23 +924,33 @@ private: args->initial_version = active_version->get_id(); size_t level_idx = 0; - for (size_t i=0; i< recon.size(); i++) { - if (recon[i].target < (level_index) active_version->get_structure()->get_level_vector().size()) { + for (size_t i = 0; i < recon.size(); i++) { + if (recon[i].target < (level_index)active_version->get_structure() + ->get_level_vector() + .size()) { level_idx = recon[i].target; - args->predicted_runtime += active_version->get_structure()->get_level_vector()[level_idx]->predict_reconstruction_time(recon[i].reccnt); + args->predicted_runtime += + active_version->get_structure() + ->get_level_vector()[level_idx] + ->predict_reconstruction_time(recon[i].reccnt); } else { level_idx = recon[i].target - 1; - args->predicted_runtime += m_config.rt_level_scale * active_version->get_structure()->get_level_vector()[level_idx]->predict_reconstruction_time(recon[i].reccnt); + args->predicted_runtime += + m_config.rt_level_scale * + active_version->get_structure() + ->get_level_vector()[level_idx] + ->predict_reconstruction_time(recon[i].reccnt); } } if (args->predicted_runtime == 0) { - fprintf(stderr, "Runtime Prediction of 0 for Level %ld with %ld records\n", level_idx, args->tasks.get_total_reccnt()); + fprintf(stderr, + "Runtime Prediction of 0 for Level %ld with %ld records\n", + level_idx, args->tasks.get_total_reccnt()); } update_insertion_delay(args->predicted_runtime, - args->tasks.get_total_reccnt(), - RECON_SCHEDULED); + args->tasks.get_total_reccnt(), RECON_SCHEDULED); m_sched->schedule_job(reconstruction, args->tasks.get_total_reccnt(), args, RECONSTRUCTION); } @@ -1002,7 +999,7 @@ private: return m_buffer->append(rec, ts); } -//#ifdef _GNU_SOURCE + //#ifdef _GNU_SOURCE void set_thread_affinity() { if constexpr (std::same_as<SchedType, SerialScheduler>) { return; diff --git a/include/framework/QueryRequirements.h b/include/framework/QueryRequirements.h index dcba67e..9f56e4a 100644 --- a/include/framework/QueryRequirements.h +++ b/include/framework/QueryRequirements.h @@ -1,7 +1,7 @@ /* * include/framework/QueryRequirements.h * - * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> * * Distributed under the Modified BSD License. * @@ -11,7 +11,7 @@ */ #pragma once -#include "framework/structure/BufferView.h" #include "framework/interface/Record.h" -#include "framework/interface/Shard.h" +#include "framework/structure/BufferView.h" #include "framework/interface/Query.h" +#include "framework/interface/Shard.h" diff --git a/include/framework/ShardRequirements.h b/include/framework/ShardRequirements.h index d054030..335c916 100644 --- a/include/framework/ShardRequirements.h +++ b/include/framework/ShardRequirements.h @@ -1,7 +1,7 @@ /* * include/framework/ShardRequirements.h * - * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu> * * Distributed under the Modified BSD License. * @@ -11,7 +11,7 @@ */ #pragma once -#include "framework/structure/BufferView.h" +#include "framework/interface/Query.h" #include "framework/interface/Record.h" #include "framework/interface/Shard.h" -#include "framework/interface/Query.h" +#include "framework/structure/BufferView.h" diff --git a/include/framework/interface/Query.h b/include/framework/interface/Query.h index 97a973d..4e9a3fb 100644 --- a/include/framework/interface/Query.h +++ b/include/framework/interface/Query.h @@ -12,7 +12,6 @@ namespace de { - /* * * @@ -23,100 +22,99 @@ template <typename QUERY, typename SHARD, typename PARAMETERS = typename QUERY::Parameters, typename LOCAL = typename QUERY::LocalQuery, typename LOCAL_BUFFER = typename QUERY::LocalQueryBuffer> -concept QueryInterface = - requires(PARAMETERS *parameters, LOCAL *local, LOCAL_BUFFER *buffer_query, - SHARD *shard, std::vector<LOCAL *> &local_queries, - std::vector<LOCAL_RESULT> &local_results, RESULT &result, - BufferView<typename SHARD::RECORD> *bv) { - /* - * Given a set of query parameters and a shard, return a local query - * object for that shard. - */ - { - QUERY::local_preproc(shard, parameters) - } -> std::convertible_to<LOCAL *>; +concept QueryInterface = requires(PARAMETERS *parameters, LOCAL *local, + LOCAL_BUFFER *buffer_query, SHARD *shard, + std::vector<LOCAL *> &local_queries, + std::vector<LOCAL_RESULT> &local_results, + RESULT &result, + BufferView<typename SHARD::RECORD> *bv) { + /* + * Given a set of query parameters and a shard, return a local query + * object for that shard. + */ + { QUERY::local_preproc(shard, parameters) } -> std::convertible_to<LOCAL *>; - /* - * Given a set of query parameters and a buffer view, return a local - * query object for the buffer. - * NOTE: for interface reasons, the pointer to the buffer view MUST be - * stored inside of the local query object. The future buffer - * query routine will access the buffer by way of this pointer. - */ - { - QUERY::local_preproc_buffer(bv, parameters) - } -> std::convertible_to<LOCAL_BUFFER *>; + /* + * Given a set of query parameters and a buffer view, return a local + * query object for the buffer. + * NOTE: for interface reasons, the pointer to the buffer view MUST be + * stored inside of the local query object. The future buffer + * query routine will access the buffer by way of this pointer. + */ + { + QUERY::local_preproc_buffer(bv, parameters) + } -> std::convertible_to<LOCAL_BUFFER *>; - /* - * Given a full set of local queries, and the buffer query, make any - * necessary adjustments to the local queries in-place, to account for - * global information. If no additional processing is required, this - * function can be left empty. - */ - { QUERY::distribute_query(parameters, local_queries, buffer_query) }; + /* + * Given a full set of local queries, and the buffer query, make any + * necessary adjustments to the local queries in-place, to account for + * global information. If no additional processing is required, this + * function can be left empty. + */ + {QUERY::distribute_query(parameters, local_queries, buffer_query)}; - /* - * Answer the local query, defined by `local` against `shard` and return - * a vector of LOCAL_RESULT objects defining the query result. - */ - { QUERY::local_query(shard, local) } -> std::convertible_to<LOCAL_RESULT>; + /* + * Answer the local query, defined by `local` against `shard` and return + * a vector of LOCAL_RESULT objects defining the query result. + */ + { QUERY::local_query(shard, local) } -> std::convertible_to<LOCAL_RESULT>; - /* - * Answer the local query defined by `local` against the buffer (which - * should be accessed by a pointer inside of `local`) and return a vector - * of LOCAL_RESULT objects defining the query result. - */ - { - QUERY::local_query_buffer(buffer_query) - } -> std::convertible_to<LOCAL_RESULT>; + /* + * Answer the local query defined by `local` against the buffer (which + * should be accessed by a pointer inside of `local`) and return a vector + * of LOCAL_RESULT objects defining the query result. + */ + { + QUERY::local_query_buffer(buffer_query) + } -> std::convertible_to<LOCAL_RESULT>; - /* - * Process the local results from the buffer and all of the shards, - * stored in `local_results`, and insert the associated ResultType - * objects into the `result` vector, which represents the final result - * of the query. Updates to this vector are done in-place. - */ - { QUERY::combine(local_results, parameters, result) }; + /* + * Process the local results from the buffer and all of the shards, + * stored in `local_results`, and insert the associated ResultType + * objects into the `result` vector, which represents the final result + * of the query. Updates to this vector are done in-place. + */ + {QUERY::combine(local_results, parameters, result)}; - /* - * Process the post-combine `result` vector of ResultType objects, - * in the context of the global and local query parameters, to determine - * if the query should be repeated. If so, make any necessary adjustments - * to the local query objects and return True. Otherwise, return False. - * - * If no repetition is needed for a given problem type, simply return - * False immediately and the query will end. - */ - { - QUERY::repeat(parameters, result, local_queries, buffer_query) - } -> std::same_as<bool>; + /* + * Process the post-combine `result` vector of ResultType objects, + * in the context of the global and local query parameters, to determine + * if the query should be repeated. If so, make any necessary adjustments + * to the local query objects and return True. Otherwise, return False. + * + * If no repetition is needed for a given problem type, simply return + * False immediately and the query will end. + */ + { + QUERY::repeat(parameters, result, local_queries, buffer_query) + } -> std::same_as<bool>; - /* - * If this flag is True, then the query will immediately stop and return - * a result as soon as the first non-deleted LocalRecordType is found. - * Otherwise, every Shard and the buffer will be queried and the results - * merged, like normal. - * - * This is largely an optimization flag for use with point-lookup, or - * other single-record result queries - */ - { QUERY::EARLY_ABORT } -> std::convertible_to<bool>; + /* + * If this flag is True, then the query will immediately stop and return + * a result as soon as the first non-deleted LocalRecordType is found. + * Otherwise, every Shard and the buffer will be queried and the results + * merged, like normal. + * + * This is largely an optimization flag for use with point-lookup, or + * other single-record result queries + */ + { QUERY::EARLY_ABORT } -> std::convertible_to<bool>; - /* - * If false, the built-in delete filtering that the framework can - * apply to the local results, prior to calling combine, will be skipped. - * This general filtering can be inefficient, particularly for tombstone - * -based deletes, and so if a more efficient manual filtering can be - * performed, it is worth setting this to True and doing that filtering - * in the combine step. - * - * If deletes are not a consideration for your problem, it's also best - * to turn this off, as it'll avoid the framework making an extra pass - * over the local results prior to combining them. - * - * TODO: Temporarily disabling this, as we've dropped framework-level - * delete filtering for the time being. - */ - /* { QUERY::SKIP_DELETE_FILTER } -> std::convertible_to<bool>; */ - }; + /* + * If false, the built-in delete filtering that the framework can + * apply to the local results, prior to calling combine, will be skipped. + * This general filtering can be inefficient, particularly for tombstone + * -based deletes, and so if a more efficient manual filtering can be + * performed, it is worth setting this to True and doing that filtering + * in the combine step. + * + * If deletes are not a consideration for your problem, it's also best + * to turn this off, as it'll avoid the framework making an extra pass + * over the local results prior to combining them. + * + * TODO: Temporarily disabling this, as we've dropped framework-level + * delete filtering for the time being. + */ + /* { QUERY::SKIP_DELETE_FILTER } -> std::convertible_to<bool>; */ +}; } // namespace de diff --git a/include/framework/interface/Scheduler.h b/include/framework/interface/Scheduler.h index d76a6c8..0724bce 100644 --- a/include/framework/interface/Scheduler.h +++ b/include/framework/interface/Scheduler.h @@ -14,7 +14,7 @@ template <typename SchedType> concept SchedulerInterface = requires(SchedType s, size_t i, void *vp, de::Job j) { {SchedType(i, i)}; - {s.schedule_job(j, i, vp, i)} -> std::convertible_to<void>; + { s.schedule_job(j, i, vp, i) } -> std::convertible_to<void>; {s.shutdown()}; {s.print_statistics()}; }; diff --git a/include/framework/interface/Shard.h b/include/framework/interface/Shard.h index fb5ce1a..e3a2de4 100644 --- a/include/framework/interface/Shard.h +++ b/include/framework/interface/Shard.h @@ -14,8 +14,8 @@ namespace de { template <typename SHARD> concept ShardInterface = RecordInterface<typename SHARD::RECORD> && - requires(SHARD shard, const std::vector<const SHARD *> &shard_vector, bool b, - BufferView<typename SHARD::RECORD> bv, + requires(SHARD shard, const std::vector<const SHARD *> &shard_vector, + bool b, BufferView<typename SHARD::RECORD> bv, typename SHARD::RECORD rec) { /* construct a shard from a vector of shards of the same type */ {SHARD(shard_vector)}; @@ -52,7 +52,6 @@ concept ShardInterface = RecordInterface<typename SHARD::RECORD> && * use only at the moment */ { shard.get_aux_memory_usage() } -> std::convertible_to<size_t>; - }; template <typename SHARD> diff --git a/include/framework/reconstruction/BSMPolicy.h b/include/framework/reconstruction/BSMPolicy.h index 61f379e..db4c8d4 100644 --- a/include/framework/reconstruction/BSMPolicy.h +++ b/include/framework/reconstruction/BSMPolicy.h @@ -36,7 +36,8 @@ public: ReconstructionVector reconstructions; auto levels = version->get_structure()->get_level_vector(); - level_index target_level = find_reconstruction_target(levels, version->get_structure()->get_record_count()); + level_index target_level = find_reconstruction_target( + levels, version->get_structure()->get_record_count()); assert(target_level != -1); level_index source_level = 0; @@ -49,7 +50,8 @@ public: task.target = target_level; size_t reccnt = 0; - if (target_level < (ssize_t)levels.size() && levels[target_level]->get_record_count() > 0) { + if (target_level < (ssize_t)levels.size() && + levels[target_level]->get_record_count() > 0) { task.sources.push_back({target_level, all_shards_idx}); task.type = ReconstructionType::Merge; } else { @@ -71,7 +73,8 @@ public: } private: - level_index find_reconstruction_target(LevelVector &levels, size_t reccnt) const { + level_index find_reconstruction_target(LevelVector &levels, + size_t reccnt) const { level_index target_level = invalid_level_idx; for (level_index i = 1; i < (level_index)levels.size(); i++) { @@ -85,7 +88,8 @@ private: } inline size_t capacity(level_index level, size_t reccnt) const { - double base = std::ceil(m_scale_factor * std::pow<double>(std::log10(reccnt), m_size_modifier)); + double base = std::ceil( + m_scale_factor * std::pow<double>(std::log10(reccnt), m_size_modifier)); return m_buffer_size * (base - 1) * pow(base, level + 1); } diff --git a/include/framework/reconstruction/BackgroundTieringPolicy.h b/include/framework/reconstruction/BackgroundTieringPolicy.h index 36556a2..1c000b9 100644 --- a/include/framework/reconstruction/BackgroundTieringPolicy.h +++ b/include/framework/reconstruction/BackgroundTieringPolicy.h @@ -16,16 +16,20 @@ namespace de { template <ShardInterface ShardType, QueryInterface<ShardType> QueryType> -class BackgroundTieringPolicy : public ReconstructionPolicy<ShardType, QueryType> { +class BackgroundTieringPolicy + : public ReconstructionPolicy<ShardType, QueryType> { typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> LevelVector; public: - BackgroundTieringPolicy(size_t scale_factor, size_t buffer_size, size_t size_modifier=0) - : m_scale_factor(scale_factor), m_buffer_size(buffer_size), m_size_modifier(size_modifier) {} - - std::vector<ReconstructionVector> get_reconstruction_tasks( - const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override { + BackgroundTieringPolicy(size_t scale_factor, size_t buffer_size, + size_t size_modifier = 0) + : m_scale_factor(scale_factor), m_buffer_size(buffer_size), + m_size_modifier(size_modifier) {} + + std::vector<ReconstructionVector> + get_reconstruction_tasks(const Version<ShardType, QueryType> *version, + LockManager &lock_mngr) const override { std::vector<ReconstructionVector> reconstructions; auto levels = version->get_structure()->get_level_vector(); @@ -34,7 +38,8 @@ public: return {}; } - level_index target_level = find_reconstruction_target(levels, version->get_structure()->get_record_count()); + level_index target_level = find_reconstruction_target( + levels, version->get_structure()->get_record_count()); assert(target_level != -1); level_index source_level = 0; @@ -44,19 +49,21 @@ public: } for (level_index i = target_level; i > source_level; i--) { - if (lock_mngr.take_lock(i-1, version->get_id())) { + if (lock_mngr.take_lock(i - 1, version->get_id())) { ReconstructionVector recon; size_t total_reccnt = levels[i - 1]->get_record_count(); std::vector<ShardID> shards; - for (ssize_t j=0; j<(ssize_t)levels[i-1]->get_shard_count(); j++) { - shards.push_back({i-1, j}); + for (ssize_t j = 0; j < (ssize_t)levels[i - 1]->get_shard_count(); + j++) { + shards.push_back({i - 1, j}); } - recon.add_reconstruction(shards, i, total_reccnt, ReconstructionType::Compact); + recon.add_reconstruction(shards, i, total_reccnt, + ReconstructionType::Compact); reconstructions.push_back(recon); } } - + return reconstructions; } @@ -68,7 +75,8 @@ public: } private: - level_index find_reconstruction_target(LevelVector &levels, size_t reccnt) const { + level_index find_reconstruction_target(LevelVector &levels, + size_t reccnt) const { level_index target_level = invalid_level_idx; for (level_index i = 1; i < (level_index)levels.size(); i++) { @@ -81,7 +89,9 @@ private: return target_level; } - inline size_t capacity(size_t reccnt) const { return m_scale_factor * std::pow(std::log(reccnt), m_size_modifier); } + inline size_t capacity(size_t reccnt) const { + return m_scale_factor * std::pow(std::log(reccnt), m_size_modifier); + } size_t m_scale_factor; size_t m_buffer_size; diff --git a/include/framework/reconstruction/CompactOnFull.h b/include/framework/reconstruction/CompactOnFull.h index f5e0400..f0b549d 100644 --- a/include/framework/reconstruction/CompactOnFull.h +++ b/include/framework/reconstruction/CompactOnFull.h @@ -21,11 +21,14 @@ class CompactOnFull : public ReconstructionPolicy<ShardType, QueryType> { LevelVector; public: - CompactOnFull(size_t scale_factor, size_t buffer_size, size_t size_modifier=0) - : m_scale_factor(scale_factor), m_buffer_size(buffer_size), m_size_modifier(size_modifier) {} - - std::vector<ReconstructionVector> get_reconstruction_tasks( - const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override { + CompactOnFull(size_t scale_factor, size_t buffer_size, + size_t size_modifier = 0) + : m_scale_factor(scale_factor), m_buffer_size(buffer_size), + m_size_modifier(size_modifier) {} + + std::vector<ReconstructionVector> + get_reconstruction_tasks(const Version<ShardType, QueryType> *version, + LockManager &lock_mngr) const override { std::vector<ReconstructionVector> reconstructions; auto levels = version->get_structure()->get_level_vector(); @@ -34,23 +37,24 @@ public: return {}; } - for (level_index i=0; i < (ssize_t) levels.size(); i++) { - if (levels[i]->get_shard_count() >= m_scale_factor && lock_mngr.take_lock(i, version->get_id())) { + for (level_index i = 0; i < (ssize_t)levels.size(); i++) { + if (levels[i]->get_shard_count() >= m_scale_factor && + lock_mngr.take_lock(i, version->get_id())) { ReconstructionVector recon; size_t total_reccnt = levels[i]->get_record_count(); std::vector<ShardID> shards; - for (ssize_t j=0; j<(ssize_t) levels[i]->get_shard_count(); j++) { - shards.push_back({i,j}); + for (ssize_t j = 0; j < (ssize_t)levels[i]->get_shard_count(); j++) { + shards.push_back({i, j}); } - recon.add_reconstruction(shards, i+1, total_reccnt, ReconstructionType::Compact); + recon.add_reconstruction(shards, i + 1, total_reccnt, + ReconstructionType::Compact); reconstructions.push_back(recon); } } - return reconstructions; - } - + return reconstructions; + } ReconstructionVector get_flush_tasks(const Version<ShardType, QueryType> *version) const override { @@ -60,7 +64,8 @@ public: } private: - level_index find_reconstruction_target(LevelVector &levels, size_t reccnt) const { + level_index find_reconstruction_target(LevelVector &levels, + size_t reccnt) const { level_index target_level = invalid_level_idx; for (level_index i = 1; i < (level_index)levels.size(); i++) { @@ -73,7 +78,9 @@ private: return target_level; } - inline size_t capacity(size_t reccnt) const { return m_scale_factor * std::pow(std::log(reccnt), m_size_modifier); } + inline size_t capacity(size_t reccnt) const { + return m_scale_factor * std::pow(std::log(reccnt), m_size_modifier); + } size_t m_scale_factor; size_t m_buffer_size; diff --git a/include/framework/reconstruction/FixedShardCountPolicy.h b/include/framework/reconstruction/FixedShardCountPolicy.h index cc8dce4..52d9f39 100644 --- a/include/framework/reconstruction/FixedShardCountPolicy.h +++ b/include/framework/reconstruction/FixedShardCountPolicy.h @@ -16,16 +16,20 @@ namespace de { template <ShardInterface ShardType, QueryInterface<ShardType> QueryType> -class FixedShardCountPolicy : public ReconstructionPolicy<ShardType, QueryType> { +class FixedShardCountPolicy + : public ReconstructionPolicy<ShardType, QueryType> { typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> LevelVector; public: - FixedShardCountPolicy(size_t buffer_size, size_t shard_count, size_t max_record_count) - : m_buffer_size(buffer_size), m_shard_count(shard_count), m_max_reccnt(max_record_count) {} + FixedShardCountPolicy(size_t buffer_size, size_t shard_count, + size_t max_record_count) + : m_buffer_size(buffer_size), m_shard_count(shard_count), + m_max_reccnt(max_record_count) {} std::vector<ReconstructionVector> - get_reconstruction_tasks(const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override { + get_reconstruction_tasks(const Version<ShardType, QueryType> *version, + LockManager &lock_mngr) const override { return {}; } @@ -37,26 +41,26 @@ public: /* if this is the very first flush, there won't be an L1 yet */ if (levels.size() > 1 && levels[1]->get_shard_count() > 0) { - ShardID last_shid = {1, (shard_index) (levels[1]->get_shard_count() - 1)}; - if (levels[1]->get_shard(last_shid.shard_idx)->get_record_count() + m_buffer_size <= capacity()) { - auto task = ReconstructionTask { - {{0, 0}, last_shid}, 1, m_buffer_size,ReconstructionType::Merge - }; + ShardID last_shid = {1, (shard_index)(levels[1]->get_shard_count() - 1)}; + if (levels[1]->get_shard(last_shid.shard_idx)->get_record_count() + + m_buffer_size <= + capacity()) { + auto task = ReconstructionTask{ + {{0, 0}, last_shid}, 1, m_buffer_size, ReconstructionType::Merge}; v.add_reconstruction(task); return v; } } - auto task = ReconstructionTask { - {{0, 0}}, 1, m_buffer_size, ReconstructionType::Append - }; + auto task = ReconstructionTask{ + {{0, 0}}, 1, m_buffer_size, ReconstructionType::Append}; v.add_reconstruction(task); return v; } private: inline size_t capacity() const { - double bps = (double) m_max_reccnt / (double) m_buffer_size / m_shard_count; + double bps = (double)m_max_reccnt / (double)m_buffer_size / m_shard_count; return ceil(bps) * m_buffer_size; } diff --git a/include/framework/reconstruction/FloodL0Policy.h b/include/framework/reconstruction/FloodL0Policy.h index c0d29fe..7e38e02 100644 --- a/include/framework/reconstruction/FloodL0Policy.h +++ b/include/framework/reconstruction/FloodL0Policy.h @@ -24,7 +24,8 @@ public: FloodL0Policy(size_t buffer_size) : m_buffer_size(buffer_size) {} std::vector<ReconstructionVector> - get_reconstruction_tasks(const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override { + get_reconstruction_tasks(const Version<ShardType, QueryType> *version, + LockManager &lock_mngr) const override { return {}; } diff --git a/include/framework/reconstruction/LevelingPolicy.h b/include/framework/reconstruction/LevelingPolicy.h index 955bc02..3e0afaa 100644 --- a/include/framework/reconstruction/LevelingPolicy.h +++ b/include/framework/reconstruction/LevelingPolicy.h @@ -58,8 +58,9 @@ public: (target_level == 1) ? m_buffer_size + target_reccnt : levels[target_level - 1]->get_record_count() + target_reccnt; - auto type = (target_level >= (level_index) levels.size()) ? ReconstructionType::Append - : ReconstructionType::Merge; + auto type = (target_level >= (level_index)levels.size()) + ? ReconstructionType::Append + : ReconstructionType::Merge; reconstructions.add_reconstruction(target_level - 1, target_level, total_reccnt, type); @@ -95,8 +96,9 @@ private: inline size_t capacity(level_index level, size_t reccnt) const { return m_buffer_size * - pow(m_scale_factor * std::ceil(std::pow<double>(std::log10(reccnt), m_size_modifier)), level); - + pow(m_scale_factor * std::ceil(std::pow<double>(std::log10(reccnt), + m_size_modifier)), + level); } size_t m_scale_factor; diff --git a/include/framework/reconstruction/ReconstructionPolicy.h b/include/framework/reconstruction/ReconstructionPolicy.h index 41a2092..3c842b2 100644 --- a/include/framework/reconstruction/ReconstructionPolicy.h +++ b/include/framework/reconstruction/ReconstructionPolicy.h @@ -12,19 +12,22 @@ */ #pragma once -#include "util/types.h" -#include "framework/structure/ExtensionStructure.h" -#include "framework/scheduling/Version.h" #include "framework/scheduling/LockManager.h" +#include "framework/scheduling/Version.h" +#include "framework/structure/ExtensionStructure.h" +#include "util/types.h" namespace de { -template<ShardInterface ShardType, QueryInterface<ShardType> QueryType> +template <ShardInterface ShardType, QueryInterface<ShardType> QueryType> class ReconstructionPolicy { typedef ExtensionStructure<ShardType, QueryType> StructureType; public: ReconstructionPolicy() {} - virtual std::vector<ReconstructionVector> get_reconstruction_tasks(const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const = 0; - virtual ReconstructionVector get_flush_tasks(const Version<ShardType, QueryType> *version) const = 0; - }; -} + virtual std::vector<ReconstructionVector> + get_reconstruction_tasks(const Version<ShardType, QueryType> *version, + LockManager &lock_mngr) const = 0; + virtual ReconstructionVector + get_flush_tasks(const Version<ShardType, QueryType> *version) const = 0; +}; +} // namespace de diff --git a/include/framework/reconstruction/TieringPolicy.h b/include/framework/reconstruction/TieringPolicy.h index b1fcb49..c16c427 100644 --- a/include/framework/reconstruction/TieringPolicy.h +++ b/include/framework/reconstruction/TieringPolicy.h @@ -21,11 +21,13 @@ class TieringPolicy : public ReconstructionPolicy<ShardType, QueryType> { LevelVector; public: - TieringPolicy(size_t scale_factor, size_t buffer_size, double modifier=0) - : m_scale_factor(scale_factor), m_buffer_size(buffer_size), m_size_modifier(modifier) {} + TieringPolicy(size_t scale_factor, size_t buffer_size, double modifier = 0) + : m_scale_factor(scale_factor), m_buffer_size(buffer_size), + m_size_modifier(modifier) {} - std::vector<ReconstructionVector> get_reconstruction_tasks( - const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override { + std::vector<ReconstructionVector> + get_reconstruction_tasks(const Version<ShardType, QueryType> *version, + LockManager &lock_mngr) const override { return {}; } @@ -34,7 +36,8 @@ public: ReconstructionVector reconstructions; auto levels = version->get_structure()->get_level_vector(); - level_index target_level = find_reconstruction_target(levels, version->get_structure()->get_record_count()); + level_index target_level = find_reconstruction_target( + levels, version->get_structure()->get_record_count()); assert(target_level != -1); level_index source_level = 0; @@ -47,12 +50,13 @@ public: size_t total_reccnt = levels[i - 1]->get_record_count(); std::vector<ShardID> shards; - for (ssize_t j=0; j<(ssize_t)levels[i-1]->get_shard_count(); j++) { - shards.push_back({i-1, j}); + for (ssize_t j = 0; j < (ssize_t)levels[i - 1]->get_shard_count(); j++) { + shards.push_back({i - 1, j}); } if (total_reccnt > 0 || shards.size() > 0) { - reconstructions.add_reconstruction(shards, i, total_reccnt, ReconstructionType::Compact); + reconstructions.add_reconstruction(shards, i, total_reccnt, + ReconstructionType::Compact); } } @@ -60,7 +64,8 @@ public: } private: - level_index find_reconstruction_target(LevelVector &levels, size_t reccnt) const { + level_index find_reconstruction_target(LevelVector &levels, + size_t reccnt) const { level_index target_level = invalid_level_idx; for (level_index i = 1; i < (level_index)levels.size(); i++) { @@ -74,7 +79,8 @@ private: } inline size_t capacity(size_t reccnt) const { - return std::ceil((double) m_scale_factor * std::pow<double>(std::log10(reccnt), m_size_modifier)); + return std::ceil((double)m_scale_factor * + std::pow<double>(std::log10(reccnt), m_size_modifier)); } size_t m_scale_factor; diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h index 7583727..a642b31 100644 --- a/include/framework/scheduling/Epoch.h +++ b/include/framework/scheduling/Epoch.h @@ -60,7 +60,9 @@ public: Structure *get_mutable_structure() { return m_structure; } - BufView get_buffer() const { return m_buffer->get_buffer_view(m_buffer_head); } + BufView get_buffer() const { + return m_buffer->get_buffer_view(m_buffer_head); + } /* * Returns a new Epoch object that is a copy of this one. The new object diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h index 8fbe07c..ef6dde5 100644 --- a/include/framework/scheduling/FIFOScheduler.h +++ b/include/framework/scheduling/FIFOScheduler.h @@ -61,7 +61,8 @@ public: std::unique_lock<std::mutex> lk(m_cv_lock); m_stats.job_queued(ts, type, size); - m_task_queue.push(Task(size, ts, job, args, type, &m_stats, nullptr, &m_cv)); + m_task_queue.push( + Task(size, ts, job, args, type, &m_stats, nullptr, &m_cv)); m_cv.notify_all(); } @@ -81,7 +82,6 @@ private: [[maybe_unused]] size_t m_memory_budget; size_t m_thrd_cnt; - std::atomic<size_t> m_counter; std::mutex m_cv_lock; std::condition_variable m_cv; diff --git a/include/framework/scheduling/LockManager.h b/include/framework/scheduling/LockManager.h index a40cf7a..820b975 100644 --- a/include/framework/scheduling/LockManager.h +++ b/include/framework/scheduling/LockManager.h @@ -3,15 +3,15 @@ */ #pragma once -#include <deque> #include <atomic> #include <cassert> +#include <deque> namespace de { class LockManager { public: - LockManager(size_t levels=1) { - for (size_t i=0; i < levels; i++) { + LockManager(size_t levels = 1) { + for (size_t i = 0; i < levels; i++) { m_lks.emplace_back(false); } @@ -20,9 +20,7 @@ public: ~LockManager() = default; - void add_lock() { - m_lks.emplace_back(false); - } + void add_lock() { m_lks.emplace_back(false); } void release_lock(size_t idx, size_t version) { if (idx < m_lks.size()) { @@ -72,17 +70,13 @@ public: return false; } - bool is_buffer_locked() { - return m_buffer_lk.load(); - } + bool is_buffer_locked() { return m_buffer_lk.load(); } - void release_buffer_lock() { - m_buffer_lk.store(false); - } + void release_buffer_lock() { m_buffer_lk.store(false); } private: std::deque<std::atomic<bool>> m_lks; std::atomic<bool> m_buffer_lk; std::atomic<size_t> m_last_unlocked_version; }; -} +} // namespace de diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h index b6ebe53..6a8ed58 100644 --- a/include/framework/scheduling/SerialScheduler.h +++ b/include/framework/scheduling/SerialScheduler.h @@ -37,7 +37,8 @@ public: t(0); } - void shutdown() { /* intentionally left blank */ } + void shutdown() { /* intentionally left blank */ + } void print_statistics() { m_stats.print_statistics(); } void print_query_time_data() { m_stats.print_query_time_data(); } diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h index 4529b2e..00ddbbb 100644 --- a/include/framework/scheduling/Task.h +++ b/include/framework/scheduling/Task.h @@ -14,9 +14,9 @@ #pragma once #include <chrono> +#include <condition_variable> #include <functional> #include <future> -#include <condition_variable> #include "framework/scheduling/Version.h" #include "framework/scheduling/statistics.h" @@ -24,11 +24,7 @@ namespace de { -enum class ReconstructionPriority { - FLUSH = 0, - CMPCT = 1, - MAINT = 2 -}; +enum class ReconstructionPriority { FLUSH = 0, CMPCT = 1, MAINT = 2 }; template <ShardInterface ShardType, QueryInterface<ShardType> QueryType> struct ReconstructionArgs { @@ -51,7 +47,8 @@ typedef std::function<void(void *)> Job; struct Task { Task(size_t size, size_t ts, Job job, void *args, size_t type = 0, - SchedulerStatistics *stats = nullptr, std::mutex *lk = nullptr, std::condition_variable *cv=nullptr) + SchedulerStatistics *stats = nullptr, std::mutex *lk = nullptr, + std::condition_variable *cv = nullptr) : m_job(job), m_size(size), m_timestamp(ts), m_args(args), m_type(type), m_stats(stats), m_lk(lk), m_cv(cv) {} diff --git a/include/framework/scheduling/Version.h b/include/framework/scheduling/Version.h index be54c84..bbcbe25 100644 --- a/include/framework/scheduling/Version.h +++ b/include/framework/scheduling/Version.h @@ -25,18 +25,17 @@ private: typedef BufferView<RecordType> BufferViewType; public: - Version(size_t vid = 0) : m_buffer(nullptr), m_structure(nullptr), m_id(vid), m_buffer_head(0) {} - Version(size_t number, std::unique_ptr<StructureType> structure, BufferType *buff, - size_t head) + Version(size_t number, std::unique_ptr<StructureType> structure, + BufferType *buff, size_t head) : m_buffer(buff), m_structure(std::move(structure)), m_id(number), m_buffer_head(head) { - if (m_buffer) { - m_buffer->take_head_reference(m_buffer_head); - } - } + if (m_buffer) { + m_buffer->take_head_reference(m_buffer_head); + } + } ~Version() { if (m_buffer) { @@ -55,7 +54,7 @@ public: size_t get_id() const { return m_id; } - void set_id(size_t id) { m_id = id;} + void set_id(size_t id) { m_id = id; } const StructureType *get_structure() const { return m_structure.get(); } @@ -107,10 +106,7 @@ public: m_structure->update_shard_version(version); } - size_t get_head() { - return m_buffer_head; - } - + size_t get_head() { return m_buffer_head; } void set_buffer(BufferType *buffer, size_t head) { assert(m_buffer == nullptr); diff --git a/include/framework/scheduling/statistics.h b/include/framework/scheduling/statistics.h index 6d9f9f0..8cb6dbd 100644 --- a/include/framework/scheduling/statistics.h +++ b/include/framework/scheduling/statistics.h @@ -16,12 +16,12 @@ #include <atomic> #include <cassert> #include <chrono> +#include <cmath> #include <cstdint> #include <cstdlib> #include <mutex> #include <unordered_map> #include <vector> -#include <cmath> namespace de { @@ -68,7 +68,6 @@ private: queue_time) .count(); } - }; public: @@ -117,10 +116,10 @@ public: size_t worst_query = 0; size_t first_query = UINT64_MAX; - for (auto &job : m_jobs) { if (job.second.type != 1) { - fprintf(stdout, "%ld %ld %ld %ld\n", job.second.id, job.second.size, job.second.runtime(), job.second.runtime() / (job.second.size)); + fprintf(stdout, "%ld %ld %ld %ld\n", job.second.id, job.second.size, + job.second.runtime(), job.second.runtime() / (job.second.size)); } if (job.first < first_query) { @@ -149,7 +148,6 @@ public: query_cnt++; } - int64_t average_queue_time = (query_cnt) ? total_queue_time / query_cnt : 0; int64_t average_runtime = (query_cnt) ? total_runtime / query_cnt : 0; @@ -162,28 +160,37 @@ public: continue; } - queue_deviation_sum += std::pow(job.second.time_in_queue() - average_queue_time, 2); - runtime_deviation_sum += std::pow(job.second.runtime() - average_runtime, 2); - + queue_deviation_sum += + std::pow(job.second.time_in_queue() - average_queue_time, 2); + runtime_deviation_sum += + std::pow(job.second.runtime() - average_runtime, 2); } - int64_t queue_stddev = (query_cnt) ? std::sqrt(queue_deviation_sum / query_cnt) : 0; - int64_t runtime_stddev = (query_cnt) ? std::sqrt(runtime_deviation_sum / query_cnt) : 0; - - - fprintf(stdout, "Query Count: %ld\tWorst Query: %ld\tFirst Query: %ld\n", query_cnt, worst_query, first_query); - fprintf(stdout, "Average Query Scheduling Delay: %ld\t Min Scheduling Delay: %ld\t Max Scheduling Delay: %ld\tStandard Deviation: %ld\n", average_queue_time, min_queue_time, max_queue_time, queue_stddev); - fprintf(stdout, "Average Query Latency: %ld\t\t Min Query Latency: %ld\t Max Query Latency: %ld\tStandard Deviation: %ld\n", average_runtime, min_runtime, max_runtime, runtime_stddev); + int64_t queue_stddev = + (query_cnt) ? std::sqrt(queue_deviation_sum / query_cnt) : 0; + int64_t runtime_stddev = + (query_cnt) ? std::sqrt(runtime_deviation_sum / query_cnt) : 0; + + fprintf(stdout, "Query Count: %ld\tWorst Query: %ld\tFirst Query: %ld\n", + query_cnt, worst_query, first_query); + fprintf(stdout, + "Average Query Scheduling Delay: %ld\t Min Scheduling Delay: %ld\t " + "Max Scheduling Delay: %ld\tStandard Deviation: %ld\n", + average_queue_time, min_queue_time, max_queue_time, queue_stddev); + fprintf(stdout, + "Average Query Latency: %ld\t\t Min Query Latency: %ld\t Max Query " + "Latency: %ld\tStandard Deviation: %ld\n", + average_runtime, min_runtime, max_runtime, runtime_stddev); } - void print_query_time_data() { std::unique_lock<std::mutex> lk(m_mutex); update_job_data_from_log(); for (auto &job : m_jobs) { if (job.second.type == 1) { - fprintf(stdout, "%ld\t%ld\t%ld\n", job.second.time_in_queue(), job.second.runtime(), job.second.end_to_end_time()); + fprintf(stdout, "%ld\t%ld\t%ld\n", job.second.time_in_queue(), + job.second.runtime(), job.second.end_to_end_time()); } } } @@ -211,18 +218,18 @@ private: auto &job = m_jobs.at(event.id); switch (event.type) { - case EventType::QUEUED: - job.queue_time = event.time; - break; - case EventType::FINISHED: - job.stop_time = event.time; - break; - case EventType::SCHEDULED: - job.scheduled_time = event.time; - break; - case EventType::STARTED: - job.start_time = event.time; - break; + case EventType::QUEUED: + job.queue_time = event.time; + break; + case EventType::FINISHED: + job.stop_time = event.time; + break; + case EventType::SCHEDULED: + job.scheduled_time = event.time; + break; + case EventType::STARTED: + job.start_time = event.time; + break; } } diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h index a9fb12d..afe5dbb 100644 --- a/include/framework/structure/BufferView.h +++ b/include/framework/structure/BufferView.h @@ -46,10 +46,9 @@ public: BufferView(Wrapped<R> *buffer, size_t cap, size_t head, size_t tail, size_t tombstone_cnt, psudb::BloomFilter<R> *filter) - : m_data(buffer), m_head(head), m_tail(tail), - m_start(m_head % cap), m_stop(m_tail % cap), m_cap(cap), - m_approx_ts_cnt(tombstone_cnt), m_tombstone_filter(filter), - m_active(true) {} + : m_data(buffer), m_head(head), m_tail(tail), m_start(m_head % cap), + m_stop(m_tail % cap), m_cap(cap), m_approx_ts_cnt(tombstone_cnt), + m_tombstone_filter(filter), m_active(true) {} ~BufferView() = default; @@ -104,9 +103,7 @@ public: */ size_t get_tombstone_count() { return m_approx_ts_cnt; } - Wrapped<R> *get(size_t i) { - return m_data + to_idx(i); - } + Wrapped<R> *get(size_t i) { return m_data + to_idx(i); } void copy_to_buffer(psudb::byte *buffer) { /* check if the region to be copied circles back to start. If so, do it in diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index bb8a480..e03c7ad 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -27,12 +27,14 @@ class ExtensionStructure { typedef BufferView<RecordType> BuffView; typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> LevelVector; + public: - ExtensionStructure(bool default_level=true) { + ExtensionStructure(bool default_level = true) { if (default_level) - m_levels.emplace_back(std::make_shared<InternalLevel<ShardType, QueryType>>(0)); + m_levels.emplace_back( + std::make_shared<InternalLevel<ShardType, QueryType>>(0)); } - + ~ExtensionStructure() = default; /* @@ -162,34 +164,37 @@ public: return cnt; } - /* * Perform the reconstruction described by task. If the resulting * reconstruction grows the structure (i.e., adds a level), returns * true. Otherwise, returns false. */ - inline reconstruction_results<ShardType> perform_reconstruction(ReconstructionTask task) const { + inline reconstruction_results<ShardType> + perform_reconstruction(ReconstructionTask task) const { reconstruction_results<ShardType> result; result.target_level = task.target; /* if there is only one source, then we don't need to actually rebuild */ if (task.sources.size() == 1) { auto shid = task.sources[0]; - if (shid.shard_idx == all_shards_idx && m_levels[shid.level_idx]->get_shard_count() > 1) { + if (shid.shard_idx == all_shards_idx && + m_levels[shid.level_idx]->get_shard_count() > 1) { /* there's more than one shard, so we need to do the reconstruction */ } else { - auto raw_shard_ptr = m_levels[shid.level_idx]->get_shard(shid.shard_idx); + auto raw_shard_ptr = + m_levels[shid.level_idx]->get_shard(shid.shard_idx); assert(raw_shard_ptr); result.source_shards.emplace_back(shid.level_idx, raw_shard_ptr); - result.new_shard = m_levels[shid.level_idx]->get_shard_ptr(shid.shard_idx).first; + result.new_shard = + m_levels[shid.level_idx]->get_shard_ptr(shid.shard_idx).first; return result; } } - - std::vector<const ShardType*> shards; + + std::vector<const ShardType *> shards; for (ShardID shid : task.sources) { - assert(shid.level_idx < (level_index) m_levels.size()); + assert(shid.level_idx < (level_index)m_levels.size()); assert(shid.shard_idx >= -1); auto raw_shard_ptr = m_levels[shid.level_idx]->get_shard(shid.shard_idx); @@ -197,12 +202,13 @@ public: result.source_shards.emplace_back(shid.level_idx, raw_shard_ptr); } - auto start = std::chrono::high_resolution_clock::now(); result.new_shard = std::make_shared<ShardType>(shards); auto stop = std::chrono::high_resolution_clock::now(); - result.runtime = std::chrono::duration_cast<std::chrono::nanoseconds>(stop- start).count(); + result.runtime = + std::chrono::duration_cast<std::chrono::nanoseconds>(stop - start) + .count(); result.reccnt = result.new_shard->get_record_count(); return result; @@ -221,11 +227,10 @@ public: return queries; } - size_t l0_size() const { - return m_levels[0]->get_shard_count(); - } + size_t l0_size() const { return m_levels[0]->get_shard_count(); } - bool apply_reconstruction(reconstruction_results<ShardType> &recon, size_t version) { + bool apply_reconstruction(reconstruction_results<ShardType> &recon, + size_t version) { bool res = append_shard(recon.new_shard, version, recon.target_level); m_levels[recon.target_level]->update_reconstruction_model(recon); delete_shards(recon.source_shards); @@ -233,13 +238,15 @@ public: return res; } - bool append_shard(std::shared_ptr<ShardType> shard, size_t version, size_t level) { + bool append_shard(std::shared_ptr<ShardType> shard, size_t version, + size_t level) { assert(level <= m_levels.size()); auto rc = false; if (level == m_levels.size()) { /* grow the structure */ - m_levels.push_back(std::make_shared<InternalLevel<ShardType, QueryType>>(level)); + m_levels.push_back( + std::make_shared<InternalLevel<ShardType, QueryType>>(level)); rc = true; } @@ -248,12 +255,15 @@ public: return rc; } - void delete_shards(std::vector<std::pair<level_index, const ShardType*>> shards) { - for (size_t i=0; i<shards.size(); i++) { - assert(shards[i].first < (level_index) m_levels.size()); + void + delete_shards(std::vector<std::pair<level_index, const ShardType *>> shards) { + for (size_t i = 0; i < shards.size(); i++) { + assert(shards[i].first < (level_index)m_levels.size()); ssize_t shard_idx = -1; - for (size_t j=0; j<m_levels[shards[i].first]->get_shard_count(); j++) { - if (m_levels[shards[i].first]->get_shard_ptr(j).first.get() == shards[i].second) { + for (size_t j = 0; j < m_levels[shards[i].first]->get_shard_count(); + j++) { + if (m_levels[shards[i].first]->get_shard_ptr(j).first.get() == + shards[i].second) { shard_idx = j; break; } @@ -262,7 +272,8 @@ public: if (shard_idx != -1) { m_levels[shards[i].first]->delete_shard(shard_idx); } else { - fprintf(stderr, "ERROR: failed to delete shard %ld\t%p\n", shards[i].first, shards[i].second); + fprintf(stderr, "ERROR: failed to delete shard %ld\t%p\n", + shards[i].first, shards[i].second); exit(EXIT_FAILURE); } } @@ -270,51 +281,55 @@ public: LevelVector const &get_level_vector() const { return m_levels; } - - /* - * Validate that no level in the structure exceeds its maximum tombstone - * capacity. This is used to trigger preemptive compactions at the end of - * the reconstruction process. - */ - bool validate_tombstone_proportion(double max_delete_prop) const { - long double ts_prop; - for (size_t i = 0; i < m_levels.size(); i++) { - if (m_levels[i]) { - ts_prop = (long double)m_levels[i]->get_tombstone_count() / - (long double)m_levels[i]->get_record_count(); - if (ts_prop > (long double)max_delete_prop) { - return false; - } + /* + * Validate that no level in the structure exceeds its maximum tombstone + * capacity. This is used to trigger preemptive compactions at the end of + * the reconstruction process. + */ + bool validate_tombstone_proportion(double max_delete_prop) const { + long double ts_prop; + for (size_t i = 0; i < m_levels.size(); i++) { + if (m_levels[i]) { + ts_prop = (long double)m_levels[i]->get_tombstone_count() / + (long double)m_levels[i]->get_record_count(); + if (ts_prop > (long double)max_delete_prop) { + return false; } } - - return true; } - bool validate_tombstone_proportion(level_index level, double max_delete_prop) const { - long double ts_prop = (long double) m_levels[level]->get_tombstone_count() / (long double) m_levels[level]->get_record_count(); - return ts_prop <= (long double) max_delete_prop; - } + return true; + } - void print_structure(bool debug=false) const { - for (size_t i=0; i<m_levels.size(); i++) { - if (debug) { - fprintf(stdout, "[D] [%ld]:\t", i); - } else { - fprintf(stdout, "[%ld]:\t", i); - } + bool validate_tombstone_proportion(level_index level, + double max_delete_prop) const { + long double ts_prop = (long double)m_levels[level]->get_tombstone_count() / + (long double)m_levels[level]->get_record_count(); + return ts_prop <= (long double)max_delete_prop; + } - if (m_levels[i]) { - for (size_t j=0; j<m_levels[i]->get_shard_count(); j++) { - fprintf(stdout, "(%ld, %ld, %p: %ld) ", j, m_levels[i]->get_shard_ptr(j).second, m_levels[i]->get_shard_ptr(j).first.get(), m_levels[i]->get_shard(j)->get_record_count()); - } - } else { - fprintf(stdout, "[Empty]"); - } + void print_structure(bool debug = false) const { + for (size_t i = 0; i < m_levels.size(); i++) { + if (debug) { + fprintf(stdout, "[D] [%ld]:\t", i); + } else { + fprintf(stdout, "[%ld]:\t", i); + } - fprintf(stdout, "\n"); + if (m_levels[i]) { + for (size_t j = 0; j < m_levels[i]->get_shard_count(); j++) { + fprintf(stdout, "(%ld, %ld, %p: %ld) ", j, + m_levels[i]->get_shard_ptr(j).second, + m_levels[i]->get_shard_ptr(j).first.get(), + m_levels[i]->get_shard(j)->get_record_count()); + } + } else { + fprintf(stdout, "[Empty]"); } - } + + fprintf(stdout, "\n"); + } + } private: LevelVector m_levels; diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h index 205dbf9..cf2b16a 100644 --- a/include/framework/structure/InternalLevel.h +++ b/include/framework/structure/InternalLevel.h @@ -15,11 +15,11 @@ */ #pragma once +#include <algorithm> +#include <deque> #include <future> #include <memory> #include <vector> -#include <deque> -#include <algorithm> #include "framework/interface/Query.h" #include "framework/interface/Shard.h" @@ -34,7 +34,7 @@ class InternalLevel { typedef std::pair<std::shared_ptr<ShardType>, size_t> shard_ptr; public: - InternalLevel(ssize_t level_no) : m_level_no(level_no) { } + InternalLevel(ssize_t level_no) : m_level_no(level_no) {} ~InternalLevel() = default; @@ -109,7 +109,7 @@ public: idx = 0; } - if (idx >= (ssize_t) m_shards.size()) { + if (idx >= (ssize_t)m_shards.size()) { return nullptr; } @@ -183,7 +183,7 @@ public: } size_t get_nonempty_shard_count() const { - size_t cnt = 0; + size_t cnt = 0; for (size_t i = 0; i < m_shards.size(); i++) { if (m_shards[i].first && m_shards[i].first->get_record_count() > 0) { cnt += 1; @@ -199,7 +199,7 @@ public: new_level->append(m_shards[i].first, m_shards[i].second); } - for (auto itr=m_rt_window.begin(); itr < m_rt_window.end(); itr++) { + for (auto itr = m_rt_window.begin(); itr < m_rt_window.end(); itr++) { new_level->m_rt_window.push_front(*itr); } @@ -208,23 +208,21 @@ public: void truncate() { m_shards.erase(m_shards.begin(), m_shards.end()); } - void delete_shard(shard_index shard, bool log_delete=true) { + void delete_shard(shard_index shard, bool log_delete = true) { size_t before = m_shards.size(); m_shards.erase(m_shards.begin() + shard); size_t after = m_shards.size(); - assert( before > after); + assert(before > after); } - void append(std::shared_ptr<ShardType> shard, size_t version=0) { + void append(std::shared_ptr<ShardType> shard, size_t version = 0) { m_shards.push_back({shard, version}); } - void append(shard_ptr shard) { - m_shards.push_back(shard); - } + void append(shard_ptr shard) { m_shards.push_back(shard); } const shard_ptr get_shard_ptr(ssize_t idx) const { - if (idx >= 0 && idx < (ssize_t) m_shards.size()) { + if (idx >= 0 && idx < (ssize_t)m_shards.size()) { return m_shards[idx]; } else if (idx == all_shards_idx && m_shards.size() == 1) { return m_shards[0]; @@ -247,7 +245,7 @@ public: size_t total = 0; for (auto rt : m_rt_window) { total += rt; - } + } return total / m_rt_window.size(); } diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index 7357915..156b718 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -97,7 +97,9 @@ public: return true; } - size_t get_record_count() const { return m_tail.load() - m_head.load().head_idx; } + size_t get_record_count() const { + return m_tail.load() - m_head.load().head_idx; + } size_t get_capacity() const { return m_cap; } @@ -139,14 +141,16 @@ public: m_active_head_advance.store(true); if (m_old_head.load().refcnt > 0) { - //fprintf(stderr, "[W]: Refusing to advance head due to remaining reference counts [2]\n"); + // fprintf(stderr, "[W]: Refusing to advance head due to remaining + // reference counts [2]\n"); m_active_head_advance.store(false); return false; } - // fprintf(stderr, "[I] Advancing head pointer: %ld %ld %ld\n", m_old_head.load().head_idx, m_head.load().head_idx, new_head); - // fprintf(stderr, "[I] Refcnts: %ld %ld\n", m_old_head.load().refcnt, m_head.load().refcnt); - + // fprintf(stderr, "[I] Advancing head pointer: %ld %ld %ld\n", + // m_old_head.load().head_idx, m_head.load().head_idx, new_head); + // fprintf(stderr, "[I] Refcnts: %ld %ld\n", m_old_head.load().refcnt, + // m_head.load().refcnt); buffer_head new_hd = {new_head, 1}; buffer_head cur_hd; @@ -163,7 +167,6 @@ public: return true; } - void set_low_watermark(size_t lwm) { assert(lwm < m_hwm); m_lwm = lwm; @@ -197,13 +200,9 @@ public: return m_cap - (m_tail.load() - m_old_head.load().head_idx); } - size_t debug_get_old_head() const { - return m_old_head.load().head_idx; - } + size_t debug_get_old_head() const { return m_old_head.load().head_idx; } - size_t debug_get_head() const { - return m_head.load().head_idx; - } + size_t debug_get_head() const { return m_head.load().head_idx; } bool take_head_reference(size_t target_head) { buffer_head cur_hd, new_hd; @@ -225,7 +224,7 @@ public: return head_acquired; } - + bool release_head_reference(size_t head) { buffer_head cur_hd, new_hd; bool head_released = false; @@ -261,8 +260,8 @@ private: buffer_head cur_hd, new_hd; bool head_acquired = false; - - //fprintf(stderr, "[I]: getting head %ld %ld %ld\n", target_head, m_old_head.load().head_idx, m_head.load().head_idx); + // fprintf(stderr, "[I]: getting head %ld %ld %ld\n", target_head, + // m_old_head.load().head_idx, m_head.load().head_idx); do { if (m_old_head.load().head_idx == target_head) { cur_hd = m_old_head.load(); @@ -279,7 +278,7 @@ private: return new_hd.head_idx; } - + ssize_t try_advance_tail() { size_t old_value = m_tail.load(); diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h index 3ae3492..e2484d5 100644 --- a/include/framework/util/Configuration.h +++ b/include/framework/util/Configuration.h @@ -8,48 +8,49 @@ */ #pragma once +#include "framework/interface/Scheduler.h" #include "framework/reconstruction/ReconstructionPolicy.h" #include "util/types.h" -#include "framework/interface/Scheduler.h" #include <cstdlib> namespace de { template <ShardInterface ShardType, QueryInterface<ShardType> QueryType, -DeletePolicy D, SchedulerInterface SchedType> + DeletePolicy D, SchedulerInterface SchedType> class DEConfiguration { - public: - DEConfiguration(std::unique_ptr<ReconstructionPolicy<ShardType, QueryType>> recon_policy) - : recon_policy(std::move(recon_policy)) {} +public: + DEConfiguration( + std::unique_ptr<ReconstructionPolicy<ShardType, QueryType>> recon_policy) + : recon_policy(std::move(recon_policy)) {} - std::unique_ptr<ReconstructionPolicy<ShardType, QueryType>> recon_policy; + std::unique_ptr<ReconstructionPolicy<ShardType, QueryType>> recon_policy; - /* buffer parameters */ - size_t buffer_count = 1; - size_t buffer_size = 8000; - size_t buffer_flush_trigger = buffer_size / 2; + /* buffer parameters */ + size_t buffer_count = 1; + size_t buffer_size = 8000; + size_t buffer_flush_trigger = buffer_size / 2; - /* reconstruction triggers */ - bool recon_enable_seek_trigger = false; - bool recon_enable_maint_on_flush = false; - bool recon_enable_delete_cmpct = false; - bool recon_maint_disabled = true; + /* reconstruction triggers */ + bool recon_enable_seek_trigger = false; + bool recon_enable_maint_on_flush = false; + bool recon_enable_delete_cmpct = false; + bool recon_maint_disabled = true; - size_t recon_l0_capacity = 0; /* 0 for unbounded */ - double maximum_delete_proportion = 1; + size_t recon_l0_capacity = 0; /* 0 for unbounded */ + double maximum_delete_proportion = 1; - /* resource management */ - size_t maximum_threads = 16; - size_t minimum_recon_threads = 1; - size_t minimum_query_threads = 4; - size_t maximum_memory_usage = 0; /* o for unbounded */ + /* resource management */ + size_t maximum_threads = 16; + size_t minimum_recon_threads = 1; + size_t minimum_query_threads = 4; + size_t maximum_memory_usage = 0; /* o for unbounded */ - size_t physical_core_count = 6; + size_t physical_core_count = 6; - size_t buffer_flush_query_preemption_trigger = UINT64_MAX; + size_t buffer_flush_query_preemption_trigger = UINT64_MAX; - bool dynamic_ratelimiting = false; - size_t rt_level_scale = 1; + bool dynamic_ratelimiting = false; + size_t rt_level_scale = 1; }; } // namespace de |