summaryrefslogtreecommitdiffstats
path: root/include/framework/structure/InternalLevel.h
diff options
context:
space:
mode:
authorDouglas Rumbaugh <dbr4@psu.edu>2025-02-06 15:56:33 -0500
committerDouglas Rumbaugh <dbr4@psu.edu>2025-02-06 15:56:33 -0500
commitfd0e99e618319974320ed3fb49535aec501be1fb (patch)
treeba5015299ec58beeea152b46eff15a4778b4f20f /include/framework/structure/InternalLevel.h
parent5ee10c336581b68ba91c7048431809e3ccaf7e87 (diff)
downloaddynamic-extension-fd0e99e618319974320ed3fb49535aec501be1fb.tar.gz
Background compaction stuff
Diffstat (limited to 'include/framework/structure/InternalLevel.h')
-rw-r--r--include/framework/structure/InternalLevel.h76
1 files changed, 52 insertions, 24 deletions
diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h
index 5bc891b..37b2b40 100644
--- a/include/framework/structure/InternalLevel.h
+++ b/include/framework/structure/InternalLevel.h
@@ -15,6 +15,7 @@
*/
#pragma once
+#include <future>
#include <memory>
#include <vector>
@@ -28,6 +29,7 @@ template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
class InternalLevel {
typedef typename ShardType::RECORD RecordType;
typedef BufferView<RecordType> BuffView;
+ typedef std::pair<std::shared_ptr<ShardType>, size_t> shard_ptr;
public:
InternalLevel(ssize_t level_no) : m_level_no(level_no) {}
@@ -47,8 +49,8 @@ public:
std::vector<const ShardType *> shards;
for (auto shard : m_shards) {
- if (shard)
- shards.emplace_back(shard.get());
+ if (shard.first)
+ shards.emplace_back(shard.first.get());
}
return new ShardType(shards);
@@ -59,10 +61,10 @@ public:
std::vector<typename QueryType::LocalQuery *> &local_queries,
typename QueryType::Parameters *query_parms) const {
for (size_t i = 0; i < m_shards.size(); i++) {
- if (m_shards[i]) {
+ if (m_shards[i].first) {
auto local_query =
- QueryType::local_preproc(m_shards[i].get(), query_parms);
- shards.push_back({{m_level_no, (ssize_t)i}, m_shards[i].get()});
+ QueryType::local_preproc(m_shards[i].first.get(), query_parms);
+ shards.push_back({{m_level_no, (ssize_t)i}, m_shards[i].first.get()});
local_queries.emplace_back(local_query);
}
}
@@ -74,7 +76,7 @@ public:
for (int i = m_shards.size() - 1; i >= (ssize_t)shard_stop; i--) {
if (m_shards[i]) {
- auto res = m_shards[i]->point_lookup(rec, true);
+ auto res = m_shards[i].first->point_lookup(rec, true);
if (res && res->is_tombstone()) {
return true;
}
@@ -88,8 +90,8 @@ public:
return false;
for (size_t i = 0; i < m_shards.size(); ++i) {
- if (m_shards[i]) {
- auto res = m_shards[i]->point_lookup(rec);
+ if (m_shards[i].first) {
+ auto res = m_shards[i].first->point_lookup(rec);
if (res) {
res->set_delete();
return true;
@@ -105,7 +107,15 @@ public:
return nullptr;
}
- return m_shards[idx].get();
+ return m_shards[idx].first.get();
+ }
+
+ const size_t get_shard_version(size_t idx) const {
+ if (idx >= m_shards.size()) {
+ return 0;
+ }
+
+ return m_shards[idx].second;
}
size_t get_shard_count() const { return m_shards.size(); }
@@ -113,8 +123,8 @@ public:
size_t get_record_count() const {
size_t cnt = 0;
for (size_t i = 0; i < m_shards.size(); i++) {
- if (m_shards[i]) {
- cnt += m_shards[i]->get_record_count();
+ if (m_shards[i].first) {
+ cnt += m_shards[i].first->get_record_count();
}
}
@@ -124,8 +134,8 @@ public:
size_t get_tombstone_count() const {
size_t res = 0;
for (size_t i = 0; i < m_shards.size(); ++i) {
- if (m_shards[i]) {
- res += m_shards[i]->get_tombstone_count();
+ if (m_shards[i].first) {
+ res += m_shards[i].first->get_tombstone_count();
}
}
return res;
@@ -134,8 +144,8 @@ public:
size_t get_aux_memory_usage() const {
size_t cnt = 0;
for (size_t i = 0; i < m_shards.size(); i++) {
- if (m_shards[i]) {
- cnt += m_shards[i]->get_aux_memory_usage();
+ if (m_shards[i].first) {
+ cnt += m_shards[i].first->get_aux_memory_usage();
}
}
@@ -146,7 +156,7 @@ public:
size_t cnt = 0;
for (size_t i = 0; i < m_shards.size(); i++) {
if (m_shards[i]) {
- cnt += m_shards[i]->get_memory_usage();
+ cnt += m_shards[i].first->get_memory_usage();
}
}
@@ -158,8 +168,8 @@ public:
size_t reccnt = 0;
for (size_t i = 0; i < m_shards.size(); i++) {
if (m_shards[i]) {
- tscnt += m_shards[i]->get_tombstone_count();
- reccnt += m_shards[i]->get_record_count();
+ tscnt += m_shards[i].first->get_tombstone_count();
+ reccnt += m_shards[i].first->get_record_count();
}
}
@@ -169,7 +179,7 @@ public:
size_t get_nonempty_shard_count() const {
size_t cnt = 0;
for (size_t i = 0; i < m_shards.size(); i++) {
- if (m_shards[i] && m_shards[i]->get_record_count() > 0) {
+ if (m_shards[i].first && m_shards[i].first->get_record_count() > 0) {
cnt += 1;
}
}
@@ -180,7 +190,7 @@ public:
std::shared_ptr<InternalLevel> clone() const {
auto new_level = std::make_shared<InternalLevel>(m_level_no);
for (size_t i = 0; i < m_shards.size(); i++) {
- new_level->append(m_shards[i]);
+ new_level->append(m_shards[i].first, m_shards[i].second);
}
return new_level;
@@ -192,21 +202,39 @@ public:
m_shards.erase(m_shards.begin() + shard);
}
- void append(std::shared_ptr<ShardType> shard) {
- m_shards.emplace_back(shard);
+ void append(std::shared_ptr<ShardType> shard, size_t version=0) {
+ m_shards.push_back({shard, version});
+ }
+
+ void append(shard_ptr shard) {
+ m_shards.push_back(shard);
}
const ShardType *get_shard(ShardID shid) const {
if (shid < m_shards.size()) {
- return m_shards[shid].get();
+ return m_shards[shid].first.get();
}
return nullptr;
}
+ const shard_ptr get_shard_ptr(size_t shid) const {
+ if (shid < m_shards.size()) {
+ return m_shards[shid];
+ }
+
+ return {nullptr, 0};
+ }
+
+ void set_shard_version(size_t idx, size_t version) {
+ if (idx < m_shards.size()) {
+ m_shards[idx].second = version;
+ }
+ }
+
private:
ssize_t m_level_no;
- std::vector<std::shared_ptr<ShardType>> m_shards;
+ std::vector<shard_ptr> m_shards;
};
} // namespace de