From 77589d4cc82b766d2cf16294fab98a57f6579cb4 Mon Sep 17 00:00:00 2001 From: Douglas Rumbaugh Date: Thu, 16 Jan 2025 13:18:37 -0500 Subject: Additional layout policies + more flexibility in buffer flushing --- CMakeLists.txt | 9 +- benchmarks/include/standard_benchmarks.h | 5 +- benchmarks/tail-latency/config_sweep.cpp | 49 +++++----- benchmarks/tail-latency/fixed_shard_count.cpp | 101 +++++++++++++++++++++ benchmarks/vldb/ts_bench.cpp | 2 + include/framework/DynamicExtension.h | 13 +++ .../reconstruction/FixedShardCountPolicy.h | 66 ++++++++++++++ include/framework/reconstruction/FloodL0Policy.h | 65 +++++++++++++ include/framework/structure/ExtensionStructure.h | 95 +++++++++++++------ include/framework/structure/InternalLevel.h | 19 ++++ 10 files changed, 364 insertions(+), 60 deletions(-) create mode 100644 benchmarks/tail-latency/fixed_shard_count.cpp create mode 100644 include/framework/reconstruction/FixedShardCountPolicy.h create mode 100644 include/framework/reconstruction/FloodL0Policy.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 786f765..c332448 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,7 +1,7 @@ cmake_minimum_required(VERSION 3.22) -set(CMAKE_C_COMPILER clang) -set(CMAKE_CXX_COMPILER clang++) +set(CMAKE_C_COMPILER gcc) +set(CMAKE_CXX_COMPILER g++) set(CMAKE_CXX_STANDARD 20) set(CMAKE_CXX_STANDARD_REQUIRED True) @@ -290,6 +290,11 @@ if (tail_bench) target_link_libraries(config_sweep PUBLIC gsl pthread atomic) target_include_directories(config_sweep PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include benchmarks/include external/psudb-common/cpp/include) target_link_options(config_sweep PUBLIC -mcx16) + + add_executable(fixed_shard_count ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/tail-latency/fixed_shard_count.cpp) + target_link_libraries(fixed_shard_count PUBLIC gsl pthread atomic) + target_include_directories(fixed_shard_count PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include benchmarks/include external/psudb-common/cpp/include) + target_link_options(fixed_shard_count PUBLIC -mcx16) endif() if (bench) diff --git a/benchmarks/include/standard_benchmarks.h b/benchmarks/include/standard_benchmarks.h index dfa6513..76423ab 100644 --- a/benchmarks/include/standard_benchmarks.h +++ b/benchmarks/include/standard_benchmarks.h @@ -22,6 +22,7 @@ #include "framework/reconstruction/LevelingPolicy.h" #include "framework/reconstruction/TieringPolicy.h" #include "framework/reconstruction/BSMPolicy.h" +#include "framework/reconstruction/FloodL0Policy.h" constexpr double delete_proportion = 0.05; static size_t g_deleted_records = 0; @@ -38,7 +39,9 @@ de::ReconstructionPolicy *get_policy(size_t scale_factor, size_t buffer_si recon = new de::LevelingPolicy(scale_factor, buffer_size); } else if (policy == 2) { recon = new de::BSMPolicy(buffer_size); - } + } else if (policy == 3) { + recon = new de::FloodL0Policy(buffer_size); + } return recon; } diff --git a/benchmarks/tail-latency/config_sweep.cpp b/benchmarks/tail-latency/config_sweep.cpp index ef84aa7..d973ee5 100644 --- a/benchmarks/tail-latency/config_sweep.cpp +++ b/benchmarks/tail-latency/config_sweep.cpp @@ -2,6 +2,7 @@ * */ +#include "framework/scheduling/FIFOScheduler.h" #define ENABLE_TIMER #define TS_TEST @@ -22,7 +23,7 @@ typedef de::Record Rec; typedef de::TrieSpline Shard; typedef de::rc::Query Q; -typedef de::DynamicExtension Ext; +typedef de::DynamicExtension Ext; typedef Q::Parameters QP; void usage(char *progname) { @@ -44,15 +45,15 @@ int main(int argc, char **argv) { auto data = read_sosd_file(d_fname, n); auto queries = read_range_queries(q_fname, .0001); - std::vector policies = {0, 1}; - std::vector buffers = {4000, 8000, 12000, 16000, 20000}; - std::vector sfs = {2, 4, 6, 8, 12}; + std::vector policies = {3}; + std::vector buffers = {8000, 16000, 32000}; + std::vector sfs = {8}; for (size_t l=0; l(sfs[k], buffers[j], policies[l]); - auto extension = new Ext(policy, 8000); + auto extension = new Ext(policy, buffers[j]/4, buffers[j]); /* warmup structure w/ 10% of records */ size_t warmup = .1 * n; @@ -73,8 +74,23 @@ int main(int argc, char **argv) { } TIMER_STOP(); + //fprintf(stdout, "%ld\t%ld\t%d\t%ld\n", sfs[k], buffers[j], policies[l], TIMER_RESULT()); + } + + extension->await_next_epoch(); + + /* repeat the queries a bunch of times */ + for (size_t l=0; l<10; l++) { + for (size_t i=0; iquery(std::move(q)); + res.get(); + TIMER_STOP(); + fprintf(stdout, "%ld\t%ld\t%d\t%ld\n", sfs[k], buffers[j], policies[l], TIMER_RESULT()); } + } QP p = {0, 10000}; @@ -85,29 +101,6 @@ int main(int argc, char **argv) { delete extension; }}} - /* - std::vector query_latencies; - query_latencies.reserve(queries.size()); - for (size_t i=warmup; iquery(std::move(q)); - res.get(); - TIMER_STOP(); - - query_latencies.push_back(TIMER_RESULT()); - } - - printf("here\n"); - - for (size_t i=0; i + +#include "framework/DynamicExtension.h" +#include "framework/scheduling/FIFOScheduler.h" +#include "shard/TrieSpline.h" +#include "query/rangecount.h" +#include "framework/interface/Record.h" +#include "file_util.h" +#include "standard_benchmarks.h" + +#include "framework/reconstruction/FixedShardCountPolicy.h" + +#include + +#include "psu-util/timer.h" + + +typedef de::Record Rec; +typedef de::TrieSpline Shard; +typedef de::rc::Query Q; +typedef de::DynamicExtension Ext; +typedef Q::Parameters QP; + +void usage(char *progname) { + fprintf(stderr, "%s reccnt datafile queryfile\n", progname); +} + +int main(int argc, char **argv) { + + if (argc < 4) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + + size_t n = atol(argv[1]); + std::string d_fname = std::string(argv[2]); + std::string q_fname = std::string(argv[3]); + + + auto data = read_sosd_file(d_fname, n); + auto queries = read_range_queries(q_fname, .0001); + + std::vector shard_counts = {4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 4096*2}; + size_t buffer_size = 8000; + + for (size_t i=0; i(buffer_size, shard_counts[i], n); + auto extension = new Ext(policy, buffer_size / 4, buffer_size); + + /* warmup structure w/ 10% of records */ + size_t warmup = .1 * n; + for (size_t i=0; iinsert(data[i])) { + usleep(1); + } + } + + extension->await_next_epoch(); + + TIMER_INIT(); + + TIMER_START(); + for (size_t i=warmup; iinsert(data[i])) { + usleep(1); + } + } + TIMER_STOP(); + + auto insert_tput = (size_t) ((double) (n - warmup) / (double) TIMER_RESULT() *1.0e9); + + extension->await_next_epoch(); + + /* repeat the queries a bunch of times */ + TIMER_START(); + for (size_t l=0; l<10; l++) { + for (size_t i=0; iquery(std::move(q)); + res.get(); + } + } + TIMER_STOP(); + + auto query_lat = TIMER_RESULT() / 10*queries.size(); + + fprintf(stdout, "%ld\t%ld\t%ld\t%ld\n", shard_counts[i], extension->get_shard_count(), insert_tput, query_lat); + + delete extension; + } + + fflush(stderr); +} + diff --git a/benchmarks/vldb/ts_bench.cpp b/benchmarks/vldb/ts_bench.cpp index 8b0ee35..1182376 100644 --- a/benchmarks/vldb/ts_bench.cpp +++ b/benchmarks/vldb/ts_bench.cpp @@ -89,6 +89,8 @@ int main(int argc, char **argv) { fprintf(stdout, "%ld\t%ld\t%ld\t%ld\t%ld\n", insert_throughput, query_latency, ext_size, static_latency, static_size); + fprintf(stdout, "%ld\n", extension->get_height()); + gsl_rng_free(rng); delete extension; fflush(stderr); diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index c35bb93..5a64243 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -248,6 +248,19 @@ public: return t; } + /** + * Get the number of non-empty shards within the index. + * + * @return The number of non-empty shards within the index + */ + size_t get_shard_count() { + auto epoch = get_active_epoch(); + auto s = epoch->get_structure()->get_shard_count(); + end_job(epoch); + + return s; + } + /** * Get the number of bytes of memory allocated across the framework for * storing records and associated index information (i.e., internal diff --git a/include/framework/reconstruction/FixedShardCountPolicy.h b/include/framework/reconstruction/FixedShardCountPolicy.h new file mode 100644 index 0000000..ec8e4e6 --- /dev/null +++ b/include/framework/reconstruction/FixedShardCountPolicy.h @@ -0,0 +1,66 @@ +/* + * include/framework/reconstruction/FixedShardCountPolicy.h + * + * Copyright (C) 2023-2024 Douglas B. Rumbaugh + * Dong Xie + * + * Distributed under the Modified BSD License. + * + */ +#pragma once +#include + +#include "framework/reconstruction/ReconstructionPolicy.h" +#include "framework/scheduling/Epoch.h" +#include "util/types.h" + +namespace de { +template QueryType> +class FixedShardCountPolicy : public ReconstructionPolicy { + typedef std::vector>> + LevelVector; + +public: + FixedShardCountPolicy(size_t buffer_size, size_t shard_count, size_t max_record_count) + : m_buffer_size(buffer_size), m_shard_count(shard_count), m_max_reccnt(max_record_count) {} + + ReconstructionVector + get_reconstruction_tasks(const Epoch *epoch, + size_t incoming_reccnt) const override { + ReconstructionVector reconstructions; + return reconstructions; + + } + + ReconstructionTask + get_flush_task(const Epoch *epoch) const override { + + auto levels = epoch->get_structure()->get_level_vector(); + + if (levels.size() == 0) { + return ReconstructionTask{ + {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append}; + } + + ShardID last_shid = {0, (shard_index) (levels[0]->get_shard_count() - 1)}; + + if (levels[0]->get_shard(last_shid.shard_idx)->get_record_count() + m_buffer_size <= capacity()) { + return ReconstructionTask{ + {{buffer_shid, last_shid}}, 0, m_buffer_size, ReconstructionType::Merge}; + } else { + return ReconstructionTask{ + {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append}; + } + } + +private: + inline size_t capacity() const { + double bps = (double) m_max_reccnt / (double) m_buffer_size / m_shard_count; + return ceil(bps) * m_buffer_size; + } + + size_t m_buffer_size; + size_t m_shard_count; + size_t m_max_reccnt; +}; +} // namespace de diff --git a/include/framework/reconstruction/FloodL0Policy.h b/include/framework/reconstruction/FloodL0Policy.h new file mode 100644 index 0000000..da4c297 --- /dev/null +++ b/include/framework/reconstruction/FloodL0Policy.h @@ -0,0 +1,65 @@ +/* + * include/framework/reconstruction/FloodL0Policy.h + * + * Copyright (C) 2023-2024 Douglas B. Rumbaugh + * Dong Xie + * + * Distributed under the Modified BSD License. + * + */ +#pragma once +#include + +#include "framework/reconstruction/ReconstructionPolicy.h" +#include "framework/scheduling/Epoch.h" +#include "util/types.h" + +namespace de { +template QueryType> +class FloodL0Policy : public ReconstructionPolicy { + typedef std::vector>> + LevelVector; + +public: + FloodL0Policy(size_t buffer_size) : m_buffer_size(buffer_size) {} + + ReconstructionVector + get_reconstruction_tasks(const Epoch *epoch, + size_t incoming_reccnt) const override { + + ReconstructionVector reconstructions; + return reconstructions; + + } + + ReconstructionTask + get_flush_task(const Epoch *epoch) const override { + return ReconstructionTask{ + {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append}; + } + +private: + level_index find_reconstruction_target(LevelVector &levels) const { + level_index target_level = invalid_level_idx; + size_t incoming_records = m_buffer_size; + + for (level_index i = 0; i < (level_index)levels.size(); i++) { + if (levels[i]->get_record_count() + incoming_records < capacity(i)) { + target_level = i; + break; + } + + incoming_records = levels[i]->get_record_count(); + } + + return target_level; + } + + inline size_t capacity(level_index level) const { + return m_buffer_size * pow(m_scale_factor, level + 1); + } + + size_t m_scale_factor; + size_t m_buffer_size; +}; +} // namespace de diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 3bb8a0b..078c4a9 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -145,17 +145,32 @@ public: return cnt; } - inline void perform_reconstruction(ReconstructionTask task) { + size_t get_shard_count() const { + size_t cnt = 0; + for (size_t i = 0; i < m_levels.size(); i++) { + if (m_levels[i]) { + cnt += m_levels[i]->get_nonempty_shard_count(); + } + } + + return cnt; + } + + inline void perform_reconstruction(ReconstructionTask task, + BuffView *bv=nullptr) { /* perform the reconstruction itself */ std::vector shards; for (ShardID shid : task.sources) { assert(shid.level_idx < (level_index) m_levels.size()); assert(shid.shard_idx >= -1); - /* if unspecified, push all shards into the vector */ - if (shid.shard_idx == all_shards_idx) { - for (size_t i = 0; i < m_levels[shid.level_idx]->get_shard_count(); - i++) { + if (shid == buffer_shid) { + assert(bv); + ShardType *buffer_shard = new ShardType(std::move(*bv)); + shards.push_back(buffer_shard); + } else if (shid.shard_idx == all_shards_idx) { + /* if unspecified, push all shards into the vector */ + for (size_t i = 0; i < m_levels[shid.level_idx]->get_shard_count(); i++) { if (m_levels[shid.level_idx]->get_shard(i)) { shards.push_back(m_levels[shid.level_idx]->get_shard(i)); } @@ -171,7 +186,9 @@ public: * Remove all of the shards processed by the operation */ for (ShardID shid : task.sources) { - if (shid.shard_idx == all_shards_idx) { + if (shid == buffer_shid) { + continue; + } else if (shid.shard_idx == all_shards_idx) { m_levels[shid.level_idx]->truncate(); } else { m_levels[shid.level_idx]->delete_shard(shid.shard_idx); @@ -199,29 +216,49 @@ public: * like that, we'll leave this as low priority. */ - /* insert the first level, if needed */ - if (m_levels.size() == 0) { - m_levels.push_back( - std::make_shared>(0)); - } - - ShardType *buffer_shard = new ShardType(std::move(buffer)); - if (task.type == ReconstructionType::Append || m_levels[0]->get_shard_count() == 0) { - m_levels[0]->append(std::shared_ptr(buffer_shard)); - } else { - std::vector shards; - for (level_index i = 0; i < (level_index)m_levels[0]->get_shard_count(); - i++) { - if (m_levels[0]->get_shard(i)) { - shards.push_back(m_levels[0]->get_shard(i)); - } - - shards.push_back(buffer_shard); - ShardType *new_shard = new ShardType(shards); - m_levels[0]->truncate(); - m_levels[0]->append(std::shared_ptr(new_shard)); - } - } + // /* insert the first level, if needed */ + // if (m_levels.size() == 0) { + // m_levels.push_back( + // std::make_shared>(0)); + // } + + perform_reconstruction(task, &buffer); + + // ShardType *buffer_shard = new ShardType(std::move(buffer)); + // if (task.type == ReconstructionType::Append || m_levels[0]->get_shard_count() == 0) { + // m_levels[0]->append(std::shared_ptr(buffer_shard)); + // } else if (task.type == ReconstructionType::Merge) { + // std::vector shards; + // for (size_t i=0; iget_shard(shid.shard_idx)); + // } + // } + + // shards.emplace_back(buffer_shard); + // ShardType *new_shard = new ShardType(shards); + // m_levels[0]->append(std::shared_ptr(new_shard)); + // for (size_t i=0; idelete_shard(shid.shard_idx); + // } + // } + // } else { + // std::vector shards; + // for (level_index i = 0; i < (level_index)m_levels[0]->get_shard_count(); + // i++) { + // if (m_levels[0]->get_shard(i)) { + // shards.push_back(m_levels[0]->get_shard(i)); + // } + + // shards.push_back(buffer_shard); + // ShardType *new_shard = new ShardType(shards); + // m_levels[0]->truncate(); + // m_levels[0]->append(std::shared_ptr(new_shard)); + // } + // } } bool take_reference() { diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h index c9d1749..5bc891b 100644 --- a/include/framework/structure/InternalLevel.h +++ b/include/framework/structure/InternalLevel.h @@ -166,6 +166,17 @@ public: return (double)tscnt / (double)(tscnt + reccnt); } + size_t get_nonempty_shard_count() const { + size_t cnt = 0; + for (size_t i = 0; i < m_shards.size(); i++) { + if (m_shards[i] && m_shards[i]->get_record_count() > 0) { + cnt += 1; + } + } + + return cnt; + } + std::shared_ptr clone() const { auto new_level = std::make_shared(m_level_no); for (size_t i = 0; i < m_shards.size(); i++) { @@ -185,6 +196,14 @@ public: m_shards.emplace_back(shard); } + const ShardType *get_shard(ShardID shid) const { + if (shid < m_shards.size()) { + return m_shards[shid].get(); + } + + return nullptr; + } + private: ssize_t m_level_no; std::vector> m_shards; -- cgit v1.2.3