Skip to content
This repository has been archived by the owner on Feb 20, 2023. It is now read-only.

Commit

Permalink
Pilot Preparation (#1309)
Browse files Browse the repository at this point in the history
  • Loading branch information
rabbit721 authored Dec 31, 2020
1 parent 7182657 commit a84af21
Show file tree
Hide file tree
Showing 22 changed files with 966 additions and 1 deletion.
5 changes: 5 additions & 0 deletions src/include/common/error/exception.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ namespace noisepage {
#define EXECUTION_EXCEPTION(msg, code) ExecutionException(msg, __FILE__, __LINE__, (code))
#define BINDER_EXCEPTION(msg, code) BinderException(msg, __FILE__, __LINE__, (code))
#define SETTINGS_EXCEPTION(msg, code) SettingsException(msg, __FILE__, __LINE__, (code))
#define PILOT_EXCEPTION(msg, code) PilotException(msg, __FILE__, __LINE__, (code))

/**
* Exception types
Expand All @@ -40,6 +41,7 @@ enum class ExceptionType : uint8_t {
CATALOG,
CONVERSION,
MESSENGER,
PILOT,
NETWORK,
PARSER,
SETTINGS,
Expand Down Expand Up @@ -98,6 +100,8 @@ class Exception : public std::runtime_error {
return "Optimizer";
case ExceptionType::EXECUTION:
return "Execution";
case ExceptionType::PILOT:
return "Pilot";
default:
return "Unknown exception type";
}
Expand Down Expand Up @@ -166,6 +170,7 @@ DEFINE_EXCEPTION(AbortException, ExceptionType::EXECUTION);
DEFINE_EXCEPTION_WITH_ERRCODE(ExecutionException, ExceptionType::EXECUTION);
DEFINE_EXCEPTION_WITH_ERRCODE(BinderException, ExceptionType::BINDER);
DEFINE_EXCEPTION_WITH_ERRCODE(SettingsException, ExceptionType::SETTINGS);
DEFINE_EXCEPTION_WITH_ERRCODE(PilotException, ExceptionType::PILOT);

/**
* Specialized Parser exception since we want a cursor position to get more verbose output
Expand Down
2 changes: 2 additions & 0 deletions src/include/execution/compiler/executable_query.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
namespace noisepage {
namespace selfdriving {
class PipelineOperatingUnits;
class PilotUtil;
} // namespace selfdriving

namespace execution {
Expand Down Expand Up @@ -198,6 +199,7 @@ class ExecutableQuery {
// MiniRunners needs to set query_identifier and pipeline_operating_units_.
friend class noisepage::runner::MiniRunners;
friend class noisepage::runner::MiniRunners_SEQ0_OutputRunners_Benchmark;
friend class noisepage::selfdriving::PilotUtil;
friend class noisepage::runner::MiniRunners_SEQ10_0_IndexInsertRunners_Benchmark;
};

Expand Down
27 changes: 27 additions & 0 deletions src/include/loggers/selfdriving_logger.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#pragma once

#include "loggers/loggers_util.h"

#ifdef NOISEPAGE_USE_LOGGING

namespace noisepage::selfdriving {
extern common::SanctionedSharedPtr<spdlog::logger>::Ptr selfdriving_logger;

void InitSelfDrivingLogger();
} // namespace noisepage::selfdriving

#define SELFDRIVING_LOG_TRACE(...) ::noisepage::selfdriving::selfdriving_logger->trace(__VA_ARGS__);
#define SELFDRIVING_LOG_DEBUG(...) ::noisepage::selfdriving::selfdriving_logger->debug(__VA_ARGS__);
#define SELFDRIVING_LOG_INFO(...) ::noisepage::selfdriving::selfdriving_logger->info(__VA_ARGS__);
#define SELFDRIVING_LOG_WARN(...) ::noisepage::selfdriving::selfdriving_logger->warn(__VA_ARGS__);
#define SELFDRIVING_LOG_ERROR(...) ::noisepage::selfdriving::selfdriving_logger->error(__VA_ARGS__);

#else

#define SELFDRIVING_LOG_TRACE(...) ((void)0)
#define SELFDRIVING_LOG_DEBUG(...) ((void)0)
#define SELFDRIVING_LOG_INFO(...) ((void)0)
#define SELFDRIVING_LOG_WARN(...) ((void)0)
#define SELFDRIVING_LOG_ERROR(...) ((void)0)

#endif
41 changes: 41 additions & 0 deletions src/include/main/db_main.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include "network/postgres/postgres_protocol_interpreter.h"
#include "optimizer/statistics/stats_storage.h"
#include "self_driving/model_server/model_server_manager.h"
#include "self_driving/pilot/pilot.h"
#include "self_driving/pilot/pilot_thread.h"
#include "settings/settings_manager.h"
#include "settings/settings_param.h"
#include "storage/garbage_collector_thread.h"
Expand Down Expand Up @@ -432,6 +434,19 @@ class DBMain {
std::make_unique<modelserver::ModelServerManager>(model_server_path_, messenger_layer->GetMessenger());
}

std::unique_ptr<selfdriving::PilotThread> pilot_thread = DISABLED;
std::unique_ptr<selfdriving::Pilot> pilot = DISABLED;
if (use_pilot_thread_) {
NOISEPAGE_ASSERT(model_server_enable_, "Pilot requires model server manager.");
pilot = std::make_unique<selfdriving::Pilot>(
model_save_path_, common::ManagedPointer(catalog_layer->GetCatalog()),
common::ManagedPointer(metrics_thread), common::ManagedPointer(model_server_manager),
common::ManagedPointer(settings_manager), common::ManagedPointer(stats_storage),
common::ManagedPointer(txn_layer->GetTransactionManager()), workload_forecast_interval_);
pilot_thread = std::make_unique<selfdriving::PilotThread>(
common::ManagedPointer(pilot), std::chrono::microseconds{pilot_interval_}, pilot_planning_);
}

db_main->settings_manager_ = std::move(settings_manager);
db_main->metrics_manager_ = std::move(metrics_manager);
db_main->metrics_thread_ = std::move(metrics_thread);
Expand All @@ -446,6 +461,8 @@ class DBMain {
db_main->execution_layer_ = std::move(execution_layer);
db_main->traffic_cop_ = std::move(traffic_cop);
db_main->network_layer_ = std::move(network_layer);
db_main->pilot_thread_ = std::move(pilot_thread);
db_main->pilot_ = std::move(pilot);
db_main->model_server_manager_ = std::move(model_server_manager);
db_main->messenger_layer_ = std::move(messenger_layer);

Expand Down Expand Up @@ -768,6 +785,11 @@ class DBMain {
uint64_t wal_persist_threshold_ = static_cast<uint64_t>(1 << 20);
bool use_logging_ = false;
bool use_gc_ = false;
bool use_pilot_thread_ = false;
bool pilot_planning_ = false;
uint64_t pilot_interval_ = 1e7;
uint64_t workload_forecast_interval_ = 1e7;
std::string model_save_path_;
bool use_catalog_ = false;
bool create_default_database_ = true;
uint64_t block_store_size_ = 1e5;
Expand Down Expand Up @@ -823,8 +845,13 @@ class DBMain {

use_metrics_ = settings_manager->GetBool(settings::Param::metrics);
use_metrics_thread_ = settings_manager->GetBool(settings::Param::use_metrics_thread);
use_pilot_thread_ = settings_manager->GetBool(settings::Param::use_pilot_thread);
pilot_planning_ = settings_manager->GetBool(settings::Param::pilot_planning);

gc_interval_ = settings_manager->GetInt(settings::Param::gc_interval);
pilot_interval_ = settings_manager->GetInt64(settings::Param::pilot_interval);
workload_forecast_interval_ = settings_manager->GetInt64(settings::Param::workload_forecast_interval);
model_save_path_ = settings_manager->GetString(settings::Param::model_save_path);

uds_file_directory_ = settings_manager->GetString(settings::Param::uds_file_directory);
// TODO(WAN): open an issue for handling settings.
Expand Down Expand Up @@ -939,6 +966,18 @@ class DBMain {
return common::ManagedPointer(gc_thread_);
}

/**
* @return ManagedPointer to the component, can be nullptr if disabled
*/
common::ManagedPointer<selfdriving::Pilot> GetPilot() const { return common::ManagedPointer(pilot_); }

/**
* @return ManagedPointer to the component, can be nullptr if disabled
*/
common::ManagedPointer<selfdriving::PilotThread> GetPilotThread() const {
return common::ManagedPointer(pilot_thread_);
}

/**
* @return ManagedPointer to the component, can be nullptr if disabled
*/
Expand Down Expand Up @@ -986,6 +1025,8 @@ class DBMain {
std::unique_ptr<ExecutionLayer> execution_layer_;
std::unique_ptr<trafficcop::TrafficCop> traffic_cop_;
std::unique_ptr<NetworkLayer> network_layer_;
std::unique_ptr<selfdriving::PilotThread> pilot_thread_;
std::unique_ptr<selfdriving::Pilot> pilot_;
std::unique_ptr<modelserver::ModelServerManager> model_server_manager_;
std::unique_ptr<MessengerLayer> messenger_layer_;
};
Expand Down
4 changes: 4 additions & 0 deletions src/include/metrics/pipeline_metric.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
#include "self_driving/modeling/operating_unit_util.h"
#include "transaction/transaction_defs.h"

namespace noisepage::selfdriving {
class PilotUtil;
}
namespace noisepage::metrics {

/**
Expand Down Expand Up @@ -83,6 +86,7 @@ class PipelineMetricRawData : public AbstractRawData {

private:
friend class PipelineMetric;
friend class selfdriving::PilotUtil;
FRIEND_TEST(MetricsTests, PipelineCSVTest);
struct PipelineData;

Expand Down
50 changes: 50 additions & 0 deletions src/include/self_driving/forecast/workload_forecast.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#pragma once

#include <map>
#include <memory>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>

#include "parser/expression/constant_value_expression.h"
#include "self_driving/forecast/workload_forecast_segment.h"

namespace noisepage::selfdriving {

/**
* Breaking predicted queries passed in by the Pilot into segments by their associated timestamps
* Executing each query while extracting pipeline features
*/
class WorkloadForecast {
public:
/**
* Constructor for WorkloadForecast
* @param forecast_interval Interval used to partition the queries into segments
*
*/
explicit WorkloadForecast(uint64_t forecast_interval);

private:
friend class PilotUtil;

void LoadQueryTrace();
void LoadQueryText();
void CreateSegments();

std::multimap<uint64_t, execution::query_id_t> query_timestamp_to_id_;
std::unordered_map<execution::query_id_t, std::vector<std::vector<parser::ConstantValueExpression>>>
query_id_to_params_;
std::unordered_map<execution::query_id_t, std::vector<type::TypeId>> query_id_to_param_types_;
std::unordered_map<execution::query_id_t, std::string> query_id_to_text_;
std::unordered_map<std::string, execution::query_id_t> query_text_to_id_;
std::unordered_map<execution::query_id_t, uint64_t> query_id_to_dboid_;
uint64_t num_sample_{5};

std::vector<WorkloadForecastSegment> forecast_segments_;
uint64_t num_forecast_segment_{0};
uint64_t forecast_interval_;
uint64_t optimizer_timeout_{10000000};
};

} // namespace noisepage::selfdriving
26 changes: 26 additions & 0 deletions src/include/self_driving/forecast/workload_forecast_segment.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#pragma once

#include <queue>
#include <tuple>
#include <unordered_map>
#include <utility>

#include "execution/exec_defs.h"

namespace noisepage::selfdriving {
/**
* Contains query ids and number of executions for each query for queries predicted to be in this time interval
*/
class WorkloadForecastSegment {
public:
/**
* Constructor for WorkloadForecastSegment
* @param id_to_num_exec Map from qids to number of execution of this query in this interval
*/
explicit WorkloadForecastSegment(std::unordered_map<execution::query_id_t, uint64_t> id_to_num_exec);

private:
std::unordered_map<execution::query_id_t, uint64_t> id_to_num_exec_;
};

} // namespace noisepage::selfdriving
15 changes: 15 additions & 0 deletions src/include/self_driving/modeling/operating_unit.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,21 @@ class ExecutionOperatingUnitFeature {
num_loops_(other.num_loops_),
num_concurrent_(other.num_concurrent_) {}

/**
* Returns a vector of doubles consisting of 7 features starting with num_rows
*/
std::vector<double> GetAllAttributes() const {
std::vector<double> all_attributes;
all_attributes.push_back(num_rows_);
all_attributes.push_back(key_size_);
all_attributes.push_back(num_keys_);
all_attributes.push_back(cardinality_);
all_attributes.push_back(GetMemFactor());
all_attributes.push_back(num_loops_);
all_attributes.push_back(num_concurrent_);
return all_attributes;
}

/** @return The ID of the translator for this ExecutionOperatingUnitFeature. */
execution::translator_id_t GetTranslatorId() const { return translator_id_; }

Expand Down
106 changes: 106 additions & 0 deletions src/include/self_driving/pilot/pilot.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#pragma once

#include <fstream>
#include <iostream>
#include <map>
#include <memory>
#include <string>
#include <tuple>
#include <unordered_map>
#include <utility>
#include <vector>

#include "catalog/catalog.h"
#include "common/action_context.h"
#include "common/macros.h"
#include "common/managed_pointer.h"
#include "execution/exec_defs.h"
#include "self_driving/forecast/workload_forecast.h"

namespace noisepage {
namespace messenger {
class Messenger;
}

namespace metrics {
class MetricsThread;
}

namespace modelserver {
class ModelServerManager;
}

namespace optimizer {
class StatsStorage;
}

namespace settings {
class SettingsManager;
}

namespace transaction {
class TransactionManager;
}

} // namespace noisepage

namespace noisepage::selfdriving {
class PilotUtil;

/**
* The pilot processes the query trace predictions by executing them and extracting pipeline features
*/
class Pilot {
protected:
/** @return Name of the environment variable to be set as the absolute path of build directory */
static constexpr const char *BUILD_ABS_PATH = "BUILD_ABS_PATH";

public:
/**
* Constructor for Pilot
* @param model_save_path model save path
* @param catalog catalog
* @param metrics_thread metrics thread for metrics manager
* @param model_server_manager model server manager
* @param settings_manager settings manager
* @param stats_storage stats_storage
* @param txn_manager transaction manager
* @param workload_forecast_interval Interval used in the forecastor
*/
Pilot(std::string model_save_path, common::ManagedPointer<catalog::Catalog> catalog,
common::ManagedPointer<metrics::MetricsThread> metrics_thread,
common::ManagedPointer<modelserver::ModelServerManager> model_server_manager,
common::ManagedPointer<settings::SettingsManager> settings_manager,
common::ManagedPointer<optimizer::StatsStorage> stats_storage,
common::ManagedPointer<transaction::TransactionManager> txn_manager, uint64_t workload_forecast_interval);

/**
* Performs Pilot Logic, load and execute the predict queries while extracting pipeline features
*/
void PerformPlanning();

private:
/**
* WorkloadForecast object performing the query execution and feature gathering
*/
std::unique_ptr<selfdriving::WorkloadForecast> forecast_;

/**
* Empty Setter Callback for setting bool value for flags
*/
static void EmptySetterCallback(common::ManagedPointer<common::ActionContext> action_context UNUSED_ATTRIBUTE) {}

void ExecuteForecast();

std::string model_save_path_;
common::ManagedPointer<catalog::Catalog> catalog_;
common::ManagedPointer<metrics::MetricsThread> metrics_thread_;
common::ManagedPointer<modelserver::ModelServerManager> model_server_manager_;
common::ManagedPointer<settings::SettingsManager> settings_manager_;
common::ManagedPointer<optimizer::StatsStorage> stats_storage_;
common::ManagedPointer<transaction::TransactionManager> txn_manager_;
uint64_t workload_forecast_interval_{10000000};
friend class noisepage::selfdriving::PilotUtil;
};

} // namespace noisepage::selfdriving
Loading

0 comments on commit a84af21

Please sign in to comment.