From eb8dbaa770a57557d67c817c2839c64f536a6ce4 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 13 Sep 2023 16:22:03 -0400 Subject: Began re-architecting the project for concurrency support The project is now in a state where it builds, but it probably has a lot of bugs still. --- include/framework/InternalLevel.h | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'include/framework/InternalLevel.h') diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h index ec8ffc4..983ec6a 100644 --- a/include/framework/InternalLevel.h +++ b/include/framework/InternalLevel.h @@ -19,6 +19,10 @@ #include "framework/MutableBuffer.h" namespace de { +template +class InternalLevel; + + template class InternalLevel { @@ -55,7 +59,7 @@ public: // WARNING: for leveling only. // assuming the base level is the level new level is merging into. (base_level is larger.) - static InternalLevel* merge_levels(InternalLevel* base_level, InternalLevel* new_level) { + static std::shared_ptr merge_levels(InternalLevel* base_level, InternalLevel* new_level) { assert(base_level->m_level_no > new_level->m_level_no || (base_level->m_level_no == 0 && new_level->m_level_no == 0)); auto res = new InternalLevel(base_level->m_level_no, 1); res->m_shard_cnt = 1; @@ -64,7 +68,7 @@ public: shards[1] = new_level->m_shards[0]; res->m_shards[0] = new S(shards, 2); - return res; + return std::shared_ptr(res); } void append_buffer(Buffer* buffer) { -- cgit v1.2.3 From abc8605a51537fc7b35bb0d9b1da6c724c5c6973 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 18 Sep 2023 13:05:44 -0400 Subject: Moved individual merge task execution into the scheduler This change is made in anticipation of scheduling each task using a specific thread, and required some modification to the interface of ExtensionStructure. Namely, 1. ExtensionStructure now supports a get_merge_tasks() interface, which returns a list of the individual level merges that would need to be performed to complete a buffer flush of specified size. 2. merge_levels and merge_buffer have been promoted to the public interface, to allow their use within the scheduler. 3. merge_buffer has been modified to assume that the structure already can support a direct flush of the buffer into L0, it is now the responsibility of the caller to ensure that the necessary merges have already been completed prior to calling this method. Currently, preemptive tombstone compactions are non-functional, so some unit tests are failing. This will be fixed when the thread scheduling system is set up. --- include/framework/InternalLevel.h | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'include/framework/InternalLevel.h') diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h index 983ec6a..b9866b8 100644 --- a/include/framework/InternalLevel.h +++ b/include/framework/InternalLevel.h @@ -87,6 +87,10 @@ public: } Shard *get_merged_shard() { + if (m_shard_cnt == 0) { + return nullptr; + } + Shard *shards[m_shard_cnt]; for (size_t i=0; i Date: Mon, 18 Sep 2023 16:37:30 -0400 Subject: The scheduler now spawns a seperate merge thread Merges are now executed from a seperate thread within the scheduler that wakes up via condition variables when new merge tasks are scheduled. In addition, tombstone limits are now enforced by the scheduler, with new merges being scheduled as needed. There are still a few tests failing, notably the zero tombstones in the last run invarient is not holding under tiering with tombstones. Need to look into that yet. --- include/framework/InternalLevel.h | 39 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 36 insertions(+), 3 deletions(-) (limited to 'include/framework/InternalLevel.h') diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h index b9866b8..e67ae45 100644 --- a/include/framework/InternalLevel.h +++ b/include/framework/InternalLevel.h @@ -34,6 +34,7 @@ public: , m_shard_cnt(0) , m_shards(shard_cap, nullptr) , m_owns(shard_cap, true) + , m_pending_shard(nullptr) {} // Create a new memory level sharing the shards and repurposing it as previous level_no + 1 @@ -42,7 +43,9 @@ public: : m_level_no(level->m_level_no + 1) , m_shard_cnt(level->m_shard_cnt) , m_shards(level->m_shards.size(), nullptr) - , m_owns(level->m_owns.size(), true) { + , m_owns(level->m_owns.size(), true) + , m_pending_shard(nullptr) + { assert(m_shard_cnt == 1 && m_shards.size() == 1); for (size_t i=0; im_shards.data(), level->m_shard_cnt); + return; + } + m_shards[m_shard_cnt] = new S(level->m_shards.data(), level->m_shard_cnt); m_owns[m_shard_cnt] = true; ++m_shard_cnt; } + + void finalize() { + if (m_pending_shard) { + for (size_t i=0; i m_shards; + + Shard *m_pending_shard; + std::vector m_owns; InternalLevel *clone() { -- cgit v1.2.3 From 754372aeccb74815cbb16f32ceacb04b4c5aaba9 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Wed, 20 Sep 2023 14:03:23 -0400 Subject: Bugfixes for tiering Fixed a few issues that manifested during the tiering tests, 1) When a version is copied, it now contains copies of the levels, not just pointers (the levels themselves still hold pointers to the shards, though). 2) Ensure that tasks are scheduled with the correct timestamp, they were originally being scheduled backwards. The get_merge_tasks() method already returns them in the correct order, so reversing them again put it in the wrong order. --- include/framework/InternalLevel.h | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'include/framework/InternalLevel.h') diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h index e67ae45..6cdac4e 100644 --- a/include/framework/InternalLevel.h +++ b/include/framework/InternalLevel.h @@ -106,6 +106,7 @@ public: for (size_t i=0; i m_owns; - InternalLevel *clone() { - auto new_level = new InternalLevel(m_level_no, m_shards.size()); + std::shared_ptr clone() { + auto new_level = std::make_shared(m_level_no, m_shards.size()); for (size_t i=0; im_shards[i] = m_shards[i]; new_level->m_owns[i] = true; m_owns[i] = false; } + + return new_level; } }; -- cgit v1.2.3 From 7c03d771475421c1d5a2bbc135242536af1a371c Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Mon, 25 Sep 2023 10:49:36 -0400 Subject: Re-structuring Project + scheduling updates This is a big one--probably should have split it apart, but I'm feeling lazy this morning. * Organized the mess of header files in include/framework by splitting them out into their own subdirectories, and renaming a few files to remove redundancies introduced by the directory structure. * Introduced a new framework/ShardRequirements.h header file for simpler shard development. This header simply contains the necessary includes from framework/* for creating shard files. This should help to remove structural dependencies from the framework file structure and shards, as well as centralizing the necessary framework files to make shard development easier. * Created a (currently dummy) SchedulerInterface, and make the scheduler implementation a template parameter of the dynamic extension for easier testing of various scheduling policies. There's still more work to be done to fully integrate the scheduler (queries, multiple buffers), but some more of the necessary framework code for this has been added as well. * Adjusted the Task interface setup for the scheduler. The task structures have been removed from ExtensionStructure and placed in their own header file. Additionally, I started experimenting with using std::variant, as opposed to inheritence, to implement subtype polymorphism on the Merge and Query tasks. The scheduler now has a general task queue that contains both, and std::variant, std::visit, and std::get are used to manipulate them without virtual functions. * Removed Alex.h, as it can't build anyway. There's a branch out there containing the Alex implementation stripped of the C++20 stuff. So there's no need to keep it here. --- include/framework/InternalLevel.h | 258 -------------------------------------- 1 file changed, 258 deletions(-) delete mode 100644 include/framework/InternalLevel.h (limited to 'include/framework/InternalLevel.h') diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h deleted file mode 100644 index 6cdac4e..0000000 --- a/include/framework/InternalLevel.h +++ /dev/null @@ -1,258 +0,0 @@ -/* - * include/framework/InternalLevel.h - * - * Copyright (C) 2023 Douglas Rumbaugh - * Dong Xie - * - * All rights reserved. Published under the Modified BSD License. - * - */ -#pragma once - -#include -#include - -#include "util/types.h" -#include "framework/ShardInterface.h" -#include "framework/QueryInterface.h" -#include "framework/RecordInterface.h" -#include "framework/MutableBuffer.h" - -namespace de { -template -class InternalLevel; - - - -template -class InternalLevel { - typedef S Shard; - typedef MutableBuffer Buffer; -public: - InternalLevel(ssize_t level_no, size_t shard_cap) - : m_level_no(level_no) - , m_shard_cnt(0) - , m_shards(shard_cap, nullptr) - , m_owns(shard_cap, true) - , m_pending_shard(nullptr) - {} - - // Create a new memory level sharing the shards and repurposing it as previous level_no + 1 - // WARNING: for leveling only. - InternalLevel(InternalLevel* level) - : m_level_no(level->m_level_no + 1) - , m_shard_cnt(level->m_shard_cnt) - , m_shards(level->m_shards.size(), nullptr) - , m_owns(level->m_owns.size(), true) - , m_pending_shard(nullptr) - { - assert(m_shard_cnt == 1 && m_shards.size() == 1); - - for (size_t i=0; im_owns[i] = false; - m_shards[i] = level->m_shards[i]; - } - } - - ~InternalLevel() { - for (size_t i=0; i merge_levels(InternalLevel* base_level, InternalLevel* new_level) { - assert(base_level->m_level_no > new_level->m_level_no || (base_level->m_level_no == 0 && new_level->m_level_no == 0)); - auto res = new InternalLevel(base_level->m_level_no, 1); - res->m_shard_cnt = 1; - Shard* shards[2]; - shards[0] = base_level->m_shards[0]; - shards[1] = new_level->m_shards[0]; - - res->m_shards[0] = new S(shards, 2); - return std::shared_ptr(res); - } - - void append_buffer(Buffer* buffer) { - if (m_shard_cnt == m_shards.size()) { - assert(m_pending_shard == nullptr); - m_pending_shard = new S(buffer); - return; - } - - m_shards[m_shard_cnt] = new S(buffer); - m_owns[m_shard_cnt] = true; - ++m_shard_cnt; - } - - void append_merged_shards(InternalLevel* level) { - if (m_shard_cnt == m_shards.size()) { - m_pending_shard = new S(level->m_shards.data(), level->m_shard_cnt); - return; - } - - m_shards[m_shard_cnt] = new S(level->m_shards.data(), level->m_shard_cnt); - m_owns[m_shard_cnt] = true; - - ++m_shard_cnt; - } - - - void finalize() { - if (m_pending_shard) { - for (size_t i=0; i> &shards, std::vector& shard_states, void *query_parms) { - for (size_t i=0; i= (ssize_t) shard_stop; i--) { - if (m_shards[i]) { - auto res = m_shards[i]->point_lookup(rec, true); - if (res && res->is_tombstone()) { - return true; - } - } - } - return false; - } - - bool delete_record(const R &rec) { - if (m_shard_cnt == 0) return false; - - for (size_t i = 0; i < m_shards.size(); ++i) { - if (m_shards[i]) { - auto res = m_shards[i]->point_lookup(rec); - if (res) { - res->set_delete(); - return true; - } - } - } - - return false; - } - - Shard* get_shard(size_t idx) { - return m_shards[idx]; - } - - size_t get_shard_count() { - return m_shard_cnt; - } - - size_t get_record_count() { - size_t cnt = 0; - for (size_t i=0; iget_record_count(); - } - - return cnt; - } - - size_t get_tombstone_count() { - size_t res = 0; - for (size_t i = 0; i < m_shard_cnt; ++i) { - res += m_shards[i]->get_tombstone_count(); - } - return res; - } - - size_t get_aux_memory_usage() { - size_t cnt = 0; - for (size_t i=0; iget_aux_memory_usage(); - } - - return cnt; - } - - size_t get_memory_usage() { - size_t cnt = 0; - for (size_t i=0; iget_memory_usage(); - } - } - - return cnt; - } - - double get_tombstone_prop() { - size_t tscnt = 0; - size_t reccnt = 0; - for (size_t i=0; iget_tombstone_count(); - reccnt += (*m_shards[i])->get_record_count(); - } - } - - return (double) tscnt / (double) (tscnt + reccnt); - } - -private: - ssize_t m_level_no; - - size_t m_shard_cnt; - size_t m_shard_size_cap; - - std::vector m_shards; - - Shard *m_pending_shard; - - std::vector m_owns; - - std::shared_ptr clone() { - auto new_level = std::make_shared(m_level_no, m_shards.size()); - for (size_t i=0; im_shards[i] = m_shards[i]; - new_level->m_owns[i] = true; - m_owns[i] = false; - } - - return new_level; - } -}; - -} -- cgit v1.2.3