diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2025-02-04 17:47:52 -0500 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2025-02-04 17:47:52 -0500 |
| commit | d054b21a66318e096a809c9f94bc8659acfacfd8 (patch) | |
| tree | 62bc7f9e41dae257a7613e8c7e9bb080d0815dd0 | |
| parent | 7370b4d1412da47f70c54107adf498fa6be2cfc4 (diff) | |
| download | dynamic-extension-d054b21a66318e096a809c9f94bc8659acfacfd8.tar.gz | |
Fixed a bug with leveling layout policy
| -rw-r--r-- | CMakeLists.txt | 6 | ||||
| -rw-r--r-- | benchmarks/tail-latency/standard_latency_dist.cpp | 23 | ||||
| -rw-r--r-- | include/framework/DynamicExtension.h | 9 | ||||
| -rw-r--r-- | include/framework/reconstruction/LevelingPolicy.h | 2 | ||||
| -rw-r--r-- | include/framework/structure/ExtensionStructure.h | 22 | ||||
| -rw-r--r-- | include/util/types.h | 6 |
6 files changed, 55 insertions, 13 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index f423ba1..a7db1b0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -315,6 +315,12 @@ if (tail_bench) target_link_libraries(isam_buffer PUBLIC gsl pthread atomic) target_include_directories(isam_buffer PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include benchmarks/include external/psudb-common/cpp/include) target_link_options(isam_buffer PUBLIC -mcx16) + + + add_executable(vptree_buffer ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/tail-latency/vptree_buffer.cpp) + target_link_libraries(vptree_buffer PUBLIC gsl pthread atomic) + target_include_directories(vptree_buffer PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include benchmarks/include external/psudb-common/cpp/include) + target_link_options(vptree_buffer PUBLIC -mcx16) endif() if (bench) diff --git a/benchmarks/tail-latency/standard_latency_dist.cpp b/benchmarks/tail-latency/standard_latency_dist.cpp index 3929b56..cb3459f 100644 --- a/benchmarks/tail-latency/standard_latency_dist.cpp +++ b/benchmarks/tail-latency/standard_latency_dist.cpp @@ -46,12 +46,13 @@ int main(int argc, char **argv) { auto data = read_sosd_file<Rec>(d_fname, n); auto queries = read_range_queries<QP>(q_fname, .0001); - std::vector<size_t> sfs = {2, 4, 8, 16, 32, 64, 128, 256, 512, 1024}; + std::vector<size_t> sfs = {2, 4, 8}; //, 4, 8, 16, 32, 64, 128, 256, 512, 1024}; size_t buffer_size = 8000; std::vector<size_t> policies = {0, 1}; + for (auto pol: policies) { for (size_t i=0; i<sfs.size(); i++) { - auto policy = get_policy<Shard, Q>(sfs[i], buffer_size, 0); + auto policy = get_policy<Shard, Q>(sfs[i], buffer_size, pol); auto extension = new Ext(policy, buffer_size / 4, buffer_size); /* warmup structure w/ 10% of records */ @@ -66,34 +67,36 @@ int main(int argc, char **argv) { TIMER_INIT(); + TIMER_START(); for (size_t j=warmup; j<data.size(); j++) { - TIMER_START(); while (!extension->insert(data[j])) { usleep(1); } - TIMER_STOP(); - - fprintf(stdout, "I\t%ld\t%ld\n", sfs[i], TIMER_RESULT()); } + TIMER_STOP(); + + size_t insert_tput = (double) (n - warmup) / (double) (TIMER_RESULT()) * 1e9; extension->await_next_epoch(); size_t total = 0; + TIMER_START(); /* repeat the queries a bunch of times */ for (size_t l=0; l<10; l++) { for (size_t j=0; j<queries.size(); j++) { - TIMER_START(); auto q = queries[j]; auto res = extension->query(std::move(q)); total += res.get(); - TIMER_STOP(); - fprintf(stdout, "Q\t%ld\t%ld\n", sfs[i], TIMER_RESULT()); } } + TIMER_STOP(); - fprintf(stdout, "S\t%ld\t%ld\t%ld\n", sfs[i], extension->get_shard_count(), total); + size_t query_lat = (double) TIMER_RESULT() / (10*queries.size()); + + fprintf(stdout, "S\t%ld\t%ld\t%ld\t%ld\t%ld\t%ld\t%ld\n", pol, sfs[i], extension->get_height(), extension->get_shard_count(), total, insert_tput, query_lat); delete extension; } + } fflush(stderr); } diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index 5a64243..1327559 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -296,6 +296,14 @@ public: return t; } + + + void print_structure() { + auto epoch = get_active_epoch(); + epoch->get_structure()->print_structure(); + end_job(epoch); + } + /** * Create a new single Shard object containing all of the records * within the framework (buffer and shards). @@ -707,6 +715,7 @@ private: return m_buffer->append(rec, ts); } + //#ifdef _GNU_SOURCE #if 0 void SetThreadAffinity() { diff --git a/include/framework/reconstruction/LevelingPolicy.h b/include/framework/reconstruction/LevelingPolicy.h index add28ba..89aa94b 100644 --- a/include/framework/reconstruction/LevelingPolicy.h +++ b/include/framework/reconstruction/LevelingPolicy.h @@ -54,7 +54,7 @@ public: ReconstructionTask get_flush_task(const Epoch<ShardType, QueryType> *epoch) const override { return ReconstructionTask{ - {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Merge}; + {{buffer_shid}, {0, all_shards_idx}}, 0, m_buffer_size, ReconstructionType::Merge}; } private: diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 078c4a9..29a45e5 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -161,13 +161,15 @@ public: /* perform the reconstruction itself */ std::vector<const ShardType *> shards; for (ShardID shid : task.sources) { - assert(shid.level_idx < (level_index) m_levels.size()); + assert(shid.level_idx <= (level_index) m_levels.size()); assert(shid.shard_idx >= -1); if (shid == buffer_shid) { assert(bv); ShardType *buffer_shard = new ShardType(std::move(*bv)); shards.push_back(buffer_shard); + } else if (shid.level_idx == (level_index) m_levels.size()) { + continue; } else if (shid.shard_idx == all_shards_idx) { /* if unspecified, push all shards into the vector */ for (size_t i = 0; i < m_levels[shid.level_idx]->get_shard_count(); i++) { @@ -188,6 +190,8 @@ public: for (ShardID shid : task.sources) { if (shid == buffer_shid) { continue; + } else if (shid.level_idx == (level_index) m_levels.size()) { + continue; } else if (shid.shard_idx == all_shards_idx) { m_levels[shid.level_idx]->truncate(); } else { @@ -315,6 +319,22 @@ public: return ts_prop <= (long double) max_delete_prop; } + void print_structure() const { + for (size_t i=0; i<m_levels.size(); i++) { + fprintf(stdout, "[%ld]:\t", i); + + if (m_levels[i]) { + for (size_t j=0; j<m_levels[i]->get_shard_count(); j++) { + fprintf(stdout, "(%ld: %ld) ", j, m_levels[i]->get_shard(j)->get_record_count()); + } + } else { + fprintf(stdout, "[Empty]"); + } + + fprintf(stdout, "\n"); + } + } + private: std::atomic<size_t> m_refcnt; LevelVector m_levels; diff --git a/include/util/types.h b/include/util/types.h index e67c486..72b3d29 100644 --- a/include/util/types.h +++ b/include/util/types.h @@ -111,7 +111,11 @@ public: void add_reconstruction(level_index source, level_index target, size_t reccnt, ReconstructionType type) { - m_tasks.push_back({{{source, all_shards_idx}}, target, reccnt}); + if (type == ReconstructionType::Merge) { + m_tasks.push_back({{{source, all_shards_idx}, {target, all_shards_idx}}, target, reccnt}); + } else { + m_tasks.push_back({{{source, all_shards_idx}}, target, reccnt}); + } total_reccnt += reccnt; } |