Skip to content

Commit

Permalink
[feature](restore) Support clean_tables/clean_partitions properties f…
Browse files Browse the repository at this point in the history
…or restore job (#39028)

The restore will keep the existing tables/partitions in which the
restore target is not contained, this PR adds a property clean_restore
to indicate that the restore job needs to recycle those
tables/partitions

The CCR part PR is: selectdb/ccr-syncer#128.
The website PR is: apache/doris-website#999
  • Loading branch information
w41ter authored and dataroaring committed Aug 17, 2024
1 parent c37a1d7 commit 4489083
Show file tree
Hide file tree
Showing 9 changed files with 406 additions and 122 deletions.
104 changes: 51 additions & 53 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@

public class RestoreStmt extends AbstractBackupStmt {
private static final String PROP_ALLOW_LOAD = "allow_load";
private static final String PROP_REPLICATION_NUM = "replication_num";
private static final String PROP_BACKUP_TIMESTAMP = "backup_timestamp";
private static final String PROP_META_VERSION = "meta_version";
private static final String PROP_RESERVE_REPLICA = "reserve_replica";
private static final String PROP_RESERVE_DYNAMIC_PARTITION_ENABLE = "reserve_dynamic_partition_enable";
private static final String PROP_IS_BEING_SYNCED = PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED;

public static final String PROP_RESERVE_REPLICA = "reserve_replica";
public static final String PROP_RESERVE_DYNAMIC_PARTITION_ENABLE = "reserve_dynamic_partition_enable";
public static final String PROP_CLEAN_TABLES = "clean_tables";
public static final String PROP_CLEAN_PARTITIONS = "clean_partitions";

private boolean allowLoad = false;
private ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
private String backupTimestamp = null;
Expand All @@ -50,16 +52,18 @@ public class RestoreStmt extends AbstractBackupStmt {
private boolean reserveDynamicPartitionEnable = false;
private boolean isLocal = false;
private boolean isBeingSynced = false;
private boolean isCleanTables = false;
private boolean isCleanPartitions = false;
private byte[] meta = null;
private byte[] jobInfo = null;

public RestoreStmt(LabelName labelName, String repoName, AbstractBackupTableRefClause restoreTableRefClause,
Map<String, String> properties) {
Map<String, String> properties) {
super(labelName, repoName, restoreTableRefClause, properties);
}

public RestoreStmt(LabelName labelName, String repoName, AbstractBackupTableRefClause restoreTableRefClause,
Map<String, String> properties, byte[] meta, byte[] jobInfo) {
Map<String, String> properties, byte[] meta, byte[] jobInfo) {
super(labelName, repoName, restoreTableRefClause, properties);
this.meta = meta;
this.jobInfo = jobInfo;
Expand Down Expand Up @@ -109,6 +113,14 @@ public boolean isBeingSynced() {
return isBeingSynced;
}

public boolean isCleanTables() {
return isCleanTables;
}

public boolean isCleanPartitions() {
return isCleanPartitions;
}

@Override
public void analyze(Analyzer analyzer) throws UserException {
if (repoName.equals(Repository.KEEP_ON_LOCAL_REPO_NAME)) {
Expand Down Expand Up @@ -142,52 +154,24 @@ public void analyzeProperties() throws AnalysisException {

Map<String, String> copiedProperties = Maps.newHashMap(properties);
// allow load
if (copiedProperties.containsKey(PROP_ALLOW_LOAD)) {
if (copiedProperties.get(PROP_ALLOW_LOAD).equalsIgnoreCase("true")) {
allowLoad = true;
} else if (copiedProperties.get(PROP_ALLOW_LOAD).equalsIgnoreCase("false")) {
allowLoad = false;
} else {
ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR,
"Invalid allow load value: " + copiedProperties.get(PROP_ALLOW_LOAD));
}
copiedProperties.remove(PROP_ALLOW_LOAD);
}
allowLoad = eatBooleanProperty(copiedProperties, PROP_ALLOW_LOAD, allowLoad);

// replication num
this.replicaAlloc = PropertyAnalyzer.analyzeReplicaAllocation(copiedProperties, "");
if (this.replicaAlloc.isNotSet()) {
this.replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
}
// reserve replica
if (copiedProperties.containsKey(PROP_RESERVE_REPLICA)) {
if (copiedProperties.get(PROP_RESERVE_REPLICA).equalsIgnoreCase("true")) {
reserveReplica = true;
} else if (copiedProperties.get(PROP_RESERVE_REPLICA).equalsIgnoreCase("false")) {
reserveReplica = false;
} else {
ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR,
"Invalid reserve_replica value: " + copiedProperties.get(PROP_RESERVE_REPLICA));
}
// force set reserveReplica to false, do not keep the origin allocation
if (reserveReplica && !Config.force_olap_table_replication_allocation.isEmpty()) {
reserveReplica = false;
}
copiedProperties.remove(PROP_RESERVE_REPLICA);
reserveReplica = eatBooleanProperty(copiedProperties, PROP_RESERVE_REPLICA, reserveReplica);
// force set reserveReplica to false, do not keep the origin allocation
if (reserveReplica && !Config.force_olap_table_replication_allocation.isEmpty()) {
reserveReplica = false;
}

// reserve dynamic partition enable
if (copiedProperties.containsKey(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE)) {
if (copiedProperties.get(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE).equalsIgnoreCase("true")) {
reserveDynamicPartitionEnable = true;
} else if (copiedProperties.get(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE).equalsIgnoreCase("false")) {
reserveDynamicPartitionEnable = false;
} else {
ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR,
"Invalid reserve dynamic partition enable value: "
+ copiedProperties.get(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE));
}
copiedProperties.remove(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE);
}
reserveDynamicPartitionEnable = eatBooleanProperty(
copiedProperties, PROP_RESERVE_DYNAMIC_PARTITION_ENABLE, reserveDynamicPartitionEnable);

// backup timestamp
if (copiedProperties.containsKey(PROP_BACKUP_TIMESTAMP)) {
backupTimestamp = copiedProperties.get(PROP_BACKUP_TIMESTAMP);
Expand All @@ -211,17 +195,13 @@ public void analyzeProperties() throws AnalysisException {
}

// is being synced
if (copiedProperties.containsKey(PROP_IS_BEING_SYNCED)) {
if (copiedProperties.get(PROP_IS_BEING_SYNCED).equalsIgnoreCase("true")) {
isBeingSynced = true;
} else if (copiedProperties.get(PROP_IS_BEING_SYNCED).equalsIgnoreCase("false")) {
isBeingSynced = false;
} else {
ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR,
"Invalid is being synced value: " + copiedProperties.get(PROP_IS_BEING_SYNCED));
}
copiedProperties.remove(PROP_IS_BEING_SYNCED);
}
isBeingSynced = eatBooleanProperty(copiedProperties, PROP_IS_BEING_SYNCED, isBeingSynced);

// is clean tables
isCleanTables = eatBooleanProperty(copiedProperties, PROP_CLEAN_TABLES, isCleanTables);

// is clean partitions
isCleanPartitions = eatBooleanProperty(copiedProperties, PROP_CLEAN_PARTITIONS, isCleanPartitions);

if (!copiedProperties.isEmpty()) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR,
Expand Down Expand Up @@ -252,4 +232,22 @@ public String toSql() {
public StmtType stmtType() {
return StmtType.RESTORE;
}

private boolean eatBooleanProperty(Map<String, String> copiedProperties, String name, boolean defaultValue)
throws AnalysisException {
boolean retval = defaultValue;
if (copiedProperties.containsKey(name)) {
String value = copiedProperties.get(name);
if (value.equalsIgnoreCase("true")) {
retval = true;
} else if (value.equalsIgnoreCase("false")) {
retval = false;
} else {
ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR,
"Invalid boolean property " + name + " value: " + value);
}
copiedProperties.remove(name);
}
return retval;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -531,12 +531,12 @@ private void restore(Repository repository, Database db, RestoreStmt stmt) throw
db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(),
stmt.getTimeoutMs(), metaVersion, stmt.reserveReplica(),
stmt.reserveDynamicPartitionEnable(), stmt.isBeingSynced(),
env, Repository.KEEP_ON_LOCAL_REPO_ID, backupMeta);
stmt.isCleanTables(), stmt.isCleanPartitions(), env, Repository.KEEP_ON_LOCAL_REPO_ID, backupMeta);
} else {
restoreJob = new RestoreJob(stmt.getLabel(), stmt.getBackupTimestamp(),
db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(),
stmt.getTimeoutMs(), stmt.getMetaVersion(), stmt.reserveReplica(), stmt.reserveDynamicPartitionEnable(),
stmt.isBeingSynced(), env, repository.getId());
stmt.isBeingSynced(), stmt.isCleanTables(), stmt.isCleanPartitions(), env, repository.getId());
}

env.getEditLog().logRestoreJob(restoreJob);
Expand Down
97 changes: 89 additions & 8 deletions fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.backup;

import org.apache.doris.analysis.BackupStmt.BackupContent;
import org.apache.doris.analysis.RestoreStmt;
import org.apache.doris.backup.BackupJobInfo.BackupIndexInfo;
import org.apache.doris.backup.BackupJobInfo.BackupOlapTableInfo;
import org.apache.doris.backup.BackupJobInfo.BackupPartitionInfo;
Expand Down Expand Up @@ -65,6 +66,7 @@
import org.apache.doris.common.util.DynamicPartitionUtil;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.property.S3ClientBEProperties;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
Expand Down Expand Up @@ -110,9 +112,12 @@
import java.util.stream.Collectors;

public class RestoreJob extends AbstractJob implements GsonPostProcessable {
private static final String PROP_RESERVE_REPLICA = "reserve_replica";
private static final String PROP_RESERVE_DYNAMIC_PARTITION_ENABLE = "reserve_dynamic_partition_enable";
private static final String PROP_RESERVE_REPLICA = RestoreStmt.PROP_RESERVE_REPLICA;
private static final String PROP_RESERVE_DYNAMIC_PARTITION_ENABLE =
RestoreStmt.PROP_RESERVE_DYNAMIC_PARTITION_ENABLE;
private static final String PROP_IS_BEING_SYNCED = PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED;
private static final String PROP_CLEAN_TABLES = RestoreStmt.PROP_CLEAN_TABLES;
private static final String PROP_CLEAN_PARTITIONS = RestoreStmt.PROP_CLEAN_PARTITIONS;

private static final Logger LOG = LogManager.getLogger(RestoreJob.class);

Expand Down Expand Up @@ -192,6 +197,11 @@ public enum RestoreJobState {

private boolean isBeingSynced = false;

// Whether to delete existing tables that are not involved in the restore.
private boolean isCleanTables = false;
// Whether to delete existing partitions that are not involved in the restore.
private boolean isCleanPartitions = false;

// restore properties
@SerializedName("prop")
private Map<String, String> properties = Maps.newHashMap();
Expand All @@ -202,7 +212,8 @@ public RestoreJob() {

public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad,
ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica,
boolean reserveDynamicPartitionEnable, boolean isBeingSynced, Env env, long repoId) {
boolean reserveDynamicPartitionEnable, boolean isBeingSynced, boolean isCleanTables,
boolean isCleanPartitions, Env env, long repoId) {
super(JobType.RESTORE, label, dbId, dbName, timeoutMs, env, repoId);
this.backupTimestamp = backupTs;
this.jobInfo = jobInfo;
Expand All @@ -217,16 +228,21 @@ public RestoreJob(String label, String backupTs, long dbId, String dbName, Backu
}
this.reserveDynamicPartitionEnable = reserveDynamicPartitionEnable;
this.isBeingSynced = isBeingSynced;
this.isCleanTables = isCleanTables;
this.isCleanPartitions = isCleanPartitions;
properties.put(PROP_RESERVE_REPLICA, String.valueOf(reserveReplica));
properties.put(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE, String.valueOf(reserveDynamicPartitionEnable));
properties.put(PROP_IS_BEING_SYNCED, String.valueOf(isBeingSynced));
properties.put(PROP_CLEAN_TABLES, String.valueOf(isCleanTables));
properties.put(PROP_CLEAN_PARTITIONS, String.valueOf(isCleanPartitions));
}

public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad,
ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica,
boolean reserveDynamicPartitionEnable, boolean isBeingSynced, Env env, long repoId, BackupMeta backupMeta) {
boolean reserveDynamicPartitionEnable, boolean isBeingSynced, boolean isCleanTables,
boolean isCleanPartitions, Env env, long repoId, BackupMeta backupMeta) {
this(label, backupTs, dbId, dbName, jobInfo, allowLoad, replicaAlloc, timeoutMs, metaVersion, reserveReplica,
reserveDynamicPartitionEnable, isBeingSynced, env, repoId);
reserveDynamicPartitionEnable, isBeingSynced, isCleanTables, isCleanPartitions, env, repoId);
this.backupMeta = backupMeta;
}

Expand Down Expand Up @@ -894,7 +910,7 @@ private void checkAndPrepareMeta() {

if (ok) {
if (LOG.isDebugEnabled()) {
LOG.debug("finished to create all restored replcias. {}", this);
LOG.debug("finished to create all restored replicas. {}", this);
}
// add restored partitions.
// table should be in State RESTORE, so no other partitions can be
Expand Down Expand Up @@ -1481,7 +1497,7 @@ private void downloadRemoteSnapshots() {
return;
}

Tablet tablet = idx.getTablet(info.getTabletId());
Tablet tablet = idx.getTablet(info.getTabletId());
if (tablet == null) {
status = new Status(ErrCode.NOT_FOUND,
"tablet " + info.getTabletId() + " does not exist in restored table "
Expand Down Expand Up @@ -1629,7 +1645,7 @@ private void downloadLocalSnapshots() {
return;
}

Tablet tablet = idx.getTablet(info.getTabletId());
Tablet tablet = idx.getTablet(info.getTabletId());
if (tablet == null) {
status = new Status(ErrCode.NOT_FOUND,
"tablet " + info.getTabletId() + " does not exist in restored table "
Expand Down Expand Up @@ -1833,6 +1849,14 @@ private Status allTabletCommitted(boolean isReplay) {
}
}

// Drop the exists but non-restored table/partitions.
if (isCleanTables || isCleanPartitions) {
Status st = dropAllNonRestoredTableAndPartitions(db);
if (!st.ok()) {
return st;
}
}

if (!isReplay) {
restoredPartitions.clear();
restoredTbls.clear();
Expand All @@ -1855,6 +1879,59 @@ private Status allTabletCommitted(boolean isReplay) {
return Status.OK;
}

private Status dropAllNonRestoredTableAndPartitions(Database db) {
try {
for (Table table : db.getTables()) {
long tableId = table.getId();
String tableName = table.getName();
TableType tableType = table.getType();
BackupOlapTableInfo backupTableInfo = jobInfo.backupOlapTableObjects.get(tableName);
if (tableType != TableType.OLAP && tableType != TableType.ODBC && tableType != TableType.VIEW) {
continue;
}
if (tableType == TableType.OLAP && backupTableInfo != null) {
// drop the non restored partitions.
dropNonRestoredPartitions(db, (OlapTable) table, backupTableInfo);
} else if (isCleanTables) {
// otherwise drop the entire table.
LOG.info("drop non restored table {}({}). {}", tableName, tableId, this);
boolean isForceDrop = false; // move this table into recyclebin.
env.getInternalCatalog().dropTableWithoutCheck(db, table, isForceDrop);
}
}
return Status.OK;
} catch (Exception e) {
LOG.warn("drop all non restored table and partitions failed. {}", this, e);
return new Status(ErrCode.COMMON_ERROR, e.getMessage());
}
}

private void dropNonRestoredPartitions(
Database db, OlapTable table, BackupOlapTableInfo backupTableInfo) throws DdlException {
if (!isCleanPartitions || !table.writeLockIfExist()) {
return;
}

try {
long tableId = table.getId();
String tableName = table.getQualifiedName();
InternalCatalog catalog = env.getInternalCatalog();
for (String partitionName : table.getPartitionNames()) {
if (backupTableInfo.containsPart(partitionName)) {
continue;
}

LOG.info("drop non restored partition {} of table {}({}). {}",
partitionName, tableName, tableId, this);
boolean isTempPartition = false;
boolean isForceDrop = false; // move this partition into recyclebin.
catalog.dropPartitionWithoutCheck(db, table, partitionName, isTempPartition, isForceDrop);
}
} finally {
table.writeUnlock();
}
}

private void releaseSnapshots() {
if (snapshotInfos.isEmpty()) {
return;
Expand Down Expand Up @@ -2192,13 +2269,17 @@ public void readFields(DataInput in) throws IOException {
reserveReplica = Boolean.parseBoolean(properties.get(PROP_RESERVE_REPLICA));
reserveDynamicPartitionEnable = Boolean.parseBoolean(properties.get(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE));
isBeingSynced = Boolean.parseBoolean(properties.get(PROP_IS_BEING_SYNCED));
isCleanTables = Boolean.parseBoolean(properties.get(PROP_CLEAN_TABLES));
isCleanPartitions = Boolean.parseBoolean(properties.get(PROP_CLEAN_PARTITIONS));
}

@Override
public void gsonPostProcess() throws IOException {
reserveReplica = Boolean.parseBoolean(properties.get(PROP_RESERVE_REPLICA));
reserveDynamicPartitionEnable = Boolean.parseBoolean(properties.get(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE));
isBeingSynced = Boolean.parseBoolean(properties.get(PROP_IS_BEING_SYNCED));
isCleanTables = Boolean.parseBoolean(properties.get(PROP_CLEAN_TABLES));
isCleanPartitions = Boolean.parseBoolean(properties.get(PROP_CLEAN_PARTITIONS));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ public void setState(PartitionState state) {
public void updateVersionForRestore(long visibleVersion) {
this.setVisibleVersion(visibleVersion);
this.nextVersion = this.visibleVersion + 1;
LOG.info("update partition {} version for restore: visible: {}, next: {}",
name, visibleVersion, nextVersion);
LOG.info("update partition {}({}) version for restore: visible: {}, next: {}",
name, id, visibleVersion, nextVersion);
}

public void updateVisibleVersion(long visibleVersion) {
Expand Down
Loading

0 comments on commit 4489083

Please sign in to comment.