From 418e9b079e559c86f3a5b276f712ad2f5d66533c Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Tue, 9 May 2023 17:59:37 -0400 Subject: Ported over IRS with unit tests --- include/framework/MutableBuffer.h | 29 ++- include/shard/MemISAM.h | 363 ++++++++++++++++++++++++++++++++++++++ include/util/Cursor.h | 2 +- 3 files changed, 392 insertions(+), 2 deletions(-) create mode 100644 include/shard/MemISAM.h (limited to 'include') diff --git a/include/framework/MutableBuffer.h b/include/framework/MutableBuffer.h index b3acfee..42bc9a7 100644 --- a/include/framework/MutableBuffer.h +++ b/include/framework/MutableBuffer.h @@ -14,6 +14,7 @@ #include #include #include +#include #include "util/base.h" #include "util/bf_config.h" @@ -46,7 +47,32 @@ public: if (m_tombstone_filter) delete m_tombstone_filter; } - int append(const K& key, const V& value, W weight = 1, bool is_tombstone = false) { + template ::value>> + int append(const K& key, const V& value, bool is_tombstone = false) { + static_assert(std::is_same::value); + if (is_tombstone && m_tombstonecnt + 1 > m_tombstone_cap) return 0; + + int32_t pos = 0; + if ((pos = try_advance_tail()) == -1) return 0; + + m_data[pos].key = key; + m_data[pos].value = value; + m_data[pos].header = ((pos << 2) | (is_tombstone ? 1 : 0)); + + if (is_tombstone) { + m_tombstonecnt.fetch_add(1); + if (m_tombstone_filter) m_tombstone_filter->insert(key); + } + + m_weight.fetch_add(1); + return 1; + } + + template ::value>> + int append(const K& key, const V& value, W_ weight=1, bool is_tombstone = false) { + static_assert(!std::is_same::value); if (is_tombstone && m_tombstonecnt + 1 > m_tombstone_cap) return 0; int32_t pos = 0; @@ -82,6 +108,7 @@ public: return 1; } + bool truncate() { m_tombstonecnt.store(0); m_reccnt.store(0); diff --git a/include/shard/MemISAM.h b/include/shard/MemISAM.h new file mode 100644 index 0000000..6d97f95 --- /dev/null +++ b/include/shard/MemISAM.h @@ -0,0 +1,363 @@ +/* + * include/shard/MemISAM.h + * + * Copyright (C) 2023 Douglas Rumbaugh + * Dong Xie + * + * All rights reserved. Published under the Modified BSD License. + * + */ +#pragma once + +#include +#include +#include +#include + +#include "framework/MutableBuffer.h" +#include "ds/PriorityQueue.h" +#include "util/Cursor.h" +#include "util/timer.h" + +namespace de { + +thread_local size_t mrun_cancelations = 0; + +template +class MemISAM { +private: +typedef Record Rec; + +constexpr static size_t inmem_isam_node_size = 256; +constexpr static size_t inmem_isam_fanout = inmem_isam_node_size / (sizeof(K) + sizeof(char*)); + +struct InMemISAMNode { + K keys[inmem_isam_fanout]; + char* child[inmem_isam_fanout]; +}; + +constexpr static size_t inmem_isam_leaf_fanout = inmem_isam_node_size / sizeof(Rec); +constexpr static size_t inmem_isam_node_keyskip = sizeof(K) * inmem_isam_fanout; + +static_assert(sizeof(InMemISAMNode) == inmem_isam_node_size, "node size does not match"); + + +public: + MemISAM(std::string data_fname, size_t record_cnt, size_t tombstone_cnt, BloomFilter *bf, bool tagging) + : m_reccnt(record_cnt), m_tombstone_cnt(tombstone_cnt), m_deleted_cnt(0), m_tagging(tagging) { + + // read the stored data file the file + size_t alloc_size = (record_cnt * sizeof(Rec)) + (CACHELINE_SIZE - (record_cnt * sizeof(Rec)) % CACHELINE_SIZE); + assert(alloc_size % CACHELINE_SIZE == 0); + m_data = (Rec*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); + + FILE *file = fopen(data_fname.c_str(), "rb"); + assert(file); + auto res = fread(m_data, sizeof(Rec), m_reccnt, file); + assert (res == m_reccnt); + fclose(file); + + // We can't really persist the internal structure, as it uses + // pointers, which are invalidated by the move. So we'll just + // rebuild it. + this->build_internal_levels(); + + // rebuild the bloom filter + for (size_t i=0; iget_record_at(i); + if (rec->is_tombstone()) { + bf->insert(rec->key); + } + } + } + + MemISAM(MutableBuffer* buffer, BloomFilter* bf, bool tagging) + :m_reccnt(0), m_tombstone_cnt(0), m_isam_nodes(nullptr), m_deleted_cnt(0), m_tagging(tagging) { + + size_t alloc_size = (buffer->get_record_count() * sizeof(Rec)) + (CACHELINE_SIZE - (buffer->get_record_count() * sizeof(Rec)) % CACHELINE_SIZE); + assert(alloc_size % CACHELINE_SIZE == 0); + m_data = (Rec*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); + + TIMER_INIT(); + + size_t offset = 0; + m_reccnt = 0; + TIMER_START(); + Rec* base = buffer->sorted_output(); + TIMER_STOP(); + + auto sort_time = TIMER_RESULT(); + Rec* stop = base + buffer->get_record_count(); + + TIMER_START(); + while (base < stop) { + if (!m_tagging) { + if (!base->is_tombstone() && (base + 1 < stop) + && base->match(base + 1) && (base + 1)->is_tombstone()) { + base += 2; + mrun_cancelations++; + continue; + } + } else if (base->get_delete_status()) { + base += 1; + continue; + } + + //Masking off the ts. + base->header &= 1; + m_data[m_reccnt++] = *base; + if (bf && base->is_tombstone()) { + ++m_tombstone_cnt; + bf->insert(base->key); + } + + base++; + } + TIMER_STOP(); + auto copy_time = TIMER_RESULT(); + + TIMER_START(); + if (m_reccnt > 0) { + build_internal_levels(); + } + TIMER_STOP(); + auto level_time = TIMER_RESULT(); + + fprintf(stdout, "%ld %ld %ld\n", sort_time, copy_time, level_time); + } + + MemISAM(MemISAM** runs, size_t len, BloomFilter* bf, bool tagging) + :m_reccnt(0), m_tombstone_cnt(0), m_deleted_cnt(0), m_isam_nodes(nullptr), m_tagging(tagging) { + std::vector> cursors; + cursors.reserve(len); + + PriorityQueue pq(len); + + size_t attemp_reccnt = 0; + + for (size_t i = 0; i < len; ++i) { + if (runs[i]) { + auto base = runs[i]->sorted_output(); + cursors.emplace_back(Cursor{base, base + runs[i]->get_record_count(), 0, runs[i]->get_record_count()}); + attemp_reccnt += runs[i]->get_record_count(); + pq.push(cursors[i].ptr, i); + } else { + cursors.emplace_back(Cursor{nullptr, nullptr, 0, 0}); + } + } + + size_t alloc_size = (attemp_reccnt * sizeof(Rec)) + (CACHELINE_SIZE - (attemp_reccnt * sizeof(Rec)) % CACHELINE_SIZE); + assert(alloc_size % CACHELINE_SIZE == 0); + m_data = (Rec*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); + + size_t offset = 0; + + while (pq.size()) { + auto now = pq.peek(); + auto next = pq.size() > 1 ? pq.peek(1) : queue_record{nullptr, 0}; + if (!m_tagging && !now.data->is_tombstone() && next.data != nullptr && + now.data->match(next.data) && next.data->is_tombstone()) { + + pq.pop(); pq.pop(); + auto& cursor1 = cursors[now.version]; + auto& cursor2 = cursors[next.version]; + if (advance_cursor(cursor1)) pq.push(cursor1.ptr, now.version); + if (advance_cursor(cursor2)) pq.push(cursor2.ptr, next.version); + } else { + auto& cursor = cursors[now.version]; + if (!m_tagging || !cursor.ptr->get_delete_status()) { + m_data[m_reccnt++] = *cursor.ptr; + if (cursor.ptr->is_tombstone()) { + ++m_tombstone_cnt; + bf->insert(cursor.ptr->key); + } + } + pq.pop(); + + if (advance_cursor(cursor)) pq.push(cursor.ptr, now.version); + } + } + + if (m_reccnt > 0) { + build_internal_levels(); + } + } + + ~MemISAM() { + if (m_data) free(m_data); + if (m_isam_nodes) free(m_isam_nodes); + } + + Rec* sorted_output() const { + return m_data; + } + + size_t get_record_count() const { + return m_reccnt; + } + + size_t get_tombstone_count() const { + return m_tombstone_cnt; + } + + bool delete_record(const K& key, const V& val) { + size_t idx = get_lower_bound(key); + if (idx >= m_reccnt) { + return false; + } + + while (idx < m_reccnt && m_data[idx].lt(key, val)) ++idx; + + if (m_data[idx].match(key, val, false)) { + m_data[idx].set_delete_status(); + m_deleted_cnt++; + return true; + } + + return false; + } + + const Rec* get_record_at(size_t idx) const { + return (idx < m_reccnt) ? m_data + idx : nullptr; + } + + size_t get_lower_bound(const K& key) const { + const InMemISAMNode* now = m_root; + while (!is_leaf(reinterpret_cast(now))) { + const InMemISAMNode* next = nullptr; + for (size_t i = 0; i < inmem_isam_fanout - 1; ++i) { + if (now->child[i + 1] == nullptr || key <= now->keys[i]) { + next = reinterpret_cast(now->child[i]); + break; + } + } + + now = next ? next : reinterpret_cast(now->child[inmem_isam_fanout - 1]); + } + + const Rec* pos = reinterpret_cast(now); + while (pos < m_data + m_reccnt && pos->key < key) pos++; + + return pos - m_data; + } + + size_t get_upper_bound(const K& key) const { + const InMemISAMNode* now = m_root; + while (!is_leaf(reinterpret_cast(now))) { + const InMemISAMNode* next = nullptr; + for (size_t i = 0; i < inmem_isam_fanout - 1; ++i) { + if (now->child[i + 1] == nullptr || key < now->keys[i]) { + next = reinterpret_cast(now->child[i]); + break; + } + } + + now = next ? next : reinterpret_cast(now->child[inmem_isam_fanout - 1]); + } + + const Rec* pos = reinterpret_cast(now); + while (pos < m_data + m_reccnt && pos->key <= key) pos++; + + return pos - m_data; + } + + bool check_tombstone(const K& key, const V& val) const { + size_t idx = get_lower_bound(key); + if (idx >= m_reccnt) { + return false; + } + + Rec* ptr = m_data + idx; + + while (ptr < m_data + m_reccnt && ptr->lt(key, val)) ptr++; + return ptr->match(key, val, true); + } + + size_t get_memory_utilization() { + return m_reccnt * sizeof(Rec) + m_internal_node_cnt * inmem_isam_node_size; + } + + void persist_to_file(std::string data_fname) { + FILE *file = fopen(data_fname.c_str(), "wb"); + assert(file); + fwrite(m_data, sizeof(Rec), m_reccnt, file); + fclose(file); + } + +private: + void build_internal_levels() { + size_t n_leaf_nodes = m_reccnt / inmem_isam_leaf_fanout + (m_reccnt % inmem_isam_leaf_fanout != 0); + size_t level_node_cnt = n_leaf_nodes; + size_t node_cnt = 0; + do { + level_node_cnt = level_node_cnt / inmem_isam_fanout + (level_node_cnt % inmem_isam_fanout != 0); + node_cnt += level_node_cnt; + } while (level_node_cnt > 1); + + size_t alloc_size = (node_cnt * inmem_isam_node_size) + (CACHELINE_SIZE - (node_cnt * inmem_isam_node_size) % CACHELINE_SIZE); + assert(alloc_size % CACHELINE_SIZE == 0); + + m_isam_nodes = (InMemISAMNode*)std::aligned_alloc(CACHELINE_SIZE, alloc_size); + m_internal_node_cnt = node_cnt; + memset(m_isam_nodes, 0, node_cnt * inmem_isam_node_size); + + InMemISAMNode* current_node = m_isam_nodes; + + const Rec* leaf_base = m_data; + const Rec* leaf_stop = m_data + m_reccnt; + while (leaf_base < leaf_stop) { + size_t fanout = 0; + for (size_t i = 0; i < inmem_isam_fanout; ++i) { + auto rec_ptr = leaf_base + inmem_isam_leaf_fanout * i; + if (rec_ptr >= leaf_stop) break; + const Rec* sep_key = std::min(rec_ptr + inmem_isam_leaf_fanout - 1, leaf_stop - 1); + current_node->keys[i] = sep_key->key; + current_node->child[i] = (char*)rec_ptr; + ++fanout; + } + current_node++; + leaf_base += fanout * inmem_isam_leaf_fanout; + } + + auto level_start = m_isam_nodes; + auto level_stop = current_node; + auto current_level_node_cnt = level_stop - level_start; + while (current_level_node_cnt > 1) { + auto now = level_start; + while (now < level_stop) { + size_t child_cnt = 0; + for (size_t i = 0; i < inmem_isam_fanout; ++i) { + auto node_ptr = now + i; + ++child_cnt; + if (node_ptr >= level_stop) break; + current_node->keys[i] = node_ptr->keys[inmem_isam_fanout - 1]; + current_node->child[i] = (char*)node_ptr; + } + now += child_cnt; + current_node++; + } + level_start = level_stop; + level_stop = current_node; + current_level_node_cnt = level_stop - level_start; + } + + assert(current_level_node_cnt == 1); + m_root = level_start; + } + + bool is_leaf(const char* ptr) const { + return ptr >= (const char*)m_data && ptr < (const char*)(m_data + m_reccnt); + } + + // Members: sorted data, internal ISAM levels, reccnt; + Rec* m_data; + InMemISAMNode* m_isam_nodes; + InMemISAMNode* m_root; + size_t m_reccnt; + size_t m_tombstone_cnt; + size_t m_internal_node_cnt; + size_t m_deleted_cnt; + bool m_tagging; +}; + +} diff --git a/include/util/Cursor.h b/include/util/Cursor.h index b9093f4..2339800 100644 --- a/include/util/Cursor.h +++ b/include/util/Cursor.h @@ -14,7 +14,7 @@ #include "io/PagedFile.h" namespace de { -template +template struct Cursor { Record *ptr; const Record *end; -- cgit v1.2.3