summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt6
-rw-r--r--benchmarks/tail-latency/standard_latency_dist.cpp23
-rw-r--r--include/framework/DynamicExtension.h9
-rw-r--r--include/framework/reconstruction/LevelingPolicy.h2
-rw-r--r--include/framework/structure/ExtensionStructure.h22
-rw-r--r--include/util/types.h6
6 files changed, 55 insertions, 13 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index f423ba1..a7db1b0 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -315,6 +315,12 @@ if (tail_bench)
target_link_libraries(isam_buffer PUBLIC gsl pthread atomic)
target_include_directories(isam_buffer PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include benchmarks/include external/psudb-common/cpp/include)
target_link_options(isam_buffer PUBLIC -mcx16)
+
+
+ add_executable(vptree_buffer ${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/tail-latency/vptree_buffer.cpp)
+ target_link_libraries(vptree_buffer PUBLIC gsl pthread atomic)
+ target_include_directories(vptree_buffer PRIVATE include external external/m-tree/cpp external/PGM-index/include external/PLEX/include benchmarks/include external/psudb-common/cpp/include)
+ target_link_options(vptree_buffer PUBLIC -mcx16)
endif()
if (bench)
diff --git a/benchmarks/tail-latency/standard_latency_dist.cpp b/benchmarks/tail-latency/standard_latency_dist.cpp
index 3929b56..cb3459f 100644
--- a/benchmarks/tail-latency/standard_latency_dist.cpp
+++ b/benchmarks/tail-latency/standard_latency_dist.cpp
@@ -46,12 +46,13 @@ int main(int argc, char **argv) {
auto data = read_sosd_file<Rec>(d_fname, n);
auto queries = read_range_queries<QP>(q_fname, .0001);
- std::vector<size_t> sfs = {2, 4, 8, 16, 32, 64, 128, 256, 512, 1024};
+ std::vector<size_t> sfs = {2, 4, 8}; //, 4, 8, 16, 32, 64, 128, 256, 512, 1024};
size_t buffer_size = 8000;
std::vector<size_t> policies = {0, 1};
+ for (auto pol: policies) {
for (size_t i=0; i<sfs.size(); i++) {
- auto policy = get_policy<Shard, Q>(sfs[i], buffer_size, 0);
+ auto policy = get_policy<Shard, Q>(sfs[i], buffer_size, pol);
auto extension = new Ext(policy, buffer_size / 4, buffer_size);
/* warmup structure w/ 10% of records */
@@ -66,34 +67,36 @@ int main(int argc, char **argv) {
TIMER_INIT();
+ TIMER_START();
for (size_t j=warmup; j<data.size(); j++) {
- TIMER_START();
while (!extension->insert(data[j])) {
usleep(1);
}
- TIMER_STOP();
-
- fprintf(stdout, "I\t%ld\t%ld\n", sfs[i], TIMER_RESULT());
}
+ TIMER_STOP();
+
+ size_t insert_tput = (double) (n - warmup) / (double) (TIMER_RESULT()) * 1e9;
extension->await_next_epoch();
size_t total = 0;
+ TIMER_START();
/* repeat the queries a bunch of times */
for (size_t l=0; l<10; l++) {
for (size_t j=0; j<queries.size(); j++) {
- TIMER_START();
auto q = queries[j];
auto res = extension->query(std::move(q));
total += res.get();
- TIMER_STOP();
- fprintf(stdout, "Q\t%ld\t%ld\n", sfs[i], TIMER_RESULT());
}
}
+ TIMER_STOP();
- fprintf(stdout, "S\t%ld\t%ld\t%ld\n", sfs[i], extension->get_shard_count(), total);
+ size_t query_lat = (double) TIMER_RESULT() / (10*queries.size());
+
+ fprintf(stdout, "S\t%ld\t%ld\t%ld\t%ld\t%ld\t%ld\t%ld\n", pol, sfs[i], extension->get_height(), extension->get_shard_count(), total, insert_tput, query_lat);
delete extension;
}
+ }
fflush(stderr);
}
diff --git a/include/framework/DynamicExtension.h b/include/framework/DynamicExtension.h
index 5a64243..1327559 100644
--- a/include/framework/DynamicExtension.h
+++ b/include/framework/DynamicExtension.h
@@ -296,6 +296,14 @@ public:
return t;
}
+
+
+ void print_structure() {
+ auto epoch = get_active_epoch();
+ epoch->get_structure()->print_structure();
+ end_job(epoch);
+ }
+
/**
* Create a new single Shard object containing all of the records
* within the framework (buffer and shards).
@@ -707,6 +715,7 @@ private:
return m_buffer->append(rec, ts);
}
+
//#ifdef _GNU_SOURCE
#if 0
void SetThreadAffinity() {
diff --git a/include/framework/reconstruction/LevelingPolicy.h b/include/framework/reconstruction/LevelingPolicy.h
index add28ba..89aa94b 100644
--- a/include/framework/reconstruction/LevelingPolicy.h
+++ b/include/framework/reconstruction/LevelingPolicy.h
@@ -54,7 +54,7 @@ public:
ReconstructionTask
get_flush_task(const Epoch<ShardType, QueryType> *epoch) const override {
return ReconstructionTask{
- {{buffer_shid}}, 0, m_buffer_size, ReconstructionType::Merge};
+ {{buffer_shid}, {0, all_shards_idx}}, 0, m_buffer_size, ReconstructionType::Merge};
}
private:
diff --git a/include/framework/structure/ExtensionStructure.h b/include/framework/structure/ExtensionStructure.h
index 078c4a9..29a45e5 100644
--- a/include/framework/structure/ExtensionStructure.h
+++ b/include/framework/structure/ExtensionStructure.h
@@ -161,13 +161,15 @@ public:
/* perform the reconstruction itself */
std::vector<const ShardType *> shards;
for (ShardID shid : task.sources) {
- assert(shid.level_idx < (level_index) m_levels.size());
+ assert(shid.level_idx <= (level_index) m_levels.size());
assert(shid.shard_idx >= -1);
if (shid == buffer_shid) {
assert(bv);
ShardType *buffer_shard = new ShardType(std::move(*bv));
shards.push_back(buffer_shard);
+ } else if (shid.level_idx == (level_index) m_levels.size()) {
+ continue;
} else if (shid.shard_idx == all_shards_idx) {
/* if unspecified, push all shards into the vector */
for (size_t i = 0; i < m_levels[shid.level_idx]->get_shard_count(); i++) {
@@ -188,6 +190,8 @@ public:
for (ShardID shid : task.sources) {
if (shid == buffer_shid) {
continue;
+ } else if (shid.level_idx == (level_index) m_levels.size()) {
+ continue;
} else if (shid.shard_idx == all_shards_idx) {
m_levels[shid.level_idx]->truncate();
} else {
@@ -315,6 +319,22 @@ public:
return ts_prop <= (long double) max_delete_prop;
}
+ void print_structure() const {
+ for (size_t i=0; i<m_levels.size(); i++) {
+ fprintf(stdout, "[%ld]:\t", i);
+
+ if (m_levels[i]) {
+ for (size_t j=0; j<m_levels[i]->get_shard_count(); j++) {
+ fprintf(stdout, "(%ld: %ld) ", j, m_levels[i]->get_shard(j)->get_record_count());
+ }
+ } else {
+ fprintf(stdout, "[Empty]");
+ }
+
+ fprintf(stdout, "\n");
+ }
+ }
+
private:
std::atomic<size_t> m_refcnt;
LevelVector m_levels;
diff --git a/include/util/types.h b/include/util/types.h
index e67c486..72b3d29 100644
--- a/include/util/types.h
+++ b/include/util/types.h
@@ -111,7 +111,11 @@ public:
void add_reconstruction(level_index source, level_index target,
size_t reccnt, ReconstructionType type) {
- m_tasks.push_back({{{source, all_shards_idx}}, target, reccnt});
+ if (type == ReconstructionType::Merge) {
+ m_tasks.push_back({{{source, all_shards_idx}, {target, all_shards_idx}}, target, reccnt});
+ } else {
+ m_tasks.push_back({{{source, all_shards_idx}}, target, reccnt});
+ }
total_reccnt += reccnt;
}