diff options
Diffstat (limited to 'include/framework')
| -rw-r--r-- | include/framework/DynamicExtension.h | 13 | ||||
| -rw-r--r-- | include/framework/reconstruction/FixedShardCountPolicy.h | 66 | ||||
| -rw-r--r-- | include/framework/reconstruction/FloodL0Policy.h | 65 | ||||
| -rw-r--r-- | include/framework/structure/ExtensionStructure.h | 95 | ||||
| -rw-r--r-- | include/framework/structure/InternalLevel.h | 19 |
5 files changed, 229 insertions, 29 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index c35bb93..5a64243 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -249,6 +249,19 @@ public: } /** + * Get the number of non-empty shards within the index. + * + * @return The number of non-empty shards within the index + */ + size_t get_shard_count() { + auto epoch = get_active_epoch(); + auto s = epoch->get_structure()->get_shard_count(); + end_job(epoch); + + return s; + } + + /** * Get the number of bytes of memory allocated across the framework for * storing records and associated index information (i.e., internal * ISAM tree nodes). This includes memory that is allocated but diff --git a/include/framework/reconstruction/FixedShardCountPolicy.h b/include/framework/reconstruction/FixedShardCountPolicy.h new file mode 100644 index 0000000..ec8e4e6 --- /dev/null +++ b/include/framework/reconstruction/FixedShardCountPolicy.h @@ -0,0 +1,66 @@ +/* + * include/framework/reconstruction/FixedShardCountPolicy.h + * + * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Dong Xie <dongx@psu.edu> + * + * Distributed under the Modified BSD License. + * + */ +#pragma once +#include <cmath> + +#include "framework/reconstruction/ReconstructionPolicy.h" +#include "framework/scheduling/Epoch.h" +#include "util/types.h" + +namespace de { +template <ShardInterface ShardType, QueryInterface<ShardType> QueryType> +class FixedShardCountPolicy : public ReconstructionPolicy<ShardType, QueryType> { + typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> + LevelVector; + +public: + FixedShardCountPolicy(size_t buffer_size, size_t shard_count, size_t max_record_count) + : m_buffer_size(buffer_size), m_shard_count(shard_count), m_max_reccnt(max_record_count) {} + + ReconstructionVector + get_reconstruction_tasks(const Epoch<ShardType, QueryType> *epoch, + size_t incoming_reccnt) const override { + ReconstructionVector reconstructions; + return reconstructions; + + } + + ReconstructionTask + get_flush_task(const Epoch<ShardType, QueryType> *epoch) const override { + + auto levels = epoch->get_structure()->get_level_vector(); + + if (levels.size() == 0) { + return ReconstructionTask{ + {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append}; + } + + ShardID last_shid = {0, (shard_index) (levels[0]->get_shard_count() - 1)}; + + if (levels[0]->get_shard(last_shid.shard_idx)->get_record_count() + m_buffer_size <= capacity()) { + return ReconstructionTask{ + {{buffer_shid, last_shid}}, 0, m_buffer_size, ReconstructionType::Merge}; + } else { + return ReconstructionTask{ + {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append}; + } + } + +private: + inline size_t capacity() const { + double bps = (double) m_max_reccnt / (double) m_buffer_size / m_shard_count; + return ceil(bps) * m_buffer_size; + } + + size_t m_buffer_size; + size_t m_shard_count; + size_t m_max_reccnt; +}; +} // namespace de diff --git a/include/framework/reconstruction/FloodL0Policy.h b/include/framework/reconstruction/FloodL0Policy.h new file mode 100644 index 0000000..da4c297 --- /dev/null +++ b/include/framework/reconstruction/FloodL0Policy.h @@ -0,0 +1,65 @@ +/* + * include/framework/reconstruction/FloodL0Policy.h + * + * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Dong Xie <dongx@psu.edu> + * + * Distributed under the Modified BSD License. + * + */ +#pragma once +#include <cmath> + +#include "framework/reconstruction/ReconstructionPolicy.h" +#include "framework/scheduling/Epoch.h" +#include "util/types.h" + +namespace de { +template <ShardInterface ShardType, QueryInterface<ShardType> QueryType> +class FloodL0Policy : public ReconstructionPolicy<ShardType, QueryType> { + typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> + LevelVector; + +public: + FloodL0Policy(size_t buffer_size) : m_buffer_size(buffer_size) {} + + ReconstructionVector + get_reconstruction_tasks(const Epoch<ShardType, QueryType> *epoch, + size_t incoming_reccnt) const override { + + ReconstructionVector reconstructions; + return reconstructions; + + } + + ReconstructionTask + get_flush_task(const Epoch<ShardType, QueryType> *epoch) const override { + return ReconstructionTask{ + {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append}; + } + +private: + level_index find_reconstruction_target(LevelVector &levels) const { + level_index target_level = invalid_level_idx; + size_t incoming_records = m_buffer_size; + + for (level_index i = 0; i < (level_index)levels.size(); i++) { + if (levels[i]->get_record_count() + incoming_records < capacity(i)) { + target_level = i; + break; + } + + incoming_records = levels[i]->get_record_count(); + } + + return target_level; + } + + inline size_t capacity(level_index level) const { + return m_buffer_size * pow(m_scale_factor, level + 1); + } + + size_t m_scale_factor; + size_t m_buffer_size; +}; +} // namespace de diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 3bb8a0b..078c4a9 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -145,17 +145,32 @@ public: return cnt; } - inline void perform_reconstruction(ReconstructionTask task) { + size_t get_shard_count() const { + size_t cnt = 0; + for (size_t i = 0; i < m_levels.size(); i++) { + if (m_levels[i]) { + cnt += m_levels[i]->get_nonempty_shard_count(); + } + } + + return cnt; + } + + inline void perform_reconstruction(ReconstructionTask task, + BuffView *bv=nullptr) { /* perform the reconstruction itself */ std::vector<const ShardType *> shards; for (ShardID shid : task.sources) { assert(shid.level_idx < (level_index) m_levels.size()); assert(shid.shard_idx >= -1); - /* if unspecified, push all shards into the vector */ - if (shid.shard_idx == all_shards_idx) { - for (size_t i = 0; i < m_levels[shid.level_idx]->get_shard_count(); - i++) { + if (shid == buffer_shid) { + assert(bv); + ShardType *buffer_shard = new ShardType(std::move(*bv)); + shards.push_back(buffer_shard); + } else if (shid.shard_idx == all_shards_idx) { + /* if unspecified, push all shards into the vector */ + for (size_t i = 0; i < m_levels[shid.level_idx]->get_shard_count(); i++) { if (m_levels[shid.level_idx]->get_shard(i)) { shards.push_back(m_levels[shid.level_idx]->get_shard(i)); } @@ -171,7 +186,9 @@ public: * Remove all of the shards processed by the operation */ for (ShardID shid : task.sources) { - if (shid.shard_idx == all_shards_idx) { + if (shid == buffer_shid) { + continue; + } else if (shid.shard_idx == all_shards_idx) { m_levels[shid.level_idx]->truncate(); } else { m_levels[shid.level_idx]->delete_shard(shid.shard_idx); @@ -199,29 +216,49 @@ public: * like that, we'll leave this as low priority. */ - /* insert the first level, if needed */ - if (m_levels.size() == 0) { - m_levels.push_back( - std::make_shared<InternalLevel<ShardType, QueryType>>(0)); - } - - ShardType *buffer_shard = new ShardType(std::move(buffer)); - if (task.type == ReconstructionType::Append || m_levels[0]->get_shard_count() == 0) { - m_levels[0]->append(std::shared_ptr<ShardType>(buffer_shard)); - } else { - std::vector<const ShardType *> shards; - for (level_index i = 0; i < (level_index)m_levels[0]->get_shard_count(); - i++) { - if (m_levels[0]->get_shard(i)) { - shards.push_back(m_levels[0]->get_shard(i)); - } - - shards.push_back(buffer_shard); - ShardType *new_shard = new ShardType(shards); - m_levels[0]->truncate(); - m_levels[0]->append(std::shared_ptr<ShardType>(new_shard)); - } - } + // /* insert the first level, if needed */ + // if (m_levels.size() == 0) { + // m_levels.push_back( + // std::make_shared<InternalLevel<ShardType, QueryType>>(0)); + // } + + perform_reconstruction(task, &buffer); + + // ShardType *buffer_shard = new ShardType(std::move(buffer)); + // if (task.type == ReconstructionType::Append || m_levels[0]->get_shard_count() == 0) { + // m_levels[0]->append(std::shared_ptr<ShardType>(buffer_shard)); + // } else if (task.type == ReconstructionType::Merge) { + // std::vector<const ShardType *> shards; + // for (size_t i=0; i<task.sources.size(); i++) { + // ShardID shid = task.sources[i]; + // if (shid != buffer_shid) { + // shards.emplace_back(m_levels[shid.level_idx]->get_shard(shid.shard_idx)); + // } + // } + + // shards.emplace_back(buffer_shard); + // ShardType *new_shard = new ShardType(shards); + // m_levels[0]->append(std::shared_ptr<ShardType>(new_shard)); + // for (size_t i=0; i<task.sources.size(); i++) { + // ShardID shid = task.sources[i]; + // if (shid != buffer_shid) { + // m_levels[shid.level_idx]->delete_shard(shid.shard_idx); + // } + // } + // } else { + // std::vector<const ShardType *> shards; + // for (level_index i = 0; i < (level_index)m_levels[0]->get_shard_count(); + // i++) { + // if (m_levels[0]->get_shard(i)) { + // shards.push_back(m_levels[0]->get_shard(i)); + // } + + // shards.push_back(buffer_shard); + // ShardType *new_shard = new ShardType(shards); + // m_levels[0]->truncate(); + // m_levels[0]->append(std::shared_ptr<ShardType>(new_shard)); + // } + // } } bool take_reference() { diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h index c9d1749..5bc891b 100644 --- a/include/framework/structure/InternalLevel.h +++ b/include/framework/structure/InternalLevel.h @@ -166,6 +166,17 @@ public: return (double)tscnt / (double)(tscnt + reccnt); } + size_t get_nonempty_shard_count() const { + size_t cnt = 0; + for (size_t i = 0; i < m_shards.size(); i++) { + if (m_shards[i] && m_shards[i]->get_record_count() > 0) { + cnt += 1; + } + } + + return cnt; + } + std::shared_ptr<InternalLevel> clone() const { auto new_level = std::make_shared<InternalLevel>(m_level_no); for (size_t i = 0; i < m_shards.size(); i++) { @@ -185,6 +196,14 @@ public: m_shards.emplace_back(shard); } + const ShardType *get_shard(ShardID shid) const { + if (shid < m_shards.size()) { + return m_shards[shid].get(); + } + + return nullptr; + } + private: ssize_t m_level_no; std::vector<std::shared_ptr<ShardType>> m_shards; |