Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 130 additions & 11 deletions fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;

Expand Down Expand Up @@ -132,6 +134,14 @@ public enum BackupJobState {
@SerializedName("prop")
private Map<String, String> properties = Maps.newHashMap();

// Record table IDs that were dropped during backup
@SerializedName("dt")
private Set<Long> droppedTables = ConcurrentHashMap.newKeySet();

// Record partition IDs that were dropped during backup (tableId -> set of partitionIds)
@SerializedName("dp")
private Map<Long, Set<Long>> droppedPartitionsByTable = Maps.newConcurrentMap();

private long commitSeq = 0;

public BackupJob() {
Expand Down Expand Up @@ -235,6 +245,39 @@ private synchronized boolean tryNewTabletSnapshotTask(SnapshotTask task) {
return true;
}

private boolean handleTabletMissing(SnapshotTask task) {
LOG.info("handleTabletMissing task: {}", task);
Table table = env.getInternalCatalog().getTableByTableId(task.getTableId());
if (table == null) {
// Table was dropped (including cases where database was dropped)
droppedTables.add(task.getTableId());
LOG.info("table {} marked as dropped during backup. {}", task.getTableId(), this);
return true;
}

if (!(table instanceof OlapTable)) {
return false;
}

OlapTable olapTable = (OlapTable) table;
olapTable.readLock();
try {
Partition partition = olapTable.getPartition(task.getPartitionId());
if (partition == null) {
// Partition was dropped or truncated (partition ID changed)
droppedPartitionsByTable.computeIfAbsent(task.getTableId(), k -> ConcurrentHashMap.newKeySet())
.add(task.getPartitionId());
LOG.info("partition {} from table {} marked as dropped during backup (dropped or truncated). {}",
task.getPartitionId(), task.getTableId(), this);
return true;
}

// If partition still exists, tablet missing is caused by other reasons
return false;
} finally {
olapTable.readUnlock();
}
}

public synchronized boolean finishTabletSnapshotTask(SnapshotTask task, TFinishTaskRequest request) {
Preconditions.checkState(task.getJobId() == jobId);
Expand All @@ -249,11 +292,21 @@ public synchronized boolean finishTabletSnapshotTask(SnapshotTask task, TFinishT
cancelInternal();
}

if (request.getTaskStatus().getStatusCode() == TStatusCode.TABLET_MISSING
&& !tryNewTabletSnapshotTask(task)) {
status = new Status(ErrCode.NOT_FOUND,
"make snapshot failed, failed to ge tablet, table will be dropped or truncated");
cancelInternal();
if (request.getTaskStatus().getStatusCode() == TStatusCode.TABLET_MISSING) {
if (handleTabletMissing(task)) {
// Successfully handled drop case, remove from task queue
taskProgress.remove(task.getSignature());
taskErrMsg.remove(task.getSignature());
Long oldValue = unfinishedTaskIds.remove(task.getSignature());
return oldValue != null;
} else {
// Not caused by drop, follow original logic
if (!tryNewTabletSnapshotTask(task)) {
status = new Status(ErrCode.NOT_FOUND,
"make snapshot failed, failed to get tablet, table will be dropped or truncated");
cancelInternal();
}
}
}

if (request.getTaskStatus().getStatusCode() == TStatusCode.NOT_IMPLEMENTED_ERROR) {
Expand Down Expand Up @@ -498,13 +551,18 @@ private void prepareAndSendSnapshotTask() {
List<Table> copiedTables = Lists.newArrayList();
List<Resource> copiedResources = Lists.newArrayList();
AgentBatchTask batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
// Track if we have any valid tables for backup
boolean hasValidTables = false;
for (TableRef tableRef : tableRefs) {
String tblName = tableRef.getName().getTbl();
Table tbl = db.getTableNullable(tblName);
if (tbl == null) {
status = new Status(ErrCode.NOT_FOUND, "table " + tblName + " does not exist");
return;
// Table was dropped, skip it and continue with other tables
LOG.info("table {} does not exist, it was dropped during backup preparation, skip it. {}",
tblName, this);
continue;
}
hasValidTables = true;
tbl.readLock();
try {
switch (tbl.getType()) {
Expand Down Expand Up @@ -538,7 +596,11 @@ private void prepareAndSendSnapshotTask() {
return;
}
}

// If no valid tables found, cancel the job
if (!hasValidTables) {
status = new Status(ErrCode.NOT_FOUND, "no valid tables found for backup");
return;
}
// Limit the max num of tablets involved in a backup job, to avoid OOM.
if (unfinishedTaskIds.size() > Config.max_backup_tablets_per_job) {
String msg = String.format("the num involved tablets %d exceeds the limit %d, "
Expand Down Expand Up @@ -825,6 +887,43 @@ private void waitingAllUploadingFinished() {
}
}

private void cleanupDroppedTablesAndPartitions() {
if (backupMeta == null) {
return;
}

// Remove dropped partitions first (before removing tables)
for (Map.Entry<Long, Set<Long>> entry : droppedPartitionsByTable.entrySet()) {
Long tableId = entry.getKey();
Set<Long> droppedPartitionIds = entry.getValue();

Table table = backupMeta.getTable(tableId);
if (table instanceof OlapTable) {
OlapTable olapTable = (OlapTable) table;

// Directly get partitions by ID instead of iterating all partitions
for (Long droppedPartitionId : droppedPartitionIds) {
Partition partition = olapTable.getPartition(droppedPartitionId);
if (partition != null) {
LOG.info("remove dropped partition {} from table {} (id: {}) in backup meta. {}",
partition.getName(), table.getName(), tableId, this);
olapTable.dropPartitionAndReserveTablet(partition.getName());
}
}
}
}

// Remove dropped tables after processing partitions
for (Long tableId : droppedTables) {
Table removedTable = backupMeta.getTable(tableId);
if (removedTable != null) {
LOG.info("remove dropped table {} (id: {}) from backup meta. {}",
removedTable.getName(), tableId, this);
backupMeta.removeTable(tableId);
}
}
}

private void saveMetaInfo(boolean replay) {
String createTimeStr = TimeUtils.longToTimeString(createTime,
TimeUtils.getDatetimeFormatWithHyphenWithTimeZone());
Expand All @@ -846,7 +945,10 @@ private void saveMetaInfo(boolean replay) {
return;
}

// 2. save meta info file
// 2. Clean up dropped tables and partitions from backup metadata
cleanupDroppedTablesAndPartitions();

// 3. save meta info file
File metaInfoFile = new File(jobDir, Repository.FILE_META_INFO);
if (!metaInfoFile.createNewFile()) {
status = new Status(ErrCode.COMMON_ERROR,
Expand All @@ -856,7 +958,7 @@ private void saveMetaInfo(boolean replay) {
backupMeta.writeToFile(metaInfoFile);
localMetaInfoFilePath = metaInfoFile.getAbsolutePath();

// 3. save job info file
// 4. save job info file
Map<Long, Long> tableCommitSeqMap = Maps.newHashMap();
// iterate properties, convert key, value from string to long
// key is "${TABLE_COMMIT_SEQ_PREFIX}{tableId}", only need tableId to long
Expand All @@ -869,8 +971,21 @@ private void saveMetaInfo(boolean replay) {
tableCommitSeqMap.put(tableId, commitSeq);
}
}
// Filter out snapshot infos for dropped tables and partitions
Map<Long, SnapshotInfo> filteredSnapshotInfos = Maps.newHashMap();
for (Map.Entry<Long, SnapshotInfo> entry : snapshotInfos.entrySet()) {
SnapshotInfo info = entry.getValue();
boolean isDroppedTable = droppedTables.contains(info.getTblId());
boolean isDroppedPartition = droppedPartitionsByTable.getOrDefault(info.getTblId(),
Collections.emptySet()).contains(info.getPartitionId());

if (!isDroppedTable && !isDroppedPartition) {
filteredSnapshotInfos.put(entry.getKey(), info);
}
}

jobInfo = BackupJobInfo.fromCatalog(createTime, label, dbName, dbId,
getContent(), backupMeta, snapshotInfos, tableCommitSeqMap);
getContent(), backupMeta, filteredSnapshotInfos, tableCommitSeqMap);
if (LOG.isDebugEnabled()) {
LOG.debug("job info: {}. {}", jobInfo, this);
}
Expand Down Expand Up @@ -903,6 +1018,10 @@ private void saveMetaInfo(boolean replay) {

snapshotInfos.clear();

// Clean up temporary records to reduce editlog size
droppedPartitionsByTable.clear();
droppedTables.clear();

// log
env.getEditLog().logBackupJob(this);
LOG.info("finished to save meta the backup job info file to local.[{}], [{}] {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ public Table getTable(Long tblId) {
return tblIdMap.get(tblId);
}

public boolean removeTable(Long tableId) {
Table removedTable = tblIdMap.remove(tableId);
if (removedTable != null) {
tblNameMap.remove(removedTable.getName());
return true;
}
return false;
}

public static BackupMeta fromFile(String filePath, int metaVersion) throws IOException {
return fromInputStream(new FileInputStream(filePath), metaVersion);
}
Expand Down
Loading