Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cloud/src/recycler/recycler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -996,7 +996,8 @@ int64_t calculate_restore_job_expired_time(
const std::string& instance_id_, const RestoreJobCloudPB& restore_job,
int64_t* earlest_ts /* restore job earliest expiration ts */) {
if (config::force_immediate_recycle || restore_job.state() == RestoreJobCloudPB::DROPPED ||
restore_job.state() == RestoreJobCloudPB::COMPLETED) {
restore_job.state() == RestoreJobCloudPB::COMPLETED ||
restore_job.state() == RestoreJobCloudPB::RECYCLING) {
// final state, recycle immediately
return 0L;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1658,6 +1658,12 @@ public class Config extends ConfigBase {
"Whether to enable cloud restore job."}, varType = VariableAnnotation.EXPERIMENTAL)
public static boolean enable_cloud_restore_job = false;

@ConfField(mutable = true, masterOnly = true, description = {
"存算分离恢复过程中,一次 create tablets rpc 创建的 tablet 数量上限,默认值为256个",
"During the cloud restore job, the maximum number of tablets created by one "
+ "create tablets RPC, 256 by default."})
public static int cloud_restore_create_tablet_batch_size = 256;

/**
* Control the default max num of the instance for a user.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ public void setBeingSyncedProperties() {

public void resetVersionForRestore() {
for (Partition partition : idToPartition.values()) {
partition.setNextVersion(partition.getVisibleVersion() + 1);
partition.setNextVersion(partition.getCachedVisibleVersion() + 1);
}
}

Expand Down Expand Up @@ -793,7 +793,7 @@ public Status resetIdsForRestore(Env env, Database db, ReplicaAllocation restore
if (Config.isCloudMode()) {
long newReplicaId = Env.getCurrentEnv().getNextId();
Replica replica = new CloudReplica(newReplicaId, null, ReplicaState.NORMAL,
visibleVersion, schemaHash, db.getId(), id, partition.getId(), idx.getId(), i);
visibleVersion, schemaHash, db.getId(), id, entry.getKey(), idx.getId(), i);
newTablet.addReplica(replica, true /* is restore */);
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.qe.ComputeGroupException;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
Expand Down Expand Up @@ -85,7 +86,7 @@ public class CloudRestoreJob extends RestoreJob {

private String cloudClusterId = null;

private Map<OlapTable, Cloud.CreateTabletsRequest.Builder> tabletsPerTable = new HashMap<>();
private Map<Pair<OlapTable, Partition>, List<Cloud.CreateTabletsRequest.Builder>> tabletRequests = new HashMap<>();

public enum MetaSeriviceOperation {
PREPARE,
Expand Down Expand Up @@ -190,14 +191,18 @@ public void doCreateReplicas() {
handleMetaObject(MetaSeriviceOperation.PREPARE);
// send create tablets requests
boolean needSetStorageVault = ((CloudEnv) Env.getCurrentEnv()).getEnableStorageVault();
for (Map.Entry<OlapTable, Cloud.CreateTabletsRequest.Builder> entry : tabletsPerTable.entrySet()) {
OlapTable table = entry.getKey();
Cloud.CreateTabletsRequest.Builder requestBuilder = entry.getValue();
Cloud.CreateTabletsResponse resp = sendCreateTabletsRequests(requestBuilder, table,
needSetStorageVault);
if (resp.hasStorageVaultId()) {
storageVaultId = resp.getStorageVaultId();
needSetStorageVault = false;
for (Map.Entry<Pair<OlapTable, Partition>, List<Cloud.CreateTabletsRequest.Builder>> entry
: tabletRequests.entrySet()) {
Pair<OlapTable, Partition> tableToPartition = entry.getKey();
OlapTable table = tableToPartition.first;
List<Cloud.CreateTabletsRequest.Builder> requestBuilders = entry.getValue();
for (Cloud.CreateTabletsRequest.Builder requestBuilder : requestBuilders) {
Cloud.CreateTabletsResponse resp = sendCreateTabletsRequests(requestBuilder, table,
needSetStorageVault);
if (resp.hasStorageVaultId()) {
storageVaultId = resp.getStorageVaultId();
needSetStorageVault = false;
}
}
}
// set storage vault for new restoring table
Expand All @@ -214,7 +219,7 @@ public void doCreateReplicas() {
} catch (Exception e) {
status = new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
} finally {
tabletsPerTable.clear();
tabletRequests.clear();
}
}

Expand Down Expand Up @@ -352,51 +357,60 @@ public void createReplicas(Database db, OlapTable localTbl, Partition restorePar
public void createReplicas(Database db, OlapTable localTbl, Partition restorePart,
Map<Long, TabletRef> tabletBases) {
List<String> rowStoreColumns = localTbl.getTableProperty().getCopiedRowStoreColumns();
Cloud.CreateTabletsRequest.Builder requestBuilder = tabletsPerTable.computeIfAbsent(localTbl,
r -> Cloud.CreateTabletsRequest.newBuilder());
List<Cloud.CreateTabletsRequest.Builder> requestBuilders = tabletRequests.computeIfAbsent(
Pair.of(localTbl, restorePart), r -> Lists.newArrayList());

for (MaterializedIndex restoredIdx : restorePart.getMaterializedIndices(MaterializedIndex.IndexExtState
.VISIBLE)) {
MaterializedIndexMeta indexMeta = localTbl.getIndexMetaByIndexId(restoredIdx.getId());
List<Index> indexes = restoredIdx.getId() == localTbl.getBaseIndexId()
? localTbl.getCopiedIndexes() : null;
for (Tablet restoreTablet : restoredIdx.getTablets()) {
try {
requestBuilder.addTabletMetas(((CloudInternalCatalog) Env.getCurrentInternalCatalog())
.createTabletMetaBuilder(localTbl.getId(), restoredIdx.getId(),
restorePart.getId(), restoreTablet,
localTbl.getPartitionInfo().getTabletType(restorePart.getId()),
indexMeta.getSchemaHash(), indexMeta.getKeysType(),
indexMeta.getShortKeyColumnCount(), localTbl.getCopiedBfColumns(),
localTbl.getBfFpp(), indexes, indexMeta.getSchema(), localTbl.getDataSortInfo(),
localTbl.getCompressionType(), localTbl.getStoragePolicy(),
localTbl.isInMemory(), false, localTbl.getName(), localTbl.getTTLSeconds(),
localTbl.getEnableUniqueKeyMergeOnWrite(), localTbl.storeRowColumn(),
localTbl.getBaseSchemaVersion(), localTbl.getCompactionPolicy(),
localTbl.getTimeSeriesCompactionGoalSizeMbytes(),
localTbl.getTimeSeriesCompactionFileCountThreshold(),
localTbl.getTimeSeriesCompactionTimeThresholdSeconds(),
localTbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
localTbl.getTimeSeriesCompactionLevelThreshold(), localTbl.disableAutoCompaction(),
localTbl.getRowStoreColumnsUniqueIds(rowStoreColumns),
localTbl.getEnableMowLightDelete(),
localTbl.getInvertedIndexFileStorageFormat(),
localTbl.rowStorePageSize(),
localTbl.variantEnableFlattenNested(),
localTbl.storagePageSize(), localTbl.getTDEAlgorithmPB(),
localTbl.storageDictPageSize(), false));
// In cloud mode all storage medium will be saved to HDD.
TabletMeta tabletMeta = new TabletMeta(db.getId(), localTbl.getId(), restorePart.getId(),
restoredIdx.getId(), indexMeta.getSchemaHash(), TStorageMedium.HDD);
Env.getCurrentInvertedIndex().addTablet(restoreTablet.getId(), tabletMeta);
Env.getCurrentInvertedIndex().addReplica(restoreTablet.getId(),
restoreTablet.getReplicaByBackendId(-1));
} catch (Exception e) {
String errMsg = String.format("create tablet meta builder failed, errMsg:%s, local table:%d, "
+ "restore partition=%d, restore index=%d, restore tablet=%d", e.getMessage(),
localTbl.getId(), restorePart.getId(), restoredIdx.getId(), restoreTablet.getId());
status = new Status(Status.ErrCode.COMMON_ERROR, errMsg);
int maxCreateTabletBatchSize = Config.cloud_restore_create_tablet_batch_size;
List<Tablet> restoreTablets = restoredIdx.getTablets();
for (int i = 0; i < restoreTablets.size(); i += maxCreateTabletBatchSize) {
int end = Math.min(i + maxCreateTabletBatchSize, restoreTablets.size());
List<Tablet> subRestoreTablets = restoreTablets.subList(i, end);
Cloud.CreateTabletsRequest.Builder requestBuilder = Cloud.CreateTabletsRequest.newBuilder();
for (Tablet restoreTablet : subRestoreTablets) {
try {
requestBuilder.addTabletMetas(((CloudInternalCatalog) Env.getCurrentInternalCatalog())
.createTabletMetaBuilder(localTbl.getId(), restoredIdx.getId(),
restorePart.getId(), restoreTablet,
localTbl.getPartitionInfo().getTabletType(restorePart.getId()),
indexMeta.getSchemaHash(), indexMeta.getKeysType(),
indexMeta.getShortKeyColumnCount(), localTbl.getCopiedBfColumns(),
localTbl.getBfFpp(), indexes, indexMeta.getSchema(), localTbl.getDataSortInfo(),
localTbl.getCompressionType(), localTbl.getStoragePolicy(),
localTbl.isInMemory(), false, localTbl.getName(), localTbl.getTTLSeconds(),
localTbl.getEnableUniqueKeyMergeOnWrite(), localTbl.storeRowColumn(),
localTbl.getBaseSchemaVersion(), localTbl.getCompactionPolicy(),
localTbl.getTimeSeriesCompactionGoalSizeMbytes(),
localTbl.getTimeSeriesCompactionFileCountThreshold(),
localTbl.getTimeSeriesCompactionTimeThresholdSeconds(),
localTbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
localTbl.getTimeSeriesCompactionLevelThreshold(), localTbl.disableAutoCompaction(),
localTbl.getRowStoreColumnsUniqueIds(rowStoreColumns),
localTbl.getEnableMowLightDelete(),
localTbl.getInvertedIndexFileStorageFormat(),
localTbl.rowStorePageSize(),
localTbl.variantEnableFlattenNested(),
localTbl.storagePageSize(), localTbl.getTDEAlgorithmPB(),
localTbl.storageDictPageSize(), false));
// In cloud mode all storage medium will be saved to HDD.
TabletMeta tabletMeta = new TabletMeta(db.getId(), localTbl.getId(), restorePart.getId(),
restoredIdx.getId(), indexMeta.getSchemaHash(), TStorageMedium.HDD);
Env.getCurrentInvertedIndex().addTablet(restoreTablet.getId(), tabletMeta);
Env.getCurrentInvertedIndex().addReplica(restoreTablet.getId(),
restoreTablet.getReplicaByBackendId(-1));
} catch (Exception e) {
String errMsg = String.format("create tablet meta builder failed, errMsg:%s, local table:%d, "
+ "restore partition=%d, restore index=%d, restore tablet=%d", e.getMessage(),
localTbl.getId(), restorePart.getId(), restoredIdx.getId(), restoreTablet.getId());
status = new Status(Status.ErrCode.COMMON_ERROR, errMsg);
return;
}
}
requestBuilders.add(requestBuilder);
}
}
}
Expand Down Expand Up @@ -515,8 +529,11 @@ private Cloud.CreateTabletsResponse sendCreateTabletsRequests(Cloud.CreateTablet
+ "vault name=%s, errMsg=%s", dbId, olapTable.getName(), storageVaultName, e.getMessage());
throw new DdlException(errMsg);
}
LOG.info("cloud restore job restore tablets, dbId: {}, tableName: {}, vault name: {}", dbId,
olapTable.getName(), storageVaultName);
if (LOG.isDebugEnabled()) {
LOG.debug("cloud restore job restore tablets, dbId: {}, tableName: {}, vault name: {},"
+ "tablet created: {}", dbId, olapTable.getName(), storageVaultName,
requestBuilder.getTabletMetasCount());
}
return resp;
}

Expand Down