summaryrefslogtreecommitdiffstats
path: root/include
diff options
context:
space:
mode:
Diffstat (limited to 'include')
-rw-r--r--include/framework/DynamicExtension.h71
-rw-r--r--include/framework/interface/Query.h6
-rw-r--r--include/framework/interface/Record.h49
-rw-r--r--include/framework/scheduling/Task.h2
-rw-r--r--include/framework/structure/BufferView.h10
-rw-r--r--include/framework/structure/ExtensionStructure.h302
-rw-r--r--include/framework/structure/InternalLevel.h19
-rw-r--r--include/framework/structure/MutableBuffer.h13
-rw-r--r--include/framework/util/Configuration.h6
-rw-r--r--include/query/irs.h85
-rw-r--r--include/query/knn.h11
-rw-r--r--include/query/pointlookup.h123
-rw-r--r--include/query/rangecount.h60
-rw-r--r--include/query/rangequery.h9
-rw-r--r--include/query/wirs.h13
-rw-r--r--include/query/wss.h13
-rw-r--r--include/shard/Alias.h2
-rw-r--r--include/shard/AugBTree.h2
-rw-r--r--include/shard/FSTrie.h200
-rw-r--r--include/shard/ISAMTree.h14
-rw-r--r--include/shard/LoudsPatricia.h199
-rw-r--r--include/shard/PGM.h150
-rw-r--r--include/shard/TrieSpline.h185
-rw-r--r--include/shard/VPTree.h51
-rw-r--r--include/util/SortedMerge.h2
-rw-r--r--include/util/types.h77
26 files changed, 1407 insertions, 267 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 7ea5370..e2e2784 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -54,6 +54,10 @@ public:
, m_next_core(0)
, m_epoch_cnt(0)
{
+ if constexpr (L == LayoutPolicy::BSM) {
+ assert(scale_factor == 2);
+ }
+
auto vers = new Structure(buffer_hwm, m_scale_factor, m_max_delete_prop);
m_current_epoch.store({new _Epoch(0, vers, m_buffer, 0), 0});
m_previous_epoch.store({nullptr, 0});
@@ -201,7 +205,7 @@ public:
*/
size_t get_memory_usage() {
auto epoch = get_active_epoch();
- auto t= epoch->get_buffer().get_memory_usage() + epoch->get_structure()->get_memory_usage();
+ auto t = m_buffer->get_memory_usage() + epoch->get_structure()->get_memory_usage();
end_job(epoch);
return t;
@@ -214,7 +218,7 @@ public:
*/
size_t get_aux_memory_usage() {
auto epoch = get_active_epoch();
- auto t = epoch->get_buffer().get_aux_memory_usage() + epoch->get_structure()->get_aux_memory_usage();
+ auto t = epoch->get_structure()->get_aux_memory_usage();
end_job(epoch);
return t;
@@ -487,10 +491,17 @@ private:
((DynamicExtension *) args->extension)->SetThreadAffinity();
Structure *vers = args->epoch->get_structure();
- for (ssize_t i=0; i<args->merges.size(); i++) {
- vers->reconstruction(args->merges[i].second, args->merges[i].first);
+ if constexpr (L == LayoutPolicy::BSM) {
+ if (args->merges.size() > 0) {
+ vers->reconstruction(args->merges[0]);
+ }
+ } else {
+ for (ssize_t i=0; i<args->merges.size(); i++) {
+ vers->reconstruction(args->merges[i].target, args->merges[i].sources[0]);
+ }
}
+
/*
* we'll grab the buffer AFTER doing the internal reconstruction, so we
* can flush as many records as possible in one go. The reconstruction
@@ -546,30 +557,34 @@ private:
std::vector<std::pair<ShardID, Shard*>> shards;
std::vector<void *> states = vers->get_query_states(shards, parms);
+ std::vector<R> results;
Q::process_query_states(parms, states, buffer_state);
- std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + 1);
- for (size_t i=0; i<query_results.size(); i++) {
- std::vector<Wrapped<R>> local_results;
- ShardID shid;
-
- if (i == 0) { /* process the buffer first */
- local_results = Q::buffer_query(buffer_state, parms);
- shid = INVALID_SHID;
- } else {
- local_results = Q::query(shards[i - 1].second, states[i - 1], parms);
- shid = shards[i - 1].first;
- }
+ do {
+ std::vector<std::vector<Wrapped<R>>> query_results(shards.size() + 1);
+ for (size_t i=0; i<query_results.size(); i++) {
+ std::vector<Wrapped<R>> local_results;
+ ShardID shid;
+
+ if (i == 0) { /* process the buffer first */
+ local_results = Q::buffer_query(buffer_state, parms);
+ shid = INVALID_SHID;
+ } else {
+ local_results = Q::query(shards[i - 1].second, states[i - 1], parms);
+ shid = shards[i - 1].first;
+ }
- query_results[i] = std::move(filter_deletes(local_results, shid, vers, &buffer));
+ query_results[i] = std::move(filter_deletes(local_results, shid, vers, &buffer));
- if constexpr (Q::EARLY_ABORT) {
- if (query_results[i].size() > 0) break;
+ if constexpr (Q::EARLY_ABORT) {
+ if (query_results[i].size() > 0) break;
+ }
}
- }
+ Q::merge(query_results, parms, results);
+
+ } while (Q::repeat(parms, results, states, buffer_state));
- auto result = Q::merge(query_results, parms);
- args->result_set.set_value(std::move(result));
+ args->result_set.set_value(std::move(results));
((DynamicExtension *) args->extension)->end_job(epoch);
@@ -624,7 +639,7 @@ private:
static std::vector<Wrapped<R>> filter_deletes(std::vector<Wrapped<R>> &records, ShardID shid, Structure *vers, BufView *bview) {
if constexpr (Q::SKIP_DELETE_FILTER) {
- return records;
+ return std::move(records);
}
std::vector<Wrapped<R>> processed_records;
@@ -685,7 +700,12 @@ private:
return processed_records;
}
+#ifdef _GNU_SOURCE
void SetThreadAffinity() {
+ if constexpr (std::same_as<SCHED, SerialScheduler>) {
+ return;
+ }
+
int core = m_next_core.fetch_add(1) % m_core_cnt;
cpu_set_t mask;
CPU_ZERO(&mask);
@@ -707,6 +727,11 @@ private:
CPU_SET(core, &mask);
::sched_setaffinity(0, sizeof(mask), &mask);
}
+#else
+ void SetThreadAffinity() {
+
+ }
+#endif
void end_job(_Epoch *epoch) {
diff --git a/include/framework/interface/Query.h b/include/framework/interface/Query.h
index 3d487f0..577d6cd 100644
--- a/include/framework/interface/Query.h
+++ b/include/framework/interface/Query.h
@@ -13,17 +13,19 @@
namespace de{
template <typename Q, typename R, typename S>
-concept QueryInterface = requires(void *p, S *sh, std::vector<void*> &s, std::vector<std::vector<Wrapped<R>>> &rv, BufferView<R> *bv) {
+concept QueryInterface = requires(void *p, S *sh, std::vector<void*> &s, std::vector<std::vector<Wrapped<R>>> &rv, BufferView<R> *bv, std::vector<R> &resv) {
{Q::get_query_state(sh, p)} -> std::convertible_to<void*>;
{Q::get_buffer_query_state(bv, p)} -> std::convertible_to<void *>;
{Q::process_query_states(p, s, p)};
{Q::query(sh, p, p)} -> std::convertible_to<std::vector<Wrapped<R>>>;
{Q::buffer_query(p, p)} -> std::convertible_to<std::vector<Wrapped<R>>>;
- {Q::merge(rv, p)} -> std::convertible_to<std::vector<R>>;
+ {Q::merge(rv, p, resv)};
{Q::delete_query_state(p)} -> std::same_as<void>;
{Q::delete_buffer_query_state(p)} -> std::same_as<void>;
+ {Q::repeat(p, resv, s, p)} -> std::same_as<bool>;
+
{Q::EARLY_ABORT} -> std::convertible_to<bool>;
{Q::SKIP_DELETE_FILTER} -> std::convertible_to<bool>;
};
diff --git a/include/framework/interface/Record.h b/include/framework/interface/Record.h
index 5b9f307..19ccadd 100644
--- a/include/framework/interface/Record.h
+++ b/include/framework/interface/Record.h
@@ -47,13 +47,18 @@ concept AlexInterface = KVPInterface<R> && requires(R r) {
};
template<typename R>
-concept WrappedInterface = RecordInterface<R> && requires(R r, R s, bool b) {
+concept WrappedInterface = RecordInterface<R> && requires(R r, R s, bool b, int i) {
{r.header} -> std::convertible_to<uint32_t>;
r.rec;
{r.set_delete()};
{r.is_deleted()} -> std::convertible_to<bool>;
{r.set_tombstone(b)};
{r.is_tombstone()} -> std::convertible_to<bool>;
+ {r.set_timestamp(i)};
+ {r.get_timestamp()} -> std::convertible_to<uint32_t>;
+ {r.clear_timestamp()};
+ {r.is_visible()} -> std::convertible_to<bool>;
+ {r.set_visible()};
{r < s} -> std::convertible_to<bool>;
{r == s} ->std::convertible_to<bool>;
};
@@ -71,9 +76,29 @@ struct Wrapped {
return header & 2;
}
+ inline void set_visible() {
+ header |= 4;
+ }
+
+ inline bool is_visible() const {
+ return header & 4;
+ }
+
+ inline void set_timestamp(int ts) {
+ header |= (ts << 3);
+ }
+
+ inline int get_timestamp() const {
+ return header >> 3;
+ }
+
+ inline void clear_timestamp() {
+ header &= 7;
+ }
+
inline void set_tombstone(bool val=true) {
if (val) {
- header |= val;
+ header |= 1;
} else {
header &= 0;
}
@@ -97,9 +122,8 @@ template <typename K, typename V>
struct Record {
K key;
V value;
- uint32_t header = 0;
- inline bool operator<(const Record& other) const {
+ inline bool operator<(const Record& other) const {
return key < other.key || (key == other.key && value < other.value);
}
@@ -108,6 +132,23 @@ struct Record {
}
};
+template<typename V>
+struct Record<const char*, V> {
+ const char* key;
+ V value;
+ size_t len;
+
+ inline bool operator<(const Record& other) const {
+ size_t n = std::min(len, other.len) + 1;
+ return strncmp(key, other.key, n) < 0;
+ }
+
+ inline bool operator==(const Record& other) const {
+ size_t n = std::min(len, other.len) + 1;
+ return strncmp(key, other.key, n) == 0;
+ }
+};
+
template <typename K, typename V, typename W>
struct WeightedRecord {
K key;
diff --git a/include/framework/scheduling/Task.h b/include/framework/scheduling/Task.h
index d5d4266..bd53090 100644
--- a/include/framework/scheduling/Task.h
+++ b/include/framework/scheduling/Task.h
@@ -26,7 +26,7 @@ namespace de {
template <RecordInterface R, ShardInterface<R> S, QueryInterface<R, S> Q, LayoutPolicy L>
struct ReconstructionArgs {
Epoch<R, S, Q, L> *epoch;
- std::vector<ReconstructionTask> merges;
+ ReconstructionVector merges;
std::promise<bool> result;
bool compaction;
void *extension;
diff --git a/include/framework/structure/BufferView.h b/include/framework/structure/BufferView.h
index 9e0872b..e95a799 100644
--- a/include/framework/structure/BufferView.h
+++ b/include/framework/structure/BufferView.h
@@ -20,7 +20,7 @@
namespace de {
-typedef std::_Bind<void (*(void*, long unsigned int))(void*, long unsigned int)> ReleaseFunction;
+typedef std::function<void(void)> ReleaseFunction;
template <RecordInterface R>
class BufferView {
@@ -112,6 +112,10 @@ public:
size_t get_record_count() {
return m_tail - m_head;
}
+
+ size_t get_capacity() {
+ return m_cap;
+ }
/*
* NOTE: This function returns an upper bound on the number
@@ -123,7 +127,7 @@ public:
}
Wrapped<R> *get(size_t i) {
- assert(i < get_record_count());
+ //assert(i < get_record_count());
return m_data + to_idx(i);
}
@@ -160,7 +164,7 @@ private:
bool m_active;
size_t to_idx(size_t i) {
- size_t idx = (m_start + i >= m_cap) ? i = (m_cap - m_start)
+ size_t idx = (m_start + i >= m_cap) ? i - (m_cap - m_start)
: m_start + i;
assert(idx < m_cap);
return idx;
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index 4802bc1..b83674b 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -27,6 +27,16 @@ class ExtensionStructure {
typedef S Shard;
typedef BufferView<R> BuffView;
+ typedef struct {
+ size_t reccnt;
+ size_t reccap;
+
+ size_t shardcnt;
+ size_t shardcap;
+ } level_state;
+
+ typedef std::vector<level_state> state_vector;
+
public:
ExtensionStructure(size_t buffer_size, size_t scale_factor, double max_delete_prop)
: m_scale_factor(scale_factor)
@@ -59,6 +69,7 @@ public:
}
new_struct->m_refcnt = 0;
+ new_struct->m_current_state = m_current_state;
return new_struct;
}
@@ -95,8 +106,13 @@ public:
* takes a structure as input.
*/
inline bool flush_buffer(BuffView buffer) {
- assert(can_reconstruct_with(0, buffer.get_record_count()));
+ state_vector tmp = m_current_state;
+
+ if (tmp.size() == 0) {
+ grow(tmp);
+ }
+ assert(can_reconstruct_with(0, buffer.get_record_count(), tmp));
flush_buffer_into_l0(std::move(buffer));
return true;
@@ -200,8 +216,16 @@ public:
return m_levels;
}
- std::vector<ReconstructionTask> get_compaction_tasks() {
- std::vector<ReconstructionTask> tasks;
+ /*
+ * NOTE: This cannot be simulated, because tombstone cancellation is not
+ * cheaply predictable. It is possible that the worst case number could
+ * be used instead, to allow for prediction, but compaction isn't a
+ * major concern outside of sampling; at least for now. So I'm not
+ * going to focus too much time on it at the moment.
+ */
+ ReconstructionVector get_compaction_tasks() {
+ ReconstructionVector tasks;
+ state_vector scratch_state = m_current_state;
/* if the tombstone/delete invariant is satisfied, no need for compactions */
if (validate_tombstone_proportion()) {
@@ -219,14 +243,12 @@ public:
assert(violation_idx != -1);
- level_index base_level = find_reconstruction_target(violation_idx);
+ level_index base_level = find_reconstruction_target(violation_idx, scratch_state);
if (base_level == -1) {
- base_level = grow();
+ base_level = grow(scratch_state);
}
for (level_index i=base_level; i>0; i--) {
- ReconstructionTask task = {i-1, i};
-
/*
* The amount of storage required for the reconstruction accounts
* for the cost of storing the new records, along with the
@@ -239,13 +261,11 @@ public:
*/
size_t reccnt = m_levels[i - 1]->get_record_count();
if constexpr (L == LayoutPolicy::LEVELING) {
- if (can_reconstruct_with(i, reccnt)) {
+ if (can_reconstruct_with(i, reccnt, scratch_state)) {
reccnt += m_levels[i]->get_record_count();
}
}
- //task.m_size = 2* reccnt * sizeof(R);
-
- tasks.push_back(task);
+ tasks.add_reconstruction(i-i, i, reccnt);
}
return tasks;
@@ -254,44 +274,53 @@ public:
/*
*
*/
- std::vector<ReconstructionTask> get_reconstruction_tasks(size_t buffer_reccnt) {
- std::vector<ReconstructionTask> reconstructions;
-
- /*
- * The buffer flush is not included so if that can be done without any
- * other change, just return an empty list.
+ ReconstructionVector get_reconstruction_tasks(size_t buffer_reccnt,
+ state_vector scratch_state={}) {
+ /*
+ * If no scratch state vector is provided, use a copy of the
+ * current one. The only time an empty vector could be used as
+ * *real* input to this function is when the current state is also
+ * empty, so this should would even in that case.
*/
- if (can_reconstruct_with(0, buffer_reccnt)) {
- return std::move(reconstructions);
+ if (scratch_state.size() == 0) {
+ scratch_state = m_current_state;
}
- level_index base_level = find_reconstruction_target(0);
- if (base_level == -1) {
- base_level = grow();
- }
-
- for (level_index i=base_level; i>0; i--) {
- ReconstructionTask task = {i-1, i};
-
+ ReconstructionVector reconstructions;
+ size_t LOOKAHEAD = 1;
+ for (size_t i=0; i<LOOKAHEAD; i++) {
/*
- * The amount of storage required for the reconstruction accounts
- * for the cost of storing the new records, along with the
- * cost of retaining the old records during the process
- * (hence the 2x multiplier).
- *
- * FIXME: currently does not account for the *actual* size
- * of the shards, only the storage for the records
- * themselves.
+ * If L0 cannot support a direct buffer flush, figure out what
+ * work must be done to free up space first. Otherwise, the
+ * reconstruction vector will be initially empty.
*/
- size_t reccnt = m_levels[i-1]->get_record_count();
- if constexpr (L == LayoutPolicy::LEVELING) {
- if (can_reconstruct_with(i, reccnt)) {
- reccnt += m_levels[i]->get_record_count();
+ if (!can_reconstruct_with(0, buffer_reccnt, scratch_state)) {
+ auto local_recon = get_reconstruction_tasks_from_level(0, scratch_state);
+
+ /*
+ * for the first iteration, we need to do all of the
+ * reconstructions, so use these to initially the returned
+ * reconstruction list
+ */
+ if (i == 0) {
+ reconstructions = local_recon;
+ /*
+ * Quick sanity test of idea: if the next reconstruction
+ * would be larger than this one, steal the largest
+ * task from it and run it now instead.
+ */
+ } else if (local_recon.get_total_reccnt() > reconstructions.get_total_reccnt()) {
+ auto t = local_recon.remove_reconstruction(0);
+ reconstructions.add_reconstruction(t);
}
}
- //task.m_size = 2* reccnt * sizeof(R);
- reconstructions.push_back(task);
+ /* simulate the buffer flush in the scratch state */
+ scratch_state[0].reccnt += buffer_reccnt;
+ if (L == LayoutPolicy::TEIRING || scratch_state[0].shardcnt == 0) {
+ scratch_state[0].shardcnt += 1;
+ }
+
}
return std::move(reconstructions);
@@ -301,38 +330,109 @@ public:
/*
*
*/
- std::vector<ReconstructionTask> get_reconstruction_tasks_from_level(level_index source_level) {
- std::vector<ReconstructionTask> reconstructions;
+ ReconstructionVector get_reconstruction_tasks_from_level(level_index source_level, state_vector &scratch_state) {
+ ReconstructionVector reconstructions;
- level_index base_level = find_reconstruction_target(source_level);
+ /*
+ * Find the first level capable of sustaining a reconstruction from
+ * the level above it. If no such level exists, add a new one at
+ * the bottom of the structure.
+ */
+ level_index base_level = find_reconstruction_target(source_level, scratch_state);
if (base_level == -1) {
- base_level = grow();
+ base_level = grow(scratch_state);
+ }
+
+ if constexpr (L == LayoutPolicy::BSM) {
+ if (base_level == 0) {
+ return std::move(reconstructions);
+ }
+
+ ReconstructionTask task;
+ task.target = base_level;
+
+ size_t base_reccnt = 0;
+ for (level_index i=base_level; i>source_level; i--) {
+ auto recon_reccnt = scratch_state[i-1].reccnt;
+ base_reccnt += recon_reccnt;
+ scratch_state[i-1].reccnt = 0;
+ scratch_state[i-1].shardcnt = 0;
+ task.add_source(i-1, recon_reccnt);
+ }
+
+ reconstructions.add_reconstruction(task);
+ scratch_state[base_level].reccnt = base_reccnt;
+ scratch_state[base_level].shardcnt = 1;
+
+ return std::move(reconstructions);
}
+ /*
+ * Determine the full set of reconstructions necessary to open up
+ * space in the source level.
+ */
for (level_index i=base_level; i>source_level; i--) {
- ReconstructionTask task = {i - 1, i};
+ size_t recon_reccnt = scratch_state[i-1].reccnt;
+ size_t base_reccnt = recon_reccnt;
+
/*
- * The amount of storage required for the reconstruction accounts
- * for the cost of storing the new records, along with the
- * cost of retaining the old records during the process
- * (hence the 2x multiplier).
- *
- * FIXME: currently does not account for the *actual* size
- * of the shards, only the storage for the records
- * themselves.
+ * If using Leveling, the total reconstruction size will be the
+ * records in *both* base and target, because they will need to
+ * be merged (assuming that target isn't empty).
*/
- size_t reccnt = m_levels[i-1]->get_record_count();
if constexpr (L == LayoutPolicy::LEVELING) {
- if (can_reconstruct_with(i, reccnt)) {
- reccnt += m_levels[i]->get_record_count();
+ if (can_reconstruct_with(i, base_reccnt, scratch_state)) {
+ recon_reccnt += scratch_state[i].reccnt;
}
}
-// task.m_size = 2* reccnt * sizeof(R);
+ reconstructions.add_reconstruction(i-1, i, recon_reccnt);
+
+ /*
+ * The base level will be emptied and its records moved to
+ * the target.
+ */
+ scratch_state[i-1].reccnt = 0;
+ scratch_state[i-1].shardcnt = 0;
- reconstructions.push_back(task);
+ /*
+ * The target level will have the records from the base level
+ * added to it, and potentially gain a shard if the LayoutPolicy
+ * is tiering or the level currently lacks any shards at all.
+ */
+ scratch_state[i].reccnt += base_reccnt;
+ if (L == LayoutPolicy::TEIRING || scratch_state[i].shardcnt == 0) {
+ scratch_state[i].shardcnt += 1;
+ }
}
- return reconstructions;
+ return std::move(reconstructions);
+ }
+
+ inline void reconstruction(ReconstructionTask task) {
+ static_assert(L == LayoutPolicy::BSM);
+ std::vector<InternalLevel<R, Shard, Q>*> levels(task.sources.size());
+ for (size_t i=0; i<task.sources.size(); i++) {
+ levels[i] = m_levels[task.sources[i]].get();
+ }
+
+ auto new_level = InternalLevel<R, Shard, Q>::reconstruction(levels, task.target);
+ if (task.target >= m_levels.size()) {
+ m_current_state.push_back({new_level->get_record_count(), calc_level_record_capacity(task.target),
+ 1, 1});
+ m_levels.emplace_back(new_level);
+ } else {
+ m_current_state[task.target] = {new_level->get_record_count(), calc_level_record_capacity(task.target),
+ 1, 1};
+ m_levels[task.target] = new_level;
+ }
+
+ /* remove all of the levels that have been flattened */
+ for (size_t i=0; i<task.sources.size(); i++) {
+ m_levels[task.sources[i]] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(task.sources[i], 1));
+ m_current_state[task.sources[i]] = {0, calc_level_record_capacity(task.target), 0, 1};
+ }
+
+ return;
}
/*
@@ -342,6 +442,14 @@ public:
* tombstone ordering invariant may be violated.
*/
inline void reconstruction(level_index base_level, level_index incoming_level) {
+ size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor;
+
+ if (base_level >= m_levels.size()) {
+ m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(base_level, shard_capacity)));
+ m_current_state.push_back({0, calc_level_record_capacity(base_level),
+ 0, shard_capacity});
+ }
+
if constexpr (L == LayoutPolicy::LEVELING) {
/* if the base level has a shard, merge the base and incoming together to make a new one */
if (m_levels[base_level]->get_shard_count() > 0) {
@@ -350,6 +458,7 @@ public:
} else {
m_levels[base_level] = m_levels[incoming_level];
}
+
} else {
m_levels[base_level]->append_level(m_levels[incoming_level].get());
m_levels[base_level]->finalize();
@@ -357,6 +466,14 @@ public:
/* place a new, empty level where the incoming level used to be */
m_levels[incoming_level] = std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(incoming_level, (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor));
+
+ /*
+ * Update the state vector to match the *real* state following
+ * the reconstruction
+ */
+ m_current_state[base_level] = {m_levels[base_level]->get_record_count(),
+ calc_level_record_capacity(base_level), m_levels[base_level]->get_shard_count(), shard_capacity};
+ m_current_state[incoming_level] = {0, calc_level_record_capacity(incoming_level), 0, shard_capacity};
}
bool take_reference() {
@@ -393,14 +510,27 @@ private:
std::vector<std::shared_ptr<InternalLevel<R, S, Q>>> m_levels;
+ /*
+ * A pair of <record_count, shard_count> for each level in the
+ * structure. Record counts may be slightly inaccurate due to
+ * deletes.
+ */
+ state_vector m_current_state;
+
/*
- * Add a new level to the structure and return its index.
+ * Add a new level to the scratch state and return its index.
+ *
+ * IMPORTANT: This does _not_ add a level to the extension structure
+ * anymore. This is handled by the appropriate reconstruction and flush
+ * methods as needed. This function is for use in "simulated"
+ * reconstructions.
*/
- inline level_index grow() {
+ inline level_index grow(state_vector &scratch_state) {
level_index new_idx = m_levels.size();
- size_t new_shard_cnt = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor;
+ size_t new_shard_cap = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor;
- m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(new_idx, new_shard_cnt)));
+ scratch_state.push_back({0, calc_level_record_capacity(new_idx),
+ 0, new_shard_cap});
return new_idx;
}
@@ -411,24 +541,36 @@ private:
* returns -1 if idx==0, and no such level exists, to simplify
* the logic of the first buffer flush.
*/
- inline level_index find_reconstruction_target(level_index idx) {
+ inline level_index find_reconstruction_target(level_index idx, state_vector &state) {
- if (idx == 0 && m_levels.size() == 0) return -1;
+ /*
+ * this handles the very first buffer flush, when the state vector
+ * is empty.
+ */
+ if (idx == 0 && state.size() == 0) return -1;
- size_t incoming_rec_cnt = get_level_record_count(idx);
- for (level_index i=idx+1; i<m_levels.size(); i++) {
- if (can_reconstruct_with(i, incoming_rec_cnt)) {
+ size_t incoming_rec_cnt = state[idx].reccnt;
+ for (level_index i=idx+1; i<state.size(); i++) {
+ if (can_reconstruct_with(i, incoming_rec_cnt, state)) {
return i;
}
- incoming_rec_cnt = get_level_record_count(i);
+ incoming_rec_cnt = state[idx].reccnt;
}
return -1;
}
inline void flush_buffer_into_l0(BuffView buffer) {
- assert(m_levels[0]);
+ size_t shard_capacity = (L == LayoutPolicy::LEVELING) ? 1 : m_scale_factor;
+
+ if (m_levels.size() == 0) {
+ m_levels.emplace_back(std::shared_ptr<InternalLevel<R, Shard, Q>>(new InternalLevel<R, Shard, Q>(0, shard_capacity)));
+
+ m_current_state.push_back({0, calc_level_record_capacity(0),
+ 0, shard_capacity});
+ }
+
if constexpr (L == LayoutPolicy::LEVELING) {
// FIXME: Kludgey implementation due to interface constraints.
auto old_level = m_levels[0].get();
@@ -444,6 +586,10 @@ private:
} else {
m_levels[0]->append_buffer(std::move(buffer));
}
+
+ /* update the state vector */
+ m_current_state[0].reccnt = m_levels[0]->get_record_count();
+ m_current_state[0].shardcnt = m_levels[0]->get_shard_count();
}
/*
@@ -475,15 +621,17 @@ private:
* Determines if a level can sustain a reconstruction with incoming_rec_cnt
* additional records without exceeding its capacity.
*/
- inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt) {
- if (idx >= m_levels.size() || !m_levels[idx]) {
+ inline bool can_reconstruct_with(level_index idx, size_t incoming_rec_cnt, state_vector &state) {
+ if (idx >= state.size()) {
return false;
}
- if (L == LayoutPolicy::LEVELING) {
- return m_levels[idx]->get_record_count() + incoming_rec_cnt <= calc_level_record_capacity(idx);
- } else {
- return m_levels[idx]->get_shard_count() < m_scale_factor;
+ if constexpr (L == LayoutPolicy::LEVELING) {
+ return state[idx].reccnt + incoming_rec_cnt <= state[idx].reccap;
+ } else if constexpr (L == LayoutPolicy::BSM) {
+ return state[idx].reccnt == 0;
+ } else {
+ return state[idx].shardcnt < state[idx].shardcap;
}
/* unreachable */
diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h
index db38946..b962dcc 100644
--- a/include/framework/structure/InternalLevel.h
+++ b/include/framework/structure/InternalLevel.h
@@ -64,6 +64,21 @@ public:
return std::shared_ptr<InternalLevel>(res);
}
+ static std::shared_ptr<InternalLevel> reconstruction(std::vector<InternalLevel*> levels, size_t level_idx) {
+ std::vector<Shard *> shards;
+ for (auto level : levels) {
+ for (auto shard : level->m_shards) {
+ if (shard) shards.emplace_back(shard.get());
+ }
+ }
+
+ auto res = new InternalLevel(level_idx, 1);
+ res->m_shard_cnt = 1;
+ res->m_shards[0] = std::make_shared<S>(shards);
+
+ return std::shared_ptr<InternalLevel>(res);
+ }
+
/*
* Create a new shard combining the records from all of
* the shards in level, and append this new shard into
@@ -185,6 +200,10 @@ public:
}
Shard* get_shard(size_t idx) {
+ if (idx >= m_shard_cnt) {
+ return nullptr;
+ }
+
return m_shards[idx].get();
}
diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h
index 415c95a..7db3980 100644
--- a/include/framework/structure/MutableBuffer.h
+++ b/include/framework/structure/MutableBuffer.h
@@ -50,18 +50,19 @@ public:
, m_tail(0)
, m_head({0, 0})
, m_old_head({high_watermark, 0})
- , m_data((Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped<R>)))
+ //, m_data((Wrapped<R> *) psudb::sf_aligned_alloc(CACHELINE_SIZE, m_cap * sizeof(Wrapped<R>)))
+ , m_data(new Wrapped<R>[m_cap]())
, m_tombstone_filter(new psudb::BloomFilter<R>(BF_FPR, m_hwm, BF_HASH_FUNCS))
, m_tscnt(0)
, m_old_tscnt(0)
, m_active_head_advance(false)
{
assert(m_cap > m_hwm);
- assert(m_hwm > m_lwm);
+ assert(m_hwm >= m_lwm);
}
~MutableBuffer() {
- free(m_data);
+ delete[] m_data;
delete m_tombstone_filter;
}
@@ -76,16 +77,20 @@ public:
wrec.header = 0;
if (tombstone) wrec.set_tombstone();
+ // FIXME: because of the mod, it isn't correct to use `pos`
+ // as the ordering timestamp in the header anymore.
size_t pos = tail % m_cap;
m_data[pos] = wrec;
- m_data[pos].header |= (pos << 2);
+ m_data[pos].set_timestamp(pos);
if (tombstone) {
m_tscnt.fetch_add(1);
if (m_tombstone_filter) m_tombstone_filter->insert(rec);
}
+ m_data[pos].set_visible();
+
return 1;
}
diff --git a/include/framework/util/Configuration.h b/include/framework/util/Configuration.h
index 65ca181..4a4524a 100644
--- a/include/framework/util/Configuration.h
+++ b/include/framework/util/Configuration.h
@@ -35,7 +35,8 @@ static thread_local size_t sampling_bailouts = 0;
enum class LayoutPolicy {
LEVELING,
- TEIRING
+ TEIRING,
+ BSM
};
enum class DeletePolicy {
@@ -43,7 +44,4 @@ enum class DeletePolicy {
TAGGING
};
-typedef ssize_t level_index;
-typedef std::pair<level_index, level_index> ReconstructionTask;
-
}
diff --git a/include/query/irs.h b/include/query/irs.h
index e2d9325..879d070 100644
--- a/include/query/irs.h
+++ b/include/query/irs.h
@@ -40,7 +40,12 @@ struct BufferState {
size_t sample_size;
BufferView<R> *buffer;
+ psudb::Alias *alias;
+
BufferState(BufferView<R> *buffer) : buffer(buffer) {}
+ ~BufferState() {
+ delete alias;
+ }
};
template <RecordInterface R, ShardInterface<R> S, bool Rejection=true>
@@ -72,6 +77,7 @@ public:
res->cutoff = res->buffer->get_record_count();
res->sample_size = 0;
+ res->alias = nullptr;
if constexpr (Rejection) {
return res;
@@ -96,39 +102,51 @@ public:
std::vector<size_t> shard_sample_sizes(shard_states.size()+1, 0);
size_t buffer_sz = 0;
- std::vector<size_t> weights;
- if constexpr (Rejection) {
- weights.push_back((bs) ? bs->cutoff : 0);
- } else {
- weights.push_back((bs) ? bs->records.size() : 0);
+ /* for simplicity of static structure testing */
+ if (!bs) {
+ assert(shard_states.size() == 1);
+ auto state = (State<R> *) shard_states[0];
+ state->sample_size = p->sample_size;
+ return;
}
- size_t total_weight = 0;
- for (auto &s : shard_states) {
- auto state = (State<R> *) s;
- total_weight += state->total_weight;
- weights.push_back(state->total_weight);
- }
+ /* we only need to build the shard alias on the first call */
+ if (bs->alias == nullptr) {
+ std::vector<size_t> weights;
+ if constexpr (Rejection) {
+ weights.push_back((bs) ? bs->cutoff : 0);
+ } else {
+ weights.push_back((bs) ? bs->records.size() : 0);
+ }
- // if no valid records fall within the query range, just
- // set all of the sample sizes to 0 and bail out.
- if (total_weight == 0) {
- for (size_t i=0; i<shard_states.size(); i++) {
- auto state = (State<R> *) shard_states[i];
- state->sample_size = 0;
+ size_t total_weight = weights[0];
+ for (auto &s : shard_states) {
+ auto state = (State<R> *) s;
+ total_weight += state->total_weight;
+ weights.push_back(state->total_weight);
}
- return;
- }
+ // if no valid records fall within the query range, just
+ // set all of the sample sizes to 0 and bail out.
+ if (total_weight == 0) {
+ for (size_t i=0; i<shard_states.size(); i++) {
+ auto state = (State<R> *) shard_states[i];
+ state->sample_size = 0;
+ }
- std::vector<double> normalized_weights;
- for (auto w : weights) {
- normalized_weights.push_back((double) w / (double) total_weight);
+ return;
+ }
+
+ std::vector<double> normalized_weights;
+ for (auto w : weights) {
+ normalized_weights.push_back((double) w / (double) total_weight);
+ }
+
+ bs->alias = new psudb::Alias(normalized_weights);
}
- auto shard_alias = psudb::Alias(normalized_weights);
for (size_t i=0; i<p->sample_size; i++) {
- auto idx = shard_alias.get(p->rng);
+ auto idx = bs->alias->get(p->rng);
if (idx == 0) {
buffer_sz++;
} else {
@@ -198,16 +216,12 @@ public:
return result;
}
- static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms) {
- std::vector<R> output;
-
+ static void merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms, std::vector<R> &output) {
for (size_t i=0; i<results.size(); i++) {
for (size_t j=0; j<results[i].size(); j++) {
output.emplace_back(results[i][j].rec);
}
}
-
- return output;
}
static void delete_query_state(void *state) {
@@ -219,5 +233,18 @@ public:
auto s = (BufferState<R> *) state;
delete s;
}
+
+ static bool repeat(void *parms, std::vector<R> &results, std::vector<void*> states, void* buffer_state) {
+ auto p = (Parms<R> *) parms;
+
+ if (results.size() < p->sample_size) {
+ auto q = *p;
+ q.sample_size -= results.size();
+ process_query_states(&q, states, buffer_state);
+ return true;
+ }
+
+ return false;
+ }
};
}}
diff --git a/include/query/knn.h b/include/query/knn.h
index 19dcf5c..a227293 100644
--- a/include/query/knn.h
+++ b/include/query/knn.h
@@ -111,10 +111,10 @@ public:
pq.pop();
}
- return results;
+ return std::move(results);
}
- static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms) {
+ static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms, std::vector<R> &output) {
Parms<R> *p = (Parms<R> *) parms;
R rec = p->point;
size_t k = p->k;
@@ -136,13 +136,12 @@ public:
}
}
- std::vector<R> output;
while (pq.size() > 0) {
output.emplace_back(*pq.peek().data);
pq.pop();
}
- return output;
+ return std::move(output);
}
static void delete_query_state(void *state) {
@@ -154,6 +153,10 @@ public:
auto s = (BufferState<R> *) state;
delete s;
}
+
+ static bool repeat(void *parms, std::vector<R> &results, std::vector<void*> states, void* buffer_state) {
+ return false;
+ }
};
}}
diff --git a/include/query/pointlookup.h b/include/query/pointlookup.h
new file mode 100644
index 0000000..94c2bce
--- /dev/null
+++ b/include/query/pointlookup.h
@@ -0,0 +1,123 @@
+/*
+ * include/query/pointlookup.h
+ *
+ * Copyright (C) 2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ * A query class for point lookup operations.
+ *
+ * TODO: Currently, this only supports point lookups for unique keys (which
+ * is the case for the trie that we're building this to use). It would be
+ * pretty straightforward to extend it to return *all* records that match
+ * the search_key (including tombstone cancellation--it's invertible) to
+ * support non-unique indexes, or at least those implementing
+ * lower_bound().
+ */
+#pragma once
+
+#include "framework/QueryRequirements.h"
+
+namespace de { namespace pl {
+
+template <RecordInterface R>
+struct Parms {
+ decltype(R::key) search_key;
+};
+
+template <RecordInterface R>
+struct State {
+};
+
+template <RecordInterface R>
+struct BufferState {
+ BufferView<R> *buffer;
+
+ BufferState(BufferView<R> *buffer)
+ : buffer(buffer) {}
+};
+
+template <KVPInterface R, ShardInterface<R> S>
+class Query {
+public:
+ constexpr static bool EARLY_ABORT=true;
+ constexpr static bool SKIP_DELETE_FILTER=true;
+
+ static void *get_query_state(S *shard, void *parms) {
+ return nullptr;
+ }
+
+ static void* get_buffer_query_state(BufferView<R> *buffer, void *parms) {
+ auto res = new BufferState<R>(buffer);
+
+ return res;
+ }
+
+ static void process_query_states(void *query_parms, std::vector<void*> &shard_states, void* buffer_state) {
+ return;
+ }
+
+ static std::vector<Wrapped<R>> query(S *shard, void *q_state, void *parms) {
+ auto p = (Parms<R> *) parms;
+ auto s = (State<R> *) q_state;
+
+ std::vector<Wrapped<R>> result;
+
+ auto r = shard->point_lookup({p->search_key, 0});
+
+ if (r) {
+ result.push_back(*r);
+ }
+
+ return result;
+ }
+
+ static std::vector<Wrapped<R>> buffer_query(void *state, void *parms) {
+ auto p = (Parms<R> *) parms;
+ auto s = (BufferState<R> *) state;
+
+ std::vector<Wrapped<R>> records;
+ for (size_t i=0; i<s->buffer->get_record_count(); i++) {
+ auto rec = s->buffer->get(i);
+
+ if (rec->rec.key == p->search_key) {
+ records.push_back(*rec);
+ return records;
+ }
+ }
+
+ return records;
+ }
+
+ static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms, std::vector<R> &output) {
+ for (auto r : results) {
+ if (r.size() > 0) {
+ if (r[0].is_deleted() || r[0].is_tombstone()) {
+ return output;
+ }
+
+ output.push_back(r[0].rec);
+ return output;
+ }
+ }
+
+ return output;
+ }
+
+ static void delete_query_state(void *state) {
+ auto s = (State<R> *) state;
+ delete s;
+ }
+
+ static void delete_buffer_query_state(void *state) {
+ auto s = (BufferState<R> *) state;
+ delete s;
+ }
+
+
+ static bool repeat(void *parms, std::vector<R> &results, std::vector<void*> states, void* buffer_state) {
+ return false;
+ }
+};
+
+}}
diff --git a/include/query/rangecount.h b/include/query/rangecount.h
index 6c57809..5b95cdd 100644
--- a/include/query/rangecount.h
+++ b/include/query/rangecount.h
@@ -35,20 +35,14 @@ struct BufferState {
: buffer(buffer) {}
};
-template <KVPInterface R, ShardInterface<R> S>
+template <KVPInterface R, ShardInterface<R> S, bool FORCE_SCAN=false>
class Query {
public:
constexpr static bool EARLY_ABORT=false;
constexpr static bool SKIP_DELETE_FILTER=true;
static void *get_query_state(S *shard, void *parms) {
- auto res = new State<R>();
- auto p = (Parms<R> *) parms;
-
- res->start_idx = shard->get_lower_bound(p->lower_bound);
- res->stop_idx = shard->get_record_count();
-
- return res;
+ return nullptr;
}
static void* get_buffer_query_state(BufferView<R> *buffer, void *parms) {
@@ -74,37 +68,43 @@ public:
res.rec.value = 0; // tombstones
records.emplace_back(res);
+
+ auto start_idx = shard->get_lower_bound(p->lower_bound);
+ auto stop_idx = shard->get_lower_bound(p->upper_bound);
+
/*
* if the returned index is one past the end of the
* records for the PGM, then there are not records
* in the index falling into the specified range.
*/
- if (s->start_idx == shard->get_record_count()) {
+ if (start_idx == shard->get_record_count()) {
return records;
}
- auto ptr = shard->get_record_at(s->start_idx);
/*
* roll the pointer forward to the first record that is
* greater than or equal to the lower bound.
*/
- while(ptr < shard->get_data() + s->stop_idx && ptr->rec.key < p->lower_bound) {
- ptr++;
+ auto recs = shard->get_data();
+ while(start_idx < stop_idx && recs[start_idx].rec.key < p->lower_bound) {
+ start_idx++;
}
- while (ptr < shard->get_data() + s->stop_idx && ptr->rec.key <= p->upper_bound) {
- if (!ptr->is_deleted()) {
- if (ptr->is_tombstone()) {
- records[0].rec.value++;
- } else {
- records[0].rec.key++;
- }
- }
+ while (stop_idx < shard->get_record_count() && recs[stop_idx].rec.key <= p->upper_bound) {
+ stop_idx++;
+ }
+ size_t idx = start_idx;
+ size_t ts_cnt = 0;
- ptr++;
+ while (idx < stop_idx) {
+ ts_cnt += recs[idx].is_tombstone() * 2 + recs[idx].is_deleted();
+ idx++;
}
+ records[0].rec.key = idx - start_idx;
+ records[0].rec.value = ts_cnt;
+
return records;
}
@@ -119,8 +119,16 @@ public:
res.rec.value = 0; // tombstones
records.emplace_back(res);
+ size_t stop_idx;
+ if constexpr (FORCE_SCAN) {
+ stop_idx = s->buffer->get_capacity() / 2;
+ } else {
+ stop_idx = s->buffer->get_record_count();
+ }
+
for (size_t i=0; i<s->buffer->get_record_count(); i++) {
auto rec = s->buffer->get(i);
+
if (rec->rec.key >= p->lower_bound && rec->rec.key <= p->upper_bound
&& !rec->is_deleted()) {
if (rec->is_tombstone()) {
@@ -134,12 +142,10 @@ public:
return records;
}
- static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms) {
-
+ static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms, std::vector<R> &output) {
R res;
res.key = 0;
res.value = 0;
- std::vector<R> output;
output.emplace_back(res);
for (size_t i=0; i<results.size(); i++) {
@@ -152,14 +158,16 @@ public:
}
static void delete_query_state(void *state) {
- auto s = (State<R> *) state;
- delete s;
}
static void delete_buffer_query_state(void *state) {
auto s = (BufferState<R> *) state;
delete s;
}
+
+ static bool repeat(void *parms, std::vector<R> &results, std::vector<void*> states, void* buffer_state) {
+ return false;
+ }
};
}}
diff --git a/include/query/rangequery.h b/include/query/rangequery.h
index 24b38ec..e0690e6 100644
--- a/include/query/rangequery.h
+++ b/include/query/rangequery.h
@@ -109,7 +109,7 @@ public:
return records;
}
- static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms) {
+ static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms, std::vector<R> &output) {
std::vector<Cursor<Wrapped<R>>> cursors;
cursors.reserve(results.size());
@@ -121,7 +121,7 @@ public:
for (size_t i = 0; i < tmp_n; ++i)
if (results[i].size() > 0){
auto base = results[i].data();
- cursors.emplace_back(Cursor{base, base + results[i].size(), 0, results[i].size()});
+ cursors.emplace_back(Cursor<Wrapped<R>>{base, base + results[i].size(), 0, results[i].size()});
assert(i == cursors.size() - 1);
total += results[i].size();
pq.push(cursors[i].ptr, tmp_n - i - 1);
@@ -133,7 +133,6 @@ public:
return std::vector<R>();
}
- std::vector<R> output;
output.reserve(total);
while (pq.size()) {
@@ -169,6 +168,10 @@ public:
auto s = (BufferState<R> *) state;
delete s;
}
+
+ static bool repeat(void *parms, std::vector<R> &results, std::vector<void*> states, void* buffer_state) {
+ return false;
+ }
};
}}
diff --git a/include/query/wirs.h b/include/query/wirs.h
index ae82194..62b43f6 100644
--- a/include/query/wirs.h
+++ b/include/query/wirs.h
@@ -219,9 +219,7 @@ public:
return result;
}
- static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms) {
- std::vector<R> output;
-
+ static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms, std::vector<R> &output) {
for (size_t i=0; i<results.size(); i++) {
for (size_t j=0; j<results[i].size(); j++) {
output.emplace_back(results[i][j].rec);
@@ -240,5 +238,14 @@ public:
auto s = (BufferState<R> *) state;
delete s;
}
+
+ static bool repeat(void *parms, std::vector<R> &results, std::vector<void*> states, void* buffer_state) {
+ auto p = (Parms<R> *) parms;
+
+ if (results.size() < p->sample_size) {
+ return true;
+ }
+ return false;
+ }
};
}}
diff --git a/include/query/wss.h b/include/query/wss.h
index 8797035..fb0b414 100644
--- a/include/query/wss.h
+++ b/include/query/wss.h
@@ -183,9 +183,7 @@ public:
return result;
}
- static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms) {
- std::vector<R> output;
-
+ static std::vector<R> merge(std::vector<std::vector<Wrapped<R>>> &results, void *parms, std::vector<R> &output) {
for (size_t i=0; i<results.size(); i++) {
for (size_t j=0; j<results[i].size(); j++) {
output.emplace_back(results[i][j].rec);
@@ -204,6 +202,15 @@ public:
auto s = (BufferState<R> *) state;
delete s;
}
+
+ static bool repeat(void *parms, std::vector<R> &results, std::vector<void*> states, void* buffer_state) {
+ auto p = (Parms<R> *) parms;
+
+ if (results.size() < p->sample_size) {
+ return true;
+ }
+ return false;
+ }
};
}}
diff --git a/include/shard/Alias.h b/include/shard/Alias.h
index 9275952..72147d7 100644
--- a/include/shard/Alias.h
+++ b/include/shard/Alias.h
@@ -148,7 +148,7 @@ public:
size_t get_memory_usage() {
- return m_alloc_size;
+ return 0;
}
size_t get_aux_memory_usage() {
diff --git a/include/shard/AugBTree.h b/include/shard/AugBTree.h
index 54931bd..c60cbcd 100644
--- a/include/shard/AugBTree.h
+++ b/include/shard/AugBTree.h
@@ -148,7 +148,7 @@ public:
}
size_t get_memory_usage() {
- return m_alloc_size + m_node_cnt * sizeof(AugBTreeNode<Wrapped<R>>);
+ return m_node_cnt * sizeof(AugBTreeNode<Wrapped<R>>);
}
size_t get_aux_memory_usage() {
diff --git a/include/shard/FSTrie.h b/include/shard/FSTrie.h
new file mode 100644
index 0000000..3783b38
--- /dev/null
+++ b/include/shard/FSTrie.h
@@ -0,0 +1,200 @@
+/*
+ * include/shard/FSTrie.h
+ *
+ * Copyright (C) 2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ * A shard shim around the FSTrie learned index.
+ */
+#pragma once
+
+
+#include <vector>
+
+#include "framework/ShardRequirements.h"
+#include "fst.hpp"
+#include "util/SortedMerge.h"
+
+using psudb::CACHELINE_SIZE;
+using psudb::BloomFilter;
+using psudb::PriorityQueue;
+using psudb::queue_record;
+using psudb::byte;
+
+namespace de {
+
+template <KVPInterface R>
+class FSTrie {
+private:
+
+ typedef decltype(R::key) K;
+ typedef decltype(R::value) V;
+ static_assert(std::is_same_v<K, const char*>, "FST requires const char* keys.");
+
+public:
+ FSTrie(BufferView<R> buffer)
+ : m_data(nullptr)
+ , m_reccnt(0)
+ , m_alloc_size(0)
+ {
+ m_data = new Wrapped<R>[buffer.get_record_count()]();
+ m_alloc_size = sizeof(Wrapped<R>) * buffer.get_record_count();
+
+ size_t cnt = 0;
+ std::vector<std::string> keys;
+ keys.reserve(buffer.get_record_count());
+
+ /*
+ * Copy the contents of the buffer view into a temporary buffer, and
+ * sort them. We still need to iterate over these temporary records to
+ * apply tombstone/deleted record filtering, as well as any possible
+ * per-record processing that is required by the shard being built.
+ */
+ auto temp_buffer = new Wrapped<R>[buffer.get_record_count()]();
+ for (size_t i=0; i<buffer.get_record_count(); i++) {
+ temp_buffer[i] = *(buffer.get(i));
+ }
+
+ auto base = temp_buffer;
+ auto stop = base + buffer.get_record_count();
+ std::sort(base, stop, std::less<Wrapped<R>>());
+
+ for (size_t i=0; i<buffer.get_record_count(); i++) {
+ if (temp_buffer[i].is_deleted() || !temp_buffer[i].is_visible() || temp_buffer[i].rec.key == "") {
+ continue;
+ }
+
+ m_data[cnt] = temp_buffer[i];
+ m_data[cnt].clear_timestamp();
+
+ keys.push_back(std::string(m_data[cnt].rec.key));
+ cnt++;
+ }
+
+ m_reccnt = cnt;
+ if (m_reccnt > 0) {
+ m_fst = new fst::Trie(keys, true, 1);
+ }
+
+ delete[] temp_buffer;
+ }
+
+ FSTrie(std::vector<FSTrie*> &shards)
+ : m_data(nullptr)
+ , m_reccnt(0)
+ , m_alloc_size(0)
+ {
+ size_t attemp_reccnt = 0;
+ size_t tombstone_count = 0;
+ auto cursors = build_cursor_vec<R, FSTrie>(shards, &attemp_reccnt, &tombstone_count);
+
+ m_data = new Wrapped<R>[attemp_reccnt]();
+ m_alloc_size = attemp_reccnt * sizeof(Wrapped<R>);
+
+ std::vector<std::string> keys;
+ keys.reserve(attemp_reccnt);
+
+ // FIXME: For smaller cursor arrays, it may be more efficient to skip
+ // the priority queue and just do a scan.
+ PriorityQueue<Wrapped<R>> pq(cursors.size());
+ for (size_t i=0; i<cursors.size(); i++) {
+ pq.push(cursors[i].ptr, i);
+ }
+
+ while (pq.size()) {
+ auto now = pq.peek();
+ auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
+ /*
+ * if the current record is not a tombstone, and the next record is
+ * a tombstone that matches the current one, then the current one
+ * has been deleted, and both it and its tombstone can be skipped
+ * over.
+ */
+ if (!now.data->is_tombstone() && next.data != nullptr &&
+ now.data->rec == next.data->rec && next.data->is_tombstone()) {
+
+ pq.pop(); pq.pop();
+ auto& cursor1 = cursors[now.version];
+ auto& cursor2 = cursors[next.version];
+ if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version);
+ if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version);
+ } else {
+ auto& cursor = cursors[now.version];
+ /* skip over records that have been deleted via tagging */
+ if (!cursor.ptr->is_deleted() && cursor.ptr->rec.key != "") {
+ m_data[m_reccnt] = *cursor.ptr;
+ keys.push_back(std::string(m_data[m_reccnt].rec.key));
+
+ m_reccnt++;
+ }
+ pq.pop();
+
+ if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version);
+ }
+ }
+
+ if (m_reccnt > 0) {
+ m_fst = new fst::Trie(keys, true, 1);
+ }
+ }
+
+ ~FSTrie() {
+ delete[] m_data;
+ delete m_fst;
+ }
+
+ Wrapped<R> *point_lookup(const R &rec, bool filter=false) {
+
+ auto idx = m_fst->exactSearch(rec.key);
+
+ if (idx == fst::kNotFound) {
+ return nullptr;
+ }
+
+ // FIXME: for convenience, I'm treating this Trie as a unique index
+ // for now, so no need to scan forward and/or check values. This
+ // also makes the point lookup query class a lot easier to make.
+ // Ultimately, though, we can support non-unique indexes with some
+ // extra work.
+
+ return m_data + idx;
+ }
+
+ Wrapped<R>* get_data() const {
+ return m_data;
+ }
+
+ size_t get_record_count() const {
+ return m_reccnt;
+ }
+
+ size_t get_tombstone_count() const {
+ return 0;
+ }
+
+ const Wrapped<R>* get_record_at(size_t idx) const {
+ if (idx >= m_reccnt) return nullptr;
+ return m_data + idx;
+ }
+
+
+ size_t get_memory_usage() {
+ return m_fst->getMemoryUsage();
+ }
+
+ size_t get_aux_memory_usage() {
+ return m_alloc_size;
+ }
+
+ size_t get_lower_bound(R &rec) {return 0;}
+ size_t get_upper_bound(R &rec) {return 0;}
+
+private:
+
+ Wrapped<R>* m_data;
+ size_t m_reccnt;
+ size_t m_alloc_size;
+ fst::Trie *m_fst;
+};
+}
diff --git a/include/shard/ISAMTree.h b/include/shard/ISAMTree.h
index 3763271..1cca506 100644
--- a/include/shard/ISAMTree.h
+++ b/include/shard/ISAMTree.h
@@ -51,7 +51,7 @@ constexpr static size_t LEAF_FANOUT = NODE_SZ / sizeof(R);
public:
ISAMTree(BufferView<R> buffer)
- : m_bf(new BloomFilter<R>(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS))
+ : m_bf(nullptr)
, m_isam_nodes(nullptr)
, m_root(nullptr)
, m_reccnt(0)
@@ -59,19 +59,12 @@ public:
, m_internal_node_cnt(0)
, m_deleted_cnt(0)
, m_alloc_size(0)
- , m_data(nullptr)
{
m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
buffer.get_record_count() *
sizeof(Wrapped<R>),
(byte**) &m_data);
- /*
- * without this, gcc seems to hoist the building of the array
- * _above_ its allocation under -O3, resulting in memfaults.
- */
- asm volatile ("" ::: "memory");
-
auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf);
m_reccnt = res.record_count;
m_tombstone_cnt = res.tombstone_count;
@@ -90,13 +83,12 @@ public:
, m_internal_node_cnt(0)
, m_deleted_cnt(0)
, m_alloc_size(0)
- , m_data(nullptr)
{
size_t attemp_reccnt = 0;
size_t tombstone_count = 0;
auto cursors = build_cursor_vec<R, ISAMTree>(shards, &attemp_reccnt, &tombstone_count);
- m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS);
+ m_bf = nullptr;
m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
attemp_reccnt * sizeof(Wrapped<R>),
(byte **) &m_data);
@@ -149,7 +141,7 @@ public:
size_t get_memory_usage() {
- return m_alloc_size + m_internal_node_cnt * NODE_SZ;
+ return m_internal_node_cnt * NODE_SZ;
}
size_t get_aux_memory_usage() {
diff --git a/include/shard/LoudsPatricia.h b/include/shard/LoudsPatricia.h
new file mode 100644
index 0000000..3452839
--- /dev/null
+++ b/include/shard/LoudsPatricia.h
@@ -0,0 +1,199 @@
+/*
+ * include/shard/LoudsPatricia.h
+ *
+ * Copyright (C) 2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ * A shard shim around the LoudsPatricia learned index.
+ */
+#pragma once
+
+
+#include <vector>
+
+#include "framework/ShardRequirements.h"
+#include "louds-patricia.hpp"
+#include "util/SortedMerge.h"
+
+using psudb::CACHELINE_SIZE;
+using psudb::BloomFilter;
+using psudb::PriorityQueue;
+using psudb::queue_record;
+using psudb::byte;
+
+namespace de {
+
+template <KVPInterface R>
+class LoudsPatricia {
+private:
+
+ typedef decltype(R::key) K;
+ typedef decltype(R::value) V;
+ static_assert(std::is_same_v<K, const char*>, "FST requires const char* keys.");
+
+public:
+ LoudsPatricia(BufferView<R> buffer)
+ : m_data(nullptr)
+ , m_reccnt(0)
+ , m_alloc_size(0)
+ {
+ m_data = new Wrapped<R>[buffer.get_record_count()]();
+ m_alloc_size = sizeof(Wrapped<R>) * buffer.get_record_count();
+
+ m_louds = new louds::Patricia();
+
+ size_t cnt = 0;
+ std::vector<std::string> keys;
+ keys.reserve(buffer.get_record_count());
+
+ /*
+ * Copy the contents of the buffer view into a temporary buffer, and
+ * sort them. We still need to iterate over these temporary records to
+ * apply tombstone/deleted record filtering, as well as any possible
+ * per-record processing that is required by the shard being built.
+ */
+ auto temp_buffer = new Wrapped<R>[buffer.get_record_count()]();
+ for (size_t i=0; i<buffer.get_record_count(); i++) {
+ temp_buffer[i] = *(buffer.get(i));
+ }
+
+ auto base = temp_buffer;
+ auto stop = base + buffer.get_record_count();
+ std::sort(base, stop, std::less<Wrapped<R>>());
+
+ for (size_t i=0; i<buffer.get_record_count(); i++) {
+ if (temp_buffer[i].is_deleted() || !temp_buffer[i].is_visible() || temp_buffer[i].rec.key == "") {
+ continue;
+ }
+
+ m_data[cnt] = temp_buffer[i];
+ m_data[cnt].clear_timestamp();
+
+ m_louds->add(std::string(m_data[cnt].rec.key));
+ cnt++;
+ }
+
+ m_reccnt = cnt;
+ if (m_reccnt > 0) {
+ m_louds->build();
+ }
+
+ delete[] temp_buffer;
+ }
+
+ LoudsPatricia(std::vector<LoudsPatricia*> &shards)
+ : m_data(nullptr)
+ , m_reccnt(0)
+ , m_alloc_size(0)
+ {
+ size_t attemp_reccnt = 0;
+ size_t tombstone_count = 0;
+ auto cursors = build_cursor_vec<R, LoudsPatricia>(shards, &attemp_reccnt, &tombstone_count);
+
+ m_data = new Wrapped<R>[attemp_reccnt]();
+ m_alloc_size = attemp_reccnt * sizeof(Wrapped<R>);
+
+ m_louds = new louds::Patricia();
+
+ // FIXME: For smaller cursor arrays, it may be more efficient to skip
+ // the priority queue and just do a scan.
+ PriorityQueue<Wrapped<R>> pq(cursors.size());
+ for (size_t i=0; i<cursors.size(); i++) {
+ pq.push(cursors[i].ptr, i);
+ }
+
+ while (pq.size()) {
+ auto now = pq.peek();
+ auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
+ /*
+ * if the current record is not a tombstone, and the next record is
+ * a tombstone that matches the current one, then the current one
+ * has been deleted, and both it and its tombstone can be skipped
+ * over.
+ */
+ if (!now.data->is_tombstone() && next.data != nullptr &&
+ now.data->rec == next.data->rec && next.data->is_tombstone()) {
+
+ pq.pop(); pq.pop();
+ auto& cursor1 = cursors[now.version];
+ auto& cursor2 = cursors[next.version];
+ if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version);
+ if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version);
+ } else {
+ auto& cursor = cursors[now.version];
+ /* skip over records that have been deleted via tagging */
+ if (!cursor.ptr->is_deleted() && cursor.ptr->rec.key != "") {
+ m_data[m_reccnt] = *cursor.ptr;
+ m_louds->add(std::string(m_data[m_reccnt].rec.key));
+ m_reccnt++;
+ }
+ pq.pop();
+
+ if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version);
+ }
+ }
+
+ if (m_reccnt > 0) {
+ m_louds->build();
+ }
+ }
+
+ ~LoudsPatricia() {
+ delete[] m_data;
+ delete m_louds;
+ }
+
+ Wrapped<R> *point_lookup(const R &rec, bool filter=false) {
+
+ auto idx = m_louds->lookup(std::string(rec.key));
+
+ if (idx == -1) {
+ return nullptr;
+ }
+
+ // FIXME: for convenience, I'm treating this Trie as a unique index
+ // for now, so no need to scan forward and/or check values. This
+ // also makes the point lookup query class a lot easier to make.
+ // Ultimately, though, we can support non-unique indexes with some
+ // extra work.
+ return m_data + idx;
+ }
+
+ Wrapped<R>* get_data() const {
+ return m_data;
+ }
+
+ size_t get_record_count() const {
+ return m_reccnt;
+ }
+
+ size_t get_tombstone_count() const {
+ return 0;
+ }
+
+ const Wrapped<R>* get_record_at(size_t idx) const {
+ if (idx >= m_reccnt) return nullptr;
+ return m_data + idx;
+ }
+
+
+ size_t get_memory_usage() {
+ return m_louds->size();
+ }
+
+ size_t get_aux_memory_usage() {
+ return m_alloc_size;
+ }
+
+ size_t get_lower_bound(R &rec) {return 0;}
+ size_t get_upper_bound(R &rec) {return 0;}
+
+private:
+
+ Wrapped<R>* m_data;
+ size_t m_reccnt;
+ size_t m_alloc_size;
+ louds::Patricia *m_louds;
+};
+}
diff --git a/include/shard/PGM.h b/include/shard/PGM.h
index e2752ef..509796b 100644
--- a/include/shard/PGM.h
+++ b/include/shard/PGM.h
@@ -39,8 +39,7 @@ private:
public:
PGM(BufferView<R> buffer)
- : m_data(nullptr)
- , m_bf(new BloomFilter<R>(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS))
+ : m_bf(nullptr)
, m_reccnt(0)
, m_tombstone_cnt(0)
, m_alloc_size(0) {
@@ -49,16 +48,63 @@ public:
buffer.get_record_count() *
sizeof(Wrapped<R>),
(byte**) &m_data);
- auto res = sorted_array_from_bufferview<R>(std::move(buffer), m_data, m_bf);
- m_reccnt = res.record_count;
- m_tombstone_cnt = res.tombstone_count;
- if (m_reccnt > 0) {
- std::vector<K> keys;
- for (size_t i=0; i<m_reccnt; i++) {
- keys.emplace_back(m_data[i].rec.key);
+ std::vector<K> keys;
+ /*
+ * Copy the contents of the buffer view into a temporary buffer, and
+ * sort them. We still need to iterate over these temporary records to
+ * apply tombstone/deleted record filtering, as well as any possible
+ * per-record processing that is required by the shard being built.
+ */
+ auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE,
+ buffer.get_record_count(),
+ sizeof(Wrapped<R>));
+ buffer.copy_to_buffer((byte *) temp_buffer);
+
+ auto base = temp_buffer;
+ auto stop = base + buffer.get_record_count();
+ std::sort(base, stop, std::less<Wrapped<R>>());
+
+ merge_info info = {0, 0};
+
+ /*
+ * Iterate over the temporary buffer to process the records, copying
+ * them into buffer as needed
+ */
+ while (base < stop) {
+ if (!base->is_tombstone() && (base + 1 < stop)
+ && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) {
+ base += 2;
+ continue;
+ } else if (base->is_deleted()) {
+ base += 1;
+ continue;
+ }
+
+ // FIXME: this shouldn't be necessary, but the tagged record
+ // bypass doesn't seem to be working on this code-path, so this
+ // ensures that tagged records from the buffer are able to be
+ // dropped, eventually. It should only need to be &= 1
+ base->header &= 3;
+ keys.emplace_back(base->rec.key);
+ m_data[info.record_count++] = *base;
+
+ if (base->is_tombstone()) {
+ info.tombstone_count++;
+ if (m_bf){
+ m_bf->insert(base->rec);
+ }
}
+ base++;
+ }
+
+ free(temp_buffer);
+
+ m_reccnt = info.record_count;
+ m_tombstone_cnt = info.tombstone_count;
+
+ if (m_reccnt > 0) {
m_pgm = pgm::PGMIndex<K, epsilon>(keys);
}
}
@@ -74,21 +120,65 @@ public:
size_t tombstone_count = 0;
auto cursors = build_cursor_vec<R, PGM>(shards, &attemp_reccnt, &tombstone_count);
- m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS);
m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
attemp_reccnt * sizeof(Wrapped<R>),
(byte **) &m_data);
+ std::vector<K> keys;
- auto res = sorted_array_merge<R>(cursors, m_data, m_bf);
- m_reccnt = res.record_count;
- m_tombstone_cnt = res.tombstone_count;
+ // FIXME: For smaller cursor arrays, it may be more efficient to skip
+ // the priority queue and just do a scan.
+ PriorityQueue<Wrapped<R>> pq(cursors.size());
+ for (size_t i=0; i<cursors.size(); i++) {
+ pq.push(cursors[i].ptr, i);
+ }
- if (m_reccnt > 0) {
- std::vector<K> keys;
- for (size_t i=0; i<m_reccnt; i++) {
- keys.emplace_back(m_data[i].rec.key);
+ merge_info info = {0, 0};
+ while (pq.size()) {
+ auto now = pq.peek();
+ auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
+ /*
+ * if the current record is not a tombstone, and the next record is
+ * a tombstone that matches the current one, then the current one
+ * has been deleted, and both it and its tombstone can be skipped
+ * over.
+ */
+ if (!now.data->is_tombstone() && next.data != nullptr &&
+ now.data->rec == next.data->rec && next.data->is_tombstone()) {
+
+ pq.pop(); pq.pop();
+ auto& cursor1 = cursors[now.version];
+ auto& cursor2 = cursors[next.version];
+ if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version);
+ if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version);
+ } else {
+ auto& cursor = cursors[now.version];
+ /* skip over records that have been deleted via tagging */
+ if (!cursor.ptr->is_deleted()) {
+ keys.emplace_back(cursor.ptr->rec.key);
+ m_data[info.record_count++] = *cursor.ptr;
+
+ /*
+ * if the record is a tombstone, increment the ts count and
+ * insert it into the bloom filter if one has been
+ * provided.
+ */
+ if (cursor.ptr->is_tombstone()) {
+ info.tombstone_count++;
+ if (m_bf) {
+ m_bf->insert(cursor.ptr->rec);
+ }
+ }
+ }
+ pq.pop();
+
+ if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version);
}
+ }
+ m_reccnt = info.record_count;
+ m_tombstone_cnt = info.tombstone_count;
+
+ if (m_reccnt > 0) {
m_pgm = pgm::PGMIndex<K, epsilon>(keys);
}
}
@@ -132,7 +222,7 @@ public:
size_t get_memory_usage() {
- return m_pgm.size_in_bytes() + m_alloc_size;
+ return m_pgm.size_in_bytes();
}
size_t get_aux_memory_usage() {
@@ -147,14 +237,16 @@ public:
return m_reccnt;
}
- // If the region to search is less than some pre-specified
- // amount, perform a linear scan to locate the record.
+ /*
+ * If the region to search is less than some pre-specified
+ * amount, perform a linear scan to locate the record.
+ */
if (bound.hi - bound.lo < 256) {
while (idx < bound.hi && m_data[idx].rec.key < key) {
idx++;
}
} else {
- // Otherwise, perform a binary search
+ /* Otherwise, perform a binary search */
idx = bound.lo;
size_t max = bound.hi;
@@ -169,10 +261,26 @@ public:
}
+ /*
+ * the upper bound returned by PGM is one passed the end of the
+ * array. If we are at that point, we should just return "not found"
+ */
+ if (idx == m_reccnt) {
+ return idx;
+ }
+
+ /*
+ * We may have walked one passed the actual lower bound, so check
+ * the index before the current one to see if it is the actual bound
+ */
if (m_data[idx].rec.key > key && idx > 0 && m_data[idx-1].rec.key <= key) {
return idx-1;
}
+ /*
+ * Otherwise, check idx. If it is a valid bound, then return it,
+ * otherwise return "not found".
+ */
return (m_data[idx].rec.key >= key) ? idx : m_reccnt;
}
diff --git a/include/shard/TrieSpline.h b/include/shard/TrieSpline.h
index 2a432e8..581277e 100644
--- a/include/shard/TrieSpline.h
+++ b/include/shard/TrieSpline.h
@@ -36,39 +36,94 @@ private:
public:
TrieSpline(BufferView<R> buffer)
- : m_data(nullptr)
- , m_reccnt(0)
+ : m_reccnt(0)
, m_tombstone_cnt(0)
, m_alloc_size(0)
, m_max_key(0)
, m_min_key(0)
- , m_bf(new BloomFilter<R>(BF_FPR, buffer.get_tombstone_count(), BF_HASH_FUNCS))
+ , m_bf(nullptr)
{
m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
buffer.get_record_count() *
sizeof(Wrapped<R>),
(byte**) &m_data);
- auto res = sorted_array_from_bufferview(std::move(buffer), m_data, m_bf);
- m_reccnt = res.record_count;
- m_tombstone_cnt = res.tombstone_count;
+ /*
+ * Copy the contents of the buffer view into a temporary buffer, and
+ * sort them. We still need to iterate over these temporary records to
+ * apply tombstone/deleted record filtering, as well as any possible
+ * per-record processing that is required by the shard being built.
+ */
+ auto temp_buffer = (Wrapped<R> *) psudb::sf_aligned_calloc(CACHELINE_SIZE,
+ buffer.get_record_count(),
+ sizeof(Wrapped<R>));
+ buffer.copy_to_buffer((byte *) temp_buffer);
+
+ auto base = temp_buffer;
+ auto stop = base + buffer.get_record_count();
+ std::sort(base, stop, std::less<Wrapped<R>>());
+
+ auto tmp_min_key = temp_buffer[0].rec.key;
+ auto tmp_max_key = temp_buffer[buffer.get_record_count() - 1].rec.key;
+ auto bldr = ts::Builder<K>(tmp_min_key, tmp_max_key, E);
+
+ merge_info info = {0, 0};
+
+ m_min_key = tmp_max_key;
+ m_max_key = tmp_min_key;
+
+ /*
+ * Iterate over the temporary buffer to process the records, copying
+ * them into buffer as needed
+ */
+ while (base < stop) {
+ if (!base->is_tombstone() && (base + 1 < stop)
+ && base->rec == (base + 1)->rec && (base + 1)->is_tombstone()) {
+ base += 2;
+ continue;
+ } else if (base->is_deleted()) {
+ base += 1;
+ continue;
+ }
- if (m_reccnt > 0) {
- m_min_key = m_data[0].rec.key;
- m_max_key = m_data[m_reccnt-1].rec.key;
+ // FIXME: this shouldn't be necessary, but the tagged record
+ // bypass doesn't seem to be working on this code-path, so this
+ // ensures that tagged records from the buffer are able to be
+ // dropped, eventually. It should only need to be &= 1
+ base->header &= 3;
+ bldr.AddKey(base->rec.key);
+ m_data[info.record_count++] = *base;
+
+ if (base->is_tombstone()) {
+ info.tombstone_count++;
+ if (m_bf){
+ m_bf->insert(base->rec);
+ }
+ }
- auto bldr = ts::Builder<K>(m_min_key, m_max_key, E);
- for (size_t i=0; i<m_reccnt; i++) {
- bldr.AddKey(m_data[i].rec.key);
+ if (base->rec.key < m_min_key) {
+ m_min_key = base->rec.key;
+ }
+
+ if (base->rec.key > m_max_key) {
+ m_max_key = base->rec.key;
}
+ base++;
+ }
+
+ free(temp_buffer);
+
+ m_reccnt = info.record_count;
+ m_tombstone_cnt = info.tombstone_count;
+
+ if (m_reccnt > 50) {
m_ts = bldr.Finalize();
}
}
TrieSpline(std::vector<TrieSpline*> &shards)
- : m_data(nullptr)
- , m_reccnt(0)
+ : m_reccnt(0)
, m_tombstone_cnt(0)
, m_alloc_size(0)
, m_max_key(0)
@@ -79,24 +134,90 @@ public:
size_t tombstone_count = 0;
auto cursors = build_cursor_vec<R, TrieSpline>(shards, &attemp_reccnt, &tombstone_count);
- m_bf = new BloomFilter<R>(BF_FPR, tombstone_count, BF_HASH_FUNCS);
m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
attemp_reccnt * sizeof(Wrapped<R>),
(byte **) &m_data);
- auto res = sorted_array_merge<R>(cursors, m_data, m_bf);
- m_reccnt = res.record_count;
- m_tombstone_cnt = res.tombstone_count;
+ // FIXME: For smaller cursor arrays, it may be more efficient to skip
+ // the priority queue and just do a scan.
+ PriorityQueue<Wrapped<R>> pq(cursors.size());
+ for (size_t i=0; i<cursors.size(); i++) {
+ pq.push(cursors[i].ptr, i);
+ }
- if (m_reccnt > 0) {
- m_min_key = m_data[0].rec.key;
- m_max_key = m_data[m_reccnt-1].rec.key;
+ auto tmp_max_key = shards[0]->m_max_key;
+ auto tmp_min_key = shards[0]->m_min_key;
- auto bldr = ts::Builder<K>(m_min_key, m_max_key, E);
- for (size_t i=0; i<m_reccnt; i++) {
- bldr.AddKey(m_data[i].rec.key);
+ for (size_t i=0; i<shards.size(); i++) {
+ if (shards[i]->m_max_key > tmp_max_key) {
+ tmp_max_key = shards[i]->m_max_key;
+ }
+
+ if (shards[i]->m_min_key < tmp_min_key) {
+ tmp_min_key = shards[i]->m_min_key;
}
+ }
+
+ auto bldr = ts::Builder<K>(tmp_min_key, tmp_max_key, E);
+
+ m_max_key = tmp_min_key;
+ m_min_key = tmp_max_key;
+
+ merge_info info = {0, 0};
+ while (pq.size()) {
+ auto now = pq.peek();
+ auto next = pq.size() > 1 ? pq.peek(1) : queue_record<Wrapped<R>>{nullptr, 0};
+ /*
+ * if the current record is not a tombstone, and the next record is
+ * a tombstone that matches the current one, then the current one
+ * has been deleted, and both it and its tombstone can be skipped
+ * over.
+ */
+ if (!now.data->is_tombstone() && next.data != nullptr &&
+ now.data->rec == next.data->rec && next.data->is_tombstone()) {
+
+ pq.pop(); pq.pop();
+ auto& cursor1 = cursors[now.version];
+ auto& cursor2 = cursors[next.version];
+ if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version);
+ if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version);
+ } else {
+ auto& cursor = cursors[now.version];
+ /* skip over records that have been deleted via tagging */
+ if (!cursor.ptr->is_deleted()) {
+ bldr.AddKey(cursor.ptr->rec.key);
+ m_data[info.record_count++] = *cursor.ptr;
+
+ /*
+ * if the record is a tombstone, increment the ts count and
+ * insert it into the bloom filter if one has been
+ * provided.
+ */
+ if (cursor.ptr->is_tombstone()) {
+ info.tombstone_count++;
+ if (m_bf) {
+ m_bf->insert(cursor.ptr->rec);
+ }
+ }
+
+ if (cursor.ptr->rec.key < m_min_key) {
+ m_min_key = cursor.ptr->rec.key;
+ }
+
+ if (cursor.ptr->rec.key > m_max_key) {
+ m_max_key = cursor.ptr->rec.key;
+ }
+ }
+ pq.pop();
+
+ if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version);
+ }
+ }
+
+ m_reccnt = info.record_count;
+ m_tombstone_cnt = info.tombstone_count;
+ if (m_reccnt > 50) {
m_ts = bldr.Finalize();
}
}
@@ -107,7 +228,7 @@ public:
}
Wrapped<R> *point_lookup(const R &rec, bool filter=false) {
- if (filter && !m_bf->lookup(rec)) {
+ if (filter && m_bf && !m_bf->lookup(rec)) {
return nullptr;
}
@@ -144,7 +265,7 @@ public:
size_t get_memory_usage() {
- return m_ts.GetSize() + m_alloc_size;
+ return m_ts.GetSize();
}
size_t get_aux_memory_usage() {
@@ -152,6 +273,18 @@ public:
}
size_t get_lower_bound(const K& key) const {
+ if (m_reccnt < 50) {
+ size_t bd = m_reccnt;
+ for (size_t i=0; i<m_reccnt; i++) {
+ if (m_data[i].rec.key >= key) {
+ bd = i;
+ break;
+ }
+ }
+
+ return bd;
+ }
+
auto bound = m_ts.GetSearchBound(key);
size_t idx = bound.begin;
diff --git a/include/shard/VPTree.h b/include/shard/VPTree.h
index b342fe6..d5a2393 100644
--- a/include/shard/VPTree.h
+++ b/include/shard/VPTree.h
@@ -58,7 +58,7 @@ public:
sizeof(Wrapped<R>),
(byte**) &m_data);
- m_ptrs = new Wrapped<R>*[buffer.get_record_count()];
+ m_ptrs = new vp_ptr[buffer.get_record_count()];
size_t offset = 0;
m_reccnt = 0;
@@ -76,7 +76,7 @@ public:
rec->header &= 3;
m_data[m_reccnt] = *rec;
- m_ptrs[m_reccnt] = &m_data[m_reccnt];
+ m_ptrs[m_reccnt].ptr = &m_data[m_reccnt];
m_reccnt++;
}
@@ -97,7 +97,7 @@ public:
m_alloc_size = psudb::sf_aligned_alloc(CACHELINE_SIZE,
attemp_reccnt * sizeof(Wrapped<R>),
(byte **) &m_data);
- m_ptrs = new Wrapped<R>*[attemp_reccnt];
+ m_ptrs = new vp_ptr[attemp_reccnt];
// FIXME: will eventually need to figure out tombstones
// this one will likely require the multi-pass
@@ -110,7 +110,7 @@ public:
}
m_data[m_reccnt] = *shards[i]->get_record_at(j);
- m_ptrs[m_reccnt] = &m_data[m_reccnt];
+ m_ptrs[m_reccnt].ptr = &m_data[m_reccnt];
m_reccnt++;
}
}
@@ -139,8 +139,8 @@ public:
} else {
vpnode *node = m_root;
- while (!node->leaf && m_ptrs[node->start]->rec != rec) {
- if (rec.calc_distance((m_ptrs[node->start]->rec)) >= node->radius) {
+ while (!node->leaf && m_ptrs[node->start].ptr->rec != rec) {
+ if (rec.calc_distance((m_ptrs[node->start].ptr->rec)) >= node->radius) {
node = node->outside;
} else {
node = node->inside;
@@ -148,8 +148,8 @@ public:
}
for (size_t i=node->start; i<=node->stop; i++) {
- if (m_ptrs[i]->rec == rec) {
- return m_ptrs[i];
+ if (m_ptrs[i].ptr->rec == rec) {
+ return m_ptrs[i].ptr;
}
}
@@ -175,7 +175,7 @@ public:
}
size_t get_memory_usage() {
- return m_node_cnt * sizeof(vpnode) + m_reccnt * sizeof(R*) + m_alloc_size;
+ return m_node_cnt * sizeof(vpnode) + m_reccnt * sizeof(R*);
}
size_t get_aux_memory_usage() {
@@ -191,8 +191,12 @@ public:
}
private:
+ struct vp_ptr {
+ Wrapped<R> *ptr;
+ double dist;
+ };
Wrapped<R>* m_data;
- Wrapped<R>** m_ptrs;
+ vp_ptr* m_ptrs;
std::unordered_map<R, size_t, RecordHash<R>> m_lookup_map;
size_t m_reccnt;
size_t m_tombstone_cnt;
@@ -260,6 +264,11 @@ private:
auto i = start + gsl_rng_uniform_int(rng, stop - start + 1);
swap(start, i);
+ /* for efficiency, we'll pre-calculate the distances between each point and the root */
+ for (size_t i=start+1; i<=stop; i++) {
+ m_ptrs[i].dist = m_ptrs[start].ptr->rec.calc_distance(m_ptrs[i].ptr->rec);
+ }
+
/*
* partition elements based on their distance from the start,
* with those elements with distance falling below the median
@@ -267,14 +276,15 @@ private:
* the median in the right. This is easily done using QuickSelect.
*/
auto mid = (start + 1 + stop) / 2;
- quickselect(start + 1, stop, mid, m_ptrs[start], rng);
+ quickselect(start + 1, stop, mid, m_ptrs[start].ptr, rng);
/* Create a new node based on this partitioning */
vpnode *node = new vpnode();
node->start = start;
/* store the radius of the circle used for partitioning the node. */
- node->radius = m_ptrs[start]->rec.calc_distance(m_ptrs[mid]->rec);
+ node->radius = m_ptrs[start].ptr->rec.calc_distance(m_ptrs[mid].ptr->rec);
+ m_ptrs[start].dist = node->radius;
/* recursively construct the left and right subtrees */
node->inside = build_subtree(start + 1, mid-1, rng);
@@ -285,8 +295,6 @@ private:
return node;
}
- // TODO: The quickselect code can probably be generalized and moved out
- // to psudb-common instead.
void quickselect(size_t start, size_t stop, size_t k, Wrapped<R> *p, gsl_rng *rng) {
if (start == stop) return;
@@ -303,13 +311,16 @@ private:
// to psudb-common instead.
size_t partition(size_t start, size_t stop, Wrapped<R> *p, gsl_rng *rng) {
auto pivot = start + gsl_rng_uniform_int(rng, stop - start);
- double pivot_dist = p->rec.calc_distance(m_ptrs[pivot]->rec);
+ //double pivot_dist = p->rec.calc_distance(m_ptrs[pivot]->rec);
swap(pivot, stop);
size_t j = start;
for (size_t i=start; i<stop; i++) {
- if (p->rec.calc_distance(m_ptrs[i]->rec) < pivot_dist) {
+ if (m_ptrs[i].dist < m_ptrs[stop].dist) {
+ //assert(distances[i - start] == p->rec.calc_distance(m_ptrs[i]->rec));
+ //if (distances[i - start] < distances[stop - start]) {
+ //if (p->rec .calc_distance(m_ptrs[i]->rec) < pivot_dist) {
swap(j, i);
j++;
}
@@ -332,13 +343,13 @@ private:
if (node->leaf) {
for (size_t i=node->start; i<=node->stop; i++) {
- double d = point.calc_distance(m_ptrs[i]->rec);
+ double d = point.calc_distance(m_ptrs[i].ptr->rec);
if (d < *farthest) {
if (pq.size() == k) {
pq.pop();
}
- pq.push(m_ptrs[i]);
+ pq.push(m_ptrs[i].ptr);
if (pq.size() == k) {
*farthest = point.calc_distance(pq.peek().data->rec);
}
@@ -348,14 +359,14 @@ private:
return;
}
- double d = point.calc_distance(m_ptrs[node->start]->rec);
+ double d = point.calc_distance(m_ptrs[node->start].ptr->rec);
if (d < *farthest) {
if (pq.size() == k) {
auto t = pq.peek().data->rec;
pq.pop();
}
- pq.push(m_ptrs[node->start]);
+ pq.push(m_ptrs[node->start].ptr);
if (pq.size() == k) {
*farthest = point.calc_distance(pq.peek().data->rec);
}
diff --git a/include/util/SortedMerge.h b/include/util/SortedMerge.h
index 8a1e782..c149189 100644
--- a/include/util/SortedMerge.h
+++ b/include/util/SortedMerge.h
@@ -58,7 +58,7 @@ static std::vector<Cursor<Wrapped<R>>> build_cursor_vec(std::vector<S*> &shards,
for (size_t i = 0; i < shards.size(); ++i) {
if (shards[i]) {
auto base = shards[i]->get_data();
- cursors.emplace_back(Cursor{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()});
+ cursors.emplace_back(Cursor<Wrapped<R>>{base, base + shards[i]->get_record_count(), 0, shards[i]->get_record_count()});
*reccnt += shards[i]->get_record_count();
*tscnt += shards[i]->get_tombstone_count();
} else {
diff --git a/include/util/types.h b/include/util/types.h
index a13bd95..cf61412 100644
--- a/include/util/types.h
+++ b/include/util/types.h
@@ -19,6 +19,8 @@
#include <cstdint>
#include <cstdlib>
+#include <vector>
+#include <cassert>
namespace de {
@@ -69,4 +71,79 @@ struct ShardID {
/* A placeholder for an invalid shard--also used to indicate the mutable buffer */
const ShardID INVALID_SHID = {-1, -1};
+typedef ssize_t level_index;
+
+typedef struct ReconstructionTask {
+ std::vector<level_index> sources;
+ level_index target;
+ size_t reccnt;
+
+ void add_source(level_index source, size_t cnt) {
+ sources.push_back(source);
+ reccnt += cnt;
+ }
+
+} ReconstructionTask;
+
+class ReconstructionVector {
+public:
+ ReconstructionVector()
+ : total_reccnt(0) {}
+
+ ~ReconstructionVector() = default;
+
+ ReconstructionTask operator[](size_t idx) {
+ return m_tasks[idx];
+ }
+
+ void add_reconstruction(level_index source, level_index target, size_t reccnt) {
+ m_tasks.push_back({{source}, target, reccnt});
+ total_reccnt += reccnt;
+ }
+
+ void add_reconstruction(ReconstructionTask task) {
+ m_tasks.push_back(task);
+ }
+
+ ReconstructionTask remove_reconstruction(size_t idx) {
+ assert(idx < m_tasks.size());
+ auto task = m_tasks[idx];
+
+ m_tasks.erase(m_tasks.begin() + idx);
+ total_reccnt -= task.reccnt;
+
+ return task;
+ }
+
+ ReconstructionTask remove_smallest_reconstruction() {
+ size_t min_size = m_tasks[0].reccnt;
+ size_t idx = 0;
+ for (size_t i=1; i<m_tasks.size(); i++) {
+ if (m_tasks[i].reccnt < min_size) {
+ min_size = m_tasks[i].reccnt;
+ idx = i;
+ }
+ }
+
+ auto task = m_tasks[idx];
+ m_tasks.erase(m_tasks.begin() + idx);
+ total_reccnt -= task.reccnt;
+
+ return task;
+ }
+
+ size_t get_total_reccnt() {
+ return total_reccnt;
+ }
+
+ size_t size() {
+ return m_tasks.size();
+ }
+
+
+private:
+ std::vector<ReconstructionTask> m_tasks;
+ size_t total_reccnt;
+};
+
}