Skip to content

Commit 490280f

Browse files
committed
Add benchmarks
1 parent 615dcd1 commit 490280f

File tree

11 files changed

+357
-26
lines changed

11 files changed

+357
-26
lines changed

benchmark/runner/ssb_benchmark.cpp

+121
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
#include <tbb/task_scheduler_init.h>
2+
3+
#include "benchmark/benchmark.h"
4+
#include "common/scoped_timer.h"
5+
#include "execution/execution_util.h"
6+
#include "execution/vm/module.h"
7+
#include "main/db_main.h"
8+
#include "settings/settings_manager.h"
9+
#include "test_util/tpch/workload.h"
10+
11+
namespace noisepage::tpch {
12+
class SSBBenchmark : public benchmark::Fixture {
13+
public:
14+
const bool print_exec_info_ = true;
15+
const double threshold_ = 0.1;
16+
const uint64_t min_iterations_per_query_ = 10;
17+
const uint64_t max_iterations_per_query_ = 10;
18+
const int32_t threads_ = tbb::task_scheduler_init::default_num_threads();
19+
const execution::vm::ExecutionMode mode_ = execution::vm::ExecutionMode::Interpret;
20+
std::unique_ptr<DBMain> db_main_;
21+
std::unique_ptr<tpch::Workload> ssb_workload_;
22+
23+
const std::string ssb_table_root_ = "/Users/dpatra/Research/NoisePage-Support/SSB/SF0.01/";
24+
const std::string ssb_database_name_ = "ssb_db";
25+
26+
void SetUp(const benchmark::State &state) final {
27+
execution::ExecutionUtil::InitTPL();
28+
29+
// Set up database
30+
std::unordered_map<settings::Param, settings::ParamInfo> param_map;
31+
settings::SettingsManager::ConstructParamMap(param_map);
32+
auto db_main_builder = DBMain::Builder().SetUseGC(true)
33+
.SetUseCatalog(true)
34+
.SetUseGCThread(true)
35+
.SetUseSettingsManager(true)
36+
.SetSettingsParameterMap(std::move(param_map));
37+
db_main_ = db_main_builder.Build();
38+
39+
// Set up metrics manager
40+
auto metrics_manager = db_main_->GetMetricsManager();
41+
metrics_manager->EnableMetric(metrics::MetricsComponent::EXECUTION_PIPELINE);
42+
metrics_manager->SetMetricSampleInterval(metrics::MetricsComponent::EXECUTION_PIPELINE, 0);
43+
44+
// Load the TPCH tables and compile the queries
45+
ssb_workload_ =
46+
std::make_unique<tpch::Workload>(common::ManagedPointer<DBMain>(db_main_), ssb_database_name_,
47+
ssb_table_root_, Workload::BenchmarkType::SSB, threads_);
48+
}
49+
50+
void TearDown(const benchmark::State &state) final {
51+
execution::ExecutionUtil::ShutdownTPL();
52+
// free db main here so we don't need to use the loggers anymore
53+
db_main_.reset();
54+
}
55+
};
56+
57+
// NOLINTNEXTLINE
58+
BENCHMARK_DEFINE_F(SSBBenchmark, StabilizeBenchmark)(benchmark::State &state) {
59+
// Run benchmark for each query independently
60+
auto num_queries = ssb_workload_->GetQueryNum();
61+
62+
for (auto _ : state) {
63+
// Overall totals
64+
uint64_t queries_run = 0, total_time = 0;
65+
for (uint32_t i = 0; i < num_queries; i++) {
66+
// Single query running totals
67+
double old_avg = 0, avg = 0;
68+
double total = 0;
69+
uint64_t iterations = 0;
70+
// Iterate at least until min_iterations_per_query and at most until max_iterations_per_query and until average
71+
// stabilizes
72+
while ((iterations < min_iterations_per_query_) ||
73+
((abs(avg - old_avg) > threshold_) && (iterations < max_iterations_per_query_))) {
74+
old_avg = avg;
75+
total += ssb_workload_->TimeQuery(i, mode_, print_exec_info_);
76+
iterations++;
77+
avg = total/iterations;
78+
}
79+
80+
if (print_exec_info_) {
81+
std::cout << ssb_workload_->GetQueryName(i) << " took " << iterations
82+
<< " iterations with an average execution time of " << avg << std::endl;
83+
}
84+
85+
queries_run += iterations;
86+
total_time += total;
87+
}
88+
state.SetIterationTime(total_time);
89+
state.SetItemsProcessed(queries_run);
90+
}
91+
92+
// Free the workload here so we don't need to use the loggers anymore
93+
ssb_workload_.reset();
94+
}
95+
96+
// NOLINTNEXTLINE
97+
BENCHMARK_DEFINE_F(SSBBenchmark, RuntimeBenchmark)(benchmark::State &state) {
98+
// Run benchmark for each query independently
99+
auto num_queries = ssb_workload_->GetQueryNum();
100+
101+
for (auto _ : state) {
102+
// Overall totals
103+
uint64_t queries_run = 0, total_time = 0;
104+
for (uint64_t iterations = 0; iterations < min_iterations_per_query_; iterations++) {
105+
// Iterate to min_iterations_per_query
106+
for (uint32_t i = 0; i < num_queries; i++) {
107+
total_time += ssb_workload_->TimeQuery(i, mode_, print_exec_info_);
108+
queries_run++;
109+
}
110+
}
111+
state.SetIterationTime(total_time);
112+
state.SetItemsProcessed(queries_run);
113+
}
114+
115+
// Free the workload here so we don't need to use the loggers anymore
116+
ssb_workload_.reset();
117+
}
118+
119+
// BENCHMARK_REGISTER_F(SSBBenchmark, StabilizeBenchmark)->Unit(benchmark::kMillisecond)->UseManualTime()->Iterations(1);
120+
BENCHMARK_REGISTER_F(SSBBenchmark, RuntimeBenchmark)->Unit(benchmark::kMillisecond)->UseManualTime()->Iterations(1);
121+
} // namespace terrier::runner

benchmark/runner/tpch_benchmark.cpp

+121
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
#include <tbb/task_scheduler_init.h>
2+
3+
#include "benchmark/benchmark.h"
4+
#include "common/scoped_timer.h"
5+
#include "execution/execution_util.h"
6+
#include "execution/vm/module.h"
7+
#include "main/db_main.h"
8+
#include "settings/settings_manager.h"
9+
#include "test_util/tpch/workload.h"
10+
11+
namespace noisepage::tpch {
12+
class TPCHBenchmark : public benchmark::Fixture {
13+
public:
14+
const bool print_exec_info_ = true;
15+
const double threshold_ = 0.1;
16+
const uint64_t min_iterations_per_query_ = 10;
17+
const uint64_t max_iterations_per_query_ = 10;
18+
const int32_t threads_ = tbb::task_scheduler_init::default_num_threads();
19+
const execution::vm::ExecutionMode mode_ = execution::vm::ExecutionMode::Interpret;
20+
21+
std::unique_ptr<DBMain> db_main_;
22+
std::unique_ptr<tpch::Workload> tpch_workload_;
23+
24+
const std::string tpch_table_root_ = "/Users/dpatra/Research/NoisePage-Support/TPCH/SF0.01/";
25+
const std::string tpch_database_name_ = "tpch_db";
26+
27+
void SetUp(const benchmark::State &state) final {
28+
execution::ExecutionUtil::InitTPL();
29+
30+
// Set up database
31+
std::unordered_map<settings::Param, settings::ParamInfo> param_map;
32+
settings::SettingsManager::ConstructParamMap(param_map);
33+
auto db_main_builder = DBMain::Builder().SetUseGC(true)
34+
.SetUseCatalog(true)
35+
.SetUseGCThread(true)
36+
.SetUseSettingsManager(true)
37+
.SetSettingsParameterMap(std::move(param_map));
38+
db_main_ = db_main_builder.Build();
39+
40+
// Set up metrics manager
41+
auto metrics_manager = db_main_->GetMetricsManager();
42+
metrics_manager->EnableMetric(metrics::MetricsComponent::EXECUTION_PIPELINE);
43+
metrics_manager->SetMetricSampleInterval(metrics::MetricsComponent::EXECUTION_PIPELINE, 0);
44+
45+
// Load the TPCH tables and compile the queries
46+
tpch_workload_ = std::make_unique<tpch::Workload>(common::ManagedPointer<DBMain>(db_main_), tpch_database_name_,
47+
tpch_table_root_, tpch::Workload::BenchmarkType::TPCH, threads_);
48+
}
49+
50+
void TearDown(const benchmark::State &state) final {
51+
execution::ExecutionUtil::ShutdownTPL();
52+
// free db main here so we don't need to use the loggers anymore
53+
db_main_.reset();
54+
}
55+
};
56+
57+
// NOLINTNEXTLINE
58+
BENCHMARK_DEFINE_F(TPCHBenchmark, StabilizeBenchmark)(benchmark::State &state) {
59+
// Run benchmark for each query independently
60+
auto num_queries = tpch_workload_->GetQueryNum();
61+
62+
for (auto _ : state) {
63+
// Overall totals
64+
uint64_t queries_run = 0, total_time = 0;
65+
for (uint32_t i = 0; i < num_queries; i++) {
66+
// Single query running totals
67+
double old_avg = 0, avg = 0;
68+
double total = 0;
69+
uint64_t iterations = 0;
70+
// Iterate at least until min_iterations_per_query and at most until max_iterations_per_query and until average
71+
// stabilizes
72+
while ((iterations < min_iterations_per_query_) ||
73+
((abs(avg - old_avg) > threshold_) && (iterations < max_iterations_per_query_))) {
74+
old_avg = avg;
75+
total += tpch_workload_->TimeQuery(i, mode_, print_exec_info_);
76+
iterations++;
77+
avg = total / iterations;
78+
}
79+
80+
if (print_exec_info_) {
81+
std::cout << tpch_workload_->GetQueryName(i) << " took " << iterations
82+
<< " iterations with an average execution time of " << avg << std::endl;
83+
}
84+
85+
queries_run += iterations;
86+
total_time += total;
87+
}
88+
state.SetIterationTime(total_time);
89+
state.SetItemsProcessed(queries_run);
90+
}
91+
92+
// Free the workload here so we don't need to use the loggers anymore
93+
tpch_workload_.reset();
94+
}
95+
96+
// NOLINTNEXTLINE
97+
BENCHMARK_DEFINE_F(TPCHBenchmark, RuntimeBenchmark)(benchmark::State &state) {
98+
// Run benchmark for each query independently
99+
auto num_queries = tpch_workload_->GetQueryNum();
100+
101+
for (auto _ : state) {
102+
// Overall totals
103+
uint64_t queries_run = 0, total_time = 0;
104+
for (uint64_t iterations = 0; iterations < min_iterations_per_query_; iterations++) {
105+
// Iterate to min_iterations_per_query
106+
for (uint32_t i = 0; i < num_queries; i++) {
107+
total_time += tpch_workload_->TimeQuery(i, mode_, print_exec_info_);
108+
queries_run++;
109+
}
110+
}
111+
state.SetIterationTime(total_time);
112+
state.SetItemsProcessed(queries_run);
113+
}
114+
115+
// Free the workload here so we don't need to use the loggers anymore
116+
tpch_workload_.reset();
117+
}
118+
119+
//BENCHMARK_REGISTER_F(TPCHBenchmark, StabilizeBenchmark)->Unit(benchmark::kMillisecond)->UseManualTime()->Iterations(1);
120+
BENCHMARK_REGISTER_F(TPCHBenchmark, RuntimeBenchmark)->Unit(benchmark::kMillisecond)->UseManualTime()->Iterations(1);
121+
} // namespace tpch

benchmark/runner/tpch_runner.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ BENCHMARK_DEFINE_F(TPCHRunner, Runner)(benchmark::State &state) {
6666
UNREACHABLE("Unimplemented Benchmark Type");
6767
}
6868
workload_ = std::make_unique<tpch::Workload>(common::ManagedPointer<DBMain>(db_main_), tpch_database_name_,
69-
table_root, type_);
69+
table_root, type_, total_num_threads_);
7070

7171
int8_t num_thread_start;
7272
uint32_t query_num_start, repeat_num;

src/execution/compiler/executable_query.cpp

+7-3
Original file line numberDiff line numberDiff line change
@@ -160,9 +160,13 @@ void ExecutableQuery::Run(common::ManagedPointer<exec::ExecutionContext> exec_ct
160160
exec_ctx->SetPipelineOperatingUnits(GetPipelineOperatingUnits());
161161
exec_ctx->SetQueryId(query_id_);
162162

163-
// Now run through fragments.
164-
for (const auto &fragment : fragments_) {
165-
fragment->Run(query_state.get(), mode);
163+
double elapsed_ms;
164+
{
165+
util::ScopedTimer timer(&elapsed_ms);
166+
// Now run through fragments.
167+
for (const auto &fragment : fragments_) {
168+
fragment->Run(query_state.get(), mode);
169+
}
166170
}
167171

168172
// We do not currently re-use ExecutionContexts. However, this is unset to help ensure

src/execution/sql/aggregation_hash_table.cpp

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
#include "execution/sql/aggregation_hash_table.h"
22

33
#include <tbb/parallel_for_each.h>
4-
#include <tbb/task_scheduler_init.h>
54

65
#include <algorithm>
76
#include <memory>
@@ -670,7 +669,7 @@ void AggregationHashTable::ExecuteParallelPartitionedScan(void *query_state, Thr
670669
util::Timer<std::milli> timer;
671670
timer.Start();
672671

673-
size_t num_threads = tbb::task_scheduler_init::default_num_threads();
672+
size_t num_threads = exec_ctx_->GetExecutionSettings().GetNumberOfParallelExecutionThreads();
674673
size_t num_tasks = nonempty_parts.size();
675674
size_t concurrent_estimate = std::min(num_threads, num_tasks);
676675
exec_ctx_->SetNumConcurrentEstimate(concurrent_estimate);

src/execution/sql/join_hash_table.cpp

+1-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
#include <llvm/ADT/STLExtras.h>
44
#include <tbb/parallel_for_each.h>
5-
#include <tbb/task_scheduler_init.h>
65

76
#include <algorithm>
87
#include <limits>
@@ -593,7 +592,7 @@ void JoinHashTable::MergeParallel(ThreadStateContainer *thread_state_container,
593592
EXECUTION_LOG_TRACE("JHT: Estimated {} elements >= {} element parallel threshold. Using parallel merge.",
594593
num_elem_estimate, DEFAULT_MIN_SIZE_FOR_PARALLEL_MERGE);
595594

596-
size_t num_threads = tbb::task_scheduler_init::default_num_threads();
595+
size_t num_threads = exec_ctx_->GetExecutionSettings().GetNumberOfParallelExecutionThreads();
597596
size_t num_tasks = tl_join_tables.size();
598597
auto estimate = std::min(num_threads, num_tasks);
599598
exec_ctx_->SetNumConcurrentEstimate(estimate);

src/execution/sql/sorter.cpp

+2-4
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
#include <llvm/ADT/STLExtras.h>
44
#include <tbb/parallel_for_each.h>
5-
#include <tbb/task_scheduler_init.h>
65

76
#include <algorithm>
87
#include <queue>
@@ -234,9 +233,8 @@ void Sorter::SortParallel(ThreadStateContainer *thread_state_container, std::siz
234233
util::StageTimer<std::milli> timer;
235234
timer.EnterStage("Parallel Sort Thread-Local Instances");
236235

237-
tbb::task_scheduler_init sched;
238236
{
239-
size_t num_threads = tbb::task_scheduler_init::default_num_threads();
237+
size_t num_threads = exec_ctx_->GetExecutionSettings().GetNumberOfParallelExecutionThreads();
240238
size_t num_tasks = tl_sorters.size();
241239
size_t num_concurrent = std::min(num_threads, num_tasks);
242240
exec_ctx_->SetNumConcurrentEstimate(num_concurrent);
@@ -360,7 +358,7 @@ void Sorter::SortParallel(ThreadStateContainer *thread_state_container, std::siz
360358
};
361359

362360
{
363-
size_t num_threads = tbb::task_scheduler_init::default_num_threads();
361+
size_t num_threads = exec_ctx_->GetExecutionSettings().GetNumberOfParallelExecutionThreads();
364362
size_t num_tasks = merge_work.size();
365363
size_t concurrent = std::min(num_threads, num_tasks);
366364
exec_ctx_->SetNumConcurrentEstimate(concurrent);

test/include/test_util/tpch/workload.h

+29-4
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,39 @@ class Workload {
3939
enum class BenchmarkType : uint32_t { TPCH, SSB };
4040

4141
Workload(common::ManagedPointer<DBMain> db_main, const std::string &db_name, const std::string &table_root,
42-
enum BenchmarkType type);
42+
enum BenchmarkType type, int64_t threads);
4343

4444
/**
4545
* Function to invoke for a single worker thread to invoke the TPCH queries
46-
* @param worker_id 1-indexed thread id
47-
*/
46+
* @param execution_us_per_worker max execution time for single worker
47+
* @param avg_interval_us interval timing
48+
* @param query_num number of queries to run
49+
* @param mode execution mode
50+
* */
4851
void Execute(int8_t worker_id, uint64_t execution_us_per_worker, uint64_t avg_interval_us, uint32_t query_num,
4952
execution::vm::ExecutionMode mode);
50-
uint32_t GetQueryNum() { return query_and_plan_.size(); }
53+
54+
/**
55+
* Function to invoke a single TPCH query and collect runtime
56+
* @param query_ind index of query into query_and_plan_
57+
* @param avg_interval_us interval timing
58+
* @param mode execution mode
59+
* @param print_output boolean flag to determine whether timing output should be printed
60+
* @return time taken to run query
61+
*/
62+
uint64_t TimeQuery(int32_t query_ind, execution::vm::ExecutionMode mode, bool print_output = false);
63+
64+
/**
65+
* Function to get number of queries in plan
66+
* @return size of query plan vector
67+
*/
68+
uint32_t GetQueryNum() const { return query_and_plan_.size(); }
69+
70+
/**
71+
* Function to get number of queries in plan
72+
* @return size of query plan vector
73+
*/
74+
std::string GetQueryName(int32_t query_ind) const { return query_names_[query_ind]; }
5175

5276
private:
5377
void GenerateTables(execution::exec::ExecutionContext *exec_ctx, const std::string &dir_name,
@@ -67,6 +91,7 @@ class Workload {
6791
std::vector<
6892
std::tuple<std::unique_ptr<execution::compiler::ExecutableQuery>, std::unique_ptr<planner::AbstractPlanNode>>>
6993
query_and_plan_;
94+
std::vector<std::string> query_names_;
7095
};
7196

7297
} // namespace noisepage::tpch

0 commit comments

Comments
 (0)