forked from UWQuickstep/quickstep
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathForemanDistributed.hpp
152 lines (124 loc) · 5.02 KB
/
ForemanDistributed.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_
#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_
#include <cstddef>
#include <cstdio>
#include <memory>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/ForemanBase.hpp"
#include "query_execution/ShiftbossDirectory.hpp"
#include "storage/DataExchangerAsync.hpp"
#include "storage/StorageManager.hpp"
#include "utility/Macros.hpp"
#include "tmb/id_typedefs.h"
namespace tmb { class MessageBus; }
namespace quickstep {
class BlockLocator;
class CatalogDatabaseLite;
class QueryProcessor;
namespace serialization { class WorkOrderMessage; }
/** \addtogroup QueryExecution
* @{
*/
/**
* @brief The Foreman receives queries from the main thread, messages from the
* policy enforcer and dispatches the work to Shiftbosses. It also
* receives work completion messages from Shiftbosses.
**/
class ForemanDistributed final : public ForemanBase {
public:
/**
* @brief Constructor.
*
* @param block_locator The block locator that manages block location info.
* @param bus A pointer to the TMB.
* @param catalog_database The catalog database where this query is executed.
* @param query_processor The QueryProcessor to save catalog upon the query
* completion.
* @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
*
* @note If cpu_id is not specified, Foreman thread can be possibly moved
* around on different CPUs by the OS.
**/
ForemanDistributed(
const BlockLocator &block_locator,
tmb::MessageBus *bus,
CatalogDatabaseLite *catalog_database,
QueryProcessor *query_processor,
const int cpu_id = -1);
~ForemanDistributed() override {
data_exchanger_.shutdown();
storage_manager_.reset();
data_exchanger_.join();
}
void printWorkOrderProfilingResults(const std::size_t query_id,
std::FILE *out) const override;
protected:
void run() override;
private:
bool isAggregationRelatedWorkOrder(const serialization::WorkOrderMessage &proto,
const std::size_t next_shiftboss_index_to_schedule,
std::size_t *shiftboss_index_for_aggregation);
bool isHashJoinRelatedWorkOrder(const serialization::WorkOrderMessage &proto,
const std::size_t next_shiftboss_index_to_schedule,
std::size_t *shiftboss_index_for_hash_join);
bool isLipRelatedWorkOrder(const serialization::WorkOrderMessage &proto,
const std::size_t next_shiftboss_index_to_schedule,
std::size_t *shiftboss_index_for_lip);
/**
* @brief Dispatch schedulable WorkOrders, wrapped in WorkOrderMessages to the
* worker threads.
*
* @param messages The messages to be dispatched.
**/
void dispatchWorkOrderMessages(
const std::vector<std::unique_ptr<serialization::WorkOrderMessage>> &messages);
/**
* @brief Send the given message to the specified worker.
*
* @param worker_index The logical index of the recipient worker in
* ShiftbossDirectory.
* @param proto The WorkOrderMessage to be sent.
**/
void sendWorkOrderMessage(const std::size_t worker_index,
const serialization::WorkOrderMessage &proto);
void processShiftbossRegistrationMessage(const tmb::client_id shiftboss_client_id,
const std::size_t work_order_capacity);
/**
* @brief Check if we can collect new messages from the PolicyEnforcer.
*
* @param message_type The type of the last received message.
**/
bool canCollectNewMessages(const tmb::message_type_id message_type);
// To get block locality info for scheduling.
const BlockLocator &block_locator_;
ShiftbossDirectory shiftboss_directory_;
CatalogDatabaseLite *catalog_database_;
// Used for '\analyze'.
DataExchangerAsync data_exchanger_;
std::unique_ptr<StorageManager> storage_manager_;
DISALLOW_COPY_AND_ASSIGN(ForemanDistributed);
};
/** @} */
} // namespace quickstep
#endif // QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_