Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](mtmv)fix nested mtmv not refresh #40433

Merged
merged 5 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3144,9 +3144,10 @@ public List<Column> getPartitionColumns() {
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context)
throws AnalysisException {
Map<String, Long> partitionVersions = context.getBaseVersions().getPartitionVersions();
long partitionId = getPartitionOrAnalysisException(partitionName).getId();
long visibleVersion = partitionVersions.containsKey(partitionName) ? partitionVersions.get(partitionName)
: getPartitionOrAnalysisException(partitionName).getVisibleVersion();
return new MTMVVersionSnapshot(visibleVersion);
return new MTMVVersionSnapshot(visibleVersion, partitionId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package org.apache.doris.mtmv;

import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.common.AnalysisException;

import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
Expand Down Expand Up @@ -74,6 +77,46 @@ public String toString() {
}

public void compatible(MTMV mtmv) {
try {
// snapshot add partitionId resolve problem of insert overwrite
zddr marked this conversation as resolved.
Show resolved Hide resolved
compatiblePartitions(mtmv);
} catch (Throwable e) {
LOG.warn("MTMV compatiblePartitions failed, mtmv: {}", mtmv.getName(), e);
}
try {
// change table id to BaseTableInfo
compatibleTables(mtmv);
} catch (Throwable e) {
LOG.warn("MTMV compatibleTables failed, mtmv: {}", mtmv.getName(), e);
}
}

private void compatiblePartitions(MTMV mtmv) throws AnalysisException {
if (!checkHasDataWithoutPartitionId()) {
return;
}
OlapTable relatedTable = (OlapTable) mtmv.getMvPartitionInfo().getRelatedTable();
for (Entry<String, MTMVSnapshotIf> entry : partitions.entrySet()) {
MTMVVersionSnapshot versionSnapshot = (MTMVVersionSnapshot) entry.getValue();
if (versionSnapshot.getId() == 0) {
Partition partition = relatedTable.getPartition(entry.getKey());
if (partition != null) {
(versionSnapshot).setId(partition.getId());
}
}
}
}

private boolean checkHasDataWithoutPartitionId() {
for (MTMVSnapshotIf snapshot : partitions.values()) {
if (snapshot instanceof MTMVVersionSnapshot && ((MTMVVersionSnapshot) snapshot).getId() == 0) {
return true;
}
}
return false;
}

private void compatibleTables(MTMV mtmv) {
if (tables.size() == tablesInfo.size()) {
return;
}
Expand All @@ -87,7 +130,7 @@ public void compatible(MTMV mtmv) {
if (tableInfo.isPresent()) {
tablesInfo.put(tableInfo.get(), entry.getValue());
} else {
LOG.warn("MTMV compatible failed, tableId: {}, relationTables: {}", entry.getKey(),
LOG.warn("MTMV compatibleTables failed, tableId: {}, relationTables: {}", entry.getKey(),
relation.getBaseTablesOneLevel());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,30 @@ public class MTMVVersionSnapshot implements MTMVSnapshotIf {
@SerializedName("v")
private long version;

// The partition version after insert overwrite is 1,
// which may cause the upper level materialized view to be unaware of changes in the data at the bottom level.
// However, the partition ID after overwrite will change, so the partition ID should be added.
// only for partition, table will always 0
@SerializedName("id")
private long id;

public MTMVVersionSnapshot(long version) {
this.version = version;
}

public MTMVVersionSnapshot(long version, long id) {
this.version = version;
this.id = id;
}

public long getId() {
return id;
}

public void setId(long id) {
this.id = id;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -37,18 +57,19 @@ public boolean equals(Object o) {
return false;
}
MTMVVersionSnapshot that = (MTMVVersionSnapshot) o;
return version == that.version;
return version == that.version && id == that.id;
}

@Override
public int hashCode() {
return Objects.hashCode(version);
return Objects.hashCode(version, id);
}

@Override
public String toString() {
return "MTMVVersionSnapshot{"
+ "version=" + version
+ ", id=" + id
+ '}';
}
}
11 changes: 11 additions & 0 deletions regression-test/data/mtmv_p0/test_multi_level_mtmv.out
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,17 @@
-- !mv2_should_one_partition --
["p_2"]

-- !mv1_should_one_partition_again --
["p_2"]

-- !mv2_should_one_partition_again --
["p_2"]

-- !mv2_again --
1 1
2 2
2 3

-- !status1 --
multi_level_mtmv1 SCHEMA_CHANGE SUCCESS

Expand Down
16 changes: 16 additions & 0 deletions regression-test/suites/mtmv_p0/test_multi_level_mtmv.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,22 @@ suite("test_multi_level_mtmv") {
waitingMTMVTaskFinishedByMvName(mv2)
order_qt_mv2_should_one_partition "select NeedRefreshPartitions from tasks('type'='mv') where MvName = '${mv2}' order by CreateTime desc limit 1"

// insert into p2 again, check partition version if change
sql """
INSERT INTO ${tableName} VALUES(2,3);
"""
sql """
REFRESH MATERIALIZED VIEW ${mv1} AUTO
"""
waitingMTMVTaskFinishedByMvName(mv1)
order_qt_mv1_should_one_partition_again "select NeedRefreshPartitions from tasks('type'='mv') where MvName = '${mv1}' order by CreateTime desc limit 1"
sql """
REFRESH MATERIALIZED VIEW ${mv2} AUTO
"""
waitingMTMVTaskFinishedByMvName(mv2)
order_qt_mv2_should_one_partition_again "select NeedRefreshPartitions from tasks('type'='mv') where MvName = '${mv2}' order by CreateTime desc limit 1"
order_qt_mv2_again "select * from ${mv2}"

// drop table
sql """
drop table ${tableName}
Expand Down
Loading