Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ statementBase

unsupportedStatement
: unsupportedUseStatement
| unsupportedDmlStatement
| unsupportedCreateStatement
| unsupportedDropStatement
| unsupportedStatsStatement
Expand Down Expand Up @@ -165,6 +164,7 @@ supportedDmlStatement
(stageAndPattern | (LEFT_PAREN SELECT selectColumnClause
FROM stageAndPattern whereClause? RIGHT_PAREN))
properties=propertyClause? #copyInto
| TRUNCATE TABLE multipartIdentifier specifiedPartition? FORCE? #truncateTable
;

supportedCreateStatement
Expand Down Expand Up @@ -907,10 +907,6 @@ unsupportedUseStatement
: USE ((catalog=identifier DOT)? database=identifier)? ATSIGN cluster=identifier #useCloudCluster
;

unsupportedDmlStatement
: TRUNCATE TABLE multipartIdentifier specifiedPartition? FORCE? #truncateTable
;

stageAndPattern
: ATSIGN (stage=identifier | TILDE)
(LEFT_PAREN pattern=STRING_LITERAL RIGHT_PAREN)?
Expand Down
18 changes: 18 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@
import org.apache.doris.nereids.trees.plans.commands.AnalyzeCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateMaterializedViewCommand;
import org.apache.doris.nereids.trees.plans.commands.DropCatalogRecycleBinCommand.IdType;
import org.apache.doris.nereids.trees.plans.commands.TruncateTableCommand;
import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVPropertyInfo;
import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVRefreshInfo;
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
Expand Down Expand Up @@ -6089,6 +6090,23 @@ public void truncateTable(TruncateTableStmt stmt) throws DdlException {
catalogIf.truncateTable(stmt);
}

/*
* Truncate specified table or partitions.
* The main idea is:
*
* 1. using the same schema to create new table(partitions)
* 2. use the new created table(partitions) to replace the old ones.
*
* if no partition specified, it will truncate all partitions of this table, including all temp partitions,
* otherwise, it will only truncate those specified partitions.
*
*/
public void truncateTable(TruncateTableCommand command) throws DdlException {
CatalogIf<?> catalogIf = catalogMgr.getCatalogOrException(command.getTableNameInfo().getCtl(),
catalog -> new DdlException(("Unknown catalog " + catalog)));
catalogIf.truncateTable(command);
}

public void replayTruncateTable(TruncateTableInfo info) throws MetaNotFoundException {
if (Strings.isNullOrEmpty(info.getCtl()) || info.getCtl().equals(InternalCatalog.INTERNAL_CATALOG_NAME)) {
// In previous versions(before 2.1.8), there is no catalog info in TruncateTableInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.nereids.trees.plans.commands.TruncateTableCommand;

import com.google.common.base.Strings;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -205,6 +206,8 @@ void dropTable(String dbName, String tableName, boolean isView, boolean isMtmv,

void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlException;

void truncateTable(TruncateTableCommand truncateTableCommand) throws DdlException;

// Convert from remote database name to local database name, overridden by subclass if necessary
default String fromRemoteDatabaseName(String remoteDatabaseName) {
return remoteDatabaseName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.doris.datasource.test.TestExternalDatabase;
import org.apache.doris.datasource.trinoconnector.TrinoConnectorExternalDatabase;
import org.apache.doris.fs.remote.dfs.DFSFileSystem;
import org.apache.doris.nereids.trees.plans.commands.TruncateTableCommand;
import org.apache.doris.persist.CreateDbInfo;
import org.apache.doris.persist.CreateTableInfo;
import org.apache.doris.persist.DropDbInfo;
Expand Down Expand Up @@ -1185,6 +1186,32 @@ public void truncateTable(TruncateTableStmt stmt) throws DdlException {
}
}

@Override
public void truncateTable(TruncateTableCommand command) throws DdlException {
makeSureInitialized();
if (metadataOps == null) {
throw new UnsupportedOperationException("Truncate table not supported in " + getName());
}
try {
String db = command.getTableNameInfo().getDb();
String tbl = command.getTableNameInfo().getTbl();

// delete all table data if null
List<String> partitions = null;
if (command.getPartitionNamesInfo().isPresent()) {
partitions = command.getPartitionNamesInfo().get().getPartitionNames();
}

metadataOps.truncateTable(db, tbl, partitions);
TruncateTableInfo info = new TruncateTableInfo(getName(), db, tbl, partitions);
Env.getCurrentEnv().getEditLog().logTruncateTable(info);
} catch (Exception e) {
LOG.warn("Failed to truncate table {}.{} in catalog {}", command.getTableNameInfo().getDb(),
command.getTableNameInfo().getTbl(), getName(), e);
throw e;
}
}

public void replayTruncateTable(TruncateTableInfo info) {
if (metadataOps != null) {
metadataOps.afterTruncateTable(info.getDb(), info.getTable());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.commands.DropCatalogRecycleBinCommand.IdType;
import org.apache.doris.nereids.trees.plans.commands.TruncateTableCommand;
import org.apache.doris.nereids.trees.plans.commands.info.DropMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
import org.apache.doris.persist.AlterDatabasePropertyInfo;
Expand Down Expand Up @@ -3779,6 +3780,242 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti
LOG.info("finished to truncate table {}, partitions: {}", tblRef.getName().toSql(), tblRef.getPartitionNames());
}

/*
* Truncate specified table or partitions.
* The main idea is:
*
* 1. using the same schema to create new table(partitions)
* 2. use the new created table(partitions) to replace the old ones.
*
* if no partition specified, it will truncate all partitions of this table, including all temp partitions,
* otherwise, it will only truncate those specified partitions.
*
*/
public void truncateTable(TruncateTableCommand truncateTableCommand) throws DdlException {
boolean isForceDrop = truncateTableCommand.isForceDrop();
String database = truncateTableCommand.getTableNameInfo().getDb();
String tbl = truncateTableCommand.getTableNameInfo().getTbl();

// check, and save some info which need to be checked again later
Map<String, Long> origPartitions = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
Map<Long, DistributionInfo> partitionsDistributionInfo = Maps.newHashMap();
OlapTable copiedTbl;

boolean truncateEntireTable = !truncateTableCommand.getPartitionNamesInfo().isPresent();

Database db = (Database) getDbOrDdlException(database);
OlapTable olapTable = db.getOlapTableOrDdlException(tbl);

if (olapTable instanceof MTMV && !MTMVUtil.allowModifyMTMVData(ConnectContext.get())) {
throw new DdlException("Not allowed to perform current operation on async materialized view");
}

HashMap<Long, Long> updateRecords = new HashMap<>();

BinlogConfig binlogConfig;
olapTable.readLock();
try {
olapTable.checkNormalStateForAlter();
if (!truncateEntireTable) {
for (String partName : truncateTableCommand.getPartitionNamesInfo().get().getPartitionNames()) {
Partition partition = olapTable.getPartition(partName);
if (partition == null) {
throw new DdlException("Partition " + partName + " does not exist");
}
// If need absolutely correct, should check running txn here.
// But if the txn is in prepare state, cann't known which partitions had load data.
if ((isForceDrop) && (!partition.hasData())) {
// if not force drop, then need to add partition to
// recycle bin, so behavior for recover would be clear
continue;
}
origPartitions.put(partName, partition.getId());
partitionsDistributionInfo.put(partition.getId(), partition.getDistributionInfo());
updateRecords.put(partition.getId(), partition.getBaseIndex().getRowCount());
}
} else {
for (Partition partition : olapTable.getPartitions()) {
// If need absolutely correct, should check running txn here.
// But if the txn is in prepare state, cann't known which partitions had load data.
if ((isForceDrop) && (!partition.hasData())) {
// if not force drop, then need to add partition to
// recycle bin, so behavior for recover would be clear
continue;
}
origPartitions.put(partition.getName(), partition.getId());
partitionsDistributionInfo.put(partition.getId(), partition.getDistributionInfo());
updateRecords.put(partition.getId(), partition.getBaseIndex().getRowCount());
}
}
// if table currently has no partitions, this sql like empty command and do nothing, should return directly.
// but if truncate whole table, the temporary partitions also need drop
if (origPartitions.isEmpty() && (!truncateEntireTable || olapTable.getAllTempPartitions().isEmpty())) {
LOG.info("finished to truncate table {}, no partition contains data, do nothing",
truncateTableCommand.getTableNameInfo().toSql());
return;
}
copiedTbl = olapTable.selectiveCopy(origPartitions.keySet(), IndexExtState.VISIBLE, false);

binlogConfig = new BinlogConfig(olapTable.getBinlogConfig());
} finally {
olapTable.readUnlock();
}

// 2. use the copied table to create partitions
List<Partition> newPartitions = Lists.newArrayList();
// tabletIdSet to save all newly created tablet ids.
Set<Long> tabletIdSet = Sets.newHashSet();
Runnable failedCleanCallback = () -> {
for (Long tabletId : tabletIdSet) {
Env.getCurrentInvertedIndex().deleteTablet(tabletId);
}
};
try {
long bufferSize = IdGeneratorUtil.getBufferSizeForTruncateTable(copiedTbl, origPartitions.values());
IdGeneratorBuffer idGeneratorBuffer =
origPartitions.isEmpty() ? null : Env.getCurrentEnv().getIdGeneratorBuffer(bufferSize);

Map<Long, Long> oldToNewPartitionId = new HashMap<Long, Long>();
List<Long> newPartitionIds = new ArrayList<Long>();
for (Map.Entry<String, Long> entry : origPartitions.entrySet()) {
long oldPartitionId = entry.getValue();
long newPartitionId = idGeneratorBuffer.getNextId();
oldToNewPartitionId.put(oldPartitionId, newPartitionId);
newPartitionIds.add(newPartitionId);
}

List<Long> indexIds = copiedTbl.getIndexIdToMeta().keySet().stream().collect(Collectors.toList());
beforeCreatePartitions(db.getId(), copiedTbl.getId(), newPartitionIds, indexIds, true);

for (Map.Entry<String, Long> entry : origPartitions.entrySet()) {
// the new partition must use new id
// If we still use the old partition id, the behavior of current load jobs on this partition
// will be undefined.
// By using a new id, load job will be aborted(just like partition is dropped),
// which is the right behavior.
long oldPartitionId = entry.getValue();
long newPartitionId = oldToNewPartitionId.get(oldPartitionId);
Partition newPartition = createPartitionWithIndices(db.getId(), copiedTbl,
newPartitionId, entry.getKey(),
copiedTbl.getIndexIdToMeta(), partitionsDistributionInfo.get(oldPartitionId),
copiedTbl.getPartitionInfo().getDataProperty(oldPartitionId),
copiedTbl.getPartitionInfo().getReplicaAllocation(oldPartitionId), null /* version info */,
copiedTbl.getCopiedBfColumns(), tabletIdSet,
copiedTbl.isInMemory(),
copiedTbl.getPartitionInfo().getTabletType(oldPartitionId),
olapTable.getPartitionInfo().getDataProperty(oldPartitionId).getStoragePolicy(),
idGeneratorBuffer, binlogConfig,
copiedTbl.getPartitionInfo().getDataProperty(oldPartitionId).isStorageMediumSpecified());
newPartitions.add(newPartition);
}

afterCreatePartitions(db.getId(), copiedTbl.getId(), newPartitionIds, indexIds, true);

} catch (DdlException e) {
// create partition failed, remove all newly created tablets
failedCleanCallback.run();
throw e;
}
Preconditions.checkState(origPartitions.size() == newPartitions.size());

// all partitions are created successfully, try to replace the old partitions.
// before replacing, we need to check again.
// Things may be changed outside the table lock.
List<Partition> oldPartitions = Lists.newArrayList();
boolean hasWriteLock = false;
try {
olapTable = (OlapTable) db.getTableOrDdlException(copiedTbl.getId());
olapTable.writeLockOrDdlException();
hasWriteLock = true;
olapTable.checkNormalStateForAlter();
// check partitions
for (Map.Entry<String, Long> entry : origPartitions.entrySet()) {
Partition partition = olapTable.getPartition(entry.getValue());
if (partition == null || !partition.getName().equalsIgnoreCase(entry.getKey())) {
throw new DdlException("Partition [" + entry.getKey() + "] is changed"
+ " during truncating table, please retry");
}
}

// check if meta changed
// rollup index may be added or dropped, and schema may be changed during creating partition operation.
boolean metaChanged = false;
if (olapTable.getIndexNameToId().size() != copiedTbl.getIndexNameToId().size()) {
metaChanged = true;
} else {
// compare schemaHash
Map<Long, Integer> copiedIndexIdToSchemaHash = copiedTbl.getIndexIdToSchemaHash();
for (Map.Entry<Long, Integer> entry : olapTable.getIndexIdToSchemaHash().entrySet()) {
long indexId = entry.getKey();
if (!copiedIndexIdToSchemaHash.containsKey(indexId)) {
metaChanged = true;
break;
}
if (!copiedIndexIdToSchemaHash.get(indexId).equals(entry.getValue())) {
metaChanged = true;
break;
}
}

List<Column> oldSchema = copiedTbl.getFullSchema();
List<Column> newSchema = olapTable.getFullSchema();
if (oldSchema.size() != newSchema.size()) {
LOG.warn("schema column size diff, old schema {}, new schema {}", oldSchema, newSchema);
metaChanged = true;
} else {
List<Column> oldSchemaCopy = Lists.newArrayList(oldSchema);
List<Column> newSchemaCopy = Lists.newArrayList(newSchema);
oldSchemaCopy.sort((Column a, Column b) -> a.getUniqueId() - b.getUniqueId());
newSchemaCopy.sort((Column a, Column b) -> a.getUniqueId() - b.getUniqueId());
for (int i = 0; i < oldSchemaCopy.size(); ++i) {
if (!oldSchemaCopy.get(i).equals(newSchemaCopy.get(i))) {
LOG.warn("schema diff, old schema {}, new schema {}", oldSchemaCopy.get(i),
newSchemaCopy.get(i));
metaChanged = true;
break;
}
}
}
}
if (DebugPointUtil.isEnable("InternalCatalog.truncateTable.metaChanged")) {
metaChanged = true;
LOG.warn("debug set truncate table meta changed");
}

if (metaChanged) {
throw new DdlException("Table[" + copiedTbl.getName() + "]'s meta has been changed. try again.");
}

//replace
Map<Long, RecyclePartitionParam> recyclePartitionParamMap = new HashMap<>();
oldPartitions = truncateTableInternal(olapTable, newPartitions,
truncateEntireTable, recyclePartitionParamMap, isForceDrop);

// write edit log
TruncateTableInfo info =
new TruncateTableInfo(db.getId(), db.getFullName(), olapTable.getId(), olapTable.getName(),
newPartitions, truncateEntireTable,
truncateTableCommand.toSqlWithoutTable(), oldPartitions, isForceDrop);
Env.getCurrentEnv().getEditLog().logTruncateTable(info);
} catch (DdlException e) {
failedCleanCallback.run();
throw e;
} finally {
if (hasWriteLock) {
olapTable.writeUnlock();
}
}

PartitionNames partitionNames = truncateEntireTable ? null
: new PartitionNames(false, truncateTableCommand.getPartitionNamesInfo().get().getPartitionNames());
Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable, partitionNames);
Env.getCurrentEnv().getAnalysisManager().updateUpdatedRows(updateRecords, db.getId(), olapTable.getId(), 0);
LOG.info("finished to truncate table {}, partitions: {}",
truncateTableCommand.getTableNameInfo().toSql(),
!truncateTableCommand.getPartitionNamesInfo().isPresent()
? "null" : truncateTableCommand.getPartitionNamesInfo().get().getPartitionNames());
}

private List<Partition> truncateTableInternal(OlapTable olapTable, List<Partition> newPartitions,
boolean isEntireTable, Map<Long, RecyclePartitionParam> recyclePartitionParamMap, boolean isforceDrop) {
// use new partitions to replace the old ones.
Expand Down
Loading
Loading