summaryrefslogtreecommitdiffstats
path: root/include/framework
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework')
-rw-r--r--include/framework/DynamicExtension.h13
-rw-r--r--include/framework/reconstruction/FixedShardCountPolicy.h66
-rw-r--r--include/framework/reconstruction/FloodL0Policy.h65
-rw-r--r--include/framework/structure/ExtensionStructure.h95
-rw-r--r--include/framework/structure/InternalLevel.h19
5 files changed, 229 insertions, 29 deletions
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index c35bb93..5a64243 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -249,6 +249,19 @@ public:
}
/**
+ * Get the number of non-empty shards within the index.
+ *
+ * @return The number of non-empty shards within the index
+ */
+ size_t get_shard_count() {
+ auto epoch = get_active_epoch();
+ auto s = epoch->get_structure()->get_shard_count();
+ end_job(epoch);
+
+ return s;
+ }
+
+ /**
* Get the number of bytes of memory allocated across the framework for
* storing records and associated index information (i.e., internal
* ISAM tree nodes). This includes memory that is allocated but
diff --git a/include/framework/reconstruction/FixedShardCountPolicy.h b/include/framework/reconstruction/FixedShardCountPolicy.h
new file mode 100644
index 0000000..ec8e4e6
--- /dev/null
+++ b/include/framework/reconstruction/FixedShardCountPolicy.h
@@ -0,0 +1,66 @@
+/*
+ * include/framework/reconstruction/FixedShardCountPolicy.h
+ *
+ * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ */
+#pragma once
+#include <cmath>
+
+#include "framework/reconstruction/ReconstructionPolicy.h"
+#include "framework/scheduling/Epoch.h"
+#include "util/types.h"
+
+namespace de {
+template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
+class FixedShardCountPolicy : public ReconstructionPolicy<ShardType, QueryType> {
+ typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>>
+ LevelVector;
+
+public:
+ FixedShardCountPolicy(size_t buffer_size, size_t shard_count, size_t max_record_count)
+ : m_buffer_size(buffer_size), m_shard_count(shard_count), m_max_reccnt(max_record_count) {}
+
+ ReconstructionVector
+ get_reconstruction_tasks(const Epoch<ShardType, QueryType> *epoch,
+ size_t incoming_reccnt) const override {
+ ReconstructionVector reconstructions;
+ return reconstructions;
+
+ }
+
+ ReconstructionTask
+ get_flush_task(const Epoch<ShardType, QueryType> *epoch) const override {
+
+ auto levels = epoch->get_structure()->get_level_vector();
+
+ if (levels.size() == 0) {
+ return ReconstructionTask{
+ {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append};
+ }
+
+ ShardID last_shid = {0, (shard_index) (levels[0]->get_shard_count() - 1)};
+
+ if (levels[0]->get_shard(last_shid.shard_idx)->get_record_count() + m_buffer_size <= capacity()) {
+ return ReconstructionTask{
+ {{buffer_shid, last_shid}}, 0, m_buffer_size, ReconstructionType::Merge};
+ } else {
+ return ReconstructionTask{
+ {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append};
+ }
+ }
+
+private:
+ inline size_t capacity() const {
+ double bps = (double) m_max_reccnt / (double) m_buffer_size / m_shard_count;
+ return ceil(bps) * m_buffer_size;
+ }
+
+ size_t m_buffer_size;
+ size_t m_shard_count;
+ size_t m_max_reccnt;
+};
+} // namespace de
diff --git a/include/framework/reconstruction/FloodL0Policy.h b/include/framework/reconstruction/FloodL0Policy.h
new file mode 100644
index 0000000..da4c297
--- /dev/null
+++ b/include/framework/reconstruction/FloodL0Policy.h
@@ -0,0 +1,65 @@
+/*
+ * include/framework/reconstruction/FloodL0Policy.h
+ *
+ * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ */
+#pragma once
+#include <cmath>
+
+#include "framework/reconstruction/ReconstructionPolicy.h"
+#include "framework/scheduling/Epoch.h"
+#include "util/types.h"
+
+namespace de {
+template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
+class FloodL0Policy : public ReconstructionPolicy<ShardType, QueryType> {
+ typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>>
+ LevelVector;
+
+public:
+ FloodL0Policy(size_t buffer_size) : m_buffer_size(buffer_size) {}
+
+ ReconstructionVector
+ get_reconstruction_tasks(const Epoch<ShardType, QueryType> *epoch,
+ size_t incoming_reccnt) const override {
+
+ ReconstructionVector reconstructions;
+ return reconstructions;
+
+ }
+
+ ReconstructionTask
+ get_flush_task(const Epoch<ShardType, QueryType> *epoch) const override {
+ return ReconstructionTask{
+ {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append};
+ }
+
+private:
+ level_index find_reconstruction_target(LevelVector &levels) const {
+ level_index target_level = invalid_level_idx;
+ size_t incoming_records = m_buffer_size;
+
+ for (level_index i = 0; i < (level_index)levels.size(); i++) {
+ if (levels[i]->get_record_count() + incoming_records < capacity(i)) {
+ target_level = i;
+ break;
+ }
+
+ incoming_records = levels[i]->get_record_count();
+ }
+
+ return target_level;
+ }
+
+ inline size_t capacity(level_index level) const {
+ return m_buffer_size * pow(m_scale_factor, level + 1);
+ }
+
+ size_t m_scale_factor;
+ size_t m_buffer_size;
+};
+} // namespace de
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index 3bb8a0b..078c4a9 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -145,17 +145,32 @@ public:
return cnt;
}
- inline void perform_reconstruction(ReconstructionTask task) {
+ size_t get_shard_count() const {
+ size_t cnt = 0;
+ for (size_t i = 0; i < m_levels.size(); i++) {
+ if (m_levels[i]) {
+ cnt += m_levels[i]->get_nonempty_shard_count();
+ }
+ }
+
+ return cnt;
+ }
+
+ inline void perform_reconstruction(ReconstructionTask task,
+ BuffView *bv=nullptr) {
/* perform the reconstruction itself */
std::vector<const ShardType *> shards;
for (ShardID shid : task.sources) {
assert(shid.level_idx < (level_index) m_levels.size());
assert(shid.shard_idx >= -1);
- /* if unspecified, push all shards into the vector */
- if (shid.shard_idx == all_shards_idx) {
- for (size_t i = 0; i < m_levels[shid.level_idx]->get_shard_count();
- i++) {
+ if (shid == buffer_shid) {
+ assert(bv);
+ ShardType *buffer_shard = new ShardType(std::move(*bv));
+ shards.push_back(buffer_shard);
+ } else if (shid.shard_idx == all_shards_idx) {
+ /* if unspecified, push all shards into the vector */
+ for (size_t i = 0; i < m_levels[shid.level_idx]->get_shard_count(); i++) {
if (m_levels[shid.level_idx]->get_shard(i)) {
shards.push_back(m_levels[shid.level_idx]->get_shard(i));
}
@@ -171,7 +186,9 @@ public:
* Remove all of the shards processed by the operation
*/
for (ShardID shid : task.sources) {
- if (shid.shard_idx == all_shards_idx) {
+ if (shid == buffer_shid) {
+ continue;
+ } else if (shid.shard_idx == all_shards_idx) {
m_levels[shid.level_idx]->truncate();
} else {
m_levels[shid.level_idx]->delete_shard(shid.shard_idx);
@@ -199,29 +216,49 @@ public:
* like that, we'll leave this as low priority.
*/
- /* insert the first level, if needed */
- if (m_levels.size() == 0) {
- m_levels.push_back(
- std::make_shared<InternalLevel<ShardType, QueryType>>(0));
- }
-
- ShardType *buffer_shard = new ShardType(std::move(buffer));
- if (task.type == ReconstructionType::Append || m_levels[0]->get_shard_count() == 0) {
- m_levels[0]->append(std::shared_ptr<ShardType>(buffer_shard));
- } else {
- std::vector<const ShardType *> shards;
- for (level_index i = 0; i < (level_index)m_levels[0]->get_shard_count();
- i++) {
- if (m_levels[0]->get_shard(i)) {
- shards.push_back(m_levels[0]->get_shard(i));
- }
-
- shards.push_back(buffer_shard);
- ShardType *new_shard = new ShardType(shards);
- m_levels[0]->truncate();
- m_levels[0]->append(std::shared_ptr<ShardType>(new_shard));
- }
- }
+ // /* insert the first level, if needed */
+ // if (m_levels.size() == 0) {
+ // m_levels.push_back(
+ // std::make_shared<InternalLevel<ShardType, QueryType>>(0));
+ // }
+
+ perform_reconstruction(task, &buffer);
+
+ // ShardType *buffer_shard = new ShardType(std::move(buffer));
+ // if (task.type == ReconstructionType::Append || m_levels[0]->get_shard_count() == 0) {
+ // m_levels[0]->append(std::shared_ptr<ShardType>(buffer_shard));
+ // } else if (task.type == ReconstructionType::Merge) {
+ // std::vector<const ShardType *> shards;
+ // for (size_t i=0; i<task.sources.size(); i++) {
+ // ShardID shid = task.sources[i];
+ // if (shid != buffer_shid) {
+ // shards.emplace_back(m_levels[shid.level_idx]->get_shard(shid.shard_idx));
+ // }
+ // }
+
+ // shards.emplace_back(buffer_shard);
+ // ShardType *new_shard = new ShardType(shards);
+ // m_levels[0]->append(std::shared_ptr<ShardType>(new_shard));
+ // for (size_t i=0; i<task.sources.size(); i++) {
+ // ShardID shid = task.sources[i];
+ // if (shid != buffer_shid) {
+ // m_levels[shid.level_idx]->delete_shard(shid.shard_idx);
+ // }
+ // }
+ // } else {
+ // std::vector<const ShardType *> shards;
+ // for (level_index i = 0; i < (level_index)m_levels[0]->get_shard_count();
+ // i++) {
+ // if (m_levels[0]->get_shard(i)) {
+ // shards.push_back(m_levels[0]->get_shard(i));
+ // }
+
+ // shards.push_back(buffer_shard);
+ // ShardType *new_shard = new ShardType(shards);
+ // m_levels[0]->truncate();
+ // m_levels[0]->append(std::shared_ptr<ShardType>(new_shard));
+ // }
+ // }
}
bool take_reference() {
diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h
index c9d1749..5bc891b 100644
--- a/include/framework/structure/InternalLevel.h
+++ b/include/framework/structure/InternalLevel.h
@@ -166,6 +166,17 @@ public:
return (double)tscnt / (double)(tscnt + reccnt);
}
+ size_t get_nonempty_shard_count() const {
+ size_t cnt = 0;
+ for (size_t i = 0; i < m_shards.size(); i++) {
+ if (m_shards[i] && m_shards[i]->get_record_count() > 0) {
+ cnt += 1;
+ }
+ }
+
+ return cnt;
+ }
+
std::shared_ptr<InternalLevel> clone() const {
auto new_level = std::make_shared<InternalLevel>(m_level_no);
for (size_t i = 0; i < m_shards.size(); i++) {
@@ -185,6 +196,14 @@ public:
m_shards.emplace_back(shard);
}
+ const ShardType *get_shard(ShardID shid) const {
+ if (shid < m_shards.size()) {
+ return m_shards[shid].get();
+ }
+
+ return nullptr;
+ }
+
private:
ssize_t m_level_no;
std::vector<std::shared_ptr<ShardType>> m_shards;