summaryrefslogtreecommitdiffstats
path: root/include/framework
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2025-09-25 14:42:44 -0400
committerDouglas Rumbaugh <dbr4@psu.edu>2025-09-25 14:42:44 -0400
commitcf5f3bbb0cb58430ed68ad3ebfcefc009e553d71 (patch)
tree4c17bc3169ee195c236cea9c9efda0aef7488e3c /include/framework
parent826c1fff5accbaa6b415acc176a5acbeb5f691b6 (diff)
downloaddynamic-extension-cf5f3bbb0cb58430ed68ad3ebfcefc009e553d71.tar.gz
Code reformatting
Diffstat (limited to 'include/framework')
-rw-r--r--include/framework/DynamicExtension.h73
-rw-r--r--include/framework/QueryRequirements.h6
-rw-r--r--include/framework/ShardRequirements.h6
-rw-r--r--include/framework/interface/Query.h176
-rw-r--r--include/framework/interface/Scheduler.h2
-rw-r--r--include/framework/interface/Shard.h5
-rw-r--r--include/framework/reconstruction/BSMPolicy.h12
-rw-r--r--include/framework/reconstruction/BackgroundTieringPolicy.h38
-rw-r--r--include/framework/reconstruction/CompactOnFull.h37
-rw-r--r--include/framework/reconstruction/FixedShardCountPolicy.h30
-rw-r--r--include/framework/reconstruction/FloodL0Policy.h3
-rw-r--r--include/framework/reconstruction/LevelingPolicy.h10
-rw-r--r--include/framework/reconstruction/ReconstructionPolicy.h19
-rw-r--r--include/framework/reconstruction/TieringPolicy.h26
-rw-r--r--include/framework/scheduling/Epoch.h4
-rw-r--r--include/framework/scheduling/FIFOScheduler.h4
-rw-r--r--include/framework/scheduling/LockManager.h20
-rw-r--r--include/framework/scheduling/SerialScheduler.h3
-rw-r--r--include/framework/scheduling/Task.h11
-rw-r--r--include/framework/scheduling/Version.h20
-rw-r--r--include/framework/scheduling/statistics.h65
-rw-r--r--include/framework/structure/BufferView.h11
-rw-r--r--include/framework/structure/ExtensionStructure.h139
-rw-r--r--include/framework/structure/InternalLevel.h26
-rw-r--r--include/framework/structure/MutableBuffer.h31
-rw-r--r--include/framework/util/Configuration.h53
26 files changed, 434 insertions, 396 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 6cfe72b..63264a0 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -70,7 +70,8 @@ public:
* for various configuration parameters in the system. See
* include/framework/util/Configuration.h for details.
*/
- DynamicExtension(ConfType &&config, double insertion_rate=1.0) : m_config(std::move(config)) {
+ DynamicExtension(ConfType &&config, double insertion_rate = 1.0)
+ : m_config(std::move(config)) {
m_buffer = std::make_unique<BufferType>(m_config.buffer_flush_trigger,
m_config.buffer_size);
@@ -426,13 +427,10 @@ private:
alignas(64) std::atomic<long> m_scheduled_records;
- enum reconstruction_phase {
- RECON_SCHEDULED = 1,
- RECON_ENDED = -1
- };
+ enum reconstruction_phase { RECON_SCHEDULED = 1, RECON_ENDED = -1 };
-
- void update_insertion_delay(long runtime, size_t reccnt, reconstruction_phase phase) {
+ void update_insertion_delay(long runtime, size_t reccnt,
+ reconstruction_phase phase) {
if (!m_config.dynamic_ratelimiting) {
return;
@@ -441,28 +439,19 @@ private:
m_scheduled_reconstruction_time.fetch_add(runtime * phase);
m_scheduled_records.fetch_add(reccnt * phase);
- long time_per_record = (m_scheduled_records.load())
- ? m_scheduled_reconstruction_time.load() /
- m_scheduled_records.load()
- : 0;
+ size_t total_reccnt = get_record_count();
+
+ long time_per_record =
+ (total_reccnt) ? m_scheduled_reconstruction_time.load() / total_reccnt
+ : 0;
long us = time_per_record / 1000;
- long excess_ns = time_per_record - us*1000;
+ long excess_ns = time_per_record - us * 1000;
m_stall_us.store(us);
size_t records_to_1us = (excess_ns) ? 1000 / excess_ns : 0;
- m_insertion_rate.store(1.0 - 1.0 / (double) records_to_1us );
-
- fprintf(stderr, "[%d] Updated delay: %ld and %ld\n", phase, us, excess_ns);
-
- if (runtime == 0) {
- fprintf(stderr, "\t[W] Predicted runtime change of 0\n");
- }
-
- if (reccnt == 0) {
- fprintf(stderr, "\t[W] Predicted reccnt change of 0\n");
- }
+ m_insertion_rate.store(1.0 - 1.0 / (double)records_to_1us);
}
bool restart_query(QueryArgs<ShardType, QueryType, DynamicExtension> *args,
@@ -537,9 +526,9 @@ private:
* this code will be bypassed in that case.
*/
if (args->priority == ReconstructionPriority::FLUSH) {
- #ifdef DE_PRINT_SHARD_COUNT
- fprintf(stdout, "S\t%ld\n", extension->get_shard_count());
- #endif
+#ifdef DE_PRINT_SHARD_COUNT
+ fprintf(stdout, "S\t%ld\n", extension->get_shard_count());
+#endif
// fprintf(stderr, "[I] Running flush (%ld)\n", recon_id);
// fprintf(stderr, "[I]\t Assigned Version %ld (%ld)\n",
@@ -694,9 +683,8 @@ private:
extension->m_lock_mngr.release_lock(level, args->version->get_id());
}
- extension->update_insertion_delay(args->predicted_runtime,
- args->tasks.get_total_reccnt(),
- RECON_ENDED);
+ extension->update_insertion_delay(
+ args->predicted_runtime, args->tasks.get_total_reccnt(), RECON_ENDED);
}
if (args->priority == ReconstructionPriority::FLUSH) {
@@ -708,7 +696,6 @@ private:
// args->version->get_id(), recon_id);
//
-
/* manually delete the argument object */
delete args;
}
@@ -937,23 +924,33 @@ private:
args->initial_version = active_version->get_id();
size_t level_idx = 0;
- for (size_t i=0; i< recon.size(); i++) {
- if (recon[i].target < (level_index) active_version->get_structure()->get_level_vector().size()) {
+ for (size_t i = 0; i < recon.size(); i++) {
+ if (recon[i].target < (level_index)active_version->get_structure()
+ ->get_level_vector()
+ .size()) {
level_idx = recon[i].target;
- args->predicted_runtime += active_version->get_structure()->get_level_vector()[level_idx]->predict_reconstruction_time(recon[i].reccnt);
+ args->predicted_runtime +=
+ active_version->get_structure()
+ ->get_level_vector()[level_idx]
+ ->predict_reconstruction_time(recon[i].reccnt);
} else {
level_idx = recon[i].target - 1;
- args->predicted_runtime += m_config.rt_level_scale * active_version->get_structure()->get_level_vector()[level_idx]->predict_reconstruction_time(recon[i].reccnt);
+ args->predicted_runtime +=
+ m_config.rt_level_scale *
+ active_version->get_structure()
+ ->get_level_vector()[level_idx]
+ ->predict_reconstruction_time(recon[i].reccnt);
}
}
if (args->predicted_runtime == 0) {
- fprintf(stderr, "Runtime Prediction of 0 for Level %ld with %ld records\n", level_idx, args->tasks.get_total_reccnt());
+ fprintf(stderr,
+ "Runtime Prediction of 0 for Level %ld with %ld records\n",
+ level_idx, args->tasks.get_total_reccnt());
}
update_insertion_delay(args->predicted_runtime,
- args->tasks.get_total_reccnt(),
- RECON_SCHEDULED);
+ args->tasks.get_total_reccnt(), RECON_SCHEDULED);
m_sched->schedule_job(reconstruction, args->tasks.get_total_reccnt(),
args, RECONSTRUCTION);
}
@@ -1002,7 +999,7 @@ private:
return m_buffer->append(rec, ts);
}
-//#ifdef _GNU_SOURCE
+ //#ifdef _GNU_SOURCE
void set_thread_affinity() {
if constexpr (std::same_as<SchedType, SerialScheduler>) {
return;
diff --git a/include/framework/QueryRequirements.h b/include/framework/QueryRequirements.h
index dcba67e..9f56e4a 100644
--- a/include/framework/QueryRequirements.h
+++ b/include/framework/QueryRequirements.h
@@ -1,7 +1,7 @@
/*
* include/framework/QueryRequirements.h
*
- * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
*
* Distributed under the Modified BSD License.
*
@@ -11,7 +11,7 @@
*/
#pragma once
-#include "framework/structure/BufferView.h"
#include "framework/interface/Record.h"
-#include "framework/interface/Shard.h"
+#include "framework/structure/BufferView.h"
#include "framework/interface/Query.h"
+#include "framework/interface/Shard.h"
diff --git a/include/framework/ShardRequirements.h b/include/framework/ShardRequirements.h
index d054030..335c916 100644
--- a/include/framework/ShardRequirements.h
+++ b/include/framework/ShardRequirements.h
@@ -1,7 +1,7 @@
/*
* include/framework/ShardRequirements.h
*
- * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Copyright (C) 2023 Douglas B. Rumbaugh <drumbaugh@psu.edu>
*
* Distributed under the Modified BSD License.
*
@@ -11,7 +11,7 @@
*/
#pragma once
-#include "framework/structure/BufferView.h"
+#include "framework/interface/Query.h"
#include "framework/interface/Record.h"
#include "framework/interface/Shard.h"
-#include "framework/interface/Query.h"
+#include "framework/structure/BufferView.h"
diff --git a/include/framework/interface/Query.h b/include/framework/interface/Query.h
index 97a973d..4e9a3fb 100644
--- a/include/framework/interface/Query.h
+++ b/include/framework/interface/Query.h
@@ -12,7 +12,6 @@
namespace de {
-
/*
*
*
@@ -23,100 +22,99 @@ template <typename QUERY, typename SHARD,
typename PARAMETERS = typename QUERY::Parameters,
typename LOCAL = typename QUERY::LocalQuery,
typename LOCAL_BUFFER = typename QUERY::LocalQueryBuffer>
-concept QueryInterface =
- requires(PARAMETERS *parameters, LOCAL *local, LOCAL_BUFFER *buffer_query,
- SHARD *shard, std::vector<LOCAL *> &local_queries,
- std::vector<LOCAL_RESULT> &local_results, RESULT &result,
- BufferView<typename SHARD::RECORD> *bv) {
- /*
- * Given a set of query parameters and a shard, return a local query
- * object for that shard.
- */
- {
- QUERY::local_preproc(shard, parameters)
- } -> std::convertible_to<LOCAL *>;
+concept QueryInterface = requires(PARAMETERS *parameters, LOCAL *local,
+ LOCAL_BUFFER *buffer_query, SHARD *shard,
+ std::vector<LOCAL *> &local_queries,
+ std::vector<LOCAL_RESULT> &local_results,
+ RESULT &result,
+ BufferView<typename SHARD::RECORD> *bv) {
+ /*
+ * Given a set of query parameters and a shard, return a local query
+ * object for that shard.
+ */
+ { QUERY::local_preproc(shard, parameters) } -> std::convertible_to<LOCAL *>;
- /*
- * Given a set of query parameters and a buffer view, return a local
- * query object for the buffer.
- * NOTE: for interface reasons, the pointer to the buffer view MUST be
- * stored inside of the local query object. The future buffer
- * query routine will access the buffer by way of this pointer.
- */
- {
- QUERY::local_preproc_buffer(bv, parameters)
- } -> std::convertible_to<LOCAL_BUFFER *>;
+ /*
+ * Given a set of query parameters and a buffer view, return a local
+ * query object for the buffer.
+ * NOTE: for interface reasons, the pointer to the buffer view MUST be
+ * stored inside of the local query object. The future buffer
+ * query routine will access the buffer by way of this pointer.
+ */
+ {
+ QUERY::local_preproc_buffer(bv, parameters)
+ } -> std::convertible_to<LOCAL_BUFFER *>;
- /*
- * Given a full set of local queries, and the buffer query, make any
- * necessary adjustments to the local queries in-place, to account for
- * global information. If no additional processing is required, this
- * function can be left empty.
- */
- { QUERY::distribute_query(parameters, local_queries, buffer_query) };
+ /*
+ * Given a full set of local queries, and the buffer query, make any
+ * necessary adjustments to the local queries in-place, to account for
+ * global information. If no additional processing is required, this
+ * function can be left empty.
+ */
+ {QUERY::distribute_query(parameters, local_queries, buffer_query)};
- /*
- * Answer the local query, defined by `local` against `shard` and return
- * a vector of LOCAL_RESULT objects defining the query result.
- */
- { QUERY::local_query(shard, local) } -> std::convertible_to<LOCAL_RESULT>;
+ /*
+ * Answer the local query, defined by `local` against `shard` and return
+ * a vector of LOCAL_RESULT objects defining the query result.
+ */
+ { QUERY::local_query(shard, local) } -> std::convertible_to<LOCAL_RESULT>;
- /*
- * Answer the local query defined by `local` against the buffer (which
- * should be accessed by a pointer inside of `local`) and return a vector
- * of LOCAL_RESULT objects defining the query result.
- */
- {
- QUERY::local_query_buffer(buffer_query)
- } -> std::convertible_to<LOCAL_RESULT>;
+ /*
+ * Answer the local query defined by `local` against the buffer (which
+ * should be accessed by a pointer inside of `local`) and return a vector
+ * of LOCAL_RESULT objects defining the query result.
+ */
+ {
+ QUERY::local_query_buffer(buffer_query)
+ } -> std::convertible_to<LOCAL_RESULT>;
- /*
- * Process the local results from the buffer and all of the shards,
- * stored in `local_results`, and insert the associated ResultType
- * objects into the `result` vector, which represents the final result
- * of the query. Updates to this vector are done in-place.
- */
- { QUERY::combine(local_results, parameters, result) };
+ /*
+ * Process the local results from the buffer and all of the shards,
+ * stored in `local_results`, and insert the associated ResultType
+ * objects into the `result` vector, which represents the final result
+ * of the query. Updates to this vector are done in-place.
+ */
+ {QUERY::combine(local_results, parameters, result)};
- /*
- * Process the post-combine `result` vector of ResultType objects,
- * in the context of the global and local query parameters, to determine
- * if the query should be repeated. If so, make any necessary adjustments
- * to the local query objects and return True. Otherwise, return False.
- *
- * If no repetition is needed for a given problem type, simply return
- * False immediately and the query will end.
- */
- {
- QUERY::repeat(parameters, result, local_queries, buffer_query)
- } -> std::same_as<bool>;
+ /*
+ * Process the post-combine `result` vector of ResultType objects,
+ * in the context of the global and local query parameters, to determine
+ * if the query should be repeated. If so, make any necessary adjustments
+ * to the local query objects and return True. Otherwise, return False.
+ *
+ * If no repetition is needed for a given problem type, simply return
+ * False immediately and the query will end.
+ */
+ {
+ QUERY::repeat(parameters, result, local_queries, buffer_query)
+ } -> std::same_as<bool>;
- /*
- * If this flag is True, then the query will immediately stop and return
- * a result as soon as the first non-deleted LocalRecordType is found.
- * Otherwise, every Shard and the buffer will be queried and the results
- * merged, like normal.
- *
- * This is largely an optimization flag for use with point-lookup, or
- * other single-record result queries
- */
- { QUERY::EARLY_ABORT } -> std::convertible_to<bool>;
+ /*
+ * If this flag is True, then the query will immediately stop and return
+ * a result as soon as the first non-deleted LocalRecordType is found.
+ * Otherwise, every Shard and the buffer will be queried and the results
+ * merged, like normal.
+ *
+ * This is largely an optimization flag for use with point-lookup, or
+ * other single-record result queries
+ */
+ { QUERY::EARLY_ABORT } -> std::convertible_to<bool>;
- /*
- * If false, the built-in delete filtering that the framework can
- * apply to the local results, prior to calling combine, will be skipped.
- * This general filtering can be inefficient, particularly for tombstone
- * -based deletes, and so if a more efficient manual filtering can be
- * performed, it is worth setting this to True and doing that filtering
- * in the combine step.
- *
- * If deletes are not a consideration for your problem, it's also best
- * to turn this off, as it'll avoid the framework making an extra pass
- * over the local results prior to combining them.
- *
- * TODO: Temporarily disabling this, as we've dropped framework-level
- * delete filtering for the time being.
- */
- /* { QUERY::SKIP_DELETE_FILTER } -> std::convertible_to<bool>; */
- };
+ /*
+ * If false, the built-in delete filtering that the framework can
+ * apply to the local results, prior to calling combine, will be skipped.
+ * This general filtering can be inefficient, particularly for tombstone
+ * -based deletes, and so if a more efficient manual filtering can be
+ * performed, it is worth setting this to True and doing that filtering
+ * in the combine step.
+ *
+ * If deletes are not a consideration for your problem, it's also best
+ * to turn this off, as it'll avoid the framework making an extra pass
+ * over the local results prior to combining them.
+ *
+ * TODO: Temporarily disabling this, as we've dropped framework-level
+ * delete filtering for the time being.
+ */
+ /* { QUERY::SKIP_DELETE_FILTER } -> std::convertible_to<bool>; */
+};
} // namespace de
diff --git a/include/framework/interface/Scheduler.h b/include/framework/interface/Scheduler.h
index d76a6c8..0724bce 100644
--- a/include/framework/interface/Scheduler.h
+++ b/include/framework/interface/Scheduler.h
@@ -14,7 +14,7 @@ template <typename SchedType>
concept SchedulerInterface = requires(SchedType s, size_t i, void *vp,
de::Job j) {
{SchedType(i, i)};
- {s.schedule_job(j, i, vp, i)} -> std::convertible_to<void>;
+ { s.schedule_job(j, i, vp, i) } -> std::convertible_to<void>;
{s.shutdown()};
{s.print_statistics()};
};
diff --git a/include/framework/interface/Shard.h b/include/framework/interface/Shard.h
index fb5ce1a..e3a2de4 100644
--- a/include/framework/interface/Shard.h
+++ b/include/framework/interface/Shard.h
@@ -14,8 +14,8 @@ namespace de {
template <typename SHARD>
concept ShardInterface = RecordInterface<typename SHARD::RECORD> &&
- requires(SHARD shard, const std::vector<const SHARD *> &shard_vector, bool b,
- BufferView<typename SHARD::RECORD> bv,
+ requires(SHARD shard, const std::vector<const SHARD *> &shard_vector,
+ bool b, BufferView<typename SHARD::RECORD> bv,
typename SHARD::RECORD rec) {
/* construct a shard from a vector of shards of the same type */
{SHARD(shard_vector)};
@@ -52,7 +52,6 @@ concept ShardInterface = RecordInterface<typename SHARD::RECORD> &&
* use only at the moment
*/
{ shard.get_aux_memory_usage() } -> std::convertible_to<size_t>;
-
};
template <typename SHARD>
diff --git a/include/framework/reconstruction/BSMPolicy.h b/include/framework/reconstruction/BSMPolicy.h
index 61f379e..db4c8d4 100644
--- a/include/framework/reconstruction/BSMPolicy.h
+++ b/include/framework/reconstruction/BSMPolicy.h
@@ -36,7 +36,8 @@ public:
ReconstructionVector reconstructions;
auto levels = version->get_structure()->get_level_vector();
- level_index target_level = find_reconstruction_target(levels, version->get_structure()->get_record_count());
+ level_index target_level = find_reconstruction_target(
+ levels, version->get_structure()->get_record_count());
assert(target_level != -1);
level_index source_level = 0;
@@ -49,7 +50,8 @@ public:
task.target = target_level;
size_t reccnt = 0;
- if (target_level < (ssize_t)levels.size() && levels[target_level]->get_record_count() > 0) {
+ if (target_level < (ssize_t)levels.size() &&
+ levels[target_level]->get_record_count() > 0) {
task.sources.push_back({target_level, all_shards_idx});
task.type = ReconstructionType::Merge;
} else {
@@ -71,7 +73,8 @@ public:
}
private:
- level_index find_reconstruction_target(LevelVector &levels, size_t reccnt) const {
+ level_index find_reconstruction_target(LevelVector &levels,
+ size_t reccnt) const {
level_index target_level = invalid_level_idx;
for (level_index i = 1; i < (level_index)levels.size(); i++) {
@@ -85,7 +88,8 @@ private:
}
inline size_t capacity(level_index level, size_t reccnt) const {
- double base = std::ceil(m_scale_factor * std::pow<double>(std::log10(reccnt), m_size_modifier));
+ double base = std::ceil(
+ m_scale_factor * std::pow<double>(std::log10(reccnt), m_size_modifier));
return m_buffer_size * (base - 1) * pow(base, level + 1);
}
diff --git a/include/framework/reconstruction/BackgroundTieringPolicy.h b/include/framework/reconstruction/BackgroundTieringPolicy.h
index 36556a2..1c000b9 100644
--- a/include/framework/reconstruction/BackgroundTieringPolicy.h
+++ b/include/framework/reconstruction/BackgroundTieringPolicy.h
@@ -16,16 +16,20 @@
namespace de {
template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
-class BackgroundTieringPolicy : public ReconstructionPolicy<ShardType, QueryType> {
+class BackgroundTieringPolicy
+ : public ReconstructionPolicy<ShardType, QueryType> {
typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>>
LevelVector;
public:
- BackgroundTieringPolicy(size_t scale_factor, size_t buffer_size, size_t size_modifier=0)
- : m_scale_factor(scale_factor), m_buffer_size(buffer_size), m_size_modifier(size_modifier) {}
-
- std::vector<ReconstructionVector> get_reconstruction_tasks(
- const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override {
+ BackgroundTieringPolicy(size_t scale_factor, size_t buffer_size,
+ size_t size_modifier = 0)
+ : m_scale_factor(scale_factor), m_buffer_size(buffer_size),
+ m_size_modifier(size_modifier) {}
+
+ std::vector<ReconstructionVector>
+ get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
+ LockManager &lock_mngr) const override {
std::vector<ReconstructionVector> reconstructions;
auto levels = version->get_structure()->get_level_vector();
@@ -34,7 +38,8 @@ public:
return {};
}
- level_index target_level = find_reconstruction_target(levels, version->get_structure()->get_record_count());
+ level_index target_level = find_reconstruction_target(
+ levels, version->get_structure()->get_record_count());
assert(target_level != -1);
level_index source_level = 0;
@@ -44,19 +49,21 @@ public:
}
for (level_index i = target_level; i > source_level; i--) {
- if (lock_mngr.take_lock(i-1, version->get_id())) {
+ if (lock_mngr.take_lock(i - 1, version->get_id())) {
ReconstructionVector recon;
size_t total_reccnt = levels[i - 1]->get_record_count();
std::vector<ShardID> shards;
- for (ssize_t j=0; j<(ssize_t)levels[i-1]->get_shard_count(); j++) {
- shards.push_back({i-1, j});
+ for (ssize_t j = 0; j < (ssize_t)levels[i - 1]->get_shard_count();
+ j++) {
+ shards.push_back({i - 1, j});
}
- recon.add_reconstruction(shards, i, total_reccnt, ReconstructionType::Compact);
+ recon.add_reconstruction(shards, i, total_reccnt,
+ ReconstructionType::Compact);
reconstructions.push_back(recon);
}
}
-
+
return reconstructions;
}
@@ -68,7 +75,8 @@ public:
}
private:
- level_index find_reconstruction_target(LevelVector &levels, size_t reccnt) const {
+ level_index find_reconstruction_target(LevelVector &levels,
+ size_t reccnt) const {
level_index target_level = invalid_level_idx;
for (level_index i = 1; i < (level_index)levels.size(); i++) {
@@ -81,7 +89,9 @@ private:
return target_level;
}
- inline size_t capacity(size_t reccnt) const { return m_scale_factor * std::pow(std::log(reccnt), m_size_modifier); }
+ inline size_t capacity(size_t reccnt) const {
+ return m_scale_factor * std::pow(std::log(reccnt), m_size_modifier);
+ }
size_t m_scale_factor;
size_t m_buffer_size;
diff --git a/include/framework/reconstruction/CompactOnFull.h b/include/framework/reconstruction/CompactOnFull.h
index f5e0400..f0b549d 100644
--- a/include/framework/reconstruction/CompactOnFull.h
+++ b/include/framework/reconstruction/CompactOnFull.h
@@ -21,11 +21,14 @@ class CompactOnFull : public ReconstructionPolicy<ShardType, QueryType> {
LevelVector;
public:
- CompactOnFull(size_t scale_factor, size_t buffer_size, size_t size_modifier=0)
- : m_scale_factor(scale_factor), m_buffer_size(buffer_size), m_size_modifier(size_modifier) {}
-
- std::vector<ReconstructionVector> get_reconstruction_tasks(
- const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override {
+ CompactOnFull(size_t scale_factor, size_t buffer_size,
+ size_t size_modifier = 0)
+ : m_scale_factor(scale_factor), m_buffer_size(buffer_size),
+ m_size_modifier(size_modifier) {}
+
+ std::vector<ReconstructionVector>
+ get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
+ LockManager &lock_mngr) const override {
std::vector<ReconstructionVector> reconstructions;
auto levels = version->get_structure()->get_level_vector();
@@ -34,23 +37,24 @@ public:
return {};
}
- for (level_index i=0; i < (ssize_t) levels.size(); i++) {
- if (levels[i]->get_shard_count() >= m_scale_factor && lock_mngr.take_lock(i, version->get_id())) {
+ for (level_index i = 0; i < (ssize_t)levels.size(); i++) {
+ if (levels[i]->get_shard_count() >= m_scale_factor &&
+ lock_mngr.take_lock(i, version->get_id())) {
ReconstructionVector recon;
size_t total_reccnt = levels[i]->get_record_count();
std::vector<ShardID> shards;
- for (ssize_t j=0; j<(ssize_t) levels[i]->get_shard_count(); j++) {
- shards.push_back({i,j});
+ for (ssize_t j = 0; j < (ssize_t)levels[i]->get_shard_count(); j++) {
+ shards.push_back({i, j});
}
- recon.add_reconstruction(shards, i+1, total_reccnt, ReconstructionType::Compact);
+ recon.add_reconstruction(shards, i + 1, total_reccnt,
+ ReconstructionType::Compact);
reconstructions.push_back(recon);
}
}
- return reconstructions;
- }
-
+ return reconstructions;
+ }
ReconstructionVector
get_flush_tasks(const Version<ShardType, QueryType> *version) const override {
@@ -60,7 +64,8 @@ public:
}
private:
- level_index find_reconstruction_target(LevelVector &levels, size_t reccnt) const {
+ level_index find_reconstruction_target(LevelVector &levels,
+ size_t reccnt) const {
level_index target_level = invalid_level_idx;
for (level_index i = 1; i < (level_index)levels.size(); i++) {
@@ -73,7 +78,9 @@ private:
return target_level;
}
- inline size_t capacity(size_t reccnt) const { return m_scale_factor * std::pow(std::log(reccnt), m_size_modifier); }
+ inline size_t capacity(size_t reccnt) const {
+ return m_scale_factor * std::pow(std::log(reccnt), m_size_modifier);
+ }
size_t m_scale_factor;
size_t m_buffer_size;
diff --git a/include/framework/reconstruction/FixedShardCountPolicy.h b/include/framework/reconstruction/FixedShardCountPolicy.h
index cc8dce4..52d9f39 100644
--- a/include/framework/reconstruction/FixedShardCountPolicy.h
+++ b/include/framework/reconstruction/FixedShardCountPolicy.h
@@ -16,16 +16,20 @@
namespace de {
template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
-class FixedShardCountPolicy : public ReconstructionPolicy<ShardType, QueryType> {
+class FixedShardCountPolicy
+ : public ReconstructionPolicy<ShardType, QueryType> {
typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>>
LevelVector;
public:
- FixedShardCountPolicy(size_t buffer_size, size_t shard_count, size_t max_record_count)
- : m_buffer_size(buffer_size), m_shard_count(shard_count), m_max_reccnt(max_record_count) {}
+ FixedShardCountPolicy(size_t buffer_size, size_t shard_count,
+ size_t max_record_count)
+ : m_buffer_size(buffer_size), m_shard_count(shard_count),
+ m_max_reccnt(max_record_count) {}
std::vector<ReconstructionVector>
- get_reconstruction_tasks(const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override {
+ get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
+ LockManager &lock_mngr) const override {
return {};
}
@@ -37,26 +41,26 @@ public:
/* if this is the very first flush, there won't be an L1 yet */
if (levels.size() > 1 && levels[1]->get_shard_count() > 0) {
- ShardID last_shid = {1, (shard_index) (levels[1]->get_shard_count() - 1)};
- if (levels[1]->get_shard(last_shid.shard_idx)->get_record_count() + m_buffer_size <= capacity()) {
- auto task = ReconstructionTask {
- {{0, 0}, last_shid}, 1, m_buffer_size,ReconstructionType::Merge
- };
+ ShardID last_shid = {1, (shard_index)(levels[1]->get_shard_count() - 1)};
+ if (levels[1]->get_shard(last_shid.shard_idx)->get_record_count() +
+ m_buffer_size <=
+ capacity()) {
+ auto task = ReconstructionTask{
+ {{0, 0}, last_shid}, 1, m_buffer_size, ReconstructionType::Merge};
v.add_reconstruction(task);
return v;
}
}
- auto task = ReconstructionTask {
- {{0, 0}}, 1, m_buffer_size, ReconstructionType::Append
- };
+ auto task = ReconstructionTask{
+ {{0, 0}}, 1, m_buffer_size, ReconstructionType::Append};
v.add_reconstruction(task);
return v;
}
private:
inline size_t capacity() const {
- double bps = (double) m_max_reccnt / (double) m_buffer_size / m_shard_count;
+ double bps = (double)m_max_reccnt / (double)m_buffer_size / m_shard_count;
return ceil(bps) * m_buffer_size;
}
diff --git a/include/framework/reconstruction/FloodL0Policy.h b/include/framework/reconstruction/FloodL0Policy.h
index c0d29fe..7e38e02 100644
--- a/include/framework/reconstruction/FloodL0Policy.h
+++ b/include/framework/reconstruction/FloodL0Policy.h
@@ -24,7 +24,8 @@ public:
FloodL0Policy(size_t buffer_size) : m_buffer_size(buffer_size) {}
std::vector<ReconstructionVector>
- get_reconstruction_tasks(const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override {
+ get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
+ LockManager &lock_mngr) const override {
return {};
}
diff --git a/include/framework/reconstruction/LevelingPolicy.h b/include/framework/reconstruction/LevelingPolicy.h
index 955bc02..3e0afaa 100644
--- a/include/framework/reconstruction/LevelingPolicy.h
+++ b/include/framework/reconstruction/LevelingPolicy.h
@@ -58,8 +58,9 @@ public:
(target_level == 1)
? m_buffer_size + target_reccnt
: levels[target_level - 1]->get_record_count() + target_reccnt;
- auto type = (target_level >= (level_index) levels.size()) ? ReconstructionType::Append
- : ReconstructionType::Merge;
+ auto type = (target_level >= (level_index)levels.size())
+ ? ReconstructionType::Append
+ : ReconstructionType::Merge;
reconstructions.add_reconstruction(target_level - 1, target_level,
total_reccnt, type);
@@ -95,8 +96,9 @@ private:
inline size_t capacity(level_index level, size_t reccnt) const {
return m_buffer_size *
- pow(m_scale_factor * std::ceil(std::pow<double>(std::log10(reccnt), m_size_modifier)), level);
-
+ pow(m_scale_factor * std::ceil(std::pow<double>(std::log10(reccnt),
+ m_size_modifier)),
+ level);
}
size_t m_scale_factor;
diff --git a/include/framework/reconstruction/ReconstructionPolicy.h b/include/framework/reconstruction/ReconstructionPolicy.h
index 41a2092..3c842b2 100644
--- a/include/framework/reconstruction/ReconstructionPolicy.h
+++ b/include/framework/reconstruction/ReconstructionPolicy.h
@@ -12,19 +12,22 @@
*/
#pragma once
-#include "util/types.h"
-#include "framework/structure/ExtensionStructure.h"
-#include "framework/scheduling/Version.h"
#include "framework/scheduling/LockManager.h"
+#include "framework/scheduling/Version.h"
+#include "framework/structure/ExtensionStructure.h"
+#include "util/types.h"
namespace de {
-template<ShardInterface ShardType, QueryInterface<ShardType> QueryType>
+template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
class ReconstructionPolicy {
typedef ExtensionStructure<ShardType, QueryType> StructureType;
public:
ReconstructionPolicy() {}
- virtual std::vector<ReconstructionVector> get_reconstruction_tasks(const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const = 0;
- virtual ReconstructionVector get_flush_tasks(const Version<ShardType, QueryType> *version) const = 0;
- };
-}
+ virtual std::vector<ReconstructionVector>
+ get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
+ LockManager &lock_mngr) const = 0;
+ virtual ReconstructionVector
+ get_flush_tasks(const Version<ShardType, QueryType> *version) const = 0;
+};
+} // namespace de
diff --git a/include/framework/reconstruction/TieringPolicy.h b/include/framework/reconstruction/TieringPolicy.h
index b1fcb49..c16c427 100644
--- a/include/framework/reconstruction/TieringPolicy.h
+++ b/include/framework/reconstruction/TieringPolicy.h
@@ -21,11 +21,13 @@ class TieringPolicy : public ReconstructionPolicy<ShardType, QueryType> {
LevelVector;
public:
- TieringPolicy(size_t scale_factor, size_t buffer_size, double modifier=0)
- : m_scale_factor(scale_factor), m_buffer_size(buffer_size), m_size_modifier(modifier) {}
+ TieringPolicy(size_t scale_factor, size_t buffer_size, double modifier = 0)
+ : m_scale_factor(scale_factor), m_buffer_size(buffer_size),
+ m_size_modifier(modifier) {}
- std::vector<ReconstructionVector> get_reconstruction_tasks(
- const Version<ShardType, QueryType> *version, LockManager &lock_mngr) const override {
+ std::vector<ReconstructionVector>
+ get_reconstruction_tasks(const Version<ShardType, QueryType> *version,
+ LockManager &lock_mngr) const override {
return {};
}
@@ -34,7 +36,8 @@ public:
ReconstructionVector reconstructions;
auto levels = version->get_structure()->get_level_vector();
- level_index target_level = find_reconstruction_target(levels, version->get_structure()->get_record_count());
+ level_index target_level = find_reconstruction_target(
+ levels, version->get_structure()->get_record_count());
assert(target_level != -1);
level_index source_level = 0;
@@ -47,12 +50,13 @@ public:
size_t total_reccnt = levels[i - 1]->get_record_count();
std::vector<ShardID> shards;
- for (ssize_t j=0; j<(ssize_t)levels[i-1]->get_shard_count(); j++) {
- shards.push_back({i-1, j});
+ for (ssize_t j = 0; j < (ssize_t)levels[i - 1]->get_shard_count(); j++) {
+ shards.push_back({i - 1, j});
}
if (total_reccnt > 0 || shards.size() > 0) {
- reconstructions.add_reconstruction(shards, i, total_reccnt, ReconstructionType::Compact);
+ reconstructions.add_reconstruction(shards, i, total_reccnt,
+ ReconstructionType::Compact);
}
}
@@ -60,7 +64,8 @@ public:
}
private:
- level_index find_reconstruction_target(LevelVector &levels, size_t reccnt) const {
+ level_index find_reconstruction_target(LevelVector &levels,
+ size_t reccnt) const {
level_index target_level = invalid_level_idx;
for (level_index i = 1; i < (level_index)levels.size(); i++) {
@@ -74,7 +79,8 @@ private:
}
inline size_t capacity(size_t reccnt) const {
- return std::ceil((double) m_scale_factor * std::pow<double>(std::log10(reccnt), m_size_modifier));
+ return std::ceil((double)m_scale_factor *
+ std::pow<double>(std::log10(reccnt), m_size_modifier));
}
size_t m_scale_factor;
diff --git a/include/framework/scheduling/Epoch.h b/include/framework/scheduling/Epoch.h
index 7583727..a642b31 100644
--- a/include/framework/scheduling/Epoch.h
+++ b/include/framework/scheduling/Epoch.h
@@ -60,7 +60,9 @@ public:
Structure *get_mutable_structure() { return m_structure; }
- BufView get_buffer() const { return m_buffer->get_buffer_view(m_buffer_head); }
+ BufView get_buffer() const {
+ return m_buffer->get_buffer_view(m_buffer_head);
+ }
/*
* Returns a new Epoch object that is a copy of this one. The new object
diff --git a/include/framework/scheduling/FIFOScheduler.h b/include/framework/scheduling/FIFOScheduler.h
index 8fbe07c..ef6dde5 100644
--- a/include/framework/scheduling/FIFOScheduler.h
+++ b/include/framework/scheduling/FIFOScheduler.h
@@ -61,7 +61,8 @@ public:
std::unique_lock<std::mutex> lk(m_cv_lock);
m_stats.job_queued(ts, type, size);
- m_task_queue.push(Task(size, ts, job, args, type, &m_stats, nullptr, &m_cv));
+ m_task_queue.push(
+ Task(size, ts, job, args, type, &m_stats, nullptr, &m_cv));
m_cv.notify_all();
}
@@ -81,7 +82,6 @@ private:
[[maybe_unused]] size_t m_memory_budget;
size_t m_thrd_cnt;
-
std::atomic<size_t> m_counter;
std::mutex m_cv_lock;
std::condition_variable m_cv;
diff --git a/include/framework/scheduling/LockManager.h b/include/framework/scheduling/LockManager.h
index a40cf7a..820b975 100644
--- a/include/framework/scheduling/LockManager.h
+++ b/include/framework/scheduling/LockManager.h
@@ -3,15 +3,15 @@
*/
#pragma once
-#include <deque>
#include <atomic>
#include <cassert>
+#include <deque>
namespace de {
class LockManager {
public:
- LockManager(size_t levels=1) {
- for (size_t i=0; i < levels; i++) {
+ LockManager(size_t levels = 1) {
+ for (size_t i = 0; i < levels; i++) {
m_lks.emplace_back(false);
}
@@ -20,9 +20,7 @@ public:
~LockManager() = default;
- void add_lock() {
- m_lks.emplace_back(false);
- }
+ void add_lock() { m_lks.emplace_back(false); }
void release_lock(size_t idx, size_t version) {
if (idx < m_lks.size()) {
@@ -72,17 +70,13 @@ public:
return false;
}
- bool is_buffer_locked() {
- return m_buffer_lk.load();
- }
+ bool is_buffer_locked() { return m_buffer_lk.load(); }
- void release_buffer_lock() {
- m_buffer_lk.store(false);
- }
+ void release_buffer_lock() { m_buffer_lk.store(false); }
private:
std::deque<std::atomic<bool>> m_lks;
std::atomic<bool> m_buffer_lk;
std::atomic<size_t> m_last_unlocked_version;
};
-}
+} // namespace de
diff --git a/include/framework/scheduling/SerialScheduler.h b/include/framework/scheduling/SerialScheduler.h
index b6ebe53..6a8ed58 100644
--- a/include/framework/scheduling/SerialScheduler.h
+++ b/include/framework/scheduling/SerialScheduler.h
@@ -37,7 +37,8 @@ public:
t(0);
}
- void shutdown() { /* intentionally left blank */ }
+ void shutdown() { /* intentionally left blank */
+ }
void print_statistics() { m_stats.print_statistics(); }
void print_query_time_data() { m_stats.print_query_time_data(); }
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
index 4529b2e..00ddbbb 100644
--- a/include/framework/scheduling/Task.h
+++ b/include/framework/scheduling/Task.h
@@ -14,9 +14,9 @@
#pragma once
#include <chrono>
+#include <condition_variable>
#include <functional>
#include <future>
-#include <condition_variable>
#include "framework/scheduling/Version.h"
#include "framework/scheduling/statistics.h"
@@ -24,11 +24,7 @@
namespace de {
-enum class ReconstructionPriority {
- FLUSH = 0,
- CMPCT = 1,
- MAINT = 2
-};
+enum class ReconstructionPriority { FLUSH = 0, CMPCT = 1, MAINT = 2 };
template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
struct ReconstructionArgs {
@@ -51,7 +47,8 @@ typedef std::function<void(void *)> Job;
struct Task {
Task(size_t size, size_t ts, Job job, void *args, size_t type = 0,
- SchedulerStatistics *stats = nullptr, std::mutex *lk = nullptr, std::condition_variable *cv=nullptr)
+ SchedulerStatistics *stats = nullptr, std::mutex *lk = nullptr,
+ std::condition_variable *cv = nullptr)
: m_job(job), m_size(size), m_timestamp(ts), m_args(args), m_type(type),
m_stats(stats), m_lk(lk), m_cv(cv) {}
diff --git a/include/framework/scheduling/Version.h b/include/framework/scheduling/Version.h
index be54c84..bbcbe25 100644
--- a/include/framework/scheduling/Version.h
+++ b/include/framework/scheduling/Version.h
@@ -25,18 +25,17 @@ private:
typedef BufferView<RecordType> BufferViewType;
public:
-
Version(size_t vid = 0)
: m_buffer(nullptr), m_structure(nullptr), m_id(vid), m_buffer_head(0) {}
- Version(size_t number, std::unique_ptr<StructureType> structure, BufferType *buff,
- size_t head)
+ Version(size_t number, std::unique_ptr<StructureType> structure,
+ BufferType *buff, size_t head)
: m_buffer(buff), m_structure(std::move(structure)), m_id(number),
m_buffer_head(head) {
- if (m_buffer) {
- m_buffer->take_head_reference(m_buffer_head);
- }
- }
+ if (m_buffer) {
+ m_buffer->take_head_reference(m_buffer_head);
+ }
+ }
~Version() {
if (m_buffer) {
@@ -55,7 +54,7 @@ public:
size_t get_id() const { return m_id; }
- void set_id(size_t id) { m_id = id;}
+ void set_id(size_t id) { m_id = id; }
const StructureType *get_structure() const { return m_structure.get(); }
@@ -107,10 +106,7 @@ public:
m_structure->update_shard_version(version);
}
- size_t get_head() {
- return m_buffer_head;
- }
-
+ size_t get_head() { return m_buffer_head; }
void set_buffer(BufferType *buffer, size_t head) {
assert(m_buffer == nullptr);
diff --git a/include/framework/scheduling/statistics.h b/include/framework/scheduling/statistics.h
index 6d9f9f0..8cb6dbd 100644
--- a/include/framework/scheduling/statistics.h
+++ b/include/framework/scheduling/statistics.h
@@ -16,12 +16,12 @@
#include <atomic>
#include <cassert>
#include <chrono>
+#include <cmath>
#include <cstdint>
#include <cstdlib>
#include <mutex>
#include <unordered_map>
#include <vector>
-#include <cmath>
namespace de {
@@ -68,7 +68,6 @@ private:
queue_time)
.count();
}
-
};
public:
@@ -117,10 +116,10 @@ public:
size_t worst_query = 0;
size_t first_query = UINT64_MAX;
-
for (auto &job : m_jobs) {
if (job.second.type != 1) {
- fprintf(stdout, "%ld %ld %ld %ld\n", job.second.id, job.second.size, job.second.runtime(), job.second.runtime() / (job.second.size));
+ fprintf(stdout, "%ld %ld %ld %ld\n", job.second.id, job.second.size,
+ job.second.runtime(), job.second.runtime() / (job.second.size));
}
if (job.first < first_query) {
@@ -149,7 +148,6 @@ public:
query_cnt++;
}
-
int64_t average_queue_time = (query_cnt) ? total_queue_time / query_cnt : 0;
int64_t average_runtime = (query_cnt) ? total_runtime / query_cnt : 0;
@@ -162,28 +160,37 @@ public:
continue;
}
- queue_deviation_sum += std::pow(job.second.time_in_queue() - average_queue_time, 2);
- runtime_deviation_sum += std::pow(job.second.runtime() - average_runtime, 2);
-
+ queue_deviation_sum +=
+ std::pow(job.second.time_in_queue() - average_queue_time, 2);
+ runtime_deviation_sum +=
+ std::pow(job.second.runtime() - average_runtime, 2);
}
- int64_t queue_stddev = (query_cnt) ? std::sqrt(queue_deviation_sum / query_cnt) : 0;
- int64_t runtime_stddev = (query_cnt) ? std::sqrt(runtime_deviation_sum / query_cnt) : 0;
-
-
- fprintf(stdout, "Query Count: %ld\tWorst Query: %ld\tFirst Query: %ld\n", query_cnt, worst_query, first_query);
- fprintf(stdout, "Average Query Scheduling Delay: %ld\t Min Scheduling Delay: %ld\t Max Scheduling Delay: %ld\tStandard Deviation: %ld\n", average_queue_time, min_queue_time, max_queue_time, queue_stddev);
- fprintf(stdout, "Average Query Latency: %ld\t\t Min Query Latency: %ld\t Max Query Latency: %ld\tStandard Deviation: %ld\n", average_runtime, min_runtime, max_runtime, runtime_stddev);
+ int64_t queue_stddev =
+ (query_cnt) ? std::sqrt(queue_deviation_sum / query_cnt) : 0;
+ int64_t runtime_stddev =
+ (query_cnt) ? std::sqrt(runtime_deviation_sum / query_cnt) : 0;
+
+ fprintf(stdout, "Query Count: %ld\tWorst Query: %ld\tFirst Query: %ld\n",
+ query_cnt, worst_query, first_query);
+ fprintf(stdout,
+ "Average Query Scheduling Delay: %ld\t Min Scheduling Delay: %ld\t "
+ "Max Scheduling Delay: %ld\tStandard Deviation: %ld\n",
+ average_queue_time, min_queue_time, max_queue_time, queue_stddev);
+ fprintf(stdout,
+ "Average Query Latency: %ld\t\t Min Query Latency: %ld\t Max Query "
+ "Latency: %ld\tStandard Deviation: %ld\n",
+ average_runtime, min_runtime, max_runtime, runtime_stddev);
}
-
void print_query_time_data() {
std::unique_lock<std::mutex> lk(m_mutex);
update_job_data_from_log();
for (auto &job : m_jobs) {
if (job.second.type == 1) {
- fprintf(stdout, "%ld\t%ld\t%ld\n", job.second.time_in_queue(), job.second.runtime(), job.second.end_to_end_time());
+ fprintf(stdout, "%ld\t%ld\t%ld\n", job.second.time_in_queue(),
+ job.second.runtime(), job.second.end_to_end_time());
}
}
}
@@ -211,18 +218,18 @@ private:
auto &job = m_jobs.at(event.id);
switch (event.type) {
- case EventType::QUEUED:
- job.queue_time = event.time;
- break;
- case EventType::FINISHED:
- job.stop_time = event.time;
- break;
- case EventType::SCHEDULED:
- job.scheduled_time = event.time;
- break;
- case EventType::STARTED:
- job.start_time = event.time;
- break;
+ case EventType::QUEUED:
+ job.queue_time = event.time;
+ break;
+ case EventType::FINISHED:
+ job.stop_time = event.time;
+ break;
+ case EventType::SCHEDULED:
+ job.scheduled_time = event.time;
+ break;
+ case EventType::STARTED:
+ job.start_time = event.time;
+ break;
}
}
diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h
index a9fb12d..afe5dbb 100644
--- a/include/framework/structure/BufferView.h
+++ b/include/framework/structure/BufferView.h
@@ -46,10 +46,9 @@ public:
BufferView(Wrapped<R> *buffer, size_t cap, size_t head, size_t tail,
size_t tombstone_cnt, psudb::BloomFilter<R> *filter)
- : m_data(buffer), m_head(head), m_tail(tail),
- m_start(m_head % cap), m_stop(m_tail % cap), m_cap(cap),
- m_approx_ts_cnt(tombstone_cnt), m_tombstone_filter(filter),
- m_active(true) {}
+ : m_data(buffer), m_head(head), m_tail(tail), m_start(m_head % cap),
+ m_stop(m_tail % cap), m_cap(cap), m_approx_ts_cnt(tombstone_cnt),
+ m_tombstone_filter(filter), m_active(true) {}
~BufferView() = default;
@@ -104,9 +103,7 @@ public:
*/
size_t get_tombstone_count() { return m_approx_ts_cnt; }
- Wrapped<R> *get(size_t i) {
- return m_data + to_idx(i);
- }
+ Wrapped<R> *get(size_t i) { return m_data + to_idx(i); }
void copy_to_buffer(psudb::byte *buffer) {
/* check if the region to be copied circles back to start. If so, do it in
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index bb8a480..e03c7ad 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -27,12 +27,14 @@ class ExtensionStructure {
typedef BufferView<RecordType> BuffView;
typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>>
LevelVector;
+
public:
- ExtensionStructure(bool default_level=true) {
+ ExtensionStructure(bool default_level = true) {
if (default_level)
- m_levels.emplace_back(std::make_shared<InternalLevel<ShardType, QueryType>>(0));
+ m_levels.emplace_back(
+ std::make_shared<InternalLevel<ShardType, QueryType>>(0));
}
-
+
~ExtensionStructure() = default;
/*
@@ -162,34 +164,37 @@ public:
return cnt;
}
-
/*
* Perform the reconstruction described by task. If the resulting
* reconstruction grows the structure (i.e., adds a level), returns
* true. Otherwise, returns false.
*/
- inline reconstruction_results<ShardType> perform_reconstruction(ReconstructionTask task) const {
+ inline reconstruction_results<ShardType>
+ perform_reconstruction(ReconstructionTask task) const {
reconstruction_results<ShardType> result;
result.target_level = task.target;
/* if there is only one source, then we don't need to actually rebuild */
if (task.sources.size() == 1) {
auto shid = task.sources[0];
- if (shid.shard_idx == all_shards_idx && m_levels[shid.level_idx]->get_shard_count() > 1) {
+ if (shid.shard_idx == all_shards_idx &&
+ m_levels[shid.level_idx]->get_shard_count() > 1) {
/* there's more than one shard, so we need to do the reconstruction */
} else {
- auto raw_shard_ptr = m_levels[shid.level_idx]->get_shard(shid.shard_idx);
+ auto raw_shard_ptr =
+ m_levels[shid.level_idx]->get_shard(shid.shard_idx);
assert(raw_shard_ptr);
result.source_shards.emplace_back(shid.level_idx, raw_shard_ptr);
- result.new_shard = m_levels[shid.level_idx]->get_shard_ptr(shid.shard_idx).first;
+ result.new_shard =
+ m_levels[shid.level_idx]->get_shard_ptr(shid.shard_idx).first;
return result;
}
}
-
- std::vector<const ShardType*> shards;
+
+ std::vector<const ShardType *> shards;
for (ShardID shid : task.sources) {
- assert(shid.level_idx < (level_index) m_levels.size());
+ assert(shid.level_idx < (level_index)m_levels.size());
assert(shid.shard_idx >= -1);
auto raw_shard_ptr = m_levels[shid.level_idx]->get_shard(shid.shard_idx);
@@ -197,12 +202,13 @@ public:
result.source_shards.emplace_back(shid.level_idx, raw_shard_ptr);
}
-
auto start = std::chrono::high_resolution_clock::now();
result.new_shard = std::make_shared<ShardType>(shards);
auto stop = std::chrono::high_resolution_clock::now();
- result.runtime = std::chrono::duration_cast<std::chrono::nanoseconds>(stop- start).count();
+ result.runtime =
+ std::chrono::duration_cast<std::chrono::nanoseconds>(stop - start)
+ .count();
result.reccnt = result.new_shard->get_record_count();
return result;
@@ -221,11 +227,10 @@ public:
return queries;
}
- size_t l0_size() const {
- return m_levels[0]->get_shard_count();
- }
+ size_t l0_size() const { return m_levels[0]->get_shard_count(); }
- bool apply_reconstruction(reconstruction_results<ShardType> &recon, size_t version) {
+ bool apply_reconstruction(reconstruction_results<ShardType> &recon,
+ size_t version) {
bool res = append_shard(recon.new_shard, version, recon.target_level);
m_levels[recon.target_level]->update_reconstruction_model(recon);
delete_shards(recon.source_shards);
@@ -233,13 +238,15 @@ public:
return res;
}
- bool append_shard(std::shared_ptr<ShardType> shard, size_t version, size_t level) {
+ bool append_shard(std::shared_ptr<ShardType> shard, size_t version,
+ size_t level) {
assert(level <= m_levels.size());
auto rc = false;
if (level == m_levels.size()) {
/* grow the structure */
- m_levels.push_back(std::make_shared<InternalLevel<ShardType, QueryType>>(level));
+ m_levels.push_back(
+ std::make_shared<InternalLevel<ShardType, QueryType>>(level));
rc = true;
}
@@ -248,12 +255,15 @@ public:
return rc;
}
- void delete_shards(std::vector<std::pair<level_index, const ShardType*>> shards) {
- for (size_t i=0; i<shards.size(); i++) {
- assert(shards[i].first < (level_index) m_levels.size());
+ void
+ delete_shards(std::vector<std::pair<level_index, const ShardType *>> shards) {
+ for (size_t i = 0; i < shards.size(); i++) {
+ assert(shards[i].first < (level_index)m_levels.size());
ssize_t shard_idx = -1;
- for (size_t j=0; j<m_levels[shards[i].first]->get_shard_count(); j++) {
- if (m_levels[shards[i].first]->get_shard_ptr(j).first.get() == shards[i].second) {
+ for (size_t j = 0; j < m_levels[shards[i].first]->get_shard_count();
+ j++) {
+ if (m_levels[shards[i].first]->get_shard_ptr(j).first.get() ==
+ shards[i].second) {
shard_idx = j;
break;
}
@@ -262,7 +272,8 @@ public:
if (shard_idx != -1) {
m_levels[shards[i].first]->delete_shard(shard_idx);
} else {
- fprintf(stderr, "ERROR: failed to delete shard %ld\t%p\n", shards[i].first, shards[i].second);
+ fprintf(stderr, "ERROR: failed to delete shard %ld\t%p\n",
+ shards[i].first, shards[i].second);
exit(EXIT_FAILURE);
}
}
@@ -270,51 +281,55 @@ public:
LevelVector const &get_level_vector() const { return m_levels; }
-
- /*
- * Validate that no level in the structure exceeds its maximum tombstone
- * capacity. This is used to trigger preemptive compactions at the end of
- * the reconstruction process.
- */
- bool validate_tombstone_proportion(double max_delete_prop) const {
- long double ts_prop;
- for (size_t i = 0; i < m_levels.size(); i++) {
- if (m_levels[i]) {
- ts_prop = (long double)m_levels[i]->get_tombstone_count() /
- (long double)m_levels[i]->get_record_count();
- if (ts_prop > (long double)max_delete_prop) {
- return false;
- }
+ /*
+ * Validate that no level in the structure exceeds its maximum tombstone
+ * capacity. This is used to trigger preemptive compactions at the end of
+ * the reconstruction process.
+ */
+ bool validate_tombstone_proportion(double max_delete_prop) const {
+ long double ts_prop;
+ for (size_t i = 0; i < m_levels.size(); i++) {
+ if (m_levels[i]) {
+ ts_prop = (long double)m_levels[i]->get_tombstone_count() /
+ (long double)m_levels[i]->get_record_count();
+ if (ts_prop > (long double)max_delete_prop) {
+ return false;
}
}
-
- return true;
}
- bool validate_tombstone_proportion(level_index level, double max_delete_prop) const {
- long double ts_prop = (long double) m_levels[level]->get_tombstone_count() / (long double) m_levels[level]->get_record_count();
- return ts_prop <= (long double) max_delete_prop;
- }
+ return true;
+ }
- void print_structure(bool debug=false) const {
- for (size_t i=0; i<m_levels.size(); i++) {
- if (debug) {
- fprintf(stdout, "[D] [%ld]:\t", i);
- } else {
- fprintf(stdout, "[%ld]:\t", i);
- }
+ bool validate_tombstone_proportion(level_index level,
+ double max_delete_prop) const {
+ long double ts_prop = (long double)m_levels[level]->get_tombstone_count() /
+ (long double)m_levels[level]->get_record_count();
+ return ts_prop <= (long double)max_delete_prop;
+ }
- if (m_levels[i]) {
- for (size_t j=0; j<m_levels[i]->get_shard_count(); j++) {
- fprintf(stdout, "(%ld, %ld, %p: %ld) ", j, m_levels[i]->get_shard_ptr(j).second, m_levels[i]->get_shard_ptr(j).first.get(), m_levels[i]->get_shard(j)->get_record_count());
- }
- } else {
- fprintf(stdout, "[Empty]");
- }
+ void print_structure(bool debug = false) const {
+ for (size_t i = 0; i < m_levels.size(); i++) {
+ if (debug) {
+ fprintf(stdout, "[D] [%ld]:\t", i);
+ } else {
+ fprintf(stdout, "[%ld]:\t", i);
+ }
- fprintf(stdout, "\n");
+ if (m_levels[i]) {
+ for (size_t j = 0; j < m_levels[i]->get_shard_count(); j++) {
+ fprintf(stdout, "(%ld, %ld, %p: %ld) ", j,
+ m_levels[i]->get_shard_ptr(j).second,
+ m_levels[i]->get_shard_ptr(j).first.get(),
+ m_levels[i]->get_shard(j)->get_record_count());
+ }
+ } else {
+ fprintf(stdout, "[Empty]");
}
- }
+
+ fprintf(stdout, "\n");
+ }
+ }
private:
LevelVector m_levels;
diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h
index 205dbf9..cf2b16a 100644
--- a/include/framework/structure/InternalLevel.h
+++ b/include/framework/structure/InternalLevel.h
@@ -15,11 +15,11 @@
*/
#pragma once
+#include <algorithm>
+#include <deque>
#include <future>
#include <memory>
#include <vector>
-#include <deque>
-#include <algorithm>
#include "framework/interface/Query.h"
#include "framework/interface/Shard.h"
@@ -34,7 +34,7 @@ class InternalLevel {
typedef std::pair<std::shared_ptr<ShardType>, size_t> shard_ptr;
public:
- InternalLevel(ssize_t level_no) : m_level_no(level_no) { }
+ InternalLevel(ssize_t level_no) : m_level_no(level_no) {}
~InternalLevel() = default;
@@ -109,7 +109,7 @@ public:
idx = 0;
}
- if (idx >= (ssize_t) m_shards.size()) {
+ if (idx >= (ssize_t)m_shards.size()) {
return nullptr;
}
@@ -183,7 +183,7 @@ public:
}
size_t get_nonempty_shard_count() const {
- size_t cnt = 0;
+ size_t cnt = 0;
for (size_t i = 0; i < m_shards.size(); i++) {
if (m_shards[i].first && m_shards[i].first->get_record_count() > 0) {
cnt += 1;
@@ -199,7 +199,7 @@ public:
new_level->append(m_shards[i].first, m_shards[i].second);
}
- for (auto itr=m_rt_window.begin(); itr < m_rt_window.end(); itr++) {
+ for (auto itr = m_rt_window.begin(); itr < m_rt_window.end(); itr++) {
new_level->m_rt_window.push_front(*itr);
}
@@ -208,23 +208,21 @@ public:
void truncate() { m_shards.erase(m_shards.begin(), m_shards.end()); }
- void delete_shard(shard_index shard, bool log_delete=true) {
+ void delete_shard(shard_index shard, bool log_delete = true) {
size_t before = m_shards.size();
m_shards.erase(m_shards.begin() + shard);
size_t after = m_shards.size();
- assert( before > after);
+ assert(before > after);
}
- void append(std::shared_ptr<ShardType> shard, size_t version=0) {
+ void append(std::shared_ptr<ShardType> shard, size_t version = 0) {
m_shards.push_back({shard, version});
}
- void append(shard_ptr shard) {
- m_shards.push_back(shard);
- }
+ void append(shard_ptr shard) { m_shards.push_back(shard); }
const shard_ptr get_shard_ptr(ssize_t idx) const {
- if (idx >= 0 && idx < (ssize_t) m_shards.size()) {
+ if (idx >= 0 && idx < (ssize_t)m_shards.size()) {
return m_shards[idx];
} else if (idx == all_shards_idx && m_shards.size() == 1) {
return m_shards[0];
@@ -247,7 +245,7 @@ public:
size_t total = 0;
for (auto rt : m_rt_window) {
total += rt;
- }
+ }
return total / m_rt_window.size();
}
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index 7357915..156b718 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -97,7 +97,9 @@ public:
return true;
}
- size_t get_record_count() const { return m_tail.load() - m_head.load().head_idx; }
+ size_t get_record_count() const {
+ return m_tail.load() - m_head.load().head_idx;
+ }
size_t get_capacity() const { return m_cap; }
@@ -139,14 +141,16 @@ public:
m_active_head_advance.store(true);
if (m_old_head.load().refcnt > 0) {
- //fprintf(stderr, "[W]: Refusing to advance head due to remaining reference counts [2]\n");
+ // fprintf(stderr, "[W]: Refusing to advance head due to remaining
+ // reference counts [2]\n");
m_active_head_advance.store(false);
return false;
}
- // fprintf(stderr, "[I] Advancing head pointer: %ld %ld %ld\n", m_old_head.load().head_idx, m_head.load().head_idx, new_head);
- // fprintf(stderr, "[I] Refcnts: %ld %ld\n", m_old_head.load().refcnt, m_head.load().refcnt);
-
+ // fprintf(stderr, "[I] Advancing head pointer: %ld %ld %ld\n",
+ // m_old_head.load().head_idx, m_head.load().head_idx, new_head);
+ // fprintf(stderr, "[I] Refcnts: %ld %ld\n", m_old_head.load().refcnt,
+ // m_head.load().refcnt);
buffer_head new_hd = {new_head, 1};
buffer_head cur_hd;
@@ -163,7 +167,6 @@ public:
return true;
}
-
void set_low_watermark(size_t lwm) {
assert(lwm < m_hwm);
m_lwm = lwm;
@@ -197,13 +200,9 @@ public:
return m_cap - (m_tail.load() - m_old_head.load().head_idx);
}
- size_t debug_get_old_head() const {
- return m_old_head.load().head_idx;
- }
+ size_t debug_get_old_head() const { return m_old_head.load().head_idx; }
- size_t debug_get_head() const {
- return m_head.load().head_idx;
- }
+ size_t debug_get_head() const { return m_head.load().head_idx; }
bool take_head_reference(size_t target_head) {
buffer_head cur_hd, new_hd;
@@ -225,7 +224,7 @@ public:
return head_acquired;
}
-
+
bool release_head_reference(size_t head) {
buffer_head cur_hd, new_hd;
bool head_released = false;
@@ -261,8 +260,8 @@ private:
buffer_head cur_hd, new_hd;
bool head_acquired = false;
-
- //fprintf(stderr, "[I]: getting head %ld %ld %ld\n", target_head, m_old_head.load().head_idx, m_head.load().head_idx);
+ // fprintf(stderr, "[I]: getting head %ld %ld %ld\n", target_head,
+ // m_old_head.load().head_idx, m_head.load().head_idx);
do {
if (m_old_head.load().head_idx == target_head) {
cur_hd = m_old_head.load();
@@ -279,7 +278,7 @@ private:
return new_hd.head_idx;
}
-
+
ssize_t try_advance_tail() {
size_t old_value = m_tail.load();
diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h
index 3ae3492..e2484d5 100644
--- a/include/framework/util/Configuration.h
+++ b/include/framework/util/Configuration.h
@@ -8,48 +8,49 @@
*/
#pragma once
+#include "framework/interface/Scheduler.h"
#include "framework/reconstruction/ReconstructionPolicy.h"
#include "util/types.h"
-#include "framework/interface/Scheduler.h"
#include <cstdlib>
namespace de {
template <ShardInterface ShardType, QueryInterface<ShardType> QueryType,
-DeletePolicy D, SchedulerInterface SchedType>
+ DeletePolicy D, SchedulerInterface SchedType>
class DEConfiguration {
- public:
- DEConfiguration(std::unique_ptr<ReconstructionPolicy<ShardType, QueryType>> recon_policy)
- : recon_policy(std::move(recon_policy)) {}
+public:
+ DEConfiguration(
+ std::unique_ptr<ReconstructionPolicy<ShardType, QueryType>> recon_policy)
+ : recon_policy(std::move(recon_policy)) {}
- std::unique_ptr<ReconstructionPolicy<ShardType, QueryType>> recon_policy;
+ std::unique_ptr<ReconstructionPolicy<ShardType, QueryType>> recon_policy;
- /* buffer parameters */
- size_t buffer_count = 1;
- size_t buffer_size = 8000;
- size_t buffer_flush_trigger = buffer_size / 2;
+ /* buffer parameters */
+ size_t buffer_count = 1;
+ size_t buffer_size = 8000;
+ size_t buffer_flush_trigger = buffer_size / 2;
- /* reconstruction triggers */
- bool recon_enable_seek_trigger = false;
- bool recon_enable_maint_on_flush = false;
- bool recon_enable_delete_cmpct = false;
- bool recon_maint_disabled = true;
+ /* reconstruction triggers */
+ bool recon_enable_seek_trigger = false;
+ bool recon_enable_maint_on_flush = false;
+ bool recon_enable_delete_cmpct = false;
+ bool recon_maint_disabled = true;
- size_t recon_l0_capacity = 0; /* 0 for unbounded */
- double maximum_delete_proportion = 1;
+ size_t recon_l0_capacity = 0; /* 0 for unbounded */
+ double maximum_delete_proportion = 1;
- /* resource management */
- size_t maximum_threads = 16;
- size_t minimum_recon_threads = 1;
- size_t minimum_query_threads = 4;
- size_t maximum_memory_usage = 0; /* o for unbounded */
+ /* resource management */
+ size_t maximum_threads = 16;
+ size_t minimum_recon_threads = 1;
+ size_t minimum_query_threads = 4;
+ size_t maximum_memory_usage = 0; /* o for unbounded */
- size_t physical_core_count = 6;
+ size_t physical_core_count = 6;
- size_t buffer_flush_query_preemption_trigger = UINT64_MAX;
+ size_t buffer_flush_query_preemption_trigger = UINT64_MAX;
- bool dynamic_ratelimiting = false;
- size_t rt_level_scale = 1;
+ bool dynamic_ratelimiting = false;
+ size_t rt_level_scale = 1;
};
} // namespace de