-
-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feature: make aggregation parallel process to reduce execution time #422
Comments
ACK! |
The parallel hash structure can be referenced |
Compare the EXPLAIN plan for aggregate queries in MySql8mysql> explain select
-> p_brand,
-> p_type,
-> p_size,
-> count(distinct ps_suppkey) as supplier_cnt
-> from
-> partsupp,
-> part
-> where
-> p_partkey = ps_partkey
-> and p_brand <> 'Brand#45'
-> and p_type not like 'MEDIUM POLISHED%'
-> and p_size in (49, 14, 23, 45, 19, 3, 36, 9)
-> and ps_suppkey not in (
-> select
-> s_suppkey
-> from
-> supplier
-> where
-> s_comment like '%Customer%Complaints%'
-> )
-> group by
-> p_brand,
-> p_type,
-> p_size
-> order by
-> supplier_cnt desc,
-> p_brand,
-> p_type,
-> p_size\G
*************************** 1. row ***************************
id: 1
select_type: SIMPLE
table: partsupp
partitions: NULL
type: index
possible_keys: PRIMARY
key: PRIMARY
key_len: 8
ref: NULL
rows: 7735092
filtered: 100.00
Extra: Using index; Using temporary; Using filesort
*************************** 2. row ***************************
id: 1
select_type: SIMPLE
table: <subquery2>
partitions: NULL
type: eq_ref
possible_keys: <auto_distinct_key>
key: <auto_distinct_key>
key_len: 5
ref: tpch.partsupp.ps_suppkey
rows: 1
filtered: 100.00
Extra: Using where; Not exists
*************************** 3. row ***************************
id: 1
select_type: SIMPLE
table: part
partitions: NULL
type: eq_ref
possible_keys: PRIMARY
key: PRIMARY
key_len: 4
ref: tpch.partsupp.ps_partkey
rows: 1
filtered: 40.00
Extra: Using where
*************************** 4. row ***************************
id: 2
select_type: MATERIALIZED
table: supplier
partitions: NULL
type: ALL
possible_keys: PRIMARY
key: NULL
key_len: NULL
ref: NULL
rows: 98754
filtered: 100.00
Extra: Using where
4 rows in set, 1 warning (0.00 sec)
|
Compare the execution time of aggregated queries in MySql8mysql> select
-> p_brand,
-> p_type,
-> p_size,
-> count(distinct ps_suppkey) as supplier_cnt
-> from
-> partsupp,
-> part
-> where
-> p_partkey = ps_partkey
-> and p_brand <> 'Brand#45'
-> and p_type not like 'MEDIUM POLISHED%'
-> and p_size in (49, 14, 23, 45, 19, 3, 36, 9)
-> and ps_suppkey not in (
-> select
-> s_suppkey
-> from
-> supplier
-> where
-> s_comment like '%Customer%Complaints%'
-> )
-> group by
-> p_brand,
-> p_type,
-> p_size
-> order by
-> supplier_cnt desc,
-> p_brand,
-> p_type,
-> p_size
-> limit 10;
+----------+--------------------------+--------+--------------+
| p_brand | p_type | p_size | supplier_cnt |
+----------+--------------------------+--------+--------------+
| Brand#44 | STANDARD PLATED TIN | 9 | 120 |
| Brand#12 | STANDARD POLISHED COPPER | 14 | 100 |
| Brand#11 | LARGE BRUSHED STEEL | 36 | 96 |
| Brand#23 | PROMO BURNISHED STEEL | 14 | 96 |
| Brand#34 | MEDIUM BRUSHED STEEL | 23 | 96 |
| Brand#53 | PROMO BURNISHED BRASS | 36 | 96 |
| Brand#54 | STANDARD BRUSHED COPPER | 19 | 96 |
| Brand#32 | LARGE POLISHED COPPER | 14 | 95 |
| Brand#43 | LARGE PLATED COPPER | 19 | 95 |
| Brand#11 | SMALL BRUSHED STEEL | 9 | 92 |
+----------+--------------------------+--------+--------------+
10 rows in set (11.44 sec)
|
Compare the execution time of aggregate query of TIANMU engine in StonedBmysql> select
-> p_brand,
-> p_type,
-> p_size,
-> count(distinct ps_suppkey) as supplier_cnt
-> from
-> partsupp,
-> part
-> where
-> p_partkey = ps_partkey
-> and p_brand <> 'Brand#45'
-> and p_type not like 'MEDIUM POLISHED%'
-> and p_size in (49, 14, 23, 45, 19, 3, 36, 9)
-> and ps_suppkey not in (
-> select
-> s_suppkey
-> from
-> supplier
-> where
-> s_comment like '%Customer%Complaints%'
-> )
-> group by
-> p_brand,
-> p_type,
-> p_size
-> order by
-> supplier_cnt desc,
-> p_brand,
-> p_type,
-> p_size
-> limit 10;
+----------+---------------------------+--------+--------------+
| p_brand | p_type | p_size | supplier_cnt |
+----------+---------------------------+--------+--------------+
| Brand#44 | STANDARD PLATED TIN | 9 | 120 |
| Brand#33 | STANDARD BURNISHED COPPER | 42 | 116 |
| Brand#24 | SMALL BURNISHED NICKEL | 11 | 104 |
| Brand#33 | SMALL BURNISHED NICKEL | 12 | 104 |
| Brand#41 | ECONOMY POLISHED BRASS | 16 | 104 |
| Brand#51 | PROMO PLATED STEEL | 28 | 104 |
| Brand#55 | ECONOMY BURNISHED NICKEL | 21 | 104 |
| Brand#11 | ECONOMY ANODIZED COPPER | 28 | 100 |
| Brand#11 | ECONOMY POLISHED COPPER | 41 | 100 |
| Brand#12 | ECONOMY ANODIZED STEEL | 31 | 100 |
+----------+---------------------------+--------+--------------+
10 rows in set (1 min 20.25 sec)
|
Table tuples splitpurpose:
|
Problems faced by table tuples split
|
Strip the column attributes of the processing aggregate:
|
It is necessary to establish the mathematical model of aggregate processing of relational algebra first, and implement the mathematical model under the premise of mathematical rules |
The projection operationProjection is relational algebra's counterpart of existential quantification in predicate logic. The attributes not included correspond to existentially quantified variables in the predicate whose extension the operand relation represents. The example below illustrates this point. Because of the correspondence with existential quantification, some authorities prefer to define projection in terms of the excluded attributes. In a computer language it is of course possible to provide notations for both, and that was done in ISBL and several languages that have taken their cue from ISBL. A nearly identical concept occurs in the category of monoids, called a string projection, which consists of removing all of the letters in the string that do not belong to a given alphabet. When implemented in SQL standard the "default projection" returns a multiset instead a set, and the π projection is obtained by the addition of the DISTINCT keyword to eliminate duplicate data.
|
Grouping
γ grouping_attribute, func(A) → name(R)
Example $\gamma_{A, \ \text{min}(B) \ \to \ D} \left( A & B & C \ A & D \ |
Domain driven model and Domain driven design (DDD) is the mathematical rules of multi-thread parallel processing aggregation:
|
Table properties[2022-08-21 10:57:51.504922] [45954] [INFO] [aggregation_algorithm.cpp:49] MSG: Aggregate numOfAttrs: 4 packpower: 16 NumOfObj: -1 NumOfTables: 2 NumOfTuples: 7422784 distinct: false
[2022-08-21 10:57:51.504980] [45954] [INFO] [aggregation_algorithm.cpp:69] MSG: Aggregate AddGroupingColumn attr_num: 0 col: 0
[2022-08-21 10:57:51.505004] [45954] [INFO] [aggregation_algorithm.cpp:69] MSG: Aggregate AddGroupingColumn attr_num: 1 col: 1
[2022-08-21 10:57:51.505011] [45954] [INFO] [aggregation_algorithm.cpp:69] MSG: Aggregate AddGroupingColumn attr_num: 2 col: 2
[2022-08-21 10:57:51.505024] [45954] [INFO] [aggregation_algorithm.cpp:127] MSG: Aggregate AddAggregatedColumn col: 3 max_no_of_distinct: 100000 min_v: 1 max_v: 100000 max_size: 11
[2022-08-21 10:57:51.540405] [45954] [INFO] [aggregation_algorithm.cpp:238] MSG: NumOfDimensions: 2 NumOfTuples: 7422784
Set the number of threads to 4[2022-08-21 21:41:28.927878] [67237] [INFO] [aggregation_algorithm.cpp:896] MSG: ReadyDist threads: 4 packnum: 154 loopcnt: 4 num: 38 mod: 2
|
Shards need to be refactoredThe accessibility of iterators and identity processing after splitting is not properly considered void AggregationWorkerEnt::PrepShardingCopy(MIIterator *mit, GroupByWrapper *gb_sharding,
std::vector<std::unique_ptr<GroupByWrapper>> *vGBW) {
DimensionVector dims(mind->NumOfDimensions());
std::unique_ptr<GroupByWrapper> gbw_ptr(new GroupByWrapper(*gb_sharding));
gbw_ptr->FillDimsUsed(dims);
gbw_ptr->SetDistinctTuples(mit->NumOfTuples());
if (!gbw_ptr->IsOnePass()) gbw_ptr->InitTupleLeft(mit->NumOfTuples());
gbw_ptr->AddAllGroupingConstants(*mit);
std::scoped_lock guard(mtx);
vGBW->emplace_back(std::move(gbw_ptr));
}
|
Tianmu: : core: : ColumnBinEncoder: : Encode function is very interesting, as the aggregation, to generate the hash key, contains both the random retrieve data from RAM, the CPU cache invalidation, It also involves transcoding the fetched data to serialize it into a new piece of RAM, spanning several access cycles in terms of CPU usage |
mysqld 174443 518728.020321: 11642419 cycles:
2d4d70b Tianmu::core::MIIterator::operator+++0x11 (/data/stonedb57/install/bin/mysqld)
30681e2 Tianmu::core::MIIterator::Increment+0x18 (/data/stonedb57/install/bin/mysqld)
2feba05 Tianmu::core::AggregationAlgorithm::AggregatePackrow+0x64d (/data/stonedb57/install/bin/mysqld)
2fea0cb Tianmu::core::AggregationAlgorithm::MultiDimensionalGroupByScan+0x789 (/data/stonedb57/install/bin/mysqld)
2fe9855 Tianmu::core::AggregationAlgorithm::Aggregate+0xe0f (/data/stonedb57/install/bin/mysqld)
2d49f86 Tianmu::core::TempTable::Materialize+0x892 (/data/stonedb57/install/bin/mysqld)
2ce4488 Tianmu::core::Engine::Execute+0xa66 (/data/stonedb57/install/bin/mysqld)
2ce31ac Tianmu::core::Engine::HandleSelect+0x8ce (/data/stonedb57/install/bin/mysqld)
2df2371 Tianmu::DBHandler::ha_my_tianmu_query+0x5c (/data/stonedb57/install/bin/mysqld)
24205c6 execute_sqlcom_select+0x254 (/data/stonedb57/install/bin/mysqld)
241993c mysql_execute_command+0xe5c (/data/stonedb57/install/bin/mysqld)
242162b mysql_parse+0x6b0 (/data/stonedb57/install/bin/mysqld)
2416622 dispatch_command+0xcc7 (/data/stonedb57/install/bin/mysqld)
2415463 do_command+0x4ba (/data/stonedb57/install/bin/mysqld)
25479f9 handle_connection+0x1ee (/data/stonedb57/install/bin/mysqld)
2c17238 pfs_spawn_thread+0x173 (/data/stonedb57/install/bin/mysqld)
7faad6ef81ca start_thread+0xea (/usr/lib64/libpthread-2.28.so)
|
Memory copy has the worst effect on cpu cyclesmysqld 174443 518766.939690: 25789094 cycles:
1d15640 memcpy@plt+0x0 (/data/stonedb57/install/bin/mysqld)
3003c55 Tianmu::core::ColumnBinEncoder::Encode+0x95 (/data/stonedb57/install/bin/mysqld)
2fee7d1 Tianmu::core::GroupTable::PutGroupingValue+0x5f (/data/stonedb57/install/bin/mysqld)
2feeb43 Tianmu::core::GroupByWrapper::PutGroupingValue+0x2f (/data/stonedb57/install/bin/mysqld)
2feb807 Tianmu::core::AggregationAlgorithm::AggregatePackrow+0x44f (/data/stonedb57/install/bin/mysqld)
2fea0cb Tianmu::core::AggregationAlgorithm::MultiDimensionalGroupByScan+0x789 (/data/stonedb57/install/bin/mysqld)
2fe9855 Tianmu::core::AggregationAlgorithm::Aggregate+0xe0f (/data/stonedb57/install/bin/mysqld)
2d49f86 Tianmu::core::TempTable::Materialize+0x892 (/data/stonedb57/install/bin/mysqld)
2ce4488 Tianmu::core::Engine::Execute+0xa66 (/data/stonedb57/install/bin/mysqld)
2ce31ac Tianmu::core::Engine::HandleSelect+0x8ce (/data/stonedb57/install/bin/mysqld)
2df2371 Tianmu::DBHandler::ha_my_tianmu_query+0x5c (/data/stonedb57/install/bin/mysqld)
24205c6 execute_sqlcom_select+0x254 (/data/stonedb57/install/bin/mysqld)
241993c mysql_execute_command+0xe5c (/data/stonedb57/install/bin/mysqld)
242162b mysql_parse+0x6b0 (/data/stonedb57/install/bin/mysqld)
2416622 dispatch_command+0xcc7 (/data/stonedb57/install/bin/mysqld)
2415463 do_command+0x4ba (/data/stonedb57/install/bin/mysqld)
25479f9 handle_connection+0x1ee (/data/stonedb57/install/bin/mysqld)
2c17238 pfs_spawn_thread+0x173 (/data/stonedb57/install/bin/mysqld)
7faad6ef81ca start_thread+0xea (/usr/lib64/libpthread-2.28.so)
|
the cost of the routine call (in the ballpark of 20 |
These joins are not re�quired in a Volcano-like pipelined execution model. It |
The |
Execution proceeds using Volcano-like pipelining, |
Some peculiarities of this |
Modern CPUs can typically only perform 1 or |
Columns store data in data files that are either unduplicated or sorted. Consider generating a new auxiliary column column:
|
struct {
std::vector<buf> v;
size_t sum_len;
char **index;
union {
void *lens;
uint32_t *lens32;
uint16_t *lens16;
};
uint8_t len_mode;
} data_{};
|
Such a column data layout suitable for writing is not conducive to reading operations, and the previous LSM tree using rocksdb was equally stupid and inherited this disgusting design |
Aggregation Stage 3 First split: Establish a column data disk IO file layout suitable for vectorized reading |
Horizontal slicing of column data| 5786 | (X_67=[100095]:bat[:oid], C_68=[4]:bat[:oid]) := group.group(X_40=[100095]:bat[:str]); |
| 5852 | (X_69=[100095]:bat[:oid], C_70=[4]:bat[:oid]) := group.group(X_48=[100095]:bat[:str]); |
| 6182 | (X_71=[100095]:bat[:oid], C_72=[4]:bat[:oid]) := group.group(X_51=[100095]:bat[:str]); |
| 5472 | (X_73=[100095]:bat[:oid], C_74=[4]:bat[:oid]) := group.group(X_57=[100095]:bat[:str]); |
| 8232 | (X_75=[100095]:bat[:oid], C_76=[4]:bat[:oid]) := group.group(X_49=[100095]:bat[:str]); |
| 5879 | (X_77=[100097]:bat[:oid], C_78=[4]:bat[:oid]) := group.group(X_63=[100097]:bat[:str]); |
| 12955 | (X_79=[100095]:bat[:oid], C_80=[4000]:bat[:oid]) := group.subgroup(X_39=[100095]:bat[:int], X_67=[100095]:bat[:oid]); # GRP_create_partial_hash_table, dense |
| 14632 | (X_81=[100095]:bat[:oid], C_82=[4000]:bat[:oid]) := group.subgroup(X_36=[100095]:bat[:int], X_73=[100095]:bat[:oid]); # GRP_create_partial_hash_table, dense |
| 12384 | (X_83=[100097]:bat[:oid], C_84=[4000]:bat[:oid]) := group.subgroup(X_34=[100097]:bat[:int], X_77=[100097]:bat[:oid]); # GRP_create_partial_hash_table, dense |
| 20043 | (X_85=[100095]:bat[:oid], C_86=[4000]:bat[:oid]) := group.subgroup(X_53=[100095]:bat[:int], X_69=[100095]:bat[:oid]); # GRP_create_partial_hash_table, dense |
| 20069 | (X_87=[100095]:bat[:oid], C_88=[4000]:bat[:oid]) := group.subgroup(X_46=[100095]:bat[:int], X_71=[100095]:bat[:oid]); # GRP_create_partial_hash_table, dense |
| 18398 | (X_89=[100095]:bat[:oid], C_90=[4000]:bat[:oid]) := group.subgroup(X_38=[100095]:bat[:int], X_75=[100095]:bat[:oid]); # GRP_create_partial_hash_table, dense |
| 26944 | (X_91=[100097]:bat[:oid], C_92=[78707]:bat[:oid]) := group.subgroupdone(X_54=[100097]:bat[:lng], X_83=[100097]:bat[:oid]); # GRP_create_partial_hash_table, dense |
| 33312 | (X_93=[100095]:bat[:oid], C_94=[78880]:bat[:oid]) := group.subgroupdone(X_44=[100095]:bat[:lng], X_79=[100095]:bat[:oid]); # GRP_create_partial_hash_table, dense
|
The kernel maintains a central table of all active threads. They |
|
Segmentation for packet processing
|
|
While the layout of disk IO follows the same vectorization structure as RAM, there are several different design strategies for multithreaded task systems, and some emphasis needs to be analyzed in detail |
The problem is memory visibility. A more efficient design would be to use a completely lock-free approach without any visibility conflicts |
Documentation for mysql aggregation
https://dev.mysql.com/doc/refman/8.0/en/aggregate-functions-and-modifiers.html
Functional Requirements:
Performance Requirements:
Table data limit:
Memory usage limit:
Execution time limit:
Aggregate query SQL
The text was updated successfully, but these errors were encountered: