From ff000799c3254f52e0beabbe9c62d10c3fc4178e Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 15 May 2023 16:48:56 -0400 Subject: Record format generalization Currently, tombstone counting is bugged. But the rest of it appears to be working. --- include/shard/MemISAM.h | 72 ++++++++++++++++---------------- include/shard/WIRS.h | 108 ++++++++++++++++++++++++++---------------------- 2 files changed, 94 insertions(+), 86 deletions(-) (limited to 'include/shard') diff --git a/include/shard/MemISAM.h b/include/shard/MemISAM.h index 6d97f95..dd2fd85 100644 --- a/include/shard/MemISAM.h +++ b/include/shard/MemISAM.h @@ -23,10 +23,11 @@ namespace de { thread_local size_t mrun_cancelations = 0; -template +template class MemISAM { private: -typedef Record Rec; +typedef decltype(R::key) K; +typedef decltype(R::value) V; constexpr static size_t inmem_isam_node_size = 256; constexpr static size_t inmem_isam_fanout = inmem_isam_node_size / (sizeof(K) + sizeof(char*)); @@ -36,24 +37,23 @@ struct InMemISAMNode { char* child[inmem_isam_fanout]; }; -constexpr static size_t inmem_isam_leaf_fanout = inmem_isam_node_size / sizeof(Rec); +constexpr static size_t inmem_isam_leaf_fanout = inmem_isam_node_size / sizeof(R); constexpr static size_t inmem_isam_node_keyskip = sizeof(K) * inmem_isam_fanout; static_assert(sizeof(InMemISAMNode) == inmem_isam_node_size, "node size does not match"); - public: MemISAM(std::string data_fname, size_t record_cnt, size_t tombstone_cnt, BloomFilter *bf, bool tagging) : m_reccnt(record_cnt), m_tombstone_cnt(tombstone_cnt), m_deleted_cnt(0), m_tagging(tagging) { // read the stored data file the file - size_t alloc_size = (record_cnt * sizeof(Rec)) + (CACHELINE_SIZE - (record_cnt * sizeof(Rec)) % CACHELINE_SIZE); + size_t alloc_size = (record_cnt * sizeof(R)) + (CACHELINE_SIZE - (record_cnt * sizeof(R)) % CACHELINE_SIZE); assert(alloc_size % CACHELINE_SIZE == 0); - m_data = (Rec*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); + m_data = (R*) std::aligned_alloc(CACHELINE_SIZE, alloc_size); FILE *file = fopen(data_fname.c_str(), "rb"); assert(file); - auto res = fread(m_data, sizeof(Rec), m_reccnt, file); + auto res = fread(m_data, sizeof(R), m_reccnt, file); assert (res == m_reccnt); fclose(file); @@ -71,34 +71,34 @@ public: } } - MemISAM(MutableBuffer* buffer, BloomFilter* bf, bool tagging) + MemISAM(MutableBuffer* buffer, BloomFilter* bf, bool tagging) :m_reccnt(0), m_tombstone_cnt(0), m_isam_nodes(nullptr), m_deleted_cnt(0), m_tagging(tagging) { - size_t alloc_size = (buffer->get_record_count() * sizeof(Rec)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(Rec)) % CACHELINE_SIZE); + size_t alloc_size = (buffer->get_record_count() * sizeof(R)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(R)) % CACHELINE_SIZE); assert(alloc_size % CACHELINE_SIZE == 0); - m_data = (Rec*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); + m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); TIMER_INIT(); size_t offset = 0; m_reccnt = 0; TIMER_START(); - Rec* base = buffer->sorted_output(); + R* base = buffer->sorted_output(); TIMER_STOP(); auto sort_time = TIMER_RESULT(); - Rec* stop = base + buffer->get_record_count(); + R* stop = base + buffer->get_record_count(); TIMER_START(); while (base < stop) { if (!m_tagging) { if (!base->is_tombstone() && (base + 1 < stop) - && base->match(base + 1) && (base + 1)->is_tombstone()) { + && *base == *(base + 1) && (base + 1)->is_tombstone()) { base += 2; mrun_cancelations++; continue; } - } else if (base->get_delete_status()) { + } else if (base->is_deleted()) { base += 1; continue; } @@ -123,15 +123,15 @@ public: TIMER_STOP(); auto level_time = TIMER_RESULT(); - fprintf(stdout, "%ld %ld %ld\n", sort_time, copy_time, level_time); + //fprintf(stdout, "%ld %ld %ld\n", sort_time, copy_time, level_time); } MemISAM(MemISAM** runs, size_t len, BloomFilter* bf, bool tagging) :m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_isam_nodes(nullptr), m_tagging(tagging) { - std::vector> cursors; + std::vector> cursors; cursors.reserve(len); - PriorityQueue pq(len); + PriorityQueue pq(len); size_t attemp_reccnt = 0; @@ -142,21 +142,21 @@ public: attemp_reccnt += runs[i]->get_record_count(); pq.push(cursors[i].ptr, i); } else { - cursors.emplace_back(Cursor{nullptr, nullptr, 0, 0}); + cursors.emplace_back(Cursor{nullptr, nullptr, 0, 0}); } } - size_t alloc_size = (attemp_reccnt * sizeof(Rec)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(Rec)) % CACHELINE_SIZE); + size_t alloc_size = (attemp_reccnt * sizeof(R)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(R)) % CACHELINE_SIZE); assert(alloc_size % CACHELINE_SIZE == 0); - m_data = (Rec*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); + m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); size_t offset = 0; while (pq.size()) { auto now = pq.peek(); - auto next = pq.size() > 1 ? pq.peek(1) : queue_record{nullptr, 0}; + auto next = pq.size() > 1 ? pq.peek(1) : queue_record{nullptr, 0}; if (!m_tagging && !now.data->is_tombstone() && next.data != nullptr && - now.data->match(next.data) && next.data->is_tombstone()) { + *now.data == *next.data && next.data->is_tombstone()) { pq.pop(); pq.pop(); auto& cursor1 = cursors[now.version]; @@ -165,7 +165,7 @@ public: if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); } else { auto& cursor = cursors[now.version]; - if (!m_tagging || !cursor.ptr->get_delete_status()) { + if (!m_tagging || !cursor.ptr->is_deleted()) { m_data[m_reccnt++] = *cursor.ptr; if (cursor.ptr->is_tombstone()) { ++m_tombstone_cnt; @@ -188,7 +188,7 @@ public: if (m_isam_nodes) free(m_isam_nodes); } - Rec* sorted_output() const { + R* sorted_output() const { return m_data; } @@ -208,7 +208,7 @@ public: while (idx < m_reccnt && m_data[idx].lt(key, val)) ++idx; - if (m_data[idx].match(key, val, false)) { + if (m_data[idx] == R {key, val}) { m_data[idx].set_delete_status(); m_deleted_cnt++; return true; @@ -217,7 +217,7 @@ public: return false; } - const Rec* get_record_at(size_t idx) const { + const R* get_record_at(size_t idx) const { return (idx < m_reccnt) ? m_data + idx : nullptr; } @@ -235,7 +235,7 @@ public: now = next ? next : reinterpret_cast(now->child[inmem_isam_fanout - 1]); } - const Rec* pos = reinterpret_cast(now); + const R* pos = reinterpret_cast(now); while (pos < m_data + m_reccnt && pos->key < key) pos++; return pos - m_data; @@ -255,7 +255,7 @@ public: now = next ? next : reinterpret_cast(now->child[inmem_isam_fanout - 1]); } - const Rec* pos = reinterpret_cast(now); + const R* pos = reinterpret_cast(now); while (pos < m_data + m_reccnt && pos->key <= key) pos++; return pos - m_data; @@ -267,20 +267,20 @@ public: return false; } - Rec* ptr = m_data + idx; + R* ptr = m_data + idx; while (ptr < m_data + m_reccnt && ptr->lt(key, val)) ptr++; - return ptr->match(key, val, true); + return *ptr == R {key, val} && ptr->is_tombstone(); } size_t get_memory_utilization() { - return m_reccnt * sizeof(Rec) + m_internal_node_cnt * inmem_isam_node_size; + return m_reccnt * sizeof(R) + m_internal_node_cnt * inmem_isam_node_size; } void persist_to_file(std::string data_fname) { FILE *file = fopen(data_fname.c_str(), "wb"); assert(file); - fwrite(m_data, sizeof(Rec), m_reccnt, file); + fwrite(m_data, sizeof(R), m_reccnt, file); fclose(file); } @@ -303,14 +303,14 @@ private: InMemISAMNode* current_node = m_isam_nodes; - const Rec* leaf_base = m_data; - const Rec* leaf_stop = m_data + m_reccnt; + const R* leaf_base = m_data; + const R* leaf_stop = m_data + m_reccnt; while (leaf_base < leaf_stop) { size_t fanout = 0; for (size_t i = 0; i < inmem_isam_fanout; ++i) { auto rec_ptr = leaf_base + inmem_isam_leaf_fanout * i; if (rec_ptr >= leaf_stop) break; - const Rec* sep_key = std::min(rec_ptr + inmem_isam_leaf_fanout - 1, leaf_stop - 1); + const R* sep_key = std::min(rec_ptr + inmem_isam_leaf_fanout - 1, leaf_stop - 1); current_node->keys[i] = sep_key->key; current_node->child[i] = (char*)rec_ptr; ++fanout; @@ -350,7 +350,7 @@ private: } // Members: sorted data, internal ISAM levels, reccnt; - Rec* m_data; + R* m_data; InMemISAMNode* m_isam_nodes; InMemISAMNode* m_root; size_t m_reccnt; diff --git a/include/shard/WIRS.h b/include/shard/WIRS.h index 39337bf..41766b9 100644 --- a/include/shard/WIRS.h +++ b/include/shard/WIRS.h @@ -12,6 +12,7 @@ #include #include #include +#include #include "ds/PriorityQueue.h" #include "util/Cursor.h" @@ -24,19 +25,26 @@ namespace de { thread_local size_t wirs_cancelations = 0; -template +template class WIRS { private: + + typedef decltype(R::key) K; + typedef decltype(R::value) V; + typedef decltype(R::weight) W; + + template struct wirs_node { - struct wirs_node *left, *right; + struct wirs_node *left, *right; K low, high; W weight; Alias* alias; }; + template struct WIRSState { W tot_weight; - std::vector nodes; + std::vector*> nodes; Alias* top_level_alias; ~WIRSState() { @@ -45,13 +53,13 @@ private: }; public: - WIRS(MutableBuffer* buffer, BloomFilter* bf, bool tagging) + WIRS(MutableBuffer* buffer, BloomFilter* bf, bool tagging) : m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_total_weight(0), m_rejection_cnt(0), m_ts_check_cnt(0), m_tagging(tagging), m_root(nullptr) { - size_t alloc_size = (buffer->get_record_count() * sizeof(Record)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(Record)) % CACHELINE_SIZE); + size_t alloc_size = (buffer->get_record_count() * sizeof(R)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(R)) % CACHELINE_SIZE); assert(alloc_size % CACHELINE_SIZE == 0); - m_data = (Record*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); + m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); size_t offset = 0; m_reccnt = 0; @@ -61,13 +69,13 @@ public: while (base < stop) { if (!m_tagging) { if (!(base->is_tombstone()) && (base + 1) < stop) { - if (base->match(base + 1) && (base + 1)->is_tombstone()) { + if (*base == *(base + 1) && (base + 1)->is_tombstone()) { base += 2; wirs_cancelations++; continue; } } - } else if (base->get_delete_status()) { + } else if (base->is_deleted()) { base += 1; continue; } @@ -92,10 +100,10 @@ public: WIRS(WIRS** shards, size_t len, BloomFilter* bf, bool tagging) : m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_total_weight(0), m_rejection_cnt(0), m_ts_check_cnt(0), m_tagging(tagging), m_root(nullptr) { - std::vector> cursors; + std::vector> cursors; cursors.reserve(len); - PriorityQueue pq(len); + PriorityQueue pq(len); size_t attemp_reccnt = 0; @@ -106,28 +114,28 @@ public: attemp_reccnt += shards[i]->get_record_count(); pq.push(cursors[i].ptr, i); } else { - cursors.emplace_back(Cursor{nullptr, nullptr, 0, 0}); + cursors.emplace_back(Cursor{nullptr, nullptr, 0, 0}); } } - size_t alloc_size = (attemp_reccnt * sizeof(Record)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(Record)) % CACHELINE_SIZE); + size_t alloc_size = (attemp_reccnt * sizeof(R)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(R)) % CACHELINE_SIZE); assert(alloc_size % CACHELINE_SIZE == 0); - m_data = (Record*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); + m_data = (R*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); while (pq.size()) { auto now = pq.peek(); - auto next = pq.size() > 1 ? pq.peek(1) : queue_record{nullptr, 0}; + auto next = pq.size() > 1 ? pq.peek(1) : queue_record{nullptr, 0}; if (!m_tagging && !now.data->is_tombstone() && next.data != nullptr && - now.data->match(next.data) && next.data->is_tombstone()) { + *now.data == *next.data && next.data->is_tombstone()) { pq.pop(); pq.pop(); auto& cursor1 = cursors[now.version]; auto& cursor2 = cursors[next.version]; - if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); - if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); + if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); } else { auto& cursor = cursors[now.version]; - if (!m_tagging || !cursor.ptr->get_delete_status()) { + if (!m_tagging || !cursor.ptr->is_deleted()) { m_data[m_reccnt++] = *cursor.ptr; m_total_weight += cursor.ptr->weight; if (bf && cursor.ptr->is_tombstone()) { @@ -137,7 +145,7 @@ public: } pq.pop(); - if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); + if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); } } @@ -155,16 +163,16 @@ public: free_tree(m_root); } - bool delete_record(const K& key, const V& val) { - size_t idx = get_lower_bound(key); + bool delete_record(const R& rec) { + size_t idx = get_lower_bound(rec.key); if (idx >= m_reccnt) { return false; } - while (idx < m_reccnt && m_data[idx].lt(key, val)) ++idx; + while (idx < m_reccnt && m_data[idx] < rec) ++idx; - if (m_data[idx].match(key, val, false)) { - m_data[idx].set_delete_status(); + if (m_data[idx] == R {rec.key, rec.val} && !m_data[idx].is_tombstone()) { + m_data[idx].set_delete(); m_deleted_cnt++; return true; } @@ -172,7 +180,7 @@ public: return false; } - void free_tree(struct wirs_node* node) { + void free_tree(struct wirs_node* node) { if (node) { delete node->alias; free_tree(node->left); @@ -181,7 +189,7 @@ public: } } - Record* sorted_output() const { + R* sorted_output() const { return m_data; } @@ -193,19 +201,19 @@ public: return m_tombstone_cnt; } - const Record* get_record_at(size_t idx) const { + const R* get_record_at(size_t idx) const { if (idx >= m_reccnt) return nullptr; return m_data + idx; } // low - high -> decompose to a set of nodes. // Build Alias across the decomposed nodes. - WIRSState* get_sample_shard_state(const K& lower_key, const K& upper_key) { - WIRSState* res = new WIRSState(); + WIRSState* get_sample_shard_state(const K& lower_key, const K& upper_key) { + auto res = new WIRSState(); // Simulate a stack to unfold recursion. double tot_weight = 0.0; - struct wirs_node* st[64] = {0}; + struct wirs_node* st[64] = {0}; st[0] = m_root; size_t top = 1; while(top > 0) { @@ -231,15 +239,15 @@ public: } static void delete_state(void *state) { - auto s = (WIRSState *) state; + WIRSState *s = (WIRSState *) state; delete s; } // returns the number of records sampled // NOTE: This operation returns records strictly between the lower and upper bounds, not // including them. - size_t get_samples(void* shard_state, std::vector> &result_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) { - WIRSState *state = (WIRSState *) shard_state; + size_t get_samples(void* shard_state, std::vector &result_set, const K& lower_key, const K& upper_key, size_t sample_sz, gsl_rng *rng) { + WIRSState *state = (WIRSState *) shard_state; if (sample_sz == 0) { return 0; } @@ -295,30 +303,30 @@ public: auto ptr = m_data + get_lower_bound(key); - while (ptr < m_data + m_reccnt && ptr->lt(key, val)) { + while (ptr < m_data + m_reccnt && *ptr < R {key, val}) { ptr ++; } - bool result = (m_tagging) ? ptr->get_delete_status() - : ptr->match(key, val, true); + bool result = (m_tagging) ? ptr->is_deleted() + : *ptr == R {key, val} && ptr->is_tombstone(); m_rejection_cnt += result; return result; } - bool check_tombstone(const K& key, const V& val) { + bool check_tombstone(const R& rec) { m_ts_check_cnt++; - size_t idx = get_lower_bound(key); + size_t idx = get_lower_bound(rec.key); if (idx >= m_reccnt) { return false; } - auto ptr = m_data + get_lower_bound(key); + auto ptr = m_data + get_lower_bound(rec.key); - while (ptr < m_data + m_reccnt && ptr->lt(key, val)) { + while (ptr < m_data + m_reccnt && *ptr < rec) { ptr ++; } - bool result = ptr->match(key, val, true); + bool result = *ptr == rec && ptr->is_tombstone(); m_rejection_cnt += result; return result; @@ -340,21 +348,21 @@ public: private: - bool covered_by(struct wirs_node* node, const K& lower_key, const K& upper_key) { + bool covered_by(struct wirs_node* node, const K& lower_key, const K& upper_key) { auto low_index = node->low * m_group_size; auto high_index = std::min((node->high + 1) * m_group_size - 1, m_reccnt - 1); return lower_key < m_data[low_index].key && m_data[high_index].key < upper_key; } - bool intersects(struct wirs_node* node, const K& lower_key, const K& upper_key) { + bool intersects(struct wirs_node* node, const K& lower_key, const K& upper_key) { auto low_index = node->low * m_group_size; auto high_index = std::min((node->high + 1) * m_group_size - 1, m_reccnt - 1); return lower_key < m_data[high_index].key && m_data[low_index].key < upper_key; } - struct wirs_node* construct_wirs_node(const std::vector& weights, size_t low, size_t high) { + struct wirs_node* construct_wirs_node(const std::vector& weights, size_t low, size_t high) { if (low == high) { - return new wirs_node{nullptr, nullptr, low, high, weights[low], new Alias({1.0})}; + return new wirs_node{nullptr, nullptr, low, high, weights[low], new Alias({1.0})}; } else if (low > high) return nullptr; std::vector node_weights; @@ -370,9 +378,9 @@ private: size_t mid = (low + high) / 2; - return new wirs_node{construct_wirs_node(weights, low, mid), - construct_wirs_node(weights, mid + 1, high), - low, high, sum, new Alias(node_weights)}; + return new wirs_node{construct_wirs_node(weights, low, mid), + construct_wirs_node(weights, mid + 1, high), + low, high, sum, new Alias(node_weights)}; } @@ -410,9 +418,9 @@ private: m_root = construct_wirs_node(weights, 0, n_groups-1); } - Record* m_data; + R* m_data; std::vector m_alias; - wirs_node* m_root; + wirs_node* m_root; bool m_tagging; W m_total_weight; size_t m_reccnt; -- cgit v1.2.3