forked from UWQuickstep/quickstep
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathQueryContext.hpp
665 lines (595 loc) · 20.9 KB
/
QueryContext.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_CONTEXT_HPP_
#define QUICKSTEP_QUERY_EXECUTION_QUERY_CONTEXT_HPP_
#include <cstddef>
#include <cstdint>
#include <memory>
#include <unordered_map>
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
#include "expressions/predicate/Predicate.hpp"
#include "expressions/scalar/Scalar.hpp"
#include "expressions/table_generator/GeneratorFunctionHandle.hpp"
#include "storage/AggregationOperationState.hpp"
#include "storage/HashTable.hpp"
#include "storage/InsertDestination.hpp"
#include "storage/WindowAggregationOperationState.hpp"
#include "threading/SpinSharedMutex.hpp"
#include "types/containers/Tuple.hpp"
#include "utility/Macros.hpp"
#include "utility/SortConfiguration.hpp"
#include "utility/lip_filter/LIPFilter.hpp"
#include "utility/lip_filter/LIPFilterDeployment.hpp"
#include "glog/logging.h"
#include "tmb/id_typedefs.h"
namespace tmb { class MessageBus; }
namespace quickstep {
class CatalogDatabaseLite;
class StorageManager;
namespace serialization { class QueryContext; }
/** \addtogroup QueryExecution
* @{
*/
/**
* @brief The QueryContext stores stateful execution info per query.
**/
class QueryContext {
public:
/**
* @brief A unique identifier for an AggregationOperationState per query.
**/
typedef std::uint32_t aggregation_state_id;
/**
* @brief A unique identifier for a GeneratorFunctionHandle per query.
**/
typedef std::uint32_t generator_function_id;
/**
* @brief A unique identifier for an InsertDestination per query.
*
* @note A negative value indicates a nonexistent InsertDestination.
**/
typedef std::int32_t insert_destination_id;
static constexpr insert_destination_id kInvalidInsertDestinationId = static_cast<insert_destination_id>(-1);
/**
* @brief A unique identifier for a JoinHashTable per query.
**/
typedef std::uint32_t join_hash_table_id;
/**
* @brief A unique identifier for a LIPFilterDeployment per query.
**/
typedef std::int32_t lip_deployment_id;
static constexpr lip_deployment_id kInvalidLIPDeploymentId = static_cast<lip_deployment_id>(-1);
/**
* @brief A unique identifier for a LIPFilter per query.
**/
typedef std::uint32_t lip_filter_id;
/**
* @brief A unique identifier for a Predicate per query.
*
* @note A negative value indicates a null Predicate.
**/
typedef std::int32_t predicate_id;
static constexpr predicate_id kInvalidPredicateId = static_cast<predicate_id>(-1);
/**
* @brief A unique identifier for a group of Scalars per query.
*
* @note A negative value indicates a nonexistent ScalarGroup.
**/
typedef std::int32_t scalar_group_id;
static constexpr scalar_group_id kInvalidScalarGroupId = static_cast<scalar_group_id>(-1);
/**
* @brief A unique identifier for a SortConfiguration per query.
**/
typedef std::uint32_t sort_config_id;
/**
* @brief A unique identifier for a Tuple to be inserted per query.
**/
typedef std::uint32_t tuple_id;
/**
* @brief A unique identifier for a group of UpdateAssignments per query.
**/
typedef std::uint32_t update_group_id;
/**
* @brief A unique identifier for a window aggregation state.
**/
typedef std::uint32_t window_aggregation_state_id;
/**
* @brief Constructor.
*
* @param proto A serialized Protocol Buffer representation of a
* QueryContext, originally generated by the optimizer.
* @param database The Database to resolve relation and attribute references
* in.
* @param storage_manager The StorageManager to use.
* @param scheduler_client_id The TMB client ID of the scheduler thread.
* @param bus A pointer to the TMB.
**/
QueryContext(const serialization::QueryContext &proto,
const CatalogDatabaseLite &database,
StorageManager *storage_manager,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus);
~QueryContext() {}
/**
* @brief Check whether a serialization::QueryContext is fully-formed and
* all parts are valid.
*
* @param proto A serialized Protocol Buffer representation of a QueryContext,
* originally generated by the optimizer.
* @param database The Database to resolve relation and attribute references
* in.
* @return Whether proto is fully-formed and valid.
**/
static bool ProtoIsValid(const serialization::QueryContext &proto,
const CatalogDatabaseLite &database);
/**
* @brief Whether the given AggregationOperationState id is valid.
*
* @param id The AggregationOperationState id.
* @param part_id The partition id.
*
* @return True if valid, otherwise false.
**/
bool isValidAggregationStateId(const aggregation_state_id id, const partition_id part_id) const {
SpinSharedMutexSharedLock<false> lock(aggregation_states_mutex_);
return id < aggregation_states_.size() &&
part_id < aggregation_states_[id].size();
}
/**
* @brief Get the AggregationOperationState.
*
* @param id The AggregationOperationState id in the query.
* @param part_id The partition id.
*
* @return The AggregationOperationState, alreadly created in the constructor.
**/
inline AggregationOperationState* getAggregationState(const aggregation_state_id id, const partition_id part_id) {
SpinSharedMutexSharedLock<false> lock(aggregation_states_mutex_);
DCHECK_LT(id, aggregation_states_.size());
DCHECK_LT(part_id, aggregation_states_[id].size());
DCHECK(aggregation_states_[id][part_id]);
return aggregation_states_[id][part_id].get();
}
/**
* @brief Destroy the given aggregation state.
*
* @param id The ID of the AggregationOperationState to destroy.
* @param part_id The partition id.
**/
inline void destroyAggregationState(const aggregation_state_id id, const partition_id part_id) {
SpinSharedMutexExclusiveLock<false> lock(aggregation_states_mutex_);
DCHECK_LT(id, aggregation_states_.size());
DCHECK_LT(part_id, aggregation_states_[id].size());
DCHECK(aggregation_states_[id][part_id]);
aggregation_states_[id][part_id].reset(nullptr);
}
/**
* @brief Whether the given GeneratorFunctionHandle id is valid.
*
* @param id The GeneratorFunctionHandle id.
*
* @return True if valid, otherwise false.
**/
bool isValidGeneratorFunctionId(const generator_function_id id) const {
return id < generator_functions_.size();
}
/**
* @brief Get the GeneratorFunctionHandle.
*
* @param id The GeneratorFunctionHandle id in the query.
*
* @return The GeneratorFunctionHandle, alreadly created in the constructor.
**/
inline const GeneratorFunctionHandle& getGeneratorFunctionHandle(
const generator_function_id id) {
DCHECK_LT(static_cast<std::size_t>(id), generator_functions_.size());
return *generator_functions_[id];
}
/**
* @brief Whether the given InsertDestination id is valid.
*
* @param id The InsertDestination id.
*
* @return True if valid, otherwise false.
**/
bool isValidInsertDestinationId(const insert_destination_id id) const {
SpinSharedMutexSharedLock<false> lock(insert_destinations_mutex_);
return id != kInvalidInsertDestinationId
&& id >= 0
&& static_cast<std::size_t>(id) < insert_destinations_.size();
}
/**
* @brief Get the InsertDestination.
*
* @param id The InsertDestination id in the query.
*
* @return The InsertDestination, alreadly created in the constructor.
**/
inline InsertDestination* getInsertDestination(const insert_destination_id id) {
SpinSharedMutexSharedLock<false> lock(insert_destinations_mutex_);
DCHECK_GE(id, 0);
DCHECK_LT(static_cast<std::size_t>(id), insert_destinations_.size());
return insert_destinations_[id].get();
}
/**
* @brief Destory the given InsertDestination.
*
* @param id The id of the InsertDestination to destroy.
**/
inline void destroyInsertDestination(const insert_destination_id id) {
SpinSharedMutexExclusiveLock<false> lock(insert_destinations_mutex_);
DCHECK_GE(id, 0);
DCHECK_LT(static_cast<std::size_t>(id), insert_destinations_.size());
insert_destinations_[id].reset();
}
/**
* @brief Whether the given JoinHashTable id is valid.
*
* @note This is a thread-safe function. Check isValidJoinHashTableIdUnsafe
* for the the unsafe version.
*
* @param id The JoinHashTable id.
* @param part_id The partition id.
*
* @return True if valid, otherwise false.
**/
bool isValidJoinHashTableId(const join_hash_table_id id, const partition_id part_id) const {
SpinSharedMutexSharedLock<false> lock(hash_tables_mutex_);
return id < join_hash_tables_.size() &&
part_id < join_hash_tables_[id].size();
}
/**
* @brief Get the JoinHashTable.
*
* @param id The JoinHashTable id in the query.
* @param part_id The partition id.
*
* @return The JoinHashTable, already created in the constructor.
**/
inline JoinHashTable* getJoinHashTable(const join_hash_table_id id, const partition_id part_id) {
SpinSharedMutexSharedLock<false> lock(hash_tables_mutex_);
DCHECK(isValidJoinHashTableIdUnsafe(id, part_id));
return join_hash_tables_[id][part_id].get();
}
/**
* @brief Destory the given JoinHashTable.
*
* @param id The id of the JoinHashTable to destroy.
* @param part_id The partition id.
**/
inline void destroyJoinHashTable(const join_hash_table_id id, const partition_id part_id) {
SpinSharedMutexExclusiveLock<false> lock(hash_tables_mutex_);
DCHECK(isValidJoinHashTableIdUnsafe(id, part_id));
join_hash_tables_[id][part_id].reset();
}
/**
* @brief Whether the given LIPFilterDeployment id is valid.
*
* @param id The LIPFilterDeployment id.
*
* @return True if valid, otherwise false.
**/
bool isValidLIPDeploymentId(const lip_deployment_id id) const {
return static_cast<std::size_t>(id) < lip_deployments_.size();
}
/**
* @brief Get a constant pointer to the LIPFilterDeployment.
*
* @param id The LIPFilterDeployment id.
*
* @return The constant pointer to LIPFilterDeployment that is
* already created in the constructor.
**/
inline const LIPFilterDeployment* getLIPDeployment(
const lip_deployment_id id) const {
DCHECK_LT(static_cast<std::size_t>(id), lip_deployments_.size());
return lip_deployments_[id].get();
}
/**
* @brief Destory the given LIPFilterDeployment.
*
* @param id The id of the LIPFilterDeployment to destroy.
**/
inline void destroyLIPDeployment(const lip_deployment_id id) {
DCHECK_LT(static_cast<std::size_t>(id), lip_deployments_.size());
lip_deployments_[id].reset();
}
/**
* @brief Whether the given LIPFilter id is valid.
*
* @param id The LIPFilter id.
*
* @return True if valid, otherwise false.
**/
bool isValidLIPFilterId(const lip_filter_id id) const {
return id < lip_filters_.size();
}
/**
* @brief Get a mutable reference to the LIPFilter.
*
* @param id The LIPFilter id.
*
* @return The LIPFilter, already created in the constructor.
**/
inline LIPFilter* getLIPFilterMutable(const lip_filter_id id) {
DCHECK_LT(id, lip_filters_.size());
return lip_filters_[id].get();
}
/**
* @brief Get a constant pointer to the LIPFilter.
*
* @param id The LIPFilter id.
*
* @return The constant pointer to LIPFilter that is
* already created in the constructor.
**/
inline const LIPFilter* getLIPFilter(const lip_filter_id id) const {
DCHECK_LT(id, lip_filters_.size());
return lip_filters_[id].get();
}
/**
* @brief Destory the given LIPFilter.
*
* @param id The id of the LIPFilter to destroy.
**/
inline void destroyLIPFilter(const lip_filter_id id) {
DCHECK_LT(id, lip_filters_.size());
lip_filters_[id].reset();
}
/**
* @brief Whether the given Predicate id is valid or no predicate.
*
* @param id The Predicate id.
*
* @return True if valid or no predicate, otherwise false.
**/
bool isValidPredicate(const predicate_id id) const {
return (id == kInvalidPredicateId) // No predicate.
|| (id >= 0 && static_cast<std::size_t>(id) < predicates_.size());
}
/**
* @brief Get the const Predicate.
*
* @param id The Predicate id in the query.
*
* @return The const Predicate (alreadly created in the constructor), or
* nullptr for the given invalid id.
**/
inline const Predicate* getPredicate(const predicate_id id) {
if (id == kInvalidPredicateId) {
return nullptr;
}
DCHECK_GE(id, 0);
DCHECK_LT(static_cast<std::size_t>(id), predicates_.size());
return predicates_[id].get();
}
/**
* @brief Whether the given Scalar group id is valid.
*
* @param id The Scalar group id.
*
* @return True if valid, otherwise false.
**/
bool isValidScalarGroupId(const scalar_group_id id) const {
return id != kInvalidScalarGroupId
&& id >= 0
&& static_cast<std::size_t>(id) < scalar_groups_.size();
}
/**
* @brief Get the group of Scalars.
*
* @param id The Scalar group id in the query.
*
* @return The reference to the Scalar group, alreadly created in the
* constructor.
**/
inline const std::vector<std::unique_ptr<const Scalar>>& getScalarGroup(const scalar_group_id id) {
DCHECK_GE(id, 0);
DCHECK_LT(static_cast<std::size_t>(id), scalar_groups_.size());
return scalar_groups_[id];
}
/**
* @brief Whether the given SortConfiguration id is valid.
*
* @param id The SortConfiguration id.
*
* @return True if valid, otherwise false.
**/
bool isValidSortConfigId(const sort_config_id id) const {
return id < sort_configs_.size();
}
/**
* @brief Get the SortConfiguration.
*
* @param id The SortConfiguration id in the query.
*
* @return The SortConfiguration, alreadly created in the constructor.
**/
inline const SortConfiguration& getSortConfig(const sort_config_id id) {
DCHECK_LT(id, sort_configs_.size());
return *sort_configs_[id];
}
/**
* @brief Whether the given Tuple id is valid.
*
* @param id The Tuple id.
*
* @return True if valid, otherwise false.
**/
bool isValidTupleId(const tuple_id id) const {
return id < tuples_.size();
}
/**
* @brief Whether the given vector of Tuple ids is valid.
*
* @param ids The vector of Tuple ids.
*
* @return True if valid, otherwise false.
**/
bool areValidTupleIds(const std::vector<tuple_id> &ids) const {
for (const tuple_id id : ids) {
if (id >= tuples_.size()) {
return false;
}
}
return true;
}
/**
* @brief Release the ownership of the Tuple referenced by the id.
*
* @note Each id should use only once.
*
* @param id The Tuple id in the query.
*
* @return The Tuple, alreadly created in the constructor.
**/
inline Tuple* releaseTuple(const tuple_id id) {
DCHECK_LT(id, tuples_.size());
DCHECK(tuples_[id]);
return tuples_[id].release();
}
/**
* @brief Whether the given update assignments group id is valid.
*
* @param id The group id of the update assignments.
*
* @return True if valid, otherwise false.
**/
bool isValidUpdateGroupId(const update_group_id id) const {
return static_cast<std::size_t>(id) < update_groups_.size();
}
/**
* @brief Get the group of the update assignments for UpdateWorkOrder.
*
* @param id The group id of the update assignments in the query.
*
* @return The reference to the update assignments group, alreadly created in the
* constructor.
**/
inline const std::unordered_map<attribute_id, std::unique_ptr<const Scalar>>& getUpdateGroup(
const update_group_id id) {
DCHECK_LT(static_cast<std::size_t>(id), update_groups_.size());
DCHECK(!update_groups_[id].empty());
return update_groups_[id];
}
/**
* @brief Whether the given WindowAggregationOperationState id is valid.
*
* @param id The WindowAggregationOperationState id.
*
* @return True if valid, otherwise false.
**/
bool isValidWindowAggregationStateId(const window_aggregation_state_id id) const {
return id < window_aggregation_states_.size();
}
/**
* @brief Get the WindowAggregationOperationState.
*
* @param id The WindowAggregationOperationState id in the query.
*
* @return The WindowAggregationOperationState, already created in the
* constructor.
**/
inline WindowAggregationOperationState* getWindowAggregationState(
const window_aggregation_state_id id) {
DCHECK_LT(id, window_aggregation_states_.size());
DCHECK(window_aggregation_states_[id]);
return window_aggregation_states_[id].get();
}
/**
* @brief Release the given WindowAggregationOperationState.
*
* @param id The id of the WindowAggregationOperationState to destroy.
*
* @return The WindowAggregationOperationState, already created in the
* constructor.
**/
inline WindowAggregationOperationState* releaseWindowAggregationState(
const window_aggregation_state_id id) {
DCHECK_LT(id, window_aggregation_states_.size());
DCHECK(window_aggregation_states_[id]);
return window_aggregation_states_[id].release();
}
/**
* @brief Get the total memory footprint of the temporary data structures
* used for query execution (e.g. join hash tables, aggregation hash
* tables) in bytes.
**/
std::size_t getTempStructuresMemoryBytes() const {
return getJoinHashTablesMemoryBytes() + getAggregationStatesMemoryBytes();
}
/**
* @brief Get the total memory footprint in bytes of the join hash tables
* used for query execution.
**/
std::size_t getJoinHashTablesMemoryBytes() const;
/**
* @brief Get the total memory footprint in bytes of the aggregation hash
* tables used for query execution.
**/
std::size_t getAggregationStatesMemoryBytes() const;
/**
* @brief Get the list of IDs of temporary relations in this query.
*
* @param temp_relation_ids A pointer to the vector that will store the
* relation IDs.
**/
void getTempRelationIDs(std::vector<relation_id> *temp_relation_ids) const;
private:
/**
* @brief Whether the given JoinHashTable id is valid.
*
* @note This is a thread-unsafe function. Check isValidJoinHashTableId
* for the the thread-safe version.
*
* @param id The JoinHashTable id.
* @param part_id The partition id.
*
* @return True if valid, otherwise false.
**/
bool isValidJoinHashTableIdUnsafe(const join_hash_table_id id,
const partition_id part_id) const {
return id < join_hash_tables_.size() &&
part_id < join_hash_tables_[id].size();
}
// Per AggregationOperationState, the index is the partition id.
typedef std::vector<std::unique_ptr<AggregationOperationState>> PartitionedAggregationOperationStates;
// Per hash join, the index is the partition id.
typedef std::vector<std::unique_ptr<JoinHashTable>> PartitionedJoinHashTables;
std::vector<PartitionedAggregationOperationStates> aggregation_states_;
std::vector<std::unique_ptr<const GeneratorFunctionHandle>> generator_functions_;
std::vector<std::unique_ptr<InsertDestination>> insert_destinations_;
std::vector<PartitionedJoinHashTables> join_hash_tables_;
std::vector<std::unique_ptr<LIPFilterDeployment>> lip_deployments_;
std::vector<std::unique_ptr<LIPFilter>> lip_filters_;
std::vector<std::unique_ptr<const Predicate>> predicates_;
std::vector<std::vector<std::unique_ptr<const Scalar>>> scalar_groups_;
std::vector<std::unique_ptr<const SortConfiguration>> sort_configs_;
std::vector<std::unique_ptr<Tuple>> tuples_;
std::vector<std::unordered_map<attribute_id, std::unique_ptr<const Scalar>>> update_groups_;
std::vector<std::unique_ptr<WindowAggregationOperationState>> window_aggregation_states_;
mutable SpinSharedMutex<false> hash_tables_mutex_;
mutable SpinSharedMutex<false> aggregation_states_mutex_;
mutable SpinSharedMutex<false> insert_destinations_mutex_;
DISALLOW_COPY_AND_ASSIGN(QueryContext);
};
/** @} */
} // namespace quickstep
#endif // QUICKSTEP_QUERY_EXECUTION_QUERY_CONTEXT_HPP_