summaryrefslogtreecommitdiffstats
path: root/benchmarks/tail-latency/selectivity_sweep.cpp
blob: 0fc0f42b6b9c96566e5c3dbc83582a1fa6c87430 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
/*
 *
 */

#include "framework/scheduling/SerialScheduler.h"
#include "framework/util/Configuration.h"
#include "util/types.h"
#define ENABLE_TIMER
#define TS_TEST

#include <thread>

#include "framework/DynamicExtension.h"
#include "framework/scheduling/FIFOScheduler.h"
#include "shard/ISAMTree.h"
#include "query/rangecount.h"
#include "framework/interface/Record.h"
#include "file_util.h"
#include "standard_benchmarks.h"

#include "framework/reconstruction/FixedShardCountPolicy.h"

#include <gsl/gsl_rng.h>

#include "psu-util/timer.h"


typedef de::Record<uint64_t, uint64_t> Rec;
typedef de::ISAMTree<Rec> Shard;
typedef de::rc::Query<Shard> Q;
typedef de::DynamicExtension<Shard, Q, de::DeletePolicy::TOMBSTONE, de::SerialScheduler> Ext;
typedef Q::Parameters QP;
typedef de::DEConfiguration<Shard, Q, de::DeletePolicy::TOMBSTONE, de::SerialScheduler> Conf;

void usage(char *progname) {
    fprintf(stderr, "%s reccnt\n", progname);
}

int main(int argc, char **argv) {

    if (argc < 2) {
        usage(argv[0]);
        exit(EXIT_FAILURE);
    }

    size_t n = atol(argv[1]);
    std::string d_fname = "unif";
    
    auto data = read_sosd_file<Rec>(d_fname, n);

    std::vector<std::vector<QP>> query_sets;
    std::vector<double> selectivities = {0.0000001}; //, 0.000001, 0.00001, 0.0001, 0.001, 0.01, 0.1, .25};

    for (auto sel: selectivities) {
        query_sets.push_back(generate_uniform_range_queries<QP>(100, n, sel));
    }

    std::vector<size_t> sfs = {2, 4, 8, 16, 32, 64, 128}; 
    size_t buffer_size = 8000;
    std::vector<size_t> policies = {0, 1};

    for (auto pol: policies) {
    for (size_t i=0; i<sfs.size(); i++) {
        auto policy = get_policy<Shard, Q>(sfs[i], buffer_size, pol, n);
        auto config = Conf(std::move(policy));
        config.recon_enable_maint_on_flush = false;
        config.recon_maint_disabled = true;
        config.buffer_flush_trigger = 4000;
        config.maximum_threads = 8;
        
        auto extension = new Ext(std::move(config));

        /* warmup structure w/ 10% of records */
        size_t warmup = .1 * n;
        for (size_t j=0; j<warmup; j++) {
            while (!extension->insert(data[j])) {
                usleep(1);
            }
        }

        extension->await_version();

        // fprintf(stderr, "\n[I] Running Insertion Benchmark\n");

        TIMER_INIT();

        TIMER_START();
        for (size_t j=warmup; j<data.size(); j++) {
            while (!extension->insert(data[j])) {
                usleep(1);
                fprintf(stderr, "insert blocked %ld\r", j);
            }
        }
        TIMER_STOP();
        auto total_insert_lat = TIMER_RESULT();

        // extension->print_structure();
        // fflush(stdout);

        // fprintf(stderr, "\n[I] Finished running insertion benchmark\n");
        extension->await_version();

        // fprintf(stderr, "[I] Running query benchmark\n");
        
        /* repeat the queries a bunch of times */

        auto insert_throughput =  (size_t) ((double) (n - warmup) / (double) total_insert_lat *1.0e9);
        fprintf(stdout, "%ld\t%ld\t%ld\t%ld\t", pol, sfs[i], n, insert_throughput);

        size_t total = 0;
        for (size_t l=0; l<query_sets.size(); l++) {
            TIMER_START();
            for (size_t f=0; f<query_sets[l].size()*10; f++) {
                auto q = query_sets[l][f%10];
                auto res = extension->query(std::move(q));
                total += res.get();
            }
            TIMER_STOP();
            auto query_latency = (TIMER_RESULT()) / (10*query_sets[l].size());
            fprintf(stdout, "%lf\t%ld\t", selectivities[l], query_latency);
        }

        fprintf(stdout, "\n");
        fprintf(stderr, "%ld\n", total);
        fflush(stdout);

        // extension->print_structure();
        delete extension;
    }
    }

    fflush(stderr);
}