summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--benchmarks/tail-latency/standard_latency_dist.cpp12
-rw-r--r--include/framework/DynamicExtension.h96
-rw-r--r--include/framework/scheduling/LockManager.h14
-rw-r--r--include/framework/structure/ExtensionStructure.h132
-rw-r--r--include/framework/structure/InternalLevel.h6
-rw-r--r--include/util/types.h9
6 files changed, 131 insertions, 138 deletions
diff --git a/benchmarks/tail-latency/standard_latency_dist.cpp b/benchmarks/tail-latency/standard_latency_dist.cpp
index 34949d4..5c96a73 100644
--- a/benchmarks/tail-latency/standard_latency_dist.cpp
+++ b/benchmarks/tail-latency/standard_latency_dist.cpp
@@ -70,6 +70,13 @@ int main(int argc, char **argv) {
while (!extension->insert(data[j])) {
usleep(1);
}
+ if (j + 1 != extension->get_record_count()) {
+ fprintf(stderr, "[E] %ld\t%ld\n", j+1, extension->get_record_count());
+ extension->print_structure();
+ fflush(stderr);
+ fflush(stdout);
+ }
+ assert(j+1 == extension->get_record_count());
}
extension->await_version();
@@ -82,6 +89,11 @@ int main(int argc, char **argv) {
fprintf(stderr, "[B] %ld %ld\n", j, extension->get_record_count());
usleep(1);
}
+
+ if (j + 1 != extension->get_record_count()) {
+ fprintf(stderr, "[E] %ld\t%ld\n", j+1, extension->get_record_count());
+ }
+ assert(j+1 == extension->get_record_count());
}
TIMER_STOP();
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index c2a59ea..31eb138 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -351,8 +351,8 @@ public:
}
/* versions signal on m_version_advance_cv when they activate */
+ std::unique_lock lk(m_version_advance_mtx);
while (m_active_version.load()->get_id() < vid) {
- std::unique_lock lk(m_version_advance_mtx);
m_version_advance_cv.wait(lk);
}
@@ -402,8 +402,6 @@ private:
std::condition_variable m_version_advance_cv;
std::mutex m_version_advance_mtx;
- std::atomic<bool> m_flush_in_progress = false;
-
LockManager m_lock_mngr;
alignas(64) std::atomic<bool> m_scheduling_reconstruction;
@@ -436,13 +434,15 @@ private:
}
}
- size_t m_flush_cnt = 0;
-
static void reconstruction(void *arguments) {
auto args = (ReconstructionArgs<ShardType, QueryType> *)arguments;
auto extension = (DynamicExtension *)args->extension;
extension->SetThreadAffinity();
+
+ std::vector<reconstruction_results<ShardType>> reconstructions;
+ size_t new_head = 0;
+
/*
* For "normal" flushes, the task vector should be empty, so this is
* all that will happen. Piggybacking internal reconstructions off
@@ -456,48 +456,26 @@ private:
*/
if (args->priority == ReconstructionPriority::FLUSH) {
fprintf(stderr, "[I] Running flush (%ld)\n", args->version->get_id());
- assert(extension->m_flush_in_progress.load());
+
/* we first construct a shard from the buffer */
auto buffview = args->version->get_buffer();
assert(buffview.get_tail() != buffview.get_head());
- size_t new_head = buffview.get_tail();
-
- auto new_shard = std::make_shared<ShardType>(std::move(buffview));
-
-
- /*
- * Flushes already know their version id. To avoid needing to
- * do any update reconciliation between structures, they wait
- * until the version directly preceeding them has been installed,
- * and only then take a copy of the structure.
- */
- extension->await_version(args->version->get_id() - 1);
+ new_head = buffview.get_tail();
- if (extension->m_config.recon_maint_disabled) {
- assert(args->version->get_mutable_structure());
- args->version->get_mutable_structure()->append_l0(std::move(new_shard), args->version->get_id());
- } else {
- assert(!args->version->get_mutable_structure());
- auto structure = std::unique_ptr<StructureType>(extension->get_active_version()->get_structure()->copy());
+ reconstruction_results<ShardType> flush_recon;
+ flush_recon.target_level = 0;
+ flush_recon.new_shard = std::make_shared<ShardType>(std::move(buffview));
- /* add the newly created shard to the structure copy */
- structure->append_l0(std::move(new_shard), args->version->get_id());
-
- /* set this version's structure to the newly created one */
- args->version->set_structure(std::move(structure));
- }
-
- args->version->advance_buffer_head(new_head);
+ reconstructions.push_back(flush_recon);
} else {
fprintf(stderr, "[I] Running background reconstruction\n");
}
/* perform all of the reconstructions */
- StructureType *structure = args->version->get_mutable_structure();
+ auto structure = args->version->get_structure();
+
for (size_t i = 0; i < args->tasks.size(); i++) {
- if (structure->perform_reconstruction(args->tasks[i])) {
- extension->m_lock_mngr.add_lock();
- }
+ reconstructions.emplace_back(structure->perform_reconstruction(args->tasks[i]));
}
/*
@@ -508,6 +486,20 @@ private:
args->version->set_id(extension->m_version_counter.fetch_add(1));
fprintf(stderr, "[I] Reconstruction version assigned (%ld)\n", args->version->get_id());
}
+
+ /* wait for our opportunity to install the updates */
+ extension->await_version(args->version->get_id() - 1);
+
+ /* get a fresh copy of the structure and apply our updates */
+ args->version->set_structure(std::move(std::unique_ptr<StructureType>(extension->get_active_version()->get_structure()->copy())));
+
+ for (auto recon : reconstructions) {
+ auto grow = args->version->get_mutable_structure()->append_shard(recon.new_shard, args->version->get_id(), recon.target_level);
+ args->version->get_mutable_structure()->delete_shards(recon.source_shards);
+ if (grow) {
+ extension->m_lock_mngr.add_lock();
+ }
+ }
/* advance the index to the newly finished version */
extension->install_new_version(args->version, args->initial_version);
@@ -526,10 +518,16 @@ private:
extension->m_lock_mngr.release_lock(level);
}
}
-
+
if (args->priority == ReconstructionPriority::FLUSH) {
- extension->m_flush_in_progress.store(false);
+ /* advance the buffer head for a flush */
+ args->version->advance_buffer_head(new_head);
+
+ extension->m_lock_mngr.release_buffer_lock();
+
fprintf(stderr, "[I] Completed flush (%ld)\n", args->version->get_id());
+ extension->print_structure();
+ fflush(stdout);
} else {
fprintf(stderr, "[I] Completed background reconstruction (%ld)\n", args->version->get_id());
extension->print_structure();
@@ -638,29 +636,18 @@ private:
assert(new_version->get_structure());
assert(new_version->get_id() != INVALID_VERSION);
- fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id());
+ fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id());
+
/* wait until our turn to install the new version */
await_version(new_version->get_id() - 1);
- auto old = get_active_version();
-
- new_version->merge_changes_from(old.get(), old_active_version_id);
- new_version->update_shard_version(new_version->get_id());
-
+ auto lk = std::unique_lock(m_version_advance_mtx);
/*
* Only one version can have a given number, so we are safe to
* directly assign here--nobody else is going to change it out from
* under us.
*/
m_active_version.store(new_version);
-
- /*
- * My understanding is that you don't *really* need this mutex for
- * safety in modern C++ when sending the signal. But I'll grab it
- * anyway out of an abundance of caution. I doubt this will be a
- * major bottleneck.
- */
- auto lk = std::unique_lock(m_version_advance_mtx);
m_version_advance_cv.notify_all();
fprintf(stderr, "[I] Installed version %ld\n", new_version->get_id());
@@ -687,14 +674,11 @@ private:
void schedule_flush() {
begin_reconstruction_scheduling();
- bool old = m_flush_in_progress.load();
- if (old || !m_flush_in_progress.compare_exchange_strong(old, true)) {
+ if (!m_lock_mngr.take_buffer_lock()) {
end_reconstruction_scheduling();
- m_version_advance_cv.notify_all();
return;
}
-
/*
* for "legacy" policies, without background reconstruction, we need
* a valid structure object as part of the version prior to determining
diff --git a/include/framework/scheduling/LockManager.h b/include/framework/scheduling/LockManager.h
index 91ed778..fcc79d1 100644
--- a/include/framework/scheduling/LockManager.h
+++ b/include/framework/scheduling/LockManager.h
@@ -46,7 +46,21 @@ public:
return false;
}
+ bool take_buffer_lock() {
+ bool old = m_buffer_lk.load();
+ if (!old) {
+ return m_buffer_lk.compare_exchange_strong(old, true);
+ }
+
+ return false;
+ }
+
+ void release_buffer_lock() {
+ m_buffer_lk.store(false);
+ }
+
private:
std::deque<std::atomic<bool>> m_lks;
+ std::atomic<bool> m_buffer_lk;
};
}
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index fa713af..521e68b 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -55,6 +55,8 @@ public:
new_struct->m_levels.push_back(m_levels[i]->clone());
}
+ new_struct->m_deleted_shards = m_deleted_shards;
+
return new_struct;
}
@@ -159,65 +161,29 @@ public:
return cnt;
}
+
/*
* Perform the reconstruction described by task. If the resulting
* reconstruction grows the structure (i.e., adds a level), returns
* true. Otherwise, returns false.
*/
- inline bool perform_reconstruction(ReconstructionTask task, size_t version=0) {
- /* perform the reconstruction itself */
+ inline reconstruction_results<ShardType> perform_reconstruction(ReconstructionTask task) const {
+ reconstruction_results<ShardType> result;
+ result.target_level = task.target;
+
std::vector<const ShardType *> shards;
for (ShardID shid : task.sources) {
- assert(shid.level_idx <= (level_index) m_levels.size());
+ assert(shid.level_idx < (level_index) m_levels.size());
assert(shid.shard_idx >= -1);
- if (shid.level_idx == (level_index) m_levels.size()) {
- continue;
- }
-
- /* if unspecified, push all shards into the vector */
- if (shid.shard_idx == all_shards_idx) {
- for (size_t i = 0; i < m_levels[shid.level_idx]->get_shard_count();
- i++) {
- if (m_levels[shid.level_idx]->get_shard(i)) {
- shards.push_back(m_levels[shid.level_idx]->get_shard(i));
- }
- }
- } else {
- shards.push_back(m_levels[shid.level_idx]->get_shard(shid.shard_idx));
- }
+ auto raw_shard_ptr = m_levels[shid.level_idx]->get_shard(shid.shard_idx);
+ shards.push_back(raw_shard_ptr);
+ result.source_shards.emplace_back(shid.level_idx, raw_shard_ptr);
}
- auto new_shard = new ShardType(shards);
+ result.new_shard = std::make_shared<ShardType>(shards);
- /*
- * Remove all of the shards processed by the operation
- */
- for (ShardID shid : task.sources) {
- if (shid.level_idx == (level_index) m_levels.size()) {
- continue;
- } else if (shid.shard_idx == all_shards_idx) {
- m_levels[shid.level_idx]->truncate();
- } else if (shid != buffer_shid) {
- m_levels[shid.level_idx]->delete_shard(shid.shard_idx);
- }
- }
-
- // fprintf(stderr, "Target: %ld\tLevels:%ld\n", task.target, m_levels.size());
-
- /*
- * Append the new shard to the target level
- */
- if (task.target < (level_index)m_levels.size()) {
- m_levels[task.target]->append(std::shared_ptr<ShardType>(new_shard), version);
- return false;
- // fprintf(stderr, "append (no growth)\n");
- } else { /* grow the structure if needed */
- m_levels.push_back(std::make_shared<InternalLevel<ShardType, QueryType>>(task.target));
- m_levels[task.target]->append(std::shared_ptr<ShardType>(new_shard), version);
- return true;
- // fprintf(stderr, "grow and append\n");
- }
+ return result;
}
std::vector<typename QueryType::LocalQuery *>
@@ -237,8 +203,36 @@ public:
return m_levels[0]->get_shard_count();
}
- void append_l0(std::shared_ptr<ShardType> shard, size_t version) {
- m_levels[0]->append(shard, version);
+ bool append_shard(std::shared_ptr<ShardType> shard, size_t version, size_t level) {
+ assert(level <= m_levels.size());
+ auto rc = false;
+
+ if (level == m_levels.size()) {
+ /* grow the structure */
+ m_levels.push_back(std::make_shared<InternalLevel<ShardType, QueryType>>(level));
+ rc = true;
+ }
+
+ m_levels[level]->append(shard, version);
+
+ return rc;
+ }
+
+ void delete_shards(std::vector<std::pair<level_index, const ShardType*>> shards) {
+ for (size_t i=0; i<shards.size(); i++) {
+ assert(shards[i].first < (level_index) m_levels.size());
+ ssize_t shard_idx = -1;
+ for (size_t j=0; j<m_levels[shards[i].first]->get_shard_count(); j++) {
+ if (m_levels[shards[i].first]->get_shard_ptr(j).first.get() == shards[i].second) {
+ shard_idx = j;
+ break;
+ }
+ }
+
+ if (shard_idx != -1) {
+ m_levels[shards[i].first]->delete_shard(shard_idx);
+ }
+ }
}
LevelVector const &get_level_vector() const { return m_levels; }
@@ -269,13 +263,17 @@ public:
return ts_prop <= (long double) max_delete_prop;
}
- void print_structure() const {
+ void print_structure(bool debug=false) const {
for (size_t i=0; i<m_levels.size(); i++) {
- fprintf(stdout, "[%ld]:\t", i);
+ if (debug) {
+ fprintf(stdout, "[D] [%ld]:\t", i);
+ } else {
+ fprintf(stdout, "[%ld]:\t", i);
+ }
if (m_levels[i]) {
for (size_t j=0; j<m_levels[i]->get_shard_count(); j++) {
- fprintf(stdout, "(%ld: %ld) ", j, m_levels[i]->get_shard(j)->get_record_count());
+ fprintf(stdout, "(%ld, %ld, %p: %ld) ", j, m_levels[i]->get_shard_ptr(j).second, m_levels[i]->get_shard_ptr(j).first.get(), m_levels[i]->get_shard(j)->get_record_count());
}
} else {
fprintf(stdout, "[Empty]");
@@ -285,37 +283,9 @@ public:
}
}
-
- void merge_structure(const ExtensionStructure* old_structure, size_t version_id = 0) {
- assert(version_id > 0);
-
- for (size_t i=0; i<old_structure->m_levels.size(); i++) {
- if (m_levels.size() <= i) {
- m_levels.push_back(old_structure->m_levels[i]);
- } else {
- for (size_t j=0; j<old_structure->m_levels[i]->get_shard_count(); j++) {
- if (old_structure->m_levels[i]->get_shard_version(j) > version_id) {
- m_levels[i]->append(old_structure->m_levels[i]->get_shard_ptr(j));
- }
- }
- }
- }
- }
-
- void update_shard_version(size_t version) {
- assert(version != 0);
-
- for (size_t i=0; i<m_levels.size(); i++) {
- for (size_t j=0; j<m_levels[i]->get_shard_count(); j++) {
- if (m_levels[i]->get_shard_version(j) == 0) {
- m_levels[i]->set_shard_version(j, version);
- }
- }
- }
- }
-
private:
LevelVector m_levels;
+ std::vector<ShardType *> m_deleted_shards;
};
} // namespace de
diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h
index 37b2b40..7e8e87d 100644
--- a/include/framework/structure/InternalLevel.h
+++ b/include/framework/structure/InternalLevel.h
@@ -198,8 +198,12 @@ public:
void truncate() { m_shards.erase(m_shards.begin(), m_shards.end()); }
- void delete_shard(shard_index shard) {
+ void delete_shard(shard_index shard, bool log_delete=true) {
+ size_t before = m_shards.size();
+ fprintf(stderr, "[D]\tReconstruction deleting shard %ld %p\n", shard, m_shards[shard].first.get());
m_shards.erase(m_shards.begin() + shard);
+ size_t after = m_shards.size();
+ assert( before > after);
}
void append(std::shared_ptr<ShardType> shard, size_t version=0) {
diff --git a/include/util/types.h b/include/util/types.h
index 084bf4b..6e8fd69 100644
--- a/include/util/types.h
+++ b/include/util/types.h
@@ -21,6 +21,7 @@
#include <cstdint>
#include <cstdlib>
#include <vector>
+#include <memory>
namespace de {
@@ -81,6 +82,14 @@ enum class ReconstructionType {
Compact /* the merging of shards on one level */
};
+
+template <typename ShardType>
+struct reconstruction_results {
+ std::shared_ptr<ShardType> new_shard;
+ std::vector<std::pair<level_index, const ShardType *>> source_shards;
+ size_t target_level;
+};
+
typedef struct ReconstructionTask {
std::vector<ShardID> sources = {};
level_index target = 0;