summaryrefslogtreecommitdiffstats
path: root/include/framework/structure
diff options
context:
space:
mode:
Diffstat (limited to 'include/framework/structure')
-rw-r--r--include/framework/structure/ExtensionStructure.h72
-rw-r--r--include/framework/structure/InternalLevel.h76
2 files changed, 114 insertions, 34 deletions
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index 60fb6c7..2bf7086 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -27,8 +27,9 @@ class ExtensionStructure {
typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>>
LevelVector;
public:
- ExtensionStructure() {
- m_levels.emplace_back(std::make_shared<InternalLevel<ShardType, QueryType>>(0));
+ ExtensionStructure(bool default_level=true) {
+ if (default_level)
+ m_levels.emplace_back(std::make_shared<InternalLevel<ShardType, QueryType>>(0));
}
~ExtensionStructure() = default;
@@ -49,7 +50,7 @@ public:
* need to be forwarded to the appropriate structures manually.
*/
ExtensionStructure<ShardType, QueryType> *copy() const {
- auto new_struct = new ExtensionStructure<ShardType, QueryType>();
+ auto new_struct = new ExtensionStructure<ShardType, QueryType>(false);
for (size_t i = 0; i < m_levels.size(); i++) {
new_struct->m_levels.push_back(m_levels[i]->clone());
}
@@ -158,13 +159,17 @@ public:
return cnt;
}
- inline void perform_reconstruction(ReconstructionTask task) {
+ inline void perform_reconstruction(ReconstructionTask task, size_t version=0) {
/* perform the reconstruction itself */
std::vector<const ShardType *> shards;
for (ShardID shid : task.sources) {
- assert(shid.level_idx < (level_index) m_levels.size());
+ assert(shid.level_idx <= (level_index) m_levels.size());
assert(shid.shard_idx >= -1);
+ if (shid.level_idx == (level_index) m_levels.size()) {
+ continue;
+ }
+
/* if unspecified, push all shards into the vector */
if (shid.shard_idx == all_shards_idx) {
for (size_t i = 0; i < m_levels[shid.level_idx]->get_shard_count();
@@ -184,21 +189,27 @@ public:
* Remove all of the shards processed by the operation
*/
for (ShardID shid : task.sources) {
- if (shid.shard_idx == all_shards_idx) {
+ if (shid.level_idx == (level_index) m_levels.size()) {
+ continue;
+ } else if (shid.shard_idx == all_shards_idx) {
m_levels[shid.level_idx]->truncate();
} else if (shid != buffer_shid) {
m_levels[shid.level_idx]->delete_shard(shid.shard_idx);
}
}
+ // fprintf(stderr, "Target: %ld\tLevels:%ld\n", task.target, m_levels.size());
+
/*
* Append the new shard to the target level
*/
if (task.target < (level_index)m_levels.size()) {
- m_levels[task.target]->append(std::shared_ptr<ShardType>(new_shard));
+ m_levels[task.target]->append(std::shared_ptr<ShardType>(new_shard), version);
+ // fprintf(stderr, "append (no growth)\n");
} else { /* grow the structure if needed */
m_levels.push_back(std::make_shared<InternalLevel<ShardType, QueryType>>(task.target));
- m_levels[task.target]->append(std::shared_ptr<ShardType>(new_shard));
+ m_levels[task.target]->append(std::shared_ptr<ShardType>(new_shard), version);
+ // fprintf(stderr, "grow and append\n");
}
}
@@ -219,8 +230,8 @@ public:
return m_levels[0]->get_shard_count();
}
- void append_l0(std::shared_ptr<ShardType> shard) {
- m_levels[0]->append(shard);
+ void append_l0(std::shared_ptr<ShardType> shard, size_t version) {
+ m_levels[0]->append(shard, version);
}
LevelVector const &get_level_vector() const { return m_levels; }
@@ -251,6 +262,47 @@ public:
return ts_prop <= (long double) max_delete_prop;
}
+ void print_structure() const {
+ for (size_t i=0; i<m_levels.size(); i++) {
+ fprintf(stdout, "[%ld]:\t", i);
+
+ if (m_levels[i]) {
+ for (size_t j=0; j<m_levels[i]->get_shard_count(); j++) {
+ fprintf(stdout, "(%ld: %ld) ", j, m_levels[i]->get_shard(j)->get_record_count());
+ }
+ } else {
+ fprintf(stdout, "[Empty]");
+ }
+
+ fprintf(stdout, "\n");
+ }
+ }
+
+
+ void merge_structure(const ExtensionStructure* old_structure, size_t version_id = 0) {
+ assert(version_id > 0);
+
+ for (size_t i=0; i<old_structure->m_levels.size(); i++) {
+ for (size_t j=0; j<old_structure->m_levels[i]->get_shard_count(); j++) {
+ if (old_structure->m_levels[i]->get_shard_version(j) > version_id) {
+ m_levels[i]->append(old_structure->m_levels[i]->get_shard_ptr(j));
+ }
+ }
+ }
+ }
+
+ void update_shard_version(size_t version) {
+ assert(version != 0);
+
+ for (size_t i=0; i<m_levels.size(); i++) {
+ for (size_t j=0; j<m_levels[i]->get_shard_count(); j++) {
+ if (m_levels[i]->get_shard_version(j) == 0) {
+ m_levels[i]->set_shard_version(j, version);
+ }
+ }
+ }
+ }
+
private:
LevelVector m_levels;
};
diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h
index 5bc891b..37b2b40 100644
--- a/include/framework/structure/InternalLevel.h
+++ b/include/framework/structure/InternalLevel.h
@@ -15,6 +15,7 @@
*/
#pragma once
+#include <future>
#include <memory>
#include <vector>
@@ -28,6 +29,7 @@ template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
class InternalLevel {
typedef typename ShardType::RECORD RecordType;
typedef BufferView<RecordType> BuffView;
+ typedef std::pair<std::shared_ptr<ShardType>, size_t> shard_ptr;
public:
InternalLevel(ssize_t level_no) : m_level_no(level_no) {}
@@ -47,8 +49,8 @@ public:
std::vector<const ShardType *> shards;
for (auto shard : m_shards) {
- if (shard)
- shards.emplace_back(shard.get());
+ if (shard.first)
+ shards.emplace_back(shard.first.get());
}
return new ShardType(shards);
@@ -59,10 +61,10 @@ public:
std::vector<typename QueryType::LocalQuery *> &local_queries,
typename QueryType::Parameters *query_parms) const {
for (size_t i = 0; i < m_shards.size(); i++) {
- if (m_shards[i]) {
+ if (m_shards[i].first) {
auto local_query =
- QueryType::local_preproc(m_shards[i].get(), query_parms);
- shards.push_back({{m_level_no, (ssize_t)i}, m_shards[i].get()});
+ QueryType::local_preproc(m_shards[i].first.get(), query_parms);
+ shards.push_back({{m_level_no, (ssize_t)i}, m_shards[i].first.get()});
local_queries.emplace_back(local_query);
}
}
@@ -74,7 +76,7 @@ public:
for (int i = m_shards.size() - 1; i >= (ssize_t)shard_stop; i--) {
if (m_shards[i]) {
- auto res = m_shards[i]->point_lookup(rec, true);
+ auto res = m_shards[i].first->point_lookup(rec, true);
if (res && res->is_tombstone()) {
return true;
}
@@ -88,8 +90,8 @@ public:
return false;
for (size_t i = 0; i < m_shards.size(); ++i) {
- if (m_shards[i]) {
- auto res = m_shards[i]->point_lookup(rec);
+ if (m_shards[i].first) {
+ auto res = m_shards[i].first->point_lookup(rec);
if (res) {
res->set_delete();
return true;
@@ -105,7 +107,15 @@ public:
return nullptr;
}
- return m_shards[idx].get();
+ return m_shards[idx].first.get();
+ }
+
+ const size_t get_shard_version(size_t idx) const {
+ if (idx >= m_shards.size()) {
+ return 0;
+ }
+
+ return m_shards[idx].second;
}
size_t get_shard_count() const { return m_shards.size(); }
@@ -113,8 +123,8 @@ public:
size_t get_record_count() const {
size_t cnt = 0;
for (size_t i = 0; i < m_shards.size(); i++) {
- if (m_shards[i]) {
- cnt += m_shards[i]->get_record_count();
+ if (m_shards[i].first) {
+ cnt += m_shards[i].first->get_record_count();
}
}
@@ -124,8 +134,8 @@ public:
size_t get_tombstone_count() const {
size_t res = 0;
for (size_t i = 0; i < m_shards.size(); ++i) {
- if (m_shards[i]) {
- res += m_shards[i]->get_tombstone_count();
+ if (m_shards[i].first) {
+ res += m_shards[i].first->get_tombstone_count();
}
}
return res;
@@ -134,8 +144,8 @@ public:
size_t get_aux_memory_usage() const {
size_t cnt = 0;
for (size_t i = 0; i < m_shards.size(); i++) {
- if (m_shards[i]) {
- cnt += m_shards[i]->get_aux_memory_usage();
+ if (m_shards[i].first) {
+ cnt += m_shards[i].first->get_aux_memory_usage();
}
}
@@ -146,7 +156,7 @@ public:
size_t cnt = 0;
for (size_t i = 0; i < m_shards.size(); i++) {
if (m_shards[i]) {
- cnt += m_shards[i]->get_memory_usage();
+ cnt += m_shards[i].first->get_memory_usage();
}
}
@@ -158,8 +168,8 @@ public:
size_t reccnt = 0;
for (size_t i = 0; i < m_shards.size(); i++) {
if (m_shards[i]) {
- tscnt += m_shards[i]->get_tombstone_count();
- reccnt += m_shards[i]->get_record_count();
+ tscnt += m_shards[i].first->get_tombstone_count();
+ reccnt += m_shards[i].first->get_record_count();
}
}
@@ -169,7 +179,7 @@ public:
size_t get_nonempty_shard_count() const {
size_t cnt = 0;
for (size_t i = 0; i < m_shards.size(); i++) {
- if (m_shards[i] && m_shards[i]->get_record_count() > 0) {
+ if (m_shards[i].first && m_shards[i].first->get_record_count() > 0) {
cnt += 1;
}
}
@@ -180,7 +190,7 @@ public:
std::shared_ptr<InternalLevel> clone() const {
auto new_level = std::make_shared<InternalLevel>(m_level_no);
for (size_t i = 0; i < m_shards.size(); i++) {
- new_level->append(m_shards[i]);
+ new_level->append(m_shards[i].first, m_shards[i].second);
}
return new_level;
@@ -192,21 +202,39 @@ public:
m_shards.erase(m_shards.begin() + shard);
}
- void append(std::shared_ptr<ShardType> shard) {
- m_shards.emplace_back(shard);
+ void append(std::shared_ptr<ShardType> shard, size_t version=0) {
+ m_shards.push_back({shard, version});
+ }
+
+ void append(shard_ptr shard) {
+ m_shards.push_back(shard);
}
const ShardType *get_shard(ShardID shid) const {
if (shid < m_shards.size()) {
- return m_shards[shid].get();
+ return m_shards[shid].first.get();
}
return nullptr;
}
+ const shard_ptr get_shard_ptr(size_t shid) const {
+ if (shid < m_shards.size()) {
+ return m_shards[shid];
+ }
+
+ return {nullptr, 0};
+ }
+
+ void set_shard_version(size_t idx, size_t version) {
+ if (idx < m_shards.size()) {
+ m_shards[idx].second = version;
+ }
+ }
+
private:
ssize_t m_level_no;
- std::vector<std::shared_ptr<ShardType>> m_shards;
+ std::vector<shard_ptr> m_shards;
};
} // namespace de