diff --git a/src/include/common/error/exception.h b/src/include/common/error/exception.h index c39ab2e3d5..ab70e7c1b9 100644 --- a/src/include/common/error/exception.h +++ b/src/include/common/error/exception.h @@ -29,6 +29,7 @@ namespace noisepage { #define EXECUTION_EXCEPTION(msg, code) ExecutionException(msg, __FILE__, __LINE__, (code)) #define BINDER_EXCEPTION(msg, code) BinderException(msg, __FILE__, __LINE__, (code)) #define SETTINGS_EXCEPTION(msg, code) SettingsException(msg, __FILE__, __LINE__, (code)) +#define PILOT_EXCEPTION(msg, code) PilotException(msg, __FILE__, __LINE__, (code)) /** * Exception types @@ -40,6 +41,7 @@ enum class ExceptionType : uint8_t { CATALOG, CONVERSION, MESSENGER, + PILOT, NETWORK, PARSER, SETTINGS, @@ -98,6 +100,8 @@ class Exception : public std::runtime_error { return "Optimizer"; case ExceptionType::EXECUTION: return "Execution"; + case ExceptionType::PILOT: + return "Pilot"; default: return "Unknown exception type"; } @@ -166,6 +170,7 @@ DEFINE_EXCEPTION(AbortException, ExceptionType::EXECUTION); DEFINE_EXCEPTION_WITH_ERRCODE(ExecutionException, ExceptionType::EXECUTION); DEFINE_EXCEPTION_WITH_ERRCODE(BinderException, ExceptionType::BINDER); DEFINE_EXCEPTION_WITH_ERRCODE(SettingsException, ExceptionType::SETTINGS); +DEFINE_EXCEPTION_WITH_ERRCODE(PilotException, ExceptionType::PILOT); /** * Specialized Parser exception since we want a cursor position to get more verbose output diff --git a/src/include/execution/compiler/executable_query.h b/src/include/execution/compiler/executable_query.h index 324132d5d4..ebef7e2bb1 100644 --- a/src/include/execution/compiler/executable_query.h +++ b/src/include/execution/compiler/executable_query.h @@ -14,6 +14,7 @@ namespace noisepage { namespace selfdriving { class PipelineOperatingUnits; +class PilotUtil; } // namespace selfdriving namespace execution { @@ -198,6 +199,7 @@ class ExecutableQuery { // MiniRunners needs to set query_identifier and pipeline_operating_units_. friend class noisepage::runner::MiniRunners; friend class noisepage::runner::MiniRunners_SEQ0_OutputRunners_Benchmark; + friend class noisepage::selfdriving::PilotUtil; friend class noisepage::runner::MiniRunners_SEQ10_0_IndexInsertRunners_Benchmark; }; diff --git a/src/include/loggers/selfdriving_logger.h b/src/include/loggers/selfdriving_logger.h new file mode 100644 index 0000000000..45d90f2129 --- /dev/null +++ b/src/include/loggers/selfdriving_logger.h @@ -0,0 +1,27 @@ +#pragma once + +#include "loggers/loggers_util.h" + +#ifdef NOISEPAGE_USE_LOGGING + +namespace noisepage::selfdriving { +extern common::SanctionedSharedPtr::Ptr selfdriving_logger; + +void InitSelfDrivingLogger(); +} // namespace noisepage::selfdriving + +#define SELFDRIVING_LOG_TRACE(...) ::noisepage::selfdriving::selfdriving_logger->trace(__VA_ARGS__); +#define SELFDRIVING_LOG_DEBUG(...) ::noisepage::selfdriving::selfdriving_logger->debug(__VA_ARGS__); +#define SELFDRIVING_LOG_INFO(...) ::noisepage::selfdriving::selfdriving_logger->info(__VA_ARGS__); +#define SELFDRIVING_LOG_WARN(...) ::noisepage::selfdriving::selfdriving_logger->warn(__VA_ARGS__); +#define SELFDRIVING_LOG_ERROR(...) ::noisepage::selfdriving::selfdriving_logger->error(__VA_ARGS__); + +#else + +#define SELFDRIVING_LOG_TRACE(...) ((void)0) +#define SELFDRIVING_LOG_DEBUG(...) ((void)0) +#define SELFDRIVING_LOG_INFO(...) ((void)0) +#define SELFDRIVING_LOG_WARN(...) ((void)0) +#define SELFDRIVING_LOG_ERROR(...) ((void)0) + +#endif diff --git a/src/include/main/db_main.h b/src/include/main/db_main.h index 8f39d9b789..16ca613fe6 100644 --- a/src/include/main/db_main.h +++ b/src/include/main/db_main.h @@ -17,6 +17,8 @@ #include "network/postgres/postgres_protocol_interpreter.h" #include "optimizer/statistics/stats_storage.h" #include "self_driving/model_server/model_server_manager.h" +#include "self_driving/pilot/pilot.h" +#include "self_driving/pilot/pilot_thread.h" #include "settings/settings_manager.h" #include "settings/settings_param.h" #include "storage/garbage_collector_thread.h" @@ -432,6 +434,19 @@ class DBMain { std::make_unique(model_server_path_, messenger_layer->GetMessenger()); } + std::unique_ptr pilot_thread = DISABLED; + std::unique_ptr pilot = DISABLED; + if (use_pilot_thread_) { + NOISEPAGE_ASSERT(model_server_enable_, "Pilot requires model server manager."); + pilot = std::make_unique( + model_save_path_, common::ManagedPointer(catalog_layer->GetCatalog()), + common::ManagedPointer(metrics_thread), common::ManagedPointer(model_server_manager), + common::ManagedPointer(settings_manager), common::ManagedPointer(stats_storage), + common::ManagedPointer(txn_layer->GetTransactionManager()), workload_forecast_interval_); + pilot_thread = std::make_unique( + common::ManagedPointer(pilot), std::chrono::microseconds{pilot_interval_}, pilot_planning_); + } + db_main->settings_manager_ = std::move(settings_manager); db_main->metrics_manager_ = std::move(metrics_manager); db_main->metrics_thread_ = std::move(metrics_thread); @@ -446,6 +461,8 @@ class DBMain { db_main->execution_layer_ = std::move(execution_layer); db_main->traffic_cop_ = std::move(traffic_cop); db_main->network_layer_ = std::move(network_layer); + db_main->pilot_thread_ = std::move(pilot_thread); + db_main->pilot_ = std::move(pilot); db_main->model_server_manager_ = std::move(model_server_manager); db_main->messenger_layer_ = std::move(messenger_layer); @@ -768,6 +785,11 @@ class DBMain { uint64_t wal_persist_threshold_ = static_cast(1 << 20); bool use_logging_ = false; bool use_gc_ = false; + bool use_pilot_thread_ = false; + bool pilot_planning_ = false; + uint64_t pilot_interval_ = 1e7; + uint64_t workload_forecast_interval_ = 1e7; + std::string model_save_path_; bool use_catalog_ = false; bool create_default_database_ = true; uint64_t block_store_size_ = 1e5; @@ -823,8 +845,13 @@ class DBMain { use_metrics_ = settings_manager->GetBool(settings::Param::metrics); use_metrics_thread_ = settings_manager->GetBool(settings::Param::use_metrics_thread); + use_pilot_thread_ = settings_manager->GetBool(settings::Param::use_pilot_thread); + pilot_planning_ = settings_manager->GetBool(settings::Param::pilot_planning); gc_interval_ = settings_manager->GetInt(settings::Param::gc_interval); + pilot_interval_ = settings_manager->GetInt64(settings::Param::pilot_interval); + workload_forecast_interval_ = settings_manager->GetInt64(settings::Param::workload_forecast_interval); + model_save_path_ = settings_manager->GetString(settings::Param::model_save_path); uds_file_directory_ = settings_manager->GetString(settings::Param::uds_file_directory); // TODO(WAN): open an issue for handling settings. @@ -939,6 +966,18 @@ class DBMain { return common::ManagedPointer(gc_thread_); } + /** + * @return ManagedPointer to the component, can be nullptr if disabled + */ + common::ManagedPointer GetPilot() const { return common::ManagedPointer(pilot_); } + + /** + * @return ManagedPointer to the component, can be nullptr if disabled + */ + common::ManagedPointer GetPilotThread() const { + return common::ManagedPointer(pilot_thread_); + } + /** * @return ManagedPointer to the component, can be nullptr if disabled */ @@ -986,6 +1025,8 @@ class DBMain { std::unique_ptr execution_layer_; std::unique_ptr traffic_cop_; std::unique_ptr network_layer_; + std::unique_ptr pilot_thread_; + std::unique_ptr pilot_; std::unique_ptr model_server_manager_; std::unique_ptr messenger_layer_; }; diff --git a/src/include/metrics/pipeline_metric.h b/src/include/metrics/pipeline_metric.h index 2ca401ca36..4baf84e3bb 100644 --- a/src/include/metrics/pipeline_metric.h +++ b/src/include/metrics/pipeline_metric.h @@ -17,6 +17,9 @@ #include "self_driving/modeling/operating_unit_util.h" #include "transaction/transaction_defs.h" +namespace noisepage::selfdriving { +class PilotUtil; +} namespace noisepage::metrics { /** @@ -83,6 +86,7 @@ class PipelineMetricRawData : public AbstractRawData { private: friend class PipelineMetric; + friend class selfdriving::PilotUtil; FRIEND_TEST(MetricsTests, PipelineCSVTest); struct PipelineData; diff --git a/src/include/self_driving/forecast/workload_forecast.h b/src/include/self_driving/forecast/workload_forecast.h new file mode 100644 index 0000000000..800369d87e --- /dev/null +++ b/src/include/self_driving/forecast/workload_forecast.h @@ -0,0 +1,50 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "parser/expression/constant_value_expression.h" +#include "self_driving/forecast/workload_forecast_segment.h" + +namespace noisepage::selfdriving { + +/** + * Breaking predicted queries passed in by the Pilot into segments by their associated timestamps + * Executing each query while extracting pipeline features + */ +class WorkloadForecast { + public: + /** + * Constructor for WorkloadForecast + * @param forecast_interval Interval used to partition the queries into segments + * + */ + explicit WorkloadForecast(uint64_t forecast_interval); + + private: + friend class PilotUtil; + + void LoadQueryTrace(); + void LoadQueryText(); + void CreateSegments(); + + std::multimap query_timestamp_to_id_; + std::unordered_map>> + query_id_to_params_; + std::unordered_map> query_id_to_param_types_; + std::unordered_map query_id_to_text_; + std::unordered_map query_text_to_id_; + std::unordered_map query_id_to_dboid_; + uint64_t num_sample_{5}; + + std::vector forecast_segments_; + uint64_t num_forecast_segment_{0}; + uint64_t forecast_interval_; + uint64_t optimizer_timeout_{10000000}; +}; + +} // namespace noisepage::selfdriving diff --git a/src/include/self_driving/forecast/workload_forecast_segment.h b/src/include/self_driving/forecast/workload_forecast_segment.h new file mode 100644 index 0000000000..dad2f775ae --- /dev/null +++ b/src/include/self_driving/forecast/workload_forecast_segment.h @@ -0,0 +1,26 @@ +#pragma once + +#include +#include +#include +#include + +#include "execution/exec_defs.h" + +namespace noisepage::selfdriving { +/** + * Contains query ids and number of executions for each query for queries predicted to be in this time interval + */ +class WorkloadForecastSegment { + public: + /** + * Constructor for WorkloadForecastSegment + * @param id_to_num_exec Map from qids to number of execution of this query in this interval + */ + explicit WorkloadForecastSegment(std::unordered_map id_to_num_exec); + + private: + std::unordered_map id_to_num_exec_; +}; + +} // namespace noisepage::selfdriving diff --git a/src/include/self_driving/modeling/operating_unit.h b/src/include/self_driving/modeling/operating_unit.h index e060fbae7d..833ef93c6a 100644 --- a/src/include/self_driving/modeling/operating_unit.h +++ b/src/include/self_driving/modeling/operating_unit.h @@ -133,6 +133,21 @@ class ExecutionOperatingUnitFeature { num_loops_(other.num_loops_), num_concurrent_(other.num_concurrent_) {} + /** + * Returns a vector of doubles consisting of 7 features starting with num_rows + */ + std::vector GetAllAttributes() const { + std::vector all_attributes; + all_attributes.push_back(num_rows_); + all_attributes.push_back(key_size_); + all_attributes.push_back(num_keys_); + all_attributes.push_back(cardinality_); + all_attributes.push_back(GetMemFactor()); + all_attributes.push_back(num_loops_); + all_attributes.push_back(num_concurrent_); + return all_attributes; + } + /** @return The ID of the translator for this ExecutionOperatingUnitFeature. */ execution::translator_id_t GetTranslatorId() const { return translator_id_; } diff --git a/src/include/self_driving/pilot/pilot.h b/src/include/self_driving/pilot/pilot.h new file mode 100644 index 0000000000..dbd400cd7e --- /dev/null +++ b/src/include/self_driving/pilot/pilot.h @@ -0,0 +1,106 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "catalog/catalog.h" +#include "common/action_context.h" +#include "common/macros.h" +#include "common/managed_pointer.h" +#include "execution/exec_defs.h" +#include "self_driving/forecast/workload_forecast.h" + +namespace noisepage { +namespace messenger { +class Messenger; +} + +namespace metrics { +class MetricsThread; +} + +namespace modelserver { +class ModelServerManager; +} + +namespace optimizer { +class StatsStorage; +} + +namespace settings { +class SettingsManager; +} + +namespace transaction { +class TransactionManager; +} + +} // namespace noisepage + +namespace noisepage::selfdriving { +class PilotUtil; + +/** + * The pilot processes the query trace predictions by executing them and extracting pipeline features + */ +class Pilot { + protected: + /** @return Name of the environment variable to be set as the absolute path of build directory */ + static constexpr const char *BUILD_ABS_PATH = "BUILD_ABS_PATH"; + + public: + /** + * Constructor for Pilot + * @param model_save_path model save path + * @param catalog catalog + * @param metrics_thread metrics thread for metrics manager + * @param model_server_manager model server manager + * @param settings_manager settings manager + * @param stats_storage stats_storage + * @param txn_manager transaction manager + * @param workload_forecast_interval Interval used in the forecastor + */ + Pilot(std::string model_save_path, common::ManagedPointer catalog, + common::ManagedPointer metrics_thread, + common::ManagedPointer model_server_manager, + common::ManagedPointer settings_manager, + common::ManagedPointer stats_storage, + common::ManagedPointer txn_manager, uint64_t workload_forecast_interval); + + /** + * Performs Pilot Logic, load and execute the predict queries while extracting pipeline features + */ + void PerformPlanning(); + + private: + /** + * WorkloadForecast object performing the query execution and feature gathering + */ + std::unique_ptr forecast_; + + /** + * Empty Setter Callback for setting bool value for flags + */ + static void EmptySetterCallback(common::ManagedPointer action_context UNUSED_ATTRIBUTE) {} + + void ExecuteForecast(); + + std::string model_save_path_; + common::ManagedPointer catalog_; + common::ManagedPointer metrics_thread_; + common::ManagedPointer model_server_manager_; + common::ManagedPointer settings_manager_; + common::ManagedPointer stats_storage_; + common::ManagedPointer txn_manager_; + uint64_t workload_forecast_interval_{10000000}; + friend class noisepage::selfdriving::PilotUtil; +}; + +} // namespace noisepage::selfdriving diff --git a/src/include/self_driving/pilot/pilot_thread.h b/src/include/self_driving/pilot/pilot_thread.h new file mode 100644 index 0000000000..c29574fbb4 --- /dev/null +++ b/src/include/self_driving/pilot/pilot_thread.h @@ -0,0 +1,82 @@ +#pragma once + +#include //NOLINT +#include //NOLINT + +#include "self_driving/pilot/pilot.h" + +namespace noisepage::selfdriving { + +/** + * Class for spinning off a thread that runs the pilot to process query predictions. + * This should be used in most cases to enable/disable Pilot in the system. + */ +class PilotThread { + public: + /** + * @param pilot Pointer to the pilot object to be run on this thread + * @param pilot_period Sleep time between Pilot invocations + * @param pilot_planning if the pilot is enabled + */ + PilotThread(common::ManagedPointer pilot, std::chrono::microseconds pilot_period, + bool pilot_planning); + + ~PilotThread() { StopPilot(); } + + /** + * Kill the Pilot thread. + */ + void StopPilot() { + NOISEPAGE_ASSERT(run_pilot_, "Pilot should already be running."); + run_pilot_ = false; + pilot_paused_ = true; + pilot_thread_.join(); + } + + /** + * Spawn the Pilot thread if it has been previously stopped. + */ + void StartPilot() { + NOISEPAGE_ASSERT(!run_pilot_, "Pilot should not already be running."); + run_pilot_ = true; + pilot_paused_ = true; + pilot_thread_ = std::thread([this] { PilotThreadLoop(); }); + } + + /** + * Pause the Pilot from running, typically for use in tests when the state of tables need to be fixed. + */ + void DisablePilot() { + NOISEPAGE_ASSERT(!pilot_paused_, "Pilot should not already be paused."); + pilot_paused_ = true; + } + + /** + * Resume Pilot after being paused. + */ + void EnablePilot() { + NOISEPAGE_ASSERT(pilot_paused_, "Pilot should already be paused."); + pilot_paused_ = false; + } + + /** + * @return the underlying Pilot object, mostly to register indexes currently. + */ + common::ManagedPointer GetPilot() { return pilot_; } + + private: + const common::ManagedPointer pilot_; + volatile bool run_pilot_; + volatile bool pilot_paused_; + std::chrono::microseconds pilot_period_; + std::thread pilot_thread_; + + void PilotThreadLoop() { + while (run_pilot_) { + std::this_thread::sleep_for(pilot_period_); + if (!pilot_paused_) pilot_->PerformPlanning(); + } + } +}; + +} // namespace noisepage::selfdriving diff --git a/src/include/self_driving/pilot_util.h b/src/include/self_driving/pilot_util.h new file mode 100644 index 0000000000..1cac0eee12 --- /dev/null +++ b/src/include/self_driving/pilot_util.h @@ -0,0 +1,72 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "metrics/metrics_store.h" +#include "parser/expression/constant_value_expression.h" + +namespace noisepage { +namespace modelserver { +class ModelServerManager; +} + +namespace transaction { +class TransactionManager; +} + +} // namespace noisepage + +namespace noisepage::selfdriving { +class WorkloadForecast; +class Pilot; + +/** + * Utility class for helper functions + */ +class PilotUtil { + public: + /** + * Executing forecasted queries and collect pipeline features for cost estimation to be used in action selection + * @param pilot pointer to the pilot to access settings, metrics, and transaction managers, and catalog + * @param forecast pointer to object storing result of workload forecast + * @returns const pointer to the collected pipeline data + */ + static const std::list &CollectPipelineFeatures( + common::ManagedPointer pilot, common::ManagedPointer forecast); + + /** + * Perform inference through model server manager with collected pipeline metrics + * To recover the result for each pipeline, also maintain a multimap pipeline_to_ou_position + * @param model_save_path model save path + * @param model_server_manager model server manager + * @param pipeline_data collected pipeline metrics after executing the forecasted queries + * @param pipeline_to_prediction list of tuples of query id, pipeline id and result of prediction + */ + static void InferenceWithFeatures( + const std::string &model_save_path, common::ManagedPointer model_server_manager, + const std::list &pipeline_data, + std::list>>> + *pipeline_to_prediction); + + private: + /** + * Group pipeline features by ou for block inference + * To recover the result for each pipeline, also maintain a multimap pipeline_to_ou_position + * @param pipeline_to_ou_position list of tuples describing the pipelines associated with each ou sample + * @param pipeline_data const reference of the collected pipeline data + * @param ou_to_features map from ExecutionOperatingUnitType to a matrix + */ + static void GroupFeaturesByOU( + std::list>>> *pipeline_to_ou_position, + const std::list &pipeline_data, + std::unordered_map>> *ou_to_features); +}; + +} // namespace noisepage::selfdriving diff --git a/src/include/settings/settings_callbacks.h b/src/include/settings/settings_callbacks.h index 367c562fb2..0947a19854 100644 --- a/src/include/settings/settings_callbacks.h +++ b/src/include/settings/settings_callbacks.h @@ -167,5 +167,15 @@ class Callbacks { */ static void MetricsQueryTrace(void *old_value, void *new_value, DBMain *db_main, common::ManagedPointer action_context); + + /** + * Enable or disable planning in Pilot thread + * @param old_value old settings value + * @param new_value new settings value + * @param db_main pointer to db_main + * @param action_context pointer to the action context for this settings change + */ + static void PilotEnablePlanning(void *old_value, void *new_value, DBMain *db_main, + common::ManagedPointer action_context); }; } // namespace noisepage::settings diff --git a/src/include/settings/settings_defs.h b/src/include/settings/settings_defs.h index 2bf76acda3..cd87138cc0 100644 --- a/src/include/settings/settings_defs.h +++ b/src/include/settings/settings_defs.h @@ -176,6 +176,26 @@ SETTING_int( noisepage::settings::Callbacks::NoOp ) +SETTING_int64( + workload_forecast_interval, + "Interval to be used to break query traces into WorkloadForecastSegment. (default : 10000000, unit: ns)", + 10000000, + 10000000, + 1000000000000, + true, + noisepage::settings::Callbacks::NoOp +) + +SETTING_int64( + pilot_interval, + "Interval of Pilot Planning Invocation when planning enabled. (default : 1000000, unit: ns)", + 1000000, + 1000000, + 10000000000, + true, + noisepage::settings::Callbacks::NoOp +) + SETTING_bool( metrics, "Metrics sub-system for various components (default: true).", @@ -192,6 +212,22 @@ SETTING_bool( noisepage::settings::Callbacks::NoOp ) +SETTING_bool( + use_pilot_thread, + "Use a thread for the pilot (default: false).", + false, + true, + noisepage::settings::Callbacks::NoOp +) + +SETTING_bool( + pilot_planning, + "Start planning in pilot (default: false).", + false, + true, + noisepage::settings::Callbacks::PilotEnablePlanning +) + SETTING_bool( logging_metrics_enable, "Metrics collection for the Logging component (default: false).", @@ -341,4 +377,13 @@ SETTING_string( false, noisepage::settings::Callbacks::NoOp ) + +// Save path of the model relative to the build path (model saved at ${BUILD_ABS_PATH} + SAVE_PATH) +SETTING_string( + model_save_path, + "Save path of the model relative to the build path (default: ../script/model/terrier_model_server_trained/mini_model_test.pickle)", + "/../script/model/terrier_model_server_trained/mini_model_test.pickle", + false, + noisepage::settings::Callbacks::NoOp +) // clang-format on diff --git a/src/loggers/loggers_util.cpp b/src/loggers/loggers_util.cpp index 894594dd8b..3fa83e251b 100644 --- a/src/loggers/loggers_util.cpp +++ b/src/loggers/loggers_util.cpp @@ -14,6 +14,7 @@ #include "loggers/network_logger.h" #include "loggers/optimizer_logger.h" #include "loggers/parser_logger.h" +#include "loggers/selfdriving_logger.h" #include "loggers/settings_logger.h" #include "loggers/storage_logger.h" #include "loggers/transaction_logger.h" @@ -41,6 +42,7 @@ void LoggersUtil::Initialize() { modelserver::InitModelServerLogger(); optimizer::InitOptimizerLogger(); parser::InitParserLogger(); + selfdriving::InitSelfDrivingLogger(); settings::InitSettingsLogger(); storage::InitIndexLogger(); storage::InitStorageLogger(); diff --git a/src/loggers/selfdriving_logger.cpp b/src/loggers/selfdriving_logger.cpp new file mode 100644 index 0000000000..6c2dac6a42 --- /dev/null +++ b/src/loggers/selfdriving_logger.cpp @@ -0,0 +1,16 @@ +#include "loggers/selfdriving_logger.h" + +#include + +namespace noisepage::selfdriving { +#ifdef NOISEPAGE_USE_LOGGING +common::SanctionedSharedPtr::Ptr selfdriving_logger = nullptr; + +void InitSelfDrivingLogger() { + if (selfdriving_logger == nullptr) { + selfdriving_logger = std::make_shared("selfdriving_logger", ::default_sink); // NOLINT + spdlog::register_logger(selfdriving_logger); + } +} +#endif +} // namespace noisepage::selfdriving diff --git a/src/self_driving/forecast/workload_forecast.cpp b/src/self_driving/forecast/workload_forecast.cpp new file mode 100644 index 0000000000..522d457bf1 --- /dev/null +++ b/src/self_driving/forecast/workload_forecast.cpp @@ -0,0 +1,194 @@ +#include "self_driving/forecast/workload_forecast.h" + +#include +#include +#include +#include + +#include "common/error/error_code.h" +#include "common/error/exception.h" +#include "execution/exec_defs.h" +#include "metrics/query_trace_metric.h" +#include "parser/expression/constant_value_expression.h" +#include "spdlog/fmt/fmt.h" + +namespace noisepage::selfdriving { + +WorkloadForecast::WorkloadForecast(uint64_t forecast_interval) : forecast_interval_(forecast_interval) { + LoadQueryText(); + LoadQueryTrace(); + CreateSegments(); +} + +/** + * Queries in query_timestamp_to_id_ are sorted by their timestamp while allowing duplicate keys, + * and then partitioned by timestamps and forecast_interval into segments. + * + * These segments will eventually store the result of workload/query arrival rate prediction. + */ +void WorkloadForecast::CreateSegments() { + std::unordered_map curr_segment; + + uint64_t curr_time = query_timestamp_to_id_.begin()->first; + + // We assume the traces are sorted by timestamp in increasing order + for (auto &it : query_timestamp_to_id_) { + if (it.first > curr_time + forecast_interval_) { + forecast_segments_.emplace_back(std::move(curr_segment)); + curr_time = it.first; + curr_segment = std::unordered_map(); + } + curr_segment[it.second] += 1; + } + + if (!curr_segment.empty()) { + forecast_segments_.emplace_back(std::move(curr_segment)); + } + num_forecast_segment_ = forecast_segments_.size(); +} + +void WorkloadForecast::LoadQueryText() { + std::string_view feat_cols = metrics::QueryTraceMetricRawData::FEATURE_COLUMNS[0]; + uint8_t num_cols = std::count(feat_cols.begin(), feat_cols.end(), ',') + 1; + std::string_view tmp = feat_cols.substr(0, feat_cols.find("query_text")); + uint8_t query_text_col = std::count(tmp.begin(), tmp.end(), ','); + // Parse qid and query text, assuming they are the first two columns, with query text wrapped in quotations marks + + // Create an input filestream + std::ifstream query_text_file(std::string(metrics::QueryTraceMetricRawData::FILES[0]).c_str()); + // Make sure the file is open + if (!query_text_file.is_open()) + throw PILOT_EXCEPTION(fmt::format("Could not open file {}", metrics::QueryTraceMetricRawData::FILES[0]), + common::ErrorCode::ERRCODE_IO_ERROR); + + // Helper vars + std::string line; + if (!query_text_file.good()) throw PILOT_EXCEPTION("File stream is not good", common::ErrorCode::ERRCODE_IO_ERROR); + + // ignore header + std::getline(query_text_file, line); + + bool parse_succ; + uint64_t db_oid; + execution::query_id_t query_id; + size_t pos, colnum; + std::string type_string; + std::vector val_vec(num_cols, ""); + + // Read data, line by line + while (std::getline(query_text_file, line)) { + std::vector param_types; + colnum = 0; + parse_succ = true; + val_vec.assign(num_cols, ""); + + while ((pos = line.find(',')) != std::string::npos && colnum < num_cols) { + // deal with the query_text col separately + if (colnum == query_text_col) { + pos = line.find("\","); + if (pos == std::string::npos || pos < 2) { + // no quotation mark found or no query_text found + parse_succ = false; + break; + } + val_vec[colnum] = line.substr(1, pos - 1); + // skip the right quotation mark + pos++; + } else if (pos > 0) { + val_vec[colnum] = line.substr(0, pos); + if (val_vec[colnum].empty()) { + // empty field + parse_succ = false; + break; + } + } + line.erase(0, pos + 2); + colnum++; + } + if (!parse_succ) continue; + + db_oid = static_cast(std::stoi(val_vec[0])); + query_id = static_cast(std::stoi(val_vec[1])); + type_string = val_vec[4]; + query_id_to_text_[query_id] = val_vec[query_text_col]; + query_text_to_id_[val_vec[query_text_col]] = query_id; + + // extract each type in the type_string + while ((pos = type_string.find(';')) != std::string::npos) { + param_types.push_back(type::TypeUtil::TypeIdFromString(type_string.substr(0, pos))); + type_string.erase(0, pos + 1); + } + + query_id_to_dboid_[query_id] = db_oid; + query_id_to_param_types_[query_id] = std::move(param_types); + } + // Close file + query_text_file.close(); +} + +void WorkloadForecast::LoadQueryTrace() { + std::string feat_cols = std::string{metrics::QueryTraceMetricRawData::FEATURE_COLUMNS[1]}; + uint8_t num_cols = std::count(feat_cols.begin(), feat_cols.end(), ',') + 1; + + // Create an input filestream + std::ifstream trace_file(std::string(metrics::QueryTraceMetricRawData::FILES[1]).c_str()); + // Make sure the file is open + if (!trace_file.is_open()) + throw PILOT_EXCEPTION(fmt::format("Could not open file {}", metrics::QueryTraceMetricRawData::FILES[1]), + common::ErrorCode::ERRCODE_IO_ERROR); + + // Helper vars + std::string line, param_string; + if (!trace_file.good()) throw PILOT_EXCEPTION("File stream is not good", common::ErrorCode::ERRCODE_IO_ERROR); + + // ignore header + std::getline(trace_file, line); + + bool parse_succ; + execution::query_id_t query_id; + size_t pos, colnum; + std::vector val_vec(num_cols, ""); + + // Read data, line by line + while (std::getline(trace_file, line)) { + colnum = 0; + parse_succ = true; + val_vec.assign(num_cols, ""); + // parse each field separated by , delimiter and store them in val_vec + while ((pos = line.find(',')) != std::string::npos && colnum < num_cols) { + if (pos > 0) { + val_vec[colnum] = line.substr(0, pos); + } + if (val_vec[colnum].empty()) { + // field not found + parse_succ = false; + break; + } + line.erase(0, pos + 2); + colnum++; + } + + if (!parse_succ) continue; + + query_id = static_cast(std::stoi(val_vec[0])); + param_string = val_vec[2]; + + // extract each parameter in the param_string + std::vector param_vec; + while ((pos = param_string.find(';')) != std::string::npos) { + auto cve = parser::ConstantValueExpression::FromString(param_string.substr(0, pos), + query_id_to_param_types_[query_id][param_vec.size()]); + param_vec.push_back(cve); + param_string.erase(0, pos + 1); + } + + if (query_id_to_params_[query_id].size() < num_sample_) { + query_id_to_params_[query_id].push_back(param_vec); + } + query_timestamp_to_id_.insert(std::make_pair(std::stoull(val_vec[1]), query_id)); + } + // Close file + trace_file.close(); +} + +} // namespace noisepage::selfdriving diff --git a/src/self_driving/forecast/workload_forecast_segment.cpp b/src/self_driving/forecast/workload_forecast_segment.cpp new file mode 100644 index 0000000000..0e8aad9128 --- /dev/null +++ b/src/self_driving/forecast/workload_forecast_segment.cpp @@ -0,0 +1,12 @@ +#include "self_driving/forecast/workload_forecast_segment.h" + +#include + +#include "execution/exec_defs.h" + +namespace noisepage::selfdriving { + +WorkloadForecastSegment::WorkloadForecastSegment(std::unordered_map id_to_num_exec) + : id_to_num_exec_(std::move(id_to_num_exec)) {} + +} // namespace noisepage::selfdriving diff --git a/src/self_driving/pilot/pilot.cpp b/src/self_driving/pilot/pilot.cpp new file mode 100644 index 0000000000..df2ee1e38d --- /dev/null +++ b/src/self_driving/pilot/pilot.cpp @@ -0,0 +1,90 @@ +#include "self_driving/pilot/pilot.h" + +#include +#include + +#include "common/action_context.h" +#include "execution/exec_defs.h" +#include "messenger/messenger.h" +#include "metrics/metrics_thread.h" +#include "optimizer/statistics/stats_storage.h" +#include "self_driving/forecast/workload_forecast.h" +#include "self_driving/model_server/model_server_manager.h" +#include "self_driving/pilot_util.h" +#include "settings/settings_manager.h" + +namespace noisepage::selfdriving { + +Pilot::Pilot(std::string model_save_path, common::ManagedPointer catalog, + common::ManagedPointer metrics_thread, + common::ManagedPointer model_server_manager, + common::ManagedPointer settings_manager, + common::ManagedPointer stats_storage, + common::ManagedPointer txn_manager, uint64_t workload_forecast_interval) + : model_save_path_(std::move(model_save_path)), + catalog_(catalog), + metrics_thread_(metrics_thread), + model_server_manager_(model_server_manager), + settings_manager_(settings_manager), + stats_storage_(stats_storage), + txn_manager_(txn_manager), + workload_forecast_interval_(workload_forecast_interval) { + forecast_ = nullptr; + while (!model_server_manager_->ModelServerStarted()) { + } +} + +void Pilot::PerformPlanning() { + forecast_ = std::make_unique(workload_forecast_interval_); + + metrics_thread_->PauseMetrics(); + ExecuteForecast(); + metrics_thread_->ResumeMetrics(); +} + +void Pilot::ExecuteForecast() { + NOISEPAGE_ASSERT(forecast_ != nullptr, "Need forecast_ initialized."); + bool oldval = settings_manager_->GetBool(settings::Param::pipeline_metrics_enable); + bool oldcounter = settings_manager_->GetBool(settings::Param::counters_enable); + uint64_t oldintv = settings_manager_->GetInt64(settings::Param::pipeline_metrics_interval); + + auto action_context = std::make_unique(common::action_id_t(1)); + if (!oldval) { + settings_manager_->SetBool(settings::Param::pipeline_metrics_enable, true, common::ManagedPointer(action_context), + EmptySetterCallback); + } + + action_context = std::make_unique(common::action_id_t(2)); + if (!oldcounter) { + settings_manager_->SetBool(settings::Param::counters_enable, true, common::ManagedPointer(action_context), + EmptySetterCallback); + } + + action_context = std::make_unique(common::action_id_t(3)); + settings_manager_->SetInt(settings::Param::pipeline_metrics_interval, 0, common::ManagedPointer(action_context), + EmptySetterCallback); + + auto pipeline_data = PilotUtil::CollectPipelineFeatures(common::ManagedPointer(this), + common::ManagedPointer(forecast_)); + std::list>>> + pipeline_to_prediction; + PilotUtil::InferenceWithFeatures(model_save_path_, model_server_manager_, pipeline_data, &pipeline_to_prediction); + + action_context = std::make_unique(common::action_id_t(4)); + if (!oldval) { + settings_manager_->SetBool(settings::Param::pipeline_metrics_enable, false, common::ManagedPointer(action_context), + EmptySetterCallback); + } + + action_context = std::make_unique(common::action_id_t(5)); + if (!oldcounter) { + settings_manager_->SetBool(settings::Param::counters_enable, false, common::ManagedPointer(action_context), + EmptySetterCallback); + } + + action_context = std::make_unique(common::action_id_t(6)); + settings_manager_->SetInt(settings::Param::pipeline_metrics_interval, oldintv, common::ManagedPointer(action_context), + EmptySetterCallback); +} + +} // namespace noisepage::selfdriving diff --git a/src/self_driving/pilot/pilot_thread.cpp b/src/self_driving/pilot/pilot_thread.cpp new file mode 100644 index 0000000000..005df2d236 --- /dev/null +++ b/src/self_driving/pilot/pilot_thread.cpp @@ -0,0 +1,12 @@ +#include "self_driving/pilot/pilot_thread.h" + +namespace noisepage::selfdriving { +PilotThread::PilotThread(common::ManagedPointer pilot, std::chrono::microseconds pilot_period, + bool pilot_planning) + : pilot_(pilot), + run_pilot_(true), + pilot_paused_(!pilot_planning), + pilot_period_(pilot_period), + pilot_thread_(std::thread([this] { PilotThreadLoop(); })) {} + +} // namespace noisepage::selfdriving diff --git a/src/self_driving/pilot_util.cpp b/src/self_driving/pilot_util.cpp new file mode 100644 index 0000000000..cd3973371d --- /dev/null +++ b/src/self_driving/pilot_util.cpp @@ -0,0 +1,143 @@ +#include "self_driving/pilot_util.h" + +#include "binder/bind_node_visitor.h" +#include "common/error/error_code.h" +#include "common/error/exception.h" +#include "common/managed_pointer.h" +#include "execution/compiler/compilation_context.h" +#include "execution/compiler/executable_query.h" +#include "execution/exec/execution_context.h" +#include "execution/exec/execution_settings.h" +#include "execution/exec_defs.h" +#include "loggers/selfdriving_logger.h" +#include "messenger/messenger.h" +#include "metrics/metrics_thread.h" +#include "optimizer/cost_model/trivial_cost_model.h" +#include "parser/expression/constant_value_expression.h" +#include "parser/postgresparser.h" +#include "self_driving/forecast/workload_forecast.h" +#include "self_driving/model_server/model_server_manager.h" +#include "self_driving/pilot/pilot.h" +#include "traffic_cop/traffic_cop_util.h" +#include "transaction/transaction_manager.h" + +namespace noisepage::selfdriving { + +const std::list &PilotUtil::CollectPipelineFeatures( + common::ManagedPointer pilot, common::ManagedPointer forecast) { + auto txn_manager = pilot->txn_manager_; + auto catalog = pilot->catalog_; + transaction::TransactionContext *txn; + auto metrics_manager = pilot->metrics_thread_->GetMetricsManager(); + + execution::exec::ExecutionSettings exec_settings{}; + exec_settings.UpdateFromSettingsManager(pilot->settings_manager_); + + execution::exec::NoOpResultConsumer consumer; + execution::exec::OutputCallback callback = consumer; + + execution::query_id_t qid; + catalog::db_oid_t db_oid; + for (auto &it : forecast->query_id_to_params_) { + qid = it.first; + for (auto ¶ms : forecast->query_id_to_params_[qid]) { + txn = txn_manager->BeginTransaction(); + auto stmt_list = parser::PostgresParser::BuildParseTree(forecast->query_id_to_text_[qid]); + db_oid = static_cast(forecast->query_id_to_dboid_[qid]); + std::unique_ptr accessor = + catalog->GetAccessor(common::ManagedPointer(txn), db_oid, DISABLED); + + auto binder = binder::BindNodeVisitor(common::ManagedPointer(accessor), db_oid); + binder.BindNameToNode(common::ManagedPointer(stmt_list), common::ManagedPointer(¶ms), + common::ManagedPointer(&(forecast->query_id_to_param_types_[qid]))); + + // Creating exec_ctx + std::unique_ptr cost_model = std::make_unique(); + + auto out_plan = + trafficcop::TrafficCopUtil::Optimize(common::ManagedPointer(txn), common::ManagedPointer(accessor), + common::ManagedPointer(stmt_list), db_oid, pilot->stats_storage_, + std::move(cost_model), forecast->optimizer_timeout_) + ->TakePlanNodeOwnership(); + + auto exec_ctx = std::make_unique( + db_oid, common::ManagedPointer(txn), callback, out_plan->GetOutputSchema().Get(), + common::ManagedPointer(accessor), exec_settings, metrics_manager); + + exec_ctx->SetParams(common::ManagedPointer>(¶ms)); + + execution::compiler::ExecutableQuery::query_identifier.store(qid); + auto exec_query = execution::compiler::CompilationContext::Compile(*out_plan, exec_settings, accessor.get(), + execution::compiler::CompilationMode::OneShot); + exec_query->Run(common::ManagedPointer(exec_ctx), execution::vm::ExecutionMode::Interpret); + std::this_thread::sleep_for(std::chrono::seconds(1)); + txn_manager->Abort(txn); + } + } + + // retrieve the features + metrics_manager->Aggregate(); + + // Commented out since currently not performing any actions on aggregated data + auto aggregated_data = reinterpret_cast( + metrics_manager->AggregatedMetrics() + .at(static_cast(metrics::MetricsComponent::EXECUTION_PIPELINE)) + .get()); + SELFDRIVING_LOG_INFO("Printing qid and pipeline id to sanity check pipeline metrics recorded"); + for (auto it = aggregated_data->pipeline_data_.begin(); it != aggregated_data->pipeline_data_.end(); it++) { + SELFDRIVING_LOG_INFO( + fmt::format("qid: {}; ppl_id: {}", static_cast(it->query_id_), static_cast(it->pipeline_id_))); + } + + return aggregated_data->pipeline_data_; +} + +void PilotUtil::InferenceWithFeatures( + const std::string &model_save_path, common::ManagedPointer model_server_manager, + const std::list &pipeline_data, + std::list>>> + *pipeline_to_prediction) { + std::unordered_map>> ou_to_features; + std::list>>> + pipeline_to_ou_position; + + PilotUtil::GroupFeaturesByOU(&pipeline_to_ou_position, pipeline_data, &ou_to_features); + NOISEPAGE_ASSERT(model_server_manager->ModelServerStarted(), "Model Server should have been started"); + std::string project_build_path = getenv(Pilot::BUILD_ABS_PATH); + std::unordered_map>> inference_result; + for (auto &ou_map_it : ou_to_features) { + auto res = model_server_manager->DoInference( + selfdriving::OperatingUnitUtil::ExecutionOperatingUnitTypeToString(ou_map_it.first), + project_build_path + model_save_path, ou_map_it.second); + if (!res.second) { + throw PILOT_EXCEPTION("Inference through model server manager has error", common::ErrorCode::ERRCODE_WARNING); + } + inference_result.emplace(ou_map_it.first, res.first); + } +} + +void PilotUtil::GroupFeaturesByOU( + std::list>>> *pipeline_to_ou_position, + const std::list &pipeline_data, + std::unordered_map>> *ou_to_features) { + for (const auto &data_it : pipeline_data) { + std::vector> ou_positions; + for (const auto &ou_it : data_it.features_) { + if (ou_to_features->find(ou_it.GetExecutionOperatingUnitType()) == ou_to_features->end()) { + ou_positions.emplace_back(ou_it.GetExecutionOperatingUnitType(), 0); + ou_to_features->emplace(ou_it.GetExecutionOperatingUnitType(), std::vector>()); + } else { + ou_positions.emplace_back(ou_it.GetExecutionOperatingUnitType(), + ou_to_features->at(ou_it.GetExecutionOperatingUnitType()).size()); + } + auto predictors = ou_it.GetAllAttributes(); + predictors.insert(predictors.begin(), data_it.execution_mode_); + ou_to_features->at(ou_it.GetExecutionOperatingUnitType()).push_back(predictors); + } + pipeline_to_ou_position->emplace_back(data_it.query_id_, data_it.pipeline_id_, std::move(ou_positions)); + } +} + +} // namespace noisepage::selfdriving diff --git a/src/settings/settings_callbacks.cpp b/src/settings/settings_callbacks.cpp index 9423e3fece..ace821a0cd 100644 --- a/src/settings/settings_callbacks.cpp +++ b/src/settings/settings_callbacks.cpp @@ -156,4 +156,15 @@ void Callbacks::MetricsQueryTrace(void *const old_value, void *const new_value, action_context->SetState(common::ActionState::SUCCESS); } +void Callbacks::PilotEnablePlanning(void *const old_value, void *const new_value, DBMain *const db_main, + common::ManagedPointer action_context) { + action_context->SetState(common::ActionState::IN_PROGRESS); + bool new_status = *static_cast(new_value); + if (new_status) + db_main->GetPilotThread()->EnablePilot(); + else + db_main->GetPilotThread()->DisablePilot(); + action_context->SetState(common::ActionState::SUCCESS); +} + } // namespace noisepage::settings diff --git a/src/settings/settings_manager.cpp b/src/settings/settings_manager.cpp index 0802e24a7a..91d4bcb44b 100644 --- a/src/settings/settings_manager.cpp +++ b/src/settings/settings_manager.cpp @@ -197,7 +197,7 @@ bool SettingsManager::ValidateValue(const parser::ConstantValueExpression &value case type::TypeId::INTEGER: return value.Peek() >= min_value.Peek() && value.Peek() <= max_value.Peek(); case type::TypeId::BIGINT: - return value.Peek() >= min_value.Peek() && value.Peek() <= max_value.Peek(); + return value.Peek() >= min_value.Peek() && value.Peek() <= max_value.Peek(); case type::TypeId ::DECIMAL: return value.Peek() >= min_value.Peek() && value.Peek() <= max_value.Peek(); default: