diff options
| -rw-r--r-- | CMakeLists.txt | 2 | ||||
| -rw-r--r-- | benchmarks/tail-latency/standard_latency_dist.cpp | 13 | ||||
| -rw-r--r-- | include/framework/DynamicExtension.h | 71 | ||||
| -rw-r--r-- | include/framework/reconstruction/BackgroundTieringPolicy.h | 3 | ||||
| -rw-r--r-- | include/framework/scheduling/Version.h | 42 | ||||
| -rw-r--r-- | include/framework/structure/InternalLevel.h | 1 | ||||
| -rw-r--r-- | include/framework/structure/MutableBuffer.h | 15 |
7 files changed, 49 insertions, 98 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 106b91a..f0d8a86 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -9,7 +9,7 @@ set(CMAKE_CXX_STANDARD_REQUIRED True) set(namespace "de") project("Practical Dynamic Extension" VERSION 0.1.0) -set(debug true) +set(debug false) set(tests True) set(bench false) set(vldb_bench false) diff --git a/benchmarks/tail-latency/standard_latency_dist.cpp b/benchmarks/tail-latency/standard_latency_dist.cpp index 5c96a73..7cce48c 100644 --- a/benchmarks/tail-latency/standard_latency_dist.cpp +++ b/benchmarks/tail-latency/standard_latency_dist.cpp @@ -70,13 +70,6 @@ int main(int argc, char **argv) { while (!extension->insert(data[j])) { usleep(1); } - if (j + 1 != extension->get_record_count()) { - fprintf(stderr, "[E] %ld\t%ld\n", j+1, extension->get_record_count()); - extension->print_structure(); - fflush(stderr); - fflush(stdout); - } - assert(j+1 == extension->get_record_count()); } extension->await_version(); @@ -86,14 +79,8 @@ int main(int argc, char **argv) { TIMER_START(); for (size_t j=warmup; j<data.size(); j++) { while (!extension->insert(data[j])) { - fprintf(stderr, "[B] %ld %ld\n", j, extension->get_record_count()); usleep(1); } - - if (j + 1 != extension->get_record_count()) { - fprintf(stderr, "[E] %ld\t%ld\n", j+1, extension->get_record_count()); - } - assert(j+1 == extension->get_record_count()); } TIMER_STOP(); diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 31eb138..e97dfab 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -440,8 +440,8 @@ private: extension->SetThreadAffinity(); - std::vector<reconstruction_results<ShardType>> reconstructions; size_t new_head = 0; + std::vector<reconstruction_results<ShardType>> reconstructions; /* * For "normal" flushes, the task vector should be empty, so this is @@ -455,7 +455,7 @@ private: * this code will be bypassed in that case. */ if (args->priority == ReconstructionPriority::FLUSH) { - fprintf(stderr, "[I] Running flush (%ld)\n", args->version->get_id()); + // fprintf(stderr, "[I] Running flush (%ld)\n", args->version->get_id()); /* we first construct a shard from the buffer */ auto buffview = args->version->get_buffer(); @@ -467,8 +467,13 @@ private: flush_recon.new_shard = std::make_shared<ShardType>(std::move(buffview)); reconstructions.push_back(flush_recon); + + // fprintf(stderr, "[I] Buffer Head:\t%ld\n", new_head); + + /* advance the buffer head for a flush */ + args->version->advance_buffer_head(new_head); } else { - fprintf(stderr, "[I] Running background reconstruction\n"); + // fprintf(stderr, "[I] Running background reconstruction\n"); } /* perform all of the reconstructions */ @@ -484,7 +489,7 @@ private: */ if (args->version->get_id() == INVALID_VERSION) { args->version->set_id(extension->m_version_counter.fetch_add(1)); - fprintf(stderr, "[I] Reconstruction version assigned (%ld)\n", args->version->get_id()); + // fprintf(stderr, "[I] Reconstruction version assigned (%ld)\n", args->version->get_id()); } /* wait for our opportunity to install the updates */ @@ -501,6 +506,11 @@ private: } } + /* grab the latest buffer head */ + if (args->priority == ReconstructionPriority::MAINT) { + args->version->set_head(extension->get_active_version()->get_head()); + } + /* advance the index to the newly finished version */ extension->install_new_version(args->version, args->initial_version); @@ -514,24 +524,13 @@ private: } for (auto level : locked_levels) { - fprintf(stderr, "\t[I] releasing lock on %ld (%ld)\n", level, args->version->get_id()); + // fprintf(stderr, "\t[I] releasing lock on %ld (%ld)\n", level, args->version->get_id()); extension->m_lock_mngr.release_lock(level); } } if (args->priority == ReconstructionPriority::FLUSH) { - /* advance the buffer head for a flush */ - args->version->advance_buffer_head(new_head); - extension->m_lock_mngr.release_buffer_lock(); - - fprintf(stderr, "[I] Completed flush (%ld)\n", args->version->get_id()); - extension->print_structure(); - fflush(stdout); - } else { - fprintf(stderr, "[I] Completed background reconstruction (%ld)\n", args->version->get_id()); - extension->print_structure(); - fflush(stdout); } /* manually delete the argument object */ @@ -609,7 +608,7 @@ private: */ version_ptr create_version_flush(std::unique_ptr<StructureType> structure) { size_t version_id = m_version_counter.fetch_add(1); - fprintf(stderr, "[I] Flush version assigned (%ld)\n", version_id); + // fprintf(stderr, "[I] Flush version assigned (%ld)\n", version_id); auto active_version = get_active_version(); std::shared_ptr<VersionType> new_version = std::make_shared<VersionType>(version_id, std::move(structure), m_buffer.get(), active_version->get_buffer().get_head()); @@ -636,7 +635,7 @@ private: assert(new_version->get_structure()); assert(new_version->get_id() != INVALID_VERSION); - fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id()); + // fprintf(stderr, "[I] Installing version %ld\n", new_version->get_id()); /* wait until our turn to install the new version */ await_version(new_version->get_id() - 1); @@ -650,7 +649,7 @@ private: m_active_version.store(new_version); m_version_advance_cv.notify_all(); - fprintf(stderr, "[I] Installed version %ld\n", new_version->get_id()); + // fprintf(stderr, "[I] Installed version %ld\n", new_version->get_id()); } StructureType *create_scratch_structure() { @@ -679,40 +678,22 @@ private: return; } - /* - * for "legacy" policies, without background reconstruction, we need - * a valid structure object as part of the version prior to determining - * the flush operations. Otherwise, the flush operation should only ever - * do the flush itself, and so no structure is needed at this point - * - * delaying obtaining the structure until later, when maintenance - * reconstructions are enabled, removes the need for flushes to reconcile - * differences in structure between when they are scheduled and when they - * finish - */ - std::unique_ptr<StructureType> structure = nullptr; - if (m_config.recon_maint_disabled) { - structure = std::unique_ptr<StructureType>(m_active_version.load()->get_structure()->copy()); - } - - auto new_version = create_version_flush(std::move(structure)); - - auto *args = new ReconstructionArgs<ShardType, QueryType>(); - args->version = new_version; - args->tasks = m_config.recon_policy->get_flush_tasks(args->version.get()); + args->version = create_version_flush(nullptr); + args->tasks = m_config.recon_policy->get_flush_tasks(get_active_version().get()); args->extension = this; args->priority = ReconstructionPriority::FLUSH; args->initial_version = INVALID_VERSION; - fprintf(stderr, "[I] Scheduling flush (%ld)\n", new_version->get_id()); + // fprintf(stderr, "[I] Scheduling flush (%ld)\n", args->version->get_id()); + /* * NOTE: args is deleted by the reconstruction job, so shouldn't be * freed here */ m_sched->schedule_job(reconstruction, m_buffer->get_high_watermark(), args, FLUSH); - fprintf(stderr, "[I] Finished scheduling flush (%ld)\n", new_version->get_id()); + // fprintf(stderr, "[I] Finished scheduling flush (%ld)\n", args->version->get_id()); if (m_config.recon_enable_maint_on_flush) { schedule_maint_reconstruction(false); @@ -731,13 +712,13 @@ private: begin_reconstruction_scheduling(); } - fprintf(stderr, "[I] Scheduling maintenance\n"); + // fprintf(stderr, "[I] Scheduling maintenance\n"); auto active_version = m_active_version.load(); auto reconstructions = m_config.recon_policy->get_reconstruction_tasks(active_version.get(), m_lock_mngr); if (reconstructions.size() == 0) { - fprintf(stderr, "[I] Maintenance contained no task--not scheduled\n"); + // fprintf(stderr, "[I] Maintenance contained no task--not scheduled\n"); } for (auto &recon : reconstructions) { @@ -786,7 +767,7 @@ private: if (m_buffer->is_at_high_watermark() && current_l0 > max_l0) { schedule_maint_reconstruction(true); - fprintf(stderr, "[I] Current L0: %ld\tMax L0:%ld\n", current_l0, max_l0); + // fprintf(stderr, "[I] Current L0: %ld\tMax L0:%ld\n", current_l0, max_l0); return 0; } diff --git a/include/framework/reconstruction/BackgroundTieringPolicy.h b/include/framework/reconstruction/BackgroundTieringPolicy.h index a1bbddc..5a82695 100644 --- a/include/framework/reconstruction/BackgroundTieringPolicy.h +++ b/include/framework/reconstruction/BackgroundTieringPolicy.h @@ -45,7 +45,6 @@ public: for (level_index i = target_level; i > source_level; i--) { if (lock_mngr.take_lock(i-1)) { - fprintf(stderr, "[I] Taking lock on %ld (%ld)\n", i-1, version->get_id()); ReconstructionVector recon; size_t target_reccnt = (i < (level_index)levels.size()) ? levels[i]->get_record_count() : 0; @@ -57,8 +56,6 @@ public: recon.add_reconstruction(shards, i, total_reccnt, ReconstructionType::Compact); reconstructions.push_back(recon); - } else { - fprintf(stderr, "[I] Failed to get lock on %ld (%ld)\n", i-1, version->get_id()); } } diff --git a/include/framework/scheduling/Version.h b/include/framework/scheduling/Version.h index fa677f2..4cd73ba 100644 --- a/include/framework/scheduling/Version.h +++ b/include/framework/scheduling/Version.h @@ -54,10 +54,6 @@ public: StructureType *get_mutable_structure() { return m_structure.get(); } bool set_structure(std::unique_ptr<StructureType> new_struct) { - if (m_structure) { - return false; - } - m_structure = std::move(new_struct); return true; } @@ -89,38 +85,18 @@ public: return m_buffer->advance_head(new_head); } - void merge_changes_from(Version *old, size_t version_id) { - /* - * for a maint reconstruction, the buffer head may have advanced - * during the reconstruction; we don't need to adjust the buffer - * for maintenance reconstructions, so we can simply "catch" the - * internal head index up to the current version. - */ - if (old->m_buffer_head > m_buffer_head) { - m_buffer_head = old->m_buffer_head; - } - - // FIXME: we should also ensure that we don't clobber anything - // in the event that multiple concurrent reconstructions affect - // the same levels. As it stands, two reconstructions *could* share - // source shards, resulting in some records being lost or duplicated. - // - // For the moment, I'm being careful to avoid this within the - // scheduling policy itself, and only forwarding changes to this - // version. - - /* using INVALID_VERSION disables shard reconcilliation */ - if (version_id == 0) { - return; - } + void update_shard_version(size_t version) { + m_structure->update_shard_version(version); + } - /* add any shards newer than version_id to this version */ - auto old_structure = old->get_structure(); - m_structure->merge_structure(old_structure, version_id); + size_t get_head() { + return m_buffer_head; } - void update_shard_version(size_t version) { - m_structure->update_shard_version(version); + + void set_head(size_t head) { + // fprintf(stderr, "[I] Updating buffer head of %ld to %ld\n", get_id(), head); + m_buffer_head = head; } private: diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h index 7e8e87d..6e8b67e 100644 --- a/include/framework/structure/InternalLevel.h +++ b/include/framework/structure/InternalLevel.h @@ -200,7 +200,6 @@ public: void delete_shard(shard_index shard, bool log_delete=true) { size_t before = m_shards.size(); - fprintf(stderr, "[D]\tReconstruction deleting shard %ld %p\n", shard, m_shards[shard].first.get()); m_shards.erase(m_shards.begin() + shard); size_t after = m_shards.size(); assert( before > after); diff --git a/include/framework/structure/MutableBuffer.h b/include/framework/structure/MutableBuffer.h index 0197ecd..105f0f3 100644 --- a/include/framework/structure/MutableBuffer.h +++ b/include/framework/structure/MutableBuffer.h @@ -149,13 +149,22 @@ public: /* refuse to advance head while there is an old with one references */ if (m_old_head.load().refcnt > 0) { - // fprintf(stderr, "[W]: Refusing to advance head due to remaining - // reference counts\n"); + //fprintf(stderr, "[W]: Refusing to advance head due to remaining reference counts\n"); return false; } m_active_head_advance.store(true); + if (m_old_head.load().refcnt > 0) { + //fprintf(stderr, "[W]: Refusing to advance head due to remaining reference counts [2]\n"); + m_active_head_advance.store(false); + return false; + } + + // fprintf(stderr, "[I] Advancing head pointer: %ld %ld %ld\n", m_old_head.load().head_idx, m_head.load().head_idx, new_head); + // fprintf(stderr, "[I] Refcnts: %ld %ld\n", m_old_head.load().refcnt, m_head.load().refcnt); + + buffer_head new_hd = {new_head, 0}; buffer_head cur_hd; @@ -179,6 +188,8 @@ public: buffer_head cur_hd, new_hd; bool head_acquired = false; + + //fprintf(stderr, "[I]: getting head %ld %ld %ld\n", target_head, m_old_head.load().head_idx, m_head.load().head_idx); do { if (m_old_head.load().head_idx == target_head) { cur_hd = m_old_head.load(); |