summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt9
-rw-r--r--benchmarks/include/standard_benchmarks.h5
-rw-r--r--benchmarks/tail-latency/config_sweep.cpp49
-rw-r--r--benchmarks/tail-latency/fixed_shard_count.cpp101
-rw-r--r--benchmarks/vldb/ts_bench.cpp2
-rw-r--r--include/framework/DynamicExtension.h13
-rw-r--r--include/framework/reconstruction/FixedShardCountPolicy.h66
-rw-r--r--include/framework/reconstruction/FloodL0Policy.h65
-rw-r--r--include/framework/structure/ExtensionStructure.h95
-rw-r--r--include/framework/structure/InternalLevel.h19
10 files changed, 364 insertions, 60 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 786f765..c332448 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1,7 +1,7 @@
cmake_minimum_required(VERSION 3.22)
-set(CMAKE_C_COMPILER clang)
-set(CMAKE_CXX_COMPILER clang++)
+set(CMAKE_C_COMPILER gcc)
+set(CMAKE_CXX_COMPILER g++)
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED True)
@@ -290,6 +290,11 @@ if (tail_bench)
target_link_libraries(config_sweep PUBLIC gsl pthread atomic)
target_include_directories(config_sweep PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include benchmarks/include external/psudb-common/cpp/include)
target_link_options(config_sweep PUBLIC -mcx16)
+
+ add_executable(fixed_shard_count ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/tail-latency/fixed_shard_count.cpp)
+ target_link_libraries(fixed_shard_count PUBLIC gsl pthread atomic)
+ target_include_directories(fixed_shard_count PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include benchmarks/include external/psudb-common/cpp/include)
+ target_link_options(fixed_shard_count PUBLIC -mcx16)
endif()
if (bench)
diff --git a/benchmarks/include/standard_benchmarks.h b/benchmarks/include/standard_benchmarks.h
index dfa6513..76423ab 100644
--- a/benchmarks/include/standard_benchmarks.h
+++ b/benchmarks/include/standard_benchmarks.h
@@ -22,6 +22,7 @@
#include "framework/reconstruction/LevelingPolicy.h"
#include "framework/reconstruction/TieringPolicy.h"
#include "framework/reconstruction/BSMPolicy.h"
+#include "framework/reconstruction/FloodL0Policy.h"
constexpr double delete_proportion = 0.05;
static size_t g_deleted_records = 0;
@@ -38,7 +39,9 @@ de::ReconstructionPolicy<S, Q> *get_policy(size_t scale_factor, size_t buffer_si
recon = new de::LevelingPolicy<S, Q>(scale_factor, buffer_size);
} else if (policy == 2) {
recon = new de::BSMPolicy<S, Q>(buffer_size);
- }
+ } else if (policy == 3) {
+ recon = new de::FloodL0Policy<S, Q>(buffer_size);
+ }
return recon;
}
diff --git a/benchmarks/tail-latency/config_sweep.cpp b/benchmarks/tail-latency/config_sweep.cpp
index ef84aa7..d973ee5 100644
--- a/benchmarks/tail-latency/config_sweep.cpp
+++ b/benchmarks/tail-latency/config_sweep.cpp
@@ -2,6 +2,7 @@
*
*/
+#include "framework/scheduling/FIFOScheduler.h"
#define ENABLE_TIMER
#define TS_TEST
@@ -22,7 +23,7 @@
typedef de::Record<uint64_t, uint64_t> Rec;
typedef de::TrieSpline<Rec> Shard;
typedef de::rc::Query<Shard> Q;
-typedef de::DynamicExtension<Shard, Q, de::DeletePolicy::TOMBSTONE, de::SerialScheduler> Ext;
+typedef de::DynamicExtension<Shard, Q, de::DeletePolicy::TOMBSTONE, de::FIFOScheduler> Ext;
typedef Q::Parameters QP;
void usage(char *progname) {
@@ -44,15 +45,15 @@ int main(int argc, char **argv) {
auto data = read_sosd_file<Rec>(d_fname, n);
auto queries = read_range_queries<QP>(q_fname, .0001);
- std::vector<int> policies = {0, 1};
- std::vector<size_t> buffers = {4000, 8000, 12000, 16000, 20000};
- std::vector<size_t> sfs = {2, 4, 6, 8, 12};
+ std::vector<int> policies = {3};
+ std::vector<size_t> buffers = {8000, 16000, 32000};
+ std::vector<size_t> sfs = {8};
for (size_t l=0; l<policies.size(); l++) {
for (size_t j=0; j<buffers.size(); j++) {
for (size_t k=0; k<sfs.size(); k++) {
auto policy = get_policy<Shard, Q>(sfs[k], buffers[j], policies[l]);
- auto extension = new Ext(policy, 8000);
+ auto extension = new Ext(policy, buffers[j]/4, buffers[j]);
/* warmup structure w/ 10% of records */
size_t warmup = .1 * n;
@@ -73,8 +74,23 @@ int main(int argc, char **argv) {
}
TIMER_STOP();
+ //fprintf(stdout, "%ld\t%ld\t%d\t%ld\n", sfs[k], buffers[j], policies[l], TIMER_RESULT());
+ }
+
+ extension->await_next_epoch();
+
+ /* repeat the queries a bunch of times */
+ for (size_t l=0; l<10; l++) {
+ for (size_t i=0; i<queries.size(); i++) {
+ TIMER_START();
+ auto q = queries[i];
+ auto res = extension->query(std::move(q));
+ res.get();
+ TIMER_STOP();
+
fprintf(stdout, "%ld\t%ld\t%d\t%ld\n", sfs[k], buffers[j], policies[l], TIMER_RESULT());
}
+ }
QP p = {0, 10000};
@@ -85,29 +101,6 @@ int main(int argc, char **argv) {
delete extension;
}}}
- /*
- std::vector<int64_t> query_latencies;
- query_latencies.reserve(queries.size());
- for (size_t i=warmup; i<data.size(); i++) {
- TIMER_START();
- auto q = queries[i];
- auto res = extension->query(std::move(q));
- res.get();
- TIMER_STOP();
-
- query_latencies.push_back(TIMER_RESULT());
- }
-
- printf("here\n");
-
- for (size_t i=0; i<insert_latencies.size(); i++) {
- fprintf(stdout, "I\t%ld\n", insert_latencies[i]);
- }
-
- for (size_t i=0; i<query_latencies.size(); i++) {
- fprintf(stdout, "Q\t%ld\n", query_latencies[i]);
- }
- */
fflush(stderr);
}
diff --git a/benchmarks/tail-latency/fixed_shard_count.cpp b/benchmarks/tail-latency/fixed_shard_count.cpp
new file mode 100644
index 0000000..fecf599
--- /dev/null
+++ b/benchmarks/tail-latency/fixed_shard_count.cpp
@@ -0,0 +1,101 @@
+/*
+ *
+ */
+
+#define ENABLE_TIMER
+#define TS_TEST
+
+#include <thread>
+
+#include "framework/DynamicExtension.h"
+#include "framework/scheduling/FIFOScheduler.h"
+#include "shard/TrieSpline.h"
+#include "query/rangecount.h"
+#include "framework/interface/Record.h"
+#include "file_util.h"
+#include "standard_benchmarks.h"
+
+#include "framework/reconstruction/FixedShardCountPolicy.h"
+
+#include <gsl/gsl_rng.h>
+
+#include "psu-util/timer.h"
+
+
+typedef de::Record<uint64_t, uint64_t> Rec;
+typedef de::TrieSpline<Rec> Shard;
+typedef de::rc::Query<Shard> Q;
+typedef de::DynamicExtension<Shard, Q, de::DeletePolicy::TOMBSTONE, de::FIFOScheduler> Ext;
+typedef Q::Parameters QP;
+
+void usage(char *progname) {
+ fprintf(stderr, "%s reccnt datafile queryfile\n", progname);
+}
+
+int main(int argc, char **argv) {
+
+ if (argc < 4) {
+ usage(argv[0]);
+ exit(EXIT_FAILURE);
+ }
+
+ size_t n = atol(argv[1]);
+ std::string d_fname = std::string(argv[2]);
+ std::string q_fname = std::string(argv[3]);
+
+
+ auto data = read_sosd_file<Rec>(d_fname, n);
+ auto queries = read_range_queries<QP>(q_fname, .0001);
+
+ std::vector<size_t> shard_counts = {4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 4096*2};
+ size_t buffer_size = 8000;
+
+ for (size_t i=0; i<shard_counts.size(); i++) {
+ auto policy = new de::FixedShardCountPolicy<Shard,Q>(buffer_size, shard_counts[i], n);
+ auto extension = new Ext(policy, buffer_size / 4, buffer_size);
+
+ /* warmup structure w/ 10% of records */
+ size_t warmup = .1 * n;
+ for (size_t i=0; i<warmup; i++) {
+ while (!extension->insert(data[i])) {
+ usleep(1);
+ }
+ }
+
+ extension->await_next_epoch();
+
+ TIMER_INIT();
+
+ TIMER_START();
+ for (size_t i=warmup; i<data.size(); i++) {
+ while (!extension->insert(data[i])) {
+ usleep(1);
+ }
+ }
+ TIMER_STOP();
+
+ auto insert_tput = (size_t) ((double) (n - warmup) / (double) TIMER_RESULT() *1.0e9);
+
+ extension->await_next_epoch();
+
+ /* repeat the queries a bunch of times */
+ TIMER_START();
+ for (size_t l=0; l<10; l++) {
+ for (size_t i=0; i<queries.size(); i++) {
+ auto q = queries[i];
+ auto res = extension->query(std::move(q));
+ res.get();
+ }
+ }
+ TIMER_STOP();
+
+ auto query_lat = TIMER_RESULT() / 10*queries.size();
+
+ fprintf(stdout, "%ld\t%ld\t%ld\t%ld\n", shard_counts[i], extension->get_shard_count(), insert_tput, query_lat);
+
+ delete extension;
+ }
+
+ fflush(stderr);
+}
+
diff --git a/benchmarks/vldb/ts_bench.cpp b/benchmarks/vldb/ts_bench.cpp
index 8b0ee35..1182376 100644
--- a/benchmarks/vldb/ts_bench.cpp
+++ b/benchmarks/vldb/ts_bench.cpp
@@ -89,6 +89,8 @@ int main(int argc, char **argv) {
fprintf(stdout, "%ld\t%ld\t%ld\t%ld\t%ld\n", insert_throughput, query_latency, ext_size, static_latency, static_size);
+ fprintf(stdout, "%ld\n", extension->get_height());
+
gsl_rng_free(rng);
delete extension;
fflush(stderr);
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index c35bb93..5a64243 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -249,6 +249,19 @@ public:
}
/**
+ * Get the number of non-empty shards within the index.
+ *
+ * @return The number of non-empty shards within the index
+ */
+ size_t get_shard_count() {
+ auto epoch = get_active_epoch();
+ auto s = epoch->get_structure()->get_shard_count();
+ end_job(epoch);
+
+ return s;
+ }
+
+ /**
* Get the number of bytes of memory allocated across the framework for
* storing records and associated index information (i.e., internal
* ISAM tree nodes). This includes memory that is allocated but
diff --git a/include/framework/reconstruction/FixedShardCountPolicy.h b/include/framework/reconstruction/FixedShardCountPolicy.h
new file mode 100644
index 0000000..ec8e4e6
--- /dev/null
+++ b/include/framework/reconstruction/FixedShardCountPolicy.h
@@ -0,0 +1,66 @@
+/*
+ * include/framework/reconstruction/FixedShardCountPolicy.h
+ *
+ * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ */
+#pragma once
+#include <cmath>
+
+#include "framework/reconstruction/ReconstructionPolicy.h"
+#include "framework/scheduling/Epoch.h"
+#include "util/types.h"
+
+namespace de {
+template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
+class FixedShardCountPolicy : public ReconstructionPolicy<ShardType, QueryType> {
+ typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>>
+ LevelVector;
+
+public:
+ FixedShardCountPolicy(size_t buffer_size, size_t shard_count, size_t max_record_count)
+ : m_buffer_size(buffer_size), m_shard_count(shard_count), m_max_reccnt(max_record_count) {}
+
+ ReconstructionVector
+ get_reconstruction_tasks(const Epoch<ShardType, QueryType> *epoch,
+ size_t incoming_reccnt) const override {
+ ReconstructionVector reconstructions;
+ return reconstructions;
+
+ }
+
+ ReconstructionTask
+ get_flush_task(const Epoch<ShardType, QueryType> *epoch) const override {
+
+ auto levels = epoch->get_structure()->get_level_vector();
+
+ if (levels.size() == 0) {
+ return ReconstructionTask{
+ {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append};
+ }
+
+ ShardID last_shid = {0, (shard_index) (levels[0]->get_shard_count() - 1)};
+
+ if (levels[0]->get_shard(last_shid.shard_idx)->get_record_count() + m_buffer_size <= capacity()) {
+ return ReconstructionTask{
+ {{buffer_shid, last_shid}}, 0, m_buffer_size, ReconstructionType::Merge};
+ } else {
+ return ReconstructionTask{
+ {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append};
+ }
+ }
+
+private:
+ inline size_t capacity() const {
+ double bps = (double) m_max_reccnt / (double) m_buffer_size / m_shard_count;
+ return ceil(bps) * m_buffer_size;
+ }
+
+ size_t m_buffer_size;
+ size_t m_shard_count;
+ size_t m_max_reccnt;
+};
+} // namespace de
diff --git a/include/framework/reconstruction/FloodL0Policy.h b/include/framework/reconstruction/FloodL0Policy.h
new file mode 100644
index 0000000..da4c297
--- /dev/null
+++ b/include/framework/reconstruction/FloodL0Policy.h
@@ -0,0 +1,65 @@
+/*
+ * include/framework/reconstruction/FloodL0Policy.h
+ *
+ * Copyright (C) 2023-2024 Douglas B. Rumbaugh <drumbaugh@psu.edu>
+ * Dong Xie <dongx@psu.edu>
+ *
+ * Distributed under the Modified BSD License.
+ *
+ */
+#pragma once
+#include <cmath>
+
+#include "framework/reconstruction/ReconstructionPolicy.h"
+#include "framework/scheduling/Epoch.h"
+#include "util/types.h"
+
+namespace de {
+template <ShardInterface ShardType, QueryInterface<ShardType> QueryType>
+class FloodL0Policy : public ReconstructionPolicy<ShardType, QueryType> {
+ typedef std::vector<std::shared_ptr<InternalLevel<ShardType, QueryType>>>
+ LevelVector;
+
+public:
+ FloodL0Policy(size_t buffer_size) : m_buffer_size(buffer_size) {}
+
+ ReconstructionVector
+ get_reconstruction_tasks(const Epoch<ShardType, QueryType> *epoch,
+ size_t incoming_reccnt) const override {
+
+ ReconstructionVector reconstructions;
+ return reconstructions;
+
+ }
+
+ ReconstructionTask
+ get_flush_task(const Epoch<ShardType, QueryType> *epoch) const override {
+ return ReconstructionTask{
+ {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Append};
+ }
+
+private:
+ level_index find_reconstruction_target(LevelVector &levels) const {
+ level_index target_level = invalid_level_idx;
+ size_t incoming_records = m_buffer_size;
+
+ for (level_index i = 0; i < (level_index)levels.size(); i++) {
+ if (levels[i]->get_record_count() + incoming_records < capacity(i)) {
+ target_level = i;
+ break;
+ }
+
+ incoming_records = levels[i]->get_record_count();
+ }
+
+ return target_level;
+ }
+
+ inline size_t capacity(level_index level) const {
+ return m_buffer_size * pow(m_scale_factor, level + 1);
+ }
+
+ size_t m_scale_factor;
+ size_t m_buffer_size;
+};
+} // namespace de
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index 3bb8a0b..078c4a9 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -145,17 +145,32 @@ public:
return cnt;
}
- inline void perform_reconstruction(ReconstructionTask task) {
+ size_t get_shard_count() const {
+ size_t cnt = 0;
+ for (size_t i = 0; i < m_levels.size(); i++) {
+ if (m_levels[i]) {
+ cnt += m_levels[i]->get_nonempty_shard_count();
+ }
+ }
+
+ return cnt;
+ }
+
+ inline void perform_reconstruction(ReconstructionTask task,
+ BuffView *bv=nullptr) {
/* perform the reconstruction itself */
std::vector<const ShardType *> shards;
for (ShardID shid : task.sources) {
assert(shid.level_idx < (level_index) m_levels.size());
assert(shid.shard_idx >= -1);
- /* if unspecified, push all shards into the vector */
- if (shid.shard_idx == all_shards_idx) {
- for (size_t i = 0; i < m_levels[shid.level_idx]->get_shard_count();
- i++) {
+ if (shid == buffer_shid) {
+ assert(bv);
+ ShardType *buffer_shard = new ShardType(std::move(*bv));
+ shards.push_back(buffer_shard);
+ } else if (shid.shard_idx == all_shards_idx) {
+ /* if unspecified, push all shards into the vector */
+ for (size_t i = 0; i < m_levels[shid.level_idx]->get_shard_count(); i++) {
if (m_levels[shid.level_idx]->get_shard(i)) {
shards.push_back(m_levels[shid.level_idx]->get_shard(i));
}
@@ -171,7 +186,9 @@ public:
* Remove all of the shards processed by the operation
*/
for (ShardID shid : task.sources) {
- if (shid.shard_idx == all_shards_idx) {
+ if (shid == buffer_shid) {
+ continue;
+ } else if (shid.shard_idx == all_shards_idx) {
m_levels[shid.level_idx]->truncate();
} else {
m_levels[shid.level_idx]->delete_shard(shid.shard_idx);
@@ -199,29 +216,49 @@ public:
* like that, we'll leave this as low priority.
*/
- /* insert the first level, if needed */
- if (m_levels.size() == 0) {
- m_levels.push_back(
- std::make_shared<InternalLevel<ShardType, QueryType>>(0));
- }
-
- ShardType *buffer_shard = new ShardType(std::move(buffer));
- if (task.type == ReconstructionType::Append || m_levels[0]->get_shard_count() == 0) {
- m_levels[0]->append(std::shared_ptr<ShardType>(buffer_shard));
- } else {
- std::vector<const ShardType *> shards;
- for (level_index i = 0; i < (level_index)m_levels[0]->get_shard_count();
- i++) {
- if (m_levels[0]->get_shard(i)) {
- shards.push_back(m_levels[0]->get_shard(i));
- }
-
- shards.push_back(buffer_shard);
- ShardType *new_shard = new ShardType(shards);
- m_levels[0]->truncate();
- m_levels[0]->append(std::shared_ptr<ShardType>(new_shard));
- }
- }
+ // /* insert the first level, if needed */
+ // if (m_levels.size() == 0) {
+ // m_levels.push_back(
+ // std::make_shared<InternalLevel<ShardType, QueryType>>(0));
+ // }
+
+ perform_reconstruction(task, &buffer);
+
+ // ShardType *buffer_shard = new ShardType(std::move(buffer));
+ // if (task.type == ReconstructionType::Append || m_levels[0]->get_shard_count() == 0) {
+ // m_levels[0]->append(std::shared_ptr<ShardType>(buffer_shard));
+ // } else if (task.type == ReconstructionType::Merge) {
+ // std::vector<const ShardType *> shards;
+ // for (size_t i=0; i<task.sources.size(); i++) {
+ // ShardID shid = task.sources[i];
+ // if (shid != buffer_shid) {
+ // shards.emplace_back(m_levels[shid.level_idx]->get_shard(shid.shard_idx));
+ // }
+ // }
+
+ // shards.emplace_back(buffer_shard);
+ // ShardType *new_shard = new ShardType(shards);
+ // m_levels[0]->append(std::shared_ptr<ShardType>(new_shard));
+ // for (size_t i=0; i<task.sources.size(); i++) {
+ // ShardID shid = task.sources[i];
+ // if (shid != buffer_shid) {
+ // m_levels[shid.level_idx]->delete_shard(shid.shard_idx);
+ // }
+ // }
+ // } else {
+ // std::vector<const ShardType *> shards;
+ // for (level_index i = 0; i < (level_index)m_levels[0]->get_shard_count();
+ // i++) {
+ // if (m_levels[0]->get_shard(i)) {
+ // shards.push_back(m_levels[0]->get_shard(i));
+ // }
+
+ // shards.push_back(buffer_shard);
+ // ShardType *new_shard = new ShardType(shards);
+ // m_levels[0]->truncate();
+ // m_levels[0]->append(std::shared_ptr<ShardType>(new_shard));
+ // }
+ // }
}
bool take_reference() {
diff --git a/include/framework/structure/InternalLevel.h b/include/framework/structure/InternalLevel.h
index c9d1749..5bc891b 100644
--- a/include/framework/structure/InternalLevel.h
+++ b/include/framework/structure/InternalLevel.h
@@ -166,6 +166,17 @@ public:
return (double)tscnt / (double)(tscnt + reccnt);
}
+ size_t get_nonempty_shard_count() const {
+ size_t cnt = 0;
+ for (size_t i = 0; i < m_shards.size(); i++) {
+ if (m_shards[i] && m_shards[i]->get_record_count() > 0) {
+ cnt += 1;
+ }
+ }
+
+ return cnt;
+ }
+
std::shared_ptr<InternalLevel> clone() const {
auto new_level = std::make_shared<InternalLevel>(m_level_no);
for (size_t i = 0; i < m_shards.size(); i++) {
@@ -185,6 +196,14 @@ public:
m_shards.emplace_back(shard);
}
+ const ShardType *get_shard(ShardID shid) const {
+ if (shid < m_shards.size()) {
+ return m_shards[shid].get();
+ }
+
+ return nullptr;
+ }
+
private:
ssize_t m_level_no;
std::vector<std::shared_ptr<ShardType>> m_shards;