diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2023-09-18 16:37:30 -0400 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2023-09-18 16:37:30 -0400 |
| commit | 6e30f576ca9d11d1901f4877315e97f84d15b1e1 (patch) | |
| tree | aec980d3e8b0c655a1c38c7c06b314cf10db4f94 /include/framework/InternalLevel.h | |
| parent | abc8605a51537fc7b35bb0d9b1da6c724c5c6973 (diff) | |
| download | dynamic-extension-6e30f576ca9d11d1901f4877315e97f84d15b1e1.tar.gz | |
The scheduler now spawns a seperate merge thread
Merges are now executed from a seperate thread within the scheduler that
wakes up via condition variables when new merge tasks are scheduled. In
addition, tombstone limits are now enforced by the scheduler, with new
merges being scheduled as needed.
There are still a few tests failing, notably the zero tombstones in the
last run invarient is not holding under tiering with tombstones. Need
to look into that yet.
Diffstat (limited to 'include/framework/InternalLevel.h')
| -rw-r--r-- | include/framework/InternalLevel.h | 39 |
1 files changed, 36 insertions, 3 deletions
diff --git a/include/framework/InternalLevel.h b/include/framework/InternalLevel.h index b9866b8..e67ae45 100644 --- a/include/framework/InternalLevel.h +++ b/include/framework/InternalLevel.h @@ -34,6 +34,7 @@ public: , m_shard_cnt(0) , m_shards(shard_cap, nullptr) , m_owns(shard_cap, true) + , m_pending_shard(nullptr) {} // Create a new memory level sharing the shards and repurposing it as previous level_no + 1 @@ -42,7 +43,9 @@ public: : m_level_no(level->m_level_no + 1) , m_shard_cnt(level->m_shard_cnt) , m_shards(level->m_shards.size(), nullptr) - , m_owns(level->m_owns.size(), true) { + , m_owns(level->m_owns.size(), true) + , m_pending_shard(nullptr) + { assert(m_shard_cnt == 1 && m_shards.size() == 1); for (size_t i=0; i<m_shards.size(); i++) { @@ -55,6 +58,8 @@ public: for (size_t i=0; i<m_shards.size(); i++) { if (m_owns[i]) delete m_shards[i]; } + + delete m_pending_shard; } // WARNING: for leveling only. @@ -72,20 +77,45 @@ public: } void append_buffer(Buffer* buffer) { - assert(m_shard_cnt < m_shards.size()); + if (m_shard_cnt == m_shards.size()) { + assert(m_pending_shard == nullptr); + m_pending_shard = new S(buffer); + return; + } + m_shards[m_shard_cnt] = new S(buffer); m_owns[m_shard_cnt] = true; ++m_shard_cnt; } void append_merged_shards(InternalLevel* level) { - assert(m_shard_cnt < m_shards.size()); + if (m_shard_cnt == m_shards.size()) { + m_pending_shard = new S(level->m_shards.data(), level->m_shard_cnt); + return; + } + m_shards[m_shard_cnt] = new S(level->m_shards.data(), level->m_shard_cnt); m_owns[m_shard_cnt] = true; ++m_shard_cnt; } + + void finalize() { + if (m_pending_shard) { + for (size_t i=0; i<m_shards.size(); i++) { + if (m_owns[i]) { + delete m_shards[i]; + m_owns[i] = false; + } + } + + m_shards[0] = m_pending_shard; + m_owns[0] = true; + m_pending_shard = nullptr; + } + } + Shard *get_merged_shard() { if (m_shard_cnt == 0) { return nullptr; @@ -206,6 +236,9 @@ private: size_t m_shard_size_cap; std::vector<Shard*> m_shards; + + Shard *m_pending_shard; + std::vector<bool> m_owns; InternalLevel *clone() { |