diff options
| author | Douglas Rumbaugh <dbr4@psu.edu> | 2025-01-16 13:18:37 -0500 |
|---|---|---|
| committer | Douglas Rumbaugh <dbr4@psu.edu> | 2025-01-16 13:18:37 -0500 |
| commit | 77589d4cc82b766d2cf16294fab98a57f6579cb4 (patch) | |
| tree | 0cc136d13c20021e0278b8b2ededf2652c27a84e | |
| parent | bac86504220da4c169089a3a1803e0a21f5acbc2 (diff) | |
| download | dynamic-extension-77589d4cc82b766d2cf16294fab98a57f6579cb4.tar.gz | |
Additional layout policies + more flexibility in buffer flushing
| -rw-r--r-- | CMakeLists.txt | 9 | ||||
| -rw-r--r-- | benchmarks/include/standard_benchmarks.h | 5 | ||||
| -rw-r--r-- | benchmarks/tail-latency/config_sweep.cpp | 49 | ||||
| -rw-r--r-- | benchmarks/tail-latency/fixed_shard_count.cpp | 101 | ||||
| -rw-r--r-- | benchmarks/vldb/ts_bench.cpp | 2 | ||||
| -rw-r--r-- | include/framework/DynamicExtension.h | 13 | ||||
| -rw-r--r-- | include/framework/reconstruction/FixedShardCountPolicy.h | 66 | ||||
| -rw-r--r-- | include/framework/reconstruction/FloodL0Policy.h | 65 | ||||
| -rw-r--r-- | include/framework/structure/ExtensionStructure.h | 95 | ||||
| -rw-r--r-- | include/framework/structure/InternalLevel.h | 19 |
10 files changed, 364 insertions, 60 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 786f765..c332448 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,7 +1,7 @@ cmake_minimum_required(VERSION 3.22) -set(CMAKE_C_COMPILER clang) -set(CMAKE_CXX_COMPILER clang++) +set(CMAKE_C_COMPILER gcc) +set(CMAKE_CXX_COMPILER g++) set(CMAKE_CXX_STANDARD 20) set(CMAKE_CXX_STANDARD_REQUIRED True) @@ -290,6 +290,11 @@ if (tail_bench) target_link_libraries(config_sweep PUBLIC gsl pthread atomic) target_include_directories(config_sweep PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include benchmarks/include external/psudb-common/cpp/include) target_link_options(config_sweep PUBLIC -mcx16) + + add_executable(fixed_shard_count ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/tail-latency/fixed_shard_count.cpp) + target_link_libraries(fixed_shard_count PUBLIC gsl pthread atomic) + target_include_directories(fixed_shard_count PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include benchmarks/include external/psudb-common/cpp/include) + target_link_options(fixed_shard_count PUBLIC -mcx16) endif() if (bench) diff --git a/benchmarks/include/standard_benchmarks.h b/benchmarks/include/standard_benchmarks.h index dfa6513..76423ab 100644 --- a/benchmarks/include/standard_benchmarks.h +++ b/benchmarks/include/standard_benchmarks.h @@ -22,6 +22,7 @@ #include "framework/reconstruction/LevelingPolicy.h" #include "framework/reconstruction/TieringPolicy.h" #include "framework/reconstruction/BSMPolicy.h" +#include "framework/reconstruction/FloodL0Policy.h" constexpr double delete_proportion = 0.05; static size_t g_deleted_records = 0; @@ -38,7 +39,9 @@ de::ReconstructionPolicy<S, Q> *get_policy(size_t scale_factor, size_t buffer_si recon = new de::LevelingPolicy<S, Q>(scale_factor, buffer_size); } else if (policy == 2) { recon = new de::BSMPolicy<S, Q>(buffer_size); - } + } else if (policy == 3) { + recon = new de::FloodL0Policy<S, Q>(buffer_size); + } return recon; } diff --git a/benchmarks/tail-latency/config_sweep.cpp b/benchmarks/tail-latency/config_sweep.cpp index ef84aa7..d973ee5 100644 --- a/benchmarks/tail-latency/config_sweep.cpp +++ b/benchmarks/tail-latency/config_sweep.cpp @@ -2,6 +2,7 @@ * */ +#include "framework/scheduling/FIFOScheduler.h" #define ENABLE_TIMER #define TS_TEST @@ -22,7 +23,7 @@ typedef de::Record<uint64_t, uint64_t> Rec; typedef de::TrieSpline<Rec> Shard; typedef de::rc::Query<Shard> Q; -typedef de::DynamicExtension<Shard, Q, de::DeletePolicy::TOMBSTONE, de::SerialScheduler> Ext; +typedef de::DynamicExtension<Shard, Q, de::DeletePolicy::TOMBSTONE, de::FIFOScheduler> Ext; typedef Q::Parameters QP; void usage(char *progname) { @@ -44,15 +45,15 @@ int main(int argc, char **argv) { auto data = read_sosd_file<Rec>(d_fname, n); auto queries = read_range_queries<QP>(q_fname, .0001); - std::vector<int> policies = {0, 1}; - std::vector<size_t> buffers = {4000, 8000, 12000, 16000, 20000}; - std::vector<size_t> sfs = {2, 4, 6, 8, 12}; + std::vector<int> policies = {3}; + std::vector<size_t> buffers = {8000, 16000, 32000}; + std::vector<size_t> sfs = {8}; for (size_t l=0; l<policies.size(); l++) { for (size_t j=0; j<buffers.size(); j++) { for (size_t k=0; k<sfs.size(); k++) { auto policy = get_policy<Shard, Q>(sfs[k], buffers[j], policies[l]); - auto extension = new Ext(policy, 8000); + auto extension = new Ext(policy, buffers[j]/4, buffers[j]); /* warmup structure w/ 10% of records */ size_t warmup = .1 * n; @@ -73,8 +74,23 @@ int main(int argc, char **argv) { } TIMER_STOP(); + //fprintf(stdout, "%ld\t%ld\t%d\t%ld\n", sfs[k], buffers[j], policies[l], TIMER_RESULT()); + } + + extension->await_next_epoch(); + + /* repeat the queries a bunch of times */ + for (size_t l=0; l<10; l++) { + for (size_t i=0; i<queries.size(); i++) { + TIMER_START(); + auto q = queries[i]; + auto res = extension->query(std::move(q)); + res.get(); + TIMER_STOP(); + fprintf(stdout, "%ld\t%ld\t%d\t%ld\n", sfs[k], buffers[j], policies[l], TIMER_RESULT()); } + } QP p = {0, 10000}; @@ -85,29 +101,6 @@ int main(int argc, char **argv) { delete extension; }}} - /* - std::vector<int64_t> query_latencies; - query_latencies.reserve(queries.size()); - for (size_t i=warmup; i<data.size(); i++) { - TIMER_START(); - auto q = queries[i]; - auto res = extension->query(std::move(q)); - res.get(); - TIMER_STOP(); - - query_latencies.push_back(TIMER_RESULT()); - } - - printf("here\n"); - - for (size_t i=0; i<insert_latencies.size(); i++) { - fprintf(stdout, "I\t%ld\n", insert_latencies[i]); - } - - for (size_t i=0; i<query_latencies.size(); i++) { - fprintf(stdout, "Q\t%ld\n", query_latencies[i]); - } - */ fflush(stderr); } diff --git a/benchmarks/tail-latency/fixed_shard_count.cpp b/benchmarks/tail-latency/fixed_shard_count.cpp new file mode 100644 index 0000000..fecf599 --- /dev/null +++ b/benchmarks/tail-latency/fixed_shard_count.cpp @@ -0,0 +1,101 @@ +/* + * + */ + +#define ENABLE_TIMER +#define TS_TEST + +#include <thread> + +#include "framework/DynamicExtension.h" +#include "framework/scheduling/FIFOScheduler.h" +#include "shard/TrieSpline.h" +#include "query/rangecount.h" +#include "framework/interface/Record.h" +#include "file_util.h" +#include "standard_benchmarks.h" + +#include "framework/reconstruction/FixedShardCountPolicy.h" + +#include <gsl/gsl_rng.h> + +#include "psu-util/timer.h" + + +typedef de::Record<uint64_t, uint64_t> Rec; +typedef de::TrieSpline<Rec> Shard; +typedef de::rc::Query<Shard> Q; +typedef de::DynamicExtension<Shard, Q, de::DeletePolicy::TOMBSTONE, de::FIFOScheduler> Ext; +typedef Q::Parameters QP; + +void usage(char *progname) { + fprintf(stderr, "%s reccnt datafile queryfile\n", progname); +} + +int main(int argc, char **argv) { + + if (argc < 4) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + + size_t n = atol(argv[1]); + std::string d_fname = std::string(argv[2]); + std::string q_fname = std::string(argv[3]); + + + auto data = read_sosd_file<Rec>(d_fname, n); + auto queries = read_range_queries<QP>(q_fname, .0001); + + std::vector<size_t> shard_counts = {4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 4096*2}; + size_t buffer_size = 8000; + + for (size_t i=0; i<shard_counts.size(); i++) { + auto policy = new de::FixedShardCountPolicy<Shard,Q>(buffer_size, shard_counts[i], n); + auto extension = new Ext(policy, buffer_size / 4, buffer_size); + + /* warmup structure w/ 10% of records */ + size_t warmup = .1 * n; + for (size_t i=0; i<warmup; i++) { + while (!extension->insert(data[i])) { + usleep(1); + } + } + + extension->await_next_epoch(); + + TIMER_INIT(); + + TIMER_START(); + for (size_t i=warmup; i<data.size(); i++) { + while (!extension->insert(data[i])) { + usleep(1); + } + } + TIMER_STOP(); + + auto insert_tput = (size_t) ((double) (n - warmup) / (double) TIMER_RESULT() *1.0e9); + + extension->await_next_epoch(); + + /* repeat the queries a bunch of times */ + TIMER_START(); + for (size_t l=0; l<10; l++) { + for (size_t i=0; i<queries.size(); i++) { + auto q = queries[i]; + auto res = extension->query(std::move(q)); + res.get(); + } + } + TIMER_STOP(); + + auto query_lat = TIMER_RESULT() / 10*queries.size(); + + fprintf(stdout, "%ld\t%ld\t%ld\t%ld\n", shard_counts[i], extension->get_shard_count(), insert_tput, query_lat); + + delete extension; + } + + fflush(stderr); +} + diff --git a/benchmarks/vldb/ts_bench.cpp b/benchmarks/vldb/ts_bench.cpp index 8b0ee35..1182376 100644 --- a/benchmarks/vldb/ts_bench.cpp +++ b/benchmarks/vldb/ts_bench.cpp @@ -89,6 +89,8 @@ int main(int argc, char **argv) { fprintf(stdout, "%ld\t%ld\t%ld\t%ld\t%ld\n", insert_throughput, query_latency, ext_size, static_latency, static_size); + fprintf(stdout, "%ld\n", extension->get_height()); + gsl_rng_free(rng); delete extension; fflush(stderr); diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h index c35bb93..5a64243 100644 --- a/include/framework/DynamicExtension.h +++ b/include/framework/DynamicExtension.h @@ -249,6 +249,19 @@ public: } /** + * Get the number of non-empty shards within the index. + * + * @return The number of non-empty shards within the index + */ + size_t get_shard_count() { + auto epoch = get_active_epoch(); + auto s = epoch->get_structure()->get_shard_count(); + end_job(epoch); + + return s; + } + + /** * Get the number of bytes of memory allocated across the framework for * storing records and associated index information (i.e., internal * ISAM tree nodes). This includes memory that is allocated but diff --git a/include/framework/reconstruction/FixedShardCountPolicy.h b/include/framework/reconstruction/FixedShardCountPolicy.h new file mode 100644 index 0000000..ec8e4e6 --- /dev/null +++ b/include/framework/reconstruction/FixedShardCountPolicy.h @@ -0,0 +1,66 @@ +/* + * include/framework/reconstruction/FixedShardCountPolicy.h + * + * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Dong Xie <dongx@psu.edu> + * + * Distributed under the Modified BSD License. + * + */ +#pragma once +#include <cmath> + +#include "framework/reconstruction/ReconstructionPolicy.h" +#include "framework/scheduling/Epoch.h" +#include "util/types.h" + +namespace de { +template <ShardInterface ShardType, QueryInterface<ShardType> QueryType> +class FixedShardCountPolicy : public ReconstructionPolicy<ShardType, QueryType> { + typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> + LevelVector; + +public: + FixedShardCountPolicy(size_t buffer_size, size_t shard_count, size_t max_record_count) + : m_buffer_size(buffer_size), m_shard_count(shard_count), m_max_reccnt(max_record_count) {} + + ReconstructionVector + get_reconstruction_tasks(const Epoch<ShardType, QueryType> *epoch, + size_t incoming_reccnt) const override { + ReconstructionVector reconstructions; + return reconstructions; + + } + + ReconstructionTask + get_flush_task(const Epoch<ShardType, QueryType> *epoch) const override { + + auto levels = epoch->get_structure()->get_level_vector(); + + if (levels.size() == 0) { + return ReconstructionTask{ + {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append}; + } + + ShardID last_shid = {0, (shard_index) (levels[0]->get_shard_count() - 1)}; + + if (levels[0]->get_shard(last_shid.shard_idx)->get_record_count() + m_buffer_size <= capacity()) { + return ReconstructionTask{ + {{buffer_shid, last_shid}}, 0, m_buffer_size, ReconstructionType::Merge}; + } else { + return ReconstructionTask{ + {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append}; + } + } + +private: + inline size_t capacity() const { + double bps = (double) m_max_reccnt / (double) m_buffer_size / m_shard_count; + return ceil(bps) * m_buffer_size; + } + + size_t m_buffer_size; + size_t m_shard_count; + size_t m_max_reccnt; +}; +} // namespace de diff --git a/include/framework/reconstruction/FloodL0Policy.h b/include/framework/reconstruction/FloodL0Policy.h new file mode 100644 index 0000000..da4c297 --- /dev/null +++ b/include/framework/reconstruction/FloodL0Policy.h @@ -0,0 +1,65 @@ +/* + * include/framework/reconstruction/FloodL0Policy.h + * + * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu> + * Dong Xie <dongx@psu.edu> + * + * Distributed under the Modified BSD License. + * + */ +#pragma once +#include <cmath> + +#include "framework/reconstruction/ReconstructionPolicy.h" +#include "framework/scheduling/Epoch.h" +#include "util/types.h" + +namespace de { +template <ShardInterface ShardType, QueryInterface<ShardType> QueryType> +class FloodL0Policy : public ReconstructionPolicy<ShardType, QueryType> { + typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>> + LevelVector; + +public: + FloodL0Policy(size_t buffer_size) : m_buffer_size(buffer_size) {} + + ReconstructionVector + get_reconstruction_tasks(const Epoch<ShardType, QueryType> *epoch, + size_t incoming_reccnt) const override { + + ReconstructionVector reconstructions; + return reconstructions; + + } + + ReconstructionTask + get_flush_task(const Epoch<ShardType, QueryType> *epoch) const override { + return ReconstructionTask{ + {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append}; + } + +private: + level_index find_reconstruction_target(LevelVector &levels) const { + level_index target_level = invalid_level_idx; + size_t incoming_records = m_buffer_size; + + for (level_index i = 0; i < (level_index)levels.size(); i++) { + if (levels[i]->get_record_count() + incoming_records < capacity(i)) { + target_level = i; + break; + } + + incoming_records = levels[i]->get_record_count(); + } + + return target_level; + } + + inline size_t capacity(level_index level) const { + return m_buffer_size * pow(m_scale_factor, level + 1); + } + + size_t m_scale_factor; + size_t m_buffer_size; +}; +} // namespace de diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h index 3bb8a0b..078c4a9 100644 --- a/include/framework/structure/ExtensionStructure.h +++ b/include/framework/structure/ExtensionStructure.h @@ -145,17 +145,32 @@ public: return cnt; } - inline void perform_reconstruction(ReconstructionTask task) { + size_t get_shard_count() const { + size_t cnt = 0; + for (size_t i = 0; i < m_levels.size(); i++) { + if (m_levels[i]) { + cnt += m_levels[i]->get_nonempty_shard_count(); + } + } + + return cnt; + } + + inline void perform_reconstruction(ReconstructionTask task, + BuffView *bv=nullptr) { /* perform the reconstruction itself */ std::vector<const ShardType *> shards; for (ShardID shid : task.sources) { assert(shid.level_idx < (level_index) m_levels.size()); assert(shid.shard_idx >= -1); - /* if unspecified, push all shards into the vector */ - if (shid.shard_idx == all_shards_idx) { - for (size_t i = 0; i < m_levels[shid.level_idx]->get_shard_count(); - i++) { + if (shid == buffer_shid) { + assert(bv); + ShardType *buffer_shard = new ShardType(std::move(*bv)); + shards.push_back(buffer_shard); + } else if (shid.shard_idx == all_shards_idx) { + /* if unspecified, push all shards into the vector */ + for (size_t i = 0; i < m_levels[shid.level_idx]->get_shard_count(); i++) { if (m_levels[shid.level_idx]->get_shard(i)) { shards.push_back(m_levels[shid.level_idx]->get_shard(i)); } @@ -171,7 +186,9 @@ public: * Remove all of the shards processed by the operation */ for (ShardID shid : task.sources) { - if (shid.shard_idx == all_shards_idx) { + if (shid == buffer_shid) { + continue; + } else if (shid.shard_idx == all_shards_idx) { m_levels[shid.level_idx]->truncate(); } else { m_levels[shid.level_idx]->delete_shard(shid.shard_idx); @@ -199,29 +216,49 @@ public: * like that, we'll leave this as low priority. */ - /* insert the first level, if needed */ - if (m_levels.size() == 0) { - m_levels.push_back( - std::make_shared<InternalLevel<ShardType, QueryType>>(0)); - } - - ShardType *buffer_shard = new ShardType(std::move(buffer)); - if (task.type == ReconstructionType::Append || m_levels[0]->get_shard_count() == 0) { - m_levels[0]->append(std::shared_ptr<ShardType>(buffer_shard)); - } else { - std::vector<const ShardType *> shards; - for (level_index i = 0; i < (level_index)m_levels[0]->get_shard_count(); - i++) { - if (m_levels[0]->get_shard(i)) { - shards.push_back(m_levels[0]->get_shard(i)); - } - - shards.push_back(buffer_shard); - ShardType *new_shard = new ShardType(shards); - m_levels[0]->truncate(); - m_levels[0]->append(std::shared_ptr<ShardType>(new_shard)); - } - } + // /* insert the first level, if needed */ + // if (m_levels.size() == 0) { + // m_levels.push_back( + // std::make_shared<InternalLevel<ShardType, QueryType>>(0)); + // } + + perform_reconstruction(task, &buffer); + + // ShardType *buffer_shard = new ShardType(std::move(buffer)); + // if (task.type == ReconstructionType::Append || m_levels[0]->get_shard_count() == 0) { + // m_levels[0]->append(std::shared_ptr<ShardType>(buffer_shard)); + // } else if (task.type == ReconstructionType::Merge) { + // std::vector<const ShardType *> shards; + // for (size_t i=0; i<task.sources.size(); i++) { + // ShardID shid = task.sources[i]; + // if (shid != buffer_shid) { + // shards.emplace_back(m_levels[shid.level_idx]->get_shard(shid.shard_idx)); + // } + // } + + // shards.emplace_back(buffer_shard); + // ShardType *new_shard = new ShardType(shards); + // m_levels[0]->append(std::shared_ptr<ShardType>(new_shard)); + // for (size_t i=0; i<task.sources.size(); i++) { + // ShardID shid = task.sources[i]; + // if (shid != buffer_shid) { + // m_levels[shid.level_idx]->delete_shard(shid.shard_idx); + // } + // } + // } else { + // std::vector<const ShardType *> shards; + // for (level_index i = 0; i < (level_index)m_levels[0]->get_shard_count(); + // i++) { + // if (m_levels[0]->get_shard(i)) { + // shards.push_back(m_levels[0]->get_shard(i)); + // } + + // shards.push_back(buffer_shard); + // ShardType *new_shard = new ShardType(shards); + // m_levels[0]->truncate(); + // m_levels[0]->append(std::shared_ptr<ShardType>(new_shard)); + // } + // } } bool take_reference() { diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h index c9d1749..5bc891b 100644 --- a/include/framework/structure/InternalLevel.h +++ b/include/framework/structure/InternalLevel.h @@ -166,6 +166,17 @@ public: return (double)tscnt / (double)(tscnt + reccnt); } + size_t get_nonempty_shard_count() const { + size_t cnt = 0; + for (size_t i = 0; i < m_shards.size(); i++) { + if (m_shards[i] && m_shards[i]->get_record_count() > 0) { + cnt += 1; + } + } + + return cnt; + } + std::shared_ptr<InternalLevel> clone() const { auto new_level = std::make_shared<InternalLevel>(m_level_no); for (size_t i = 0; i < m_shards.size(); i++) { @@ -185,6 +196,14 @@ public: m_shards.emplace_back(shard); } + const ShardType *get_shard(ShardID shid) const { + if (shid < m_shards.size()) { + return m_shards[shid].get(); + } + + return nullptr; + } + private: ssize_t m_level_no; std::vector<std::shared_ptr<ShardType>> m_shards; |