diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2024-01-12 14:10:11 -0500 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2024-01-12 14:10:11 -0500 |
| commit | aac0bb661af8fae38d3ce08d6078cb4d9dfcb575 (patch) | |
| tree | 347e0ce7f7e15f2610039f02b75d47cedf810cd6 /include/framework/structure/ExtensionStructure.h | |
| parent | c4514c2e62a711189cf3c914297885d97fb51a09 (diff) | |
| download | dynamic-extension-aac0bb661af8fae38d3ce08d6078cb4d9dfcb575.tar.gz | |
Initial integration of new buffering scheme into framework
It isn't working right now (lotsa test failures), but we're to the
debugging phase now.
Diffstat (limited to 'include/framework/structure/ExtensionStructure.h')
| -rw-r--r-- | include/framework/structure/ExtensionStructure.h | 41 |
1 files changed, 12 insertions, 29 deletions
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 60016a0..ae566cb 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -10,28 +10,22 @@ #pragma once #include <atomic> -#include <numeric> #include <cstdio> #include <vector> -#include "framework/structure/MutableBuffer.h" +#include "framework/structure/BufferView.h" #include "framework/structure/InternalLevel.h" -#include "framework/interface/Shard.h" -#include "framework/interface/Query.h" -#include "framework/interface/Record.h" #include "framework/util/Configuration.h" -#include "framework/scheduling/Task.h" #include "psu-util/timer.h" -#include "psu-ds/Alias.h" namespace de { template <RecordInterface R, ShardInterface S, QueryInterface Q, LayoutPolicy L=LayoutPolicy::TEIRING> class ExtensionStructure { typedef S Shard; - typedef MutableBuffer<R> Buffer; + typedef BufferView<R> BuffView; public: ExtensionStructure(size_t buffer_size, size_t scale_factor, double max_delete_prop) @@ -96,14 +90,10 @@ public: * FIXME: arguably, this should be a method attached to the buffer that * takes a structure as input. */ - inline bool flush_buffer(Buffer *buffer) { - assert(can_reconstruct_with(0, buffer->get_record_count())); + inline bool flush_buffer(BuffView buffer) { + assert(can_reconstruct_with(0, buffer.get_record_count())); - // FIXME: this step makes an extra copy of the buffer, - // which could be avoided by adjusting the shard - // reconstruction process a bit, possibly. - buffer->start_flush(); - flush_buffer_into_l0(buffer); + flush_buffer_into_l0(std::move(buffer)); return true; } @@ -415,11 +405,11 @@ private: * returns -1 if idx==0, and no such level exists, to simplify * the logic of the first buffer flush. */ - inline level_index find_reconstruction_target(level_index idx, Buffer *buffer=nullptr) { + inline level_index find_reconstruction_target(level_index idx) { if (idx == 0 && m_levels.size() == 0) return -1; - size_t incoming_rec_cnt = get_level_record_count(idx, buffer); + size_t incoming_rec_cnt = get_level_record_count(idx); for (level_index i=idx+1; i<m_levels.size(); i++) { if (can_reconstruct_with(i, incoming_rec_cnt)) { return i; @@ -431,13 +421,13 @@ private: return -1; } - inline void flush_buffer_into_l0(Buffer *buffer) { + inline void flush_buffer_into_l0(BuffView buffer) { assert(m_levels[0]); if constexpr (L == LayoutPolicy::LEVELING) { // FIXME: Kludgey implementation due to interface constraints. auto old_level = m_levels[0].get(); auto temp_level = new InternalLevel<R, Shard, Q>(0, 1); - temp_level->append_buffer(buffer); + temp_level->append_buffer(std::move(buffer)); if (old_level->get_shard_count() > 0) { m_levels[0] = InternalLevel<R, Shard, Q>::reconstruction(old_level, temp_level); @@ -446,7 +436,7 @@ private: m_levels[0] = std::shared_ptr<InternalLevel<R, Shard, Q>>(temp_level); } } else { - m_levels[0]->append_buffer(buffer); + m_levels[0]->append_buffer(std::move(buffer)); } } @@ -469,16 +459,9 @@ private: } /* - * Returns the actual number of records present on a specified level. An - * index value of -1 indicates the memory table. Can optionally pass in - * a pointer to the memory table to use, if desired. Otherwise, there are - * no guarantees about which buffer will be accessed if level_index is -1. + * Returns the number of records present on a specified level. */ - inline size_t get_level_record_count(level_index idx, Buffer *buffer=nullptr) { - if (buffer) { - return buffer->get_record_count(); - } - + inline size_t get_level_record_count(level_index idx) { return (m_levels[idx]) ? m_levels[idx]->get_record_count() : 0; } |