Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
import org.apache.doris.mtmv.BaseTableInfo;
Expand Down Expand Up @@ -55,9 +54,10 @@
public class PartitionCompensator {

public static final Logger LOG = LogManager.getLogger(PartitionCompensator.class);
// if partition pair is null which means can not get partitions from table in QueryPartitionCollector,
// we think this table scan query all partitions default
// if the partition pair is null which means could not get partitions from table in QueryPartitionCollector,
// we think the table scans query all-partitions default
public static final Pair<RelationId, Set<String>> ALL_PARTITIONS = Pair.of(null, null);
// It means all partitions are used when query
public static final Collection<Pair<RelationId, Set<String>>> ALL_PARTITIONS_LIST =
ImmutableList.of(ALL_PARTITIONS);

Expand Down Expand Up @@ -180,33 +180,24 @@ public static Map<List<String>, Set<String>> getQueryUsedPartitions(StatementCon
// if value is not empty, means query some partitions
Map<List<String>, Set<String>> queryUsedRelatedTablePartitionsMap = new HashMap<>();
tableLoop:
for (Map.Entry<List<String>, TableIf> queryUsedTableEntry : statementContext.getTables().entrySet()) {
for (List<String> queryUsedTable : tableUsedPartitionNameMap.keySet()) {
Set<String> usedPartitionSet = new HashSet<>();
Collection<Pair<RelationId, Set<String>>> tableUsedPartitions =
tableUsedPartitionNameMap.get(queryUsedTableEntry.getKey());
if (!tableUsedPartitions.isEmpty()) {
if (ALL_PARTITIONS_LIST.equals(tableUsedPartitions)) {
queryUsedRelatedTablePartitionsMap.put(queryUsedTableEntry.getKey(), null);
continue;
}
for (Pair<RelationId, Set<String>> partitionPair : tableUsedPartitions) {
if (!customRelationIdSet.isEmpty()) {
if (ALL_PARTITIONS.equals(partitionPair)) {
continue;
}
if (customRelationIdSet.get(partitionPair.key().asInt())) {
usedPartitionSet.addAll(partitionPair.value());
}
} else {
if (ALL_PARTITIONS.equals(partitionPair)) {
queryUsedRelatedTablePartitionsMap.put(queryUsedTableEntry.getKey(), null);
continue tableLoop;
}
usedPartitionSet.addAll(partitionPair.value());
}
tableUsedPartitionNameMap.get(queryUsedTable);
if (ALL_PARTITIONS_LIST.equals(tableUsedPartitions)) {
// It means all partitions are used when query
queryUsedRelatedTablePartitionsMap.put(queryUsedTable, null);
continue;
}
for (Pair<RelationId, Set<String>> tableUsedPartitionPair : tableUsedPartitions) {
if (ALL_PARTITIONS.equals(tableUsedPartitionPair)) {
// It means all partitions are used when query
queryUsedRelatedTablePartitionsMap.put(queryUsedTable, null);
continue tableLoop;
}
usedPartitionSet.addAll(tableUsedPartitionPair.value());
}
queryUsedRelatedTablePartitionsMap.put(queryUsedTableEntry.getKey(), usedPartitionSet);
queryUsedRelatedTablePartitionsMap.put(queryUsedTable, usedPartitionSet);
}
return queryUsedRelatedTablePartitionsMap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -753,4 +753,89 @@ suite("nested_mtmv") {
mv_rewrite_any_success(sql_5, [mv_3, mv_4, mv_5])
compare_res(sql_5 + " order by 1,2,3,4,5,6,7,8,9,10,11,12,13")

sql """
drop table if exists sales_partitioned
"""

sql """
CREATE TABLE sales_partitioned (
product_id INT NOT NULL,
city VARCHAR(50) NOT NULL,
sale_date DATE NOT NULL,
amount DECIMAL(18, 2) NOT NULL
)
DUPLICATE KEY(product_id, city, sale_date)
PARTITION BY RANGE(sale_date) (
PARTITION p20251001 VALUES [('2025-10-01'), ('2025-10-02')),
PARTITION p20251002 VALUES [('2025-10-02'), ('2025-10-03')),
PARTITION p20251003 VALUES [('2025-10-03'), ('2025-10-04')),
PARTITION p_other VALUES [('2025-10-04'), ('2025-11-01'))
)
DISTRIBUTED BY HASH(product_id) BUCKETS 10
PROPERTIES (
"replication_num" = "1"
);
"""


sql """
INSERT INTO sales_partitioned (product_id, city, sale_date, amount) VALUES
(101, 'Beijing', '2025-10-01', 100.00), -- p20251001
(101, 'Shanghai', '2025-10-01', 150.00), -- p20251001
(102, 'Beijing', '2025-10-02', 200.00), -- p20251002
(102, 'Shanghai', '2025-10-02', 250.00), -- p20251002
(101, 'Beijing', '2025-10-03', 120.00), -- p20251003
(102, 'Shanghai', '2025-10-03', 300.00); -- p20251003
"""

create_async_partition_mv(db, "zz_mtmv1", """
SELECT
city,
sale_date,
SUM(amount) AS daily_city_amount
FROM
sales_partitioned
GROUP BY
city, sale_date;
""", "(sale_date)")
mv_rewrite_success("""
SELECT
city,
SUM(amount) AS total_city_amount
FROM
sales_partitioned
WHERE
sale_date >= '2025-10-01' AND sale_date <= '2025-10-03'
GROUP BY
city;
""", "zz_mtmv1", true, is_partition_statistics_ready(db, ["zz_mtmv1"]))

create_async_partition_mv(db, "zz_mtmv2", """
SELECT
city,
date_trunc(sale_date, 'MONTH') AS sale_date,
SUM(daily_city_amount) AS monthly_city_amount
FROM
zz_mtmv1
GROUP BY
city,
date_trunc(sale_date, 'MONTH')
""", "(sale_date)")

mv_rewrite_all_success_without_check_chosen("""
SELECT
date_trunc(sale_date, 'MONTH') AS sale_date,
SUM(daily_city_amount) AS monthly_city_amount
FROM
(SELECT
city,
sale_date,
SUM(amount) AS daily_city_amount
FROM
sales_partitioned
GROUP BY
city, sale_date) as t
GROUP BY
date_trunc(sale_date, 'MONTH');
""", ["zz_mtmv1", "zz_mtmv2"])
}
Loading