Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
93 commits
Select commit Hold shift + click to select a range
54eb6e7
1
zddr Mar 13, 2025
8b3f001
1
zddr Mar 13, 2025
9b9908d
1
zddr Mar 13, 2025
08a496e
1
zddr Mar 13, 2025
d5900b8
1
zddr Mar 13, 2025
0460cff
1
zddr Mar 13, 2025
63b8227
1
zddr Mar 13, 2025
a927035
1
zddr Mar 13, 2025
d2f11f7
1
zddr Mar 13, 2025
69cf9b7
1
zddr Mar 13, 2025
ff94d5a
use base schema
zddr Mar 21, 2025
a62c447
like instead of equals
zddr Mar 25, 2025
6b61bd1
1
zddr Mar 26, 2025
4cb9c6b
1
zddr Mar 26, 2025
a5b7a42
1
zddr Mar 26, 2025
d1bfb0f
1
zddr Mar 26, 2025
5894da2
1
zddr Mar 26, 2025
b95b89d
1
zddr Mar 26, 2025
b9a6271
1
zddr Mar 26, 2025
3e47106
1
zddr Mar 26, 2025
32ff666
1
zddr Mar 26, 2025
73c3dab
1
zddr Mar 26, 2025
b6f95ca
1
zddr Mar 26, 2025
df7608d
1
zddr Mar 26, 2025
90352a1
1
zddr Mar 26, 2025
f69f936
1
zddr Mar 26, 2025
2318957
1
zddr Mar 26, 2025
99dded1
fix replace
zddr Mar 31, 2025
276ae3d
1
zddr Mar 31, 2025
ef7cc9e
1
zddr Mar 31, 2025
ef0091a
1
zddr Mar 31, 2025
047868b
fix replace
zddr Mar 31, 2025
5bbd078
fix replace
zddr Mar 31, 2025
bb4f83e
fix replace
zddr Mar 31, 2025
a0c8ef8
1
zddr Mar 31, 2025
f7aaa54
1
zddr Mar 31, 2025
29a1679
1
zddr Mar 31, 2025
a9c345a
1
zddr Mar 31, 2025
bff2569
1
zddr Mar 31, 2025
eb68154
1
zddr Mar 31, 2025
4e5ec3a
1
zddr Mar 31, 2025
c7be88e
1
zddr Mar 31, 2025
e60ff94
1
zddr Mar 31, 2025
ae21b7c
1
zddr Mar 31, 2025
b5572df
1
zddr Apr 1, 2025
d622ff8
1
zddr Apr 1, 2025
8e02e37
1
zddr Apr 1, 2025
91951fc
1
zddr Apr 1, 2025
97b67e0
1
zddr Apr 1, 2025
e7b18b9
1
zddr Apr 1, 2025
5b80c84
1
zddr Apr 1, 2025
80684ba
1
zddr Apr 1, 2025
883fc61
1
zddr Apr 1, 2025
2db73db
1
zddr Apr 1, 2025
4e8b68b
1
zddr Apr 1, 2025
1261592
1
zddr Apr 1, 2025
520734e
1
zddr Apr 1, 2025
9a1fbf7
1
zddr Apr 1, 2025
fcc4679
Merge branch 'master' into mv_sc_core
zddr Apr 2, 2025
ff15c72
fix replace
zddr Apr 2, 2025
a65fa81
fix replace
zddr Apr 2, 2025
000ff80
fix replace
zddr Apr 2, 2025
71b0604
fix replace
zddr Apr 2, 2025
644f903
1
zddr Apr 2, 2025
25e5420
1
zddr Apr 2, 2025
3762e20
fix replace
zddr Apr 3, 2025
665a7df
1
zddr Apr 3, 2025
4b117b3
1
zddr Apr 3, 2025
2277265
1
zddr Apr 7, 2025
a55af42
1
zddr Apr 7, 2025
2555c65
1
zddr Apr 7, 2025
9d5898b
1
zddr Apr 7, 2025
145ca56
1
zddr Apr 7, 2025
31a7232
1
zddr Apr 7, 2025
18bb599
add comment
zddr Apr 7, 2025
70a8d7d
1
zddr Apr 7, 2025
e7b9d58
1
zddr Apr 7, 2025
383e88e
1
zddr Apr 8, 2025
5a582fa
init value
zddr Apr 21, 2025
b40c516
Merge branch 'master' into mv_sc_core
zddr Apr 21, 2025
07a22a7
comment
zddr Apr 24, 2025
7b3a57d
comment
zddr Apr 24, 2025
236a677
p0
zddr Apr 25, 2025
581877d
1
zddr Apr 25, 2025
82e7337
Merge branch 'master' into mv_sc_core
zddr Apr 27, 2025
7f5aa08
comment
zddr Apr 28, 2025
c7c59a5
add ut
zddr Apr 28, 2025
aee4ca5
add ut
zddr Apr 28, 2025
577d74f
Merge branch 'master' into mv_sc_core
zddr Apr 29, 2025
dcaf294
1
zddr Apr 29, 2025
d53b22b
1
zddr Apr 29, 2025
c4305be
resolve conflict
zddr Apr 30, 2025
1d68457
resolve conflict
zddr May 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.PropertyAnalyzer.RewriteProperty;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.nereids.trees.plans.commands.AlterSystemCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterTableCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateMaterializedViewCommand;
Expand Down Expand Up @@ -221,7 +222,8 @@ private boolean processAlterOlapTableInternal(List<AlterClause> alterClauses, Ol

olapTable.checkNormalStateForAlter();
boolean needProcessOutsideTableLock = false;
String oldTableName = olapTable.getName();
BaseTableInfo oldBaseTableInfo = new BaseTableInfo(olapTable);
Optional<BaseTableInfo> newBaseTableInfo = Optional.empty();
if (currentAlterOps.checkTableStoragePolicy(alterClauses)) {
String tableStoragePolicy = olapTable.getStoragePolicy();
String currentStoragePolicy = currentAlterOps.getTableStoragePolicy(alterClauses);
Expand Down Expand Up @@ -328,8 +330,14 @@ private boolean processAlterOlapTableInternal(List<AlterClause> alterClauses, Ol
}
} else if (currentAlterOps.hasRenameOp()) {
processRename(db, olapTable, alterClauses);
newBaseTableInfo = Optional.of(new BaseTableInfo(olapTable));
} else if (currentAlterOps.hasReplaceTableOp()) {
processReplaceTable(db, olapTable, alterClauses);
// after replace table, olapTable may still be old name, so need set it to new name
ReplaceTableClause clause = (ReplaceTableClause) alterClauses.get(0);
String newTblName = clause.getTblName();
newBaseTableInfo = Optional.of(new BaseTableInfo(olapTable));
newBaseTableInfo.get().setTableName(newTblName);
} else if (currentAlterOps.contains(AlterOpType.MODIFY_TABLE_PROPERTY_SYNC)) {
needProcessOutsideTableLock = true;
} else if (currentAlterOps.contains(AlterOpType.MODIFY_DISTRIBUTION)) {
Expand All @@ -347,7 +355,8 @@ private boolean processAlterOlapTableInternal(List<AlterClause> alterClauses, Ol
throw new DdlException("Invalid alter operations: " + currentAlterOps);
}
if (needChangeMTMVState(alterClauses)) {
Env.getCurrentEnv().getMtmvService().alterTable(olapTable, oldTableName);
Env.getCurrentEnv().getMtmvService()
.alterTable(oldBaseTableInfo, newBaseTableInfo, currentAlterOps.hasReplaceTableOp());
}
return needProcessOutsideTableLock;
}
Expand Down Expand Up @@ -810,11 +819,9 @@ private void replaceTableInternal(Database db, OlapTable origTable, OlapTable ne
// drop origin table and new table
db.unregisterTable(oldTblName);
db.unregisterTable(newTblName);

// rename new table name to origin table name and add it to database
newTbl.checkAndSetName(oldTblName, false);
db.registerTable(newTbl);

if (swapTable) {
// rename origin table name to new table name and add it to database
origTable.checkAndSetName(newTblName, false);
Expand All @@ -828,9 +835,13 @@ private void replaceTableInternal(Database db, OlapTable origTable, OlapTable ne
} else {
Env.getCurrentRecycleBin().recycleTable(db.getId(), origTable, isReplay, isForce, 0);
}

if (origTable.getType() == TableType.MATERIALIZED_VIEW) {
Env.getCurrentEnv().getMtmvService().deregisterMTMV((MTMV) origTable);
// Because the current dropMTMV will delete jobs related to materialized views,
// this method will maintain its own metadata for deleting jobs,
// so it cannot be called during playback
if (!isReplay) {
Env.getCurrentEnv().getMtmvService().dropMTMV((MTMV) origTable);
}
}
Env.getCurrentEnv().getAnalysisManager().removeTableStats(origTable.getId());
}
Expand Down
37 changes: 10 additions & 27 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
Original file line number Diff line number Diff line change
Expand Up @@ -412,9 +412,7 @@ public Pair<Boolean, Boolean> createTableWithLock(
result = setIfNotExist;
isTableExist = true;
} else {
idToTable.put(table.getId(), table);
lowerCaseToTableName.put(tableName.toLowerCase(), tableName);
nameToTable.put(table.getName(), table);
registerTable(table);
if (table.isTemporary()) {
Env.getCurrentEnv().registerTempTableAndSession(table);
}
Expand All @@ -426,8 +424,6 @@ public Pair<Boolean, Boolean> createTableWithLock(
}
if (table.getType() == TableType.ELASTICSEARCH) {
Env.getCurrentEnv().getEsRepository().registerTable((EsTable) table);
} else if (table.getType() == TableType.MATERIALIZED_VIEW) {
Env.getCurrentEnv().getMtmvService().registerMTMV((MTMV) table, id);
}
}
return Pair.of(result, isTableExist);
Expand All @@ -451,6 +447,9 @@ public boolean registerTable(TableIf table) {
idToTable.put(olapTable.getId(), olapTable);
nameToTable.put(olapTable.getName(), olapTable);
lowerCaseToTableName.put(tableName.toLowerCase(), tableName);
if (olapTable instanceof MTMV) {
Env.getCurrentEnv().getMtmvService().registerMTMV((MTMV) olapTable, id);
}
}
olapTable.unmarkDropped();
return result;
Expand All @@ -462,6 +461,9 @@ public void unregisterTable(String tableName) {
}
Table table = getTableNullable(tableName);
if (table != null) {
if (table instanceof MTMV) {
Env.getCurrentEnv().getMtmvService().unregisterMTMV((MTMV) table);
}
this.nameToTable.remove(tableName);
this.lowerCaseToTableName.remove(tableName.toLowerCase());
this.idToTable.remove(table.getId());
Expand Down Expand Up @@ -670,14 +672,7 @@ private void readTables(DataInput in) throws IOException {
int numTables = in.readInt();
for (int i = 0; i < numTables; ++i) {
Table table = Table.read(in);
table.setQualifiedDbName(fullQualifiedName);
if (table instanceof MTMV) {
Env.getCurrentEnv().getMtmvService().registerMTMV((MTMV) table, id);
}
String tableName = table.getName();
nameToTable.put(tableName, table);
idToTable.put(table.getId(), table);
lowerCaseToTableName.put(tableName.toLowerCase(), tableName);
registerTable(table);
}
}

Expand All @@ -686,12 +681,7 @@ public void gsonPostProcess() throws IOException {
Preconditions.checkState(nameToTable.getClass() == ConcurrentHashMap.class,
"nameToTable should be ConcurrentMap");
nameToTable.forEach((tn, tb) -> {
tb.setQualifiedDbName(fullQualifiedName);
if (tb instanceof MTMV) {
Env.getCurrentEnv().getMtmvService().registerMTMV((MTMV) tb, id);
}
idToTable.put(tb.getId(), tb);
lowerCaseToTableName.put(tn.toLowerCase(), tn);
registerTable(tb);
});

if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_105) {
Expand Down Expand Up @@ -765,14 +755,7 @@ public void readFields(DataInput in) throws IOException {
int numTables = in.readInt();
for (int i = 0; i < numTables; ++i) {
Table table = Table.read(in);
table.setQualifiedDbName(fullQualifiedName);
if (table instanceof MTMV) {
Env.getCurrentEnv().getMtmvService().registerMTMV((MTMV) table, id);
}
String tableName = table.getName();
nameToTable.put(tableName, table);
idToTable.put(table.getId(), table);
lowerCaseToTableName.put(tableName.toLowerCase(), tableName);
registerTable(table);
}

// read quota
Expand Down
10 changes: 8 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -5028,16 +5028,22 @@ public void renameTable(Database db, Table table, String newTableName) throws Dd
throw new DdlException("Table name[" + newTableName + "] is already used (in restoring)");
}

if (table.isManagedTable()) {
// If not checked first, execute db.unregisterTable first,
// and then check the name in setName, it cannot guarantee atomicity
((OlapTable) table).checkAndSetName(newTableName, true);
}

db.unregisterTable(oldTableName);

if (table.isManagedTable()) {
// olap table should also check if any rollup has same name as "newTableName"
((OlapTable) table).checkAndSetName(newTableName, false);
} else {
table.setName(newTableName);
}

db.unregisterTable(oldTableName);
db.registerTable(table);

TableInfo tableInfo = TableInfo.createForTableRename(db.getId(), table.getId(), oldTableName,
newTableName);
editLog.logTableRename(tableInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1053,9 +1053,6 @@ public boolean unprotectDropTable(Database db, Table table, boolean isForceDrop,
if (table.getType() == TableType.ELASTICSEARCH) {
esRepository.deRegisterTable(table.getId());
}
if (table.getType() == TableType.MATERIALIZED_VIEW) {
Env.getCurrentEnv().getMtmvService().deregisterMTMV((MTMV) table);
}

Env.getCurrentEnv().getAnalysisManager().removeTableStats(table.getId());
Env.getCurrentEnv().getDictionaryManager().dropTableDictionaries(db.getName(), table.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
Expand All @@ -44,6 +45,7 @@
import org.apache.doris.mtmv.MTMVPartitionUtil;
import org.apache.doris.mtmv.MTMVPlanUtil;
import org.apache.doris.mtmv.MTMVRefreshContext;
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod;
import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
Expand All @@ -52,6 +54,7 @@
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand;
import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition;
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
import org.apache.doris.qe.AuditLogHelper;
import org.apache.doris.qe.ConnectContext;
Expand Down Expand Up @@ -82,6 +85,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

public class MTMVTask extends AbstractTask {
private static final Logger LOG = LogManager.getLogger(MTMVTask.class);
Expand Down Expand Up @@ -194,6 +198,11 @@ public void run() throws JobException {
// lock table order by id to avoid deadlock
MetaLockUtils.readLockTables(tableIfs);
try {
// if mtmv is schema_change, check if column type has changed
// If it's not in the schema_change state, the column type definitely won't change.
if (MTMVState.SCHEMA_CHANGE.equals(mtmv.getStatus().getState())) {
checkColumnTypeIfChange(mtmv, ctx);
}
if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) {
MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable();
if (!relatedTable.isValidRelatedTable()) {
Expand Down Expand Up @@ -247,6 +256,39 @@ public void run() throws JobException {
}
}

private void checkColumnTypeIfChange(MTMV mtmv, ConnectContext ctx) throws JobException {
List<ColumnDefinition> currentColumnsDefinition = MTMVPlanUtil.generateColumnsBySql(mtmv.getQuerySql(), ctx,
mtmv.getMvPartitionInfo().getPartitionCol(),
mtmv.getDistributionColumnNames(), null, mtmv.getTableProperty().getProperties());
List<Column> currentColumns = currentColumnsDefinition.stream()
.map(ColumnDefinition::translateToCatalogStyle)
.collect(Collectors.toList());
List<Column> originalColumns = mtmv.getBaseSchema(true);
if (currentColumns.size() != originalColumns.size()) {
throw new JobException(String.format(
"column length not equals, please check whether columns of base table have changed, "
+ "original length is: %s, current length is: %s",
originalColumns.size(), currentColumns.size()));
}
for (int i = 0; i < originalColumns.size(); i++) {
if (!isTypeLike(originalColumns.get(i).getType(), currentColumns.get(i).getType())) {
throw new JobException(String.format(
"column type not same, please check whether columns of base table have changed, "
+ "column name is: %s, original type is: %s, current type is: %s",
originalColumns.get(i).getName(), originalColumns.get(i).getType().toSql(),
currentColumns.get(i).getType().toSql()));
}
}
}

private boolean isTypeLike(Type type, Type typeOther) {
if (type.isStringType()) {
return typeOther.isStringType();
} else {
return type.equals(typeOther);
}
}

private void executeWithRetry(Set<String> execPartitionNames, Map<TableIf, String> tableWithPartKey)
throws Exception {
int retryCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo;
import org.apache.doris.persist.AlterMTMV;

import java.util.Optional;

/**
* Contains all operations that affect the mtmv
*/
Expand Down Expand Up @@ -63,7 +65,7 @@ public interface MTMVHookService {
*
* @param mtmv
*/
void deregisterMTMV(MTMV mtmv);
void unregisterMTMV(MTMV mtmv);

/**
* triggered when alter mtmv, only once
Expand Down Expand Up @@ -102,9 +104,11 @@ public interface MTMVHookService {
/**
* Triggered when baseTable is altered
*
* @param table
* @param oldTableInfo info before alter
* @param newTableInfo info after alter
* @param isReplace
*/
void alterTable(Table table, String oldTableName);
void alterTable(BaseTableInfo oldTableInfo, Optional<BaseTableInfo> newTableInfo, boolean isReplace);

/**
* Triggered when pause mtmv
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Optional;

/**
* when do some operation, do something about job
Expand Down Expand Up @@ -144,7 +145,7 @@ public void registerMTMV(MTMV mtmv, Long dbId) {
}

@Override
public void deregisterMTMV(MTMV mtmv) {
public void unregisterMTMV(MTMV mtmv) {

}

Expand Down Expand Up @@ -189,7 +190,7 @@ public void dropTable(Table table) {
}

@Override
public void alterTable(Table table, String oldTableName) {
public void alterTable(BaseTableInfo oldTableInfo, Optional<BaseTableInfo> newTableInfo, boolean isReplace) {

}

Expand Down
Loading