Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.PropertyAnalyzer.RewriteProperty;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
import org.apache.doris.persist.AlterMTMV;
import org.apache.doris.persist.AlterViewInfo;
Expand Down Expand Up @@ -176,7 +177,8 @@ private boolean processAlterOlapTable(AlterTableStmt stmt, OlapTable olapTable,

olapTable.checkNormalStateForAlter();
boolean needProcessOutsideTableLock = false;
String oldTableName = olapTable.getName();
BaseTableInfo oldBaseTableInfo = new BaseTableInfo(olapTable);
Optional<BaseTableInfo> newBaseTableInfo = Optional.empty();
if (currentAlterOps.checkTableStoragePolicy(alterClauses)) {
String tableStoragePolicy = olapTable.getStoragePolicy();
String currentStoragePolicy = currentAlterOps.getTableStoragePolicy(alterClauses);
Expand Down Expand Up @@ -283,8 +285,14 @@ private boolean processAlterOlapTable(AlterTableStmt stmt, OlapTable olapTable,
}
} else if (currentAlterOps.hasRenameOp()) {
processRename(db, olapTable, alterClauses);
newBaseTableInfo = Optional.of(new BaseTableInfo(olapTable));
} else if (currentAlterOps.hasReplaceTableOp()) {
processReplaceTable(db, olapTable, alterClauses);
// after replace table, olapTable may still be old name, so need set it to new name
ReplaceTableClause clause = (ReplaceTableClause) alterClauses.get(0);
String newTblName = clause.getTblName();
newBaseTableInfo = Optional.of(new BaseTableInfo(olapTable));
newBaseTableInfo.get().setTableName(newTblName);
} else if (currentAlterOps.contains(AlterOpType.MODIFY_TABLE_PROPERTY_SYNC)) {
needProcessOutsideTableLock = true;
} else if (currentAlterOps.contains(AlterOpType.MODIFY_DISTRIBUTION)) {
Expand All @@ -302,7 +310,8 @@ private boolean processAlterOlapTable(AlterTableStmt stmt, OlapTable olapTable,
throw new DdlException("Invalid alter operations: " + currentAlterOps);
}
if (needChangeMTMVState(alterClauses)) {
Env.getCurrentEnv().getMtmvService().alterTable(olapTable, oldTableName);
Env.getCurrentEnv().getMtmvService()
.alterTable(oldBaseTableInfo, newBaseTableInfo, currentAlterOps.hasReplaceTableOp());
}
return needProcessOutsideTableLock;
}
Expand Down Expand Up @@ -661,11 +670,9 @@ private void replaceTableInternal(Database db, OlapTable origTable, OlapTable ne
// drop origin table and new table
db.unregisterTable(oldTblName);
db.unregisterTable(newTblName);

// rename new table name to origin table name and add it to database
newTbl.checkAndSetName(oldTblName, false);
db.registerTable(newTbl);

if (swapTable) {
// rename origin table name to new table name and add it to database
origTable.checkAndSetName(newTblName, false);
Expand All @@ -674,7 +681,12 @@ private void replaceTableInternal(Database db, OlapTable origTable, OlapTable ne
// not swap, the origin table is not used anymore, need to drop all its tablets.
Env.getCurrentEnv().onEraseOlapTable(origTable, isReplay);
if (origTable.getType() == TableType.MATERIALIZED_VIEW) {
Env.getCurrentEnv().getMtmvService().deregisterMTMV((MTMV) origTable);
// Because the current dropMTMV will delete jobs related to materialized views,
// this method will maintain its own metadata for deleting jobs,
// so it cannot be called during playback
if (!isReplay) {
Env.getCurrentEnv().getMtmvService().dropMTMV((MTMV) origTable);
}
}
Env.getCurrentEnv().getAnalysisManager().removeTableStats(origTable.getId());
}
Expand Down
38 changes: 10 additions & 28 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
Original file line number Diff line number Diff line change
Expand Up @@ -412,19 +412,14 @@ public Pair<Boolean, Boolean> createTableWithLock(
result = setIfNotExist;
isTableExist = true;
} else {
idToTable.put(table.getId(), table);
nameToTable.put(table.getName(), table);
lowerCaseToTableName.put(tableName.toLowerCase(), tableName);

registerTable(table);
if (!isReplay) {
// Write edit log
CreateTableInfo info = new CreateTableInfo(fullQualifiedName, table);
Env.getCurrentEnv().getEditLog().logCreateTable(info);
}
if (table.getType() == TableType.ELASTICSEARCH) {
Env.getCurrentEnv().getEsRepository().registerTable((EsTable) table);
} else if (table.getType() == TableType.MATERIALIZED_VIEW) {
Env.getCurrentEnv().getMtmvService().registerMTMV((MTMV) table, id);
}
}
return Pair.of(result, isTableExist);
Expand All @@ -448,6 +443,9 @@ public boolean registerTable(TableIf table) {
idToTable.put(olapTable.getId(), olapTable);
nameToTable.put(olapTable.getName(), olapTable);
lowerCaseToTableName.put(tableName.toLowerCase(), tableName);
if (olapTable instanceof MTMV) {
Env.getCurrentEnv().getMtmvService().registerMTMV((MTMV) olapTable, id);
}
}
olapTable.unmarkDropped();
return result;
Expand All @@ -459,6 +457,9 @@ public void unregisterTable(String tableName) {
}
Table table = getTableNullable(tableName);
if (table != null) {
if (table instanceof MTMV) {
Env.getCurrentEnv().getMtmvService().unregisterMTMV((MTMV) table);
}
this.nameToTable.remove(tableName);
this.idToTable.remove(table.getId());
this.lowerCaseToTableName.remove(tableName.toLowerCase());
Expand Down Expand Up @@ -638,14 +639,7 @@ private void readTables(DataInput in) throws IOException {
int numTables = in.readInt();
for (int i = 0; i < numTables; ++i) {
Table table = Table.read(in);
table.setQualifiedDbName(fullQualifiedName);
if (table instanceof MTMV) {
Env.getCurrentEnv().getMtmvService().registerMTMV((MTMV) table, id);
}
String tableName = table.getName();
nameToTable.put(tableName, table);
idToTable.put(table.getId(), table);
lowerCaseToTableName.put(tableName.toLowerCase(), tableName);
registerTable(table);
}
}

Expand All @@ -654,12 +648,7 @@ public void gsonPostProcess() throws IOException {
Preconditions.checkState(nameToTable.getClass() == ConcurrentHashMap.class,
"nameToTable should be ConcurrentMap");
nameToTable.forEach((tn, tb) -> {
tb.setQualifiedDbName(fullQualifiedName);
if (tb instanceof MTMV) {
Env.getCurrentEnv().getMtmvService().registerMTMV((MTMV) tb, id);
}
idToTable.put(tb.getId(), tb);
lowerCaseToTableName.put(tn.toLowerCase(), tn);
registerTable(tb);
});

if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_105) {
Expand Down Expand Up @@ -733,14 +722,7 @@ public void readFields(DataInput in) throws IOException {
int numTables = in.readInt();
for (int i = 0; i < numTables; ++i) {
Table table = Table.read(in);
table.setQualifiedDbName(fullQualifiedName);
if (table instanceof MTMV) {
Env.getCurrentEnv().getMtmvService().registerMTMV((MTMV) table, id);
}
String tableName = table.getName();
nameToTable.put(tableName, table);
idToTable.put(table.getId(), table);
lowerCaseToTableName.put(tableName.toLowerCase(), tableName);
registerTable(table);
}

// read quota
Expand Down
10 changes: 8 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -4832,16 +4832,22 @@ public void renameTable(Database db, Table table, String newTableName) throws Dd
throw new DdlException("Table name[" + newTableName + "] is already used (in restoring)");
}

if (table.isManagedTable()) {
// If not checked first, execute db.unregisterTable first,
// and then check the name in setName, it cannot guarantee atomicity
((OlapTable) table).checkAndSetName(newTableName, true);
}

db.unregisterTable(oldTableName);

if (table.isManagedTable()) {
// olap table should also check if any rollup has same name as "newTableName"
((OlapTable) table).checkAndSetName(newTableName, false);
} else {
table.setName(newTableName);
}

db.unregisterTable(oldTableName);
db.registerTable(table);

TableInfo tableInfo = TableInfo.createForTableRename(db.getId(), table.getId(), oldTableName,
newTableName);
editLog.logTableRename(tableInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1046,9 +1046,6 @@ public boolean unprotectDropTable(Database db, Table table, boolean isForceDrop,
// which make things easier.
((OlapTable) table).dropAllTempPartitions();
}
if (table.getType() == TableType.MATERIALIZED_VIEW) {
Env.getCurrentEnv().getMtmvService().deregisterMTMV((MTMV) table);
}
Env.getCurrentEnv().getAnalysisManager().removeTableStats(table.getId());
db.unregisterTable(table.getName());
StopWatch watch = StopWatch.createStarted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
Expand All @@ -44,13 +45,15 @@
import org.apache.doris.mtmv.MTMVPartitionUtil;
import org.apache.doris.mtmv.MTMVPlanUtil;
import org.apache.doris.mtmv.MTMVRefreshContext;
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod;
import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot;
import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand;
import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition;
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
import org.apache.doris.qe.AuditLogHelper;
import org.apache.doris.qe.ConnectContext;
Expand Down Expand Up @@ -81,6 +84,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

public class MTMVTask extends AbstractTask {
private static final Logger LOG = LogManager.getLogger(MTMVTask.class);
Expand Down Expand Up @@ -193,6 +197,11 @@ public void run() throws JobException {
// lock table order by id to avoid deadlock
MetaLockUtils.readLockTables(tableIfs);
try {
// if mtmv is schema_change, check if column type has changed
// If it's not in the schema_change state, the column type definitely won't change.
if (MTMVState.SCHEMA_CHANGE.equals(mtmv.getStatus().getState())) {
checkColumnTypeIfChange(mtmv, ctx);
}
if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) {
MTMVPartitionUtil.alignMvPartition(mtmv);
}
Expand Down Expand Up @@ -239,6 +248,39 @@ public void run() throws JobException {
}
}

private void checkColumnTypeIfChange(MTMV mtmv, ConnectContext ctx) throws JobException {
List<ColumnDefinition> currentColumnsDefinition = MTMVPlanUtil.generateColumnsBySql(mtmv.getQuerySql(), ctx,
mtmv.getMvPartitionInfo().getPartitionCol(),
mtmv.getDistributionColumnNames(), null, mtmv.getTableProperty().getProperties());
List<Column> currentColumns = currentColumnsDefinition.stream()
.map(ColumnDefinition::translateToCatalogStyle)
.collect(Collectors.toList());
List<Column> originalColumns = mtmv.getBaseSchema(true);
if (currentColumns.size() != originalColumns.size()) {
throw new JobException(String.format(
"column length not equals, please check whether columns of base table have changed, "
+ "original length is: %s, current length is: %s",
originalColumns.size(), currentColumns.size()));
}
for (int i = 0; i < originalColumns.size(); i++) {
if (!isTypeLike(originalColumns.get(i).getType(), currentColumns.get(i).getType())) {
throw new JobException(String.format(
"column type not same, please check whether columns of base table have changed, "
+ "column name is: %s, original type is: %s, current type is: %s",
originalColumns.get(i).getName(), originalColumns.get(i).getType().toSql(),
currentColumns.get(i).getType().toSql()));
}
}
}

private boolean isTypeLike(Type type, Type typeOther) {
if (type.isStringType()) {
return typeOther.isStringType();
} else {
return type.equals(typeOther);
}
}

private void executeWithRetry(Set<String> execPartitionNames, Map<TableIf, String> tableWithPartKey)
throws Exception {
int retryCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo;
import org.apache.doris.persist.AlterMTMV;

import java.util.Optional;

/**
* Contains all operations that affect the mtmv
*/
Expand Down Expand Up @@ -63,7 +65,7 @@ public interface MTMVHookService {
*
* @param mtmv
*/
void deregisterMTMV(MTMV mtmv);
void unregisterMTMV(MTMV mtmv);

/**
* triggered when alter mtmv, only once
Expand Down Expand Up @@ -102,9 +104,11 @@ public interface MTMVHookService {
/**
* Triggered when baseTable is altered
*
* @param table
* @param oldTableInfo info before alter
* @param newTableInfo info after alter
* @param isReplace
*/
void alterTable(Table table, String oldTableName);
void alterTable(BaseTableInfo oldTableInfo, Optional<BaseTableInfo> newTableInfo, boolean isReplace);

/**
* Triggered when pause mtmv
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Optional;

/**
* when do some operation, do something about job
Expand Down Expand Up @@ -144,7 +145,7 @@ public void registerMTMV(MTMV mtmv, Long dbId) {
}

@Override
public void deregisterMTMV(MTMV mtmv) {
public void unregisterMTMV(MTMV mtmv) {

}

Expand Down Expand Up @@ -189,7 +190,7 @@ public void dropTable(Table table) {
}

@Override
public void alterTable(Table table, String oldTableName) {
public void alterTable(BaseTableInfo oldTableInfo, Optional<BaseTableInfo> newTableInfo, boolean isReplace) {

}

Expand Down
Loading