Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
sollhui committed Jul 30, 2024
1 parent d0bfb86 commit 3e4bf96
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,17 @@ public Cloud.GetInstanceResponse getInstance(Cloud.GetInstanceRequest request) {
return blockingStub.getRlTaskCommitAttach(request);
}

public Cloud. ResetProgressResponse
resetProgress(Cloud. ResetProgressRequest request) {
if (!request.hasCloudUniqueId()) {
Cloud. ResetProgressRequest.Builder builder =
Cloud. ResetProgressRequest.newBuilder();
builder.mergeFrom(request);
return blockingStub.resetProgress(builder.setCloudUniqueId(Config.cloud_unique_id).build());
}
return blockingStub.resetProgress(request);
}

public Cloud.GetObjStoreInfoResponse
getObjStoreInfo(Cloud.GetObjStoreInfoRequest request) {
if (!request.hasCloudUniqueId()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,17 @@ public Cloud.AlterObjStoreInfoResponse alterObjStoreInfo(Cloud.AlterObjStoreInfo
}
}

public Cloud.ResetProgressResponse
resetProgress(Cloud.ResetProgressRequest request)
throws RpcException {
try {
final MetaServiceClient client = getProxy();
return client.resetProgress(request);
} catch (Exception e) {
throw new RpcException("", e.getMessage(), e);
}
}

public Cloud.GetObjStoreInfoResponse
getObjStoreInfo(Cloud.GetObjStoreInfoRequest request) throws RpcException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,11 @@ private void modifyPropertiesInternal(Map<String, String> jobProperties,
// and should reset cache before modifying partition offset.
if (!Strings.isNullOrEmpty(dataSourceProperties.getTopic())) {
this.topic = dataSourceProperties.getTopic();
this.progress = new KafkaProgress();
if (Config.isCloudMode()) {
resetCloudProgress();
} else {
this.progress = new KafkaProgress();
}
}

// modify partition offset
Expand Down Expand Up @@ -748,6 +752,31 @@ private void modifyPropertiesInternal(Map<String, String> jobProperties,
this.id, jobProperties, dataSourceProperties);
}

private void resetCloudProgress() throws DdlException {
Cloud.ResetProgressRequest.Builder builder =
Cloud.ResetProgressRequest.newBuilder();
builder.setCloudUniqueId(Config.cloud_unique_id);
builder.setDbId(dbId);
builder.setJobId(id);

Cloud.ResetProgressResponse response;
try {
response = MetaServiceProxy.getInstance().resetProgress(builder.build());
if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
LOG.warn("failed to reset cloud progress, response: {}", response);
if (response.getStatus().getCode() == Cloud.MetaServiceCode.ROUTINE_LOAD_PROGRESS_NOT_FOUND) {
LOG.warn("not found routine load progress, response: {}", response);
return;
} else {
throw new DdlException(response.getStatus().getMsg());
}
}
} catch (RpcException e) {
LOG.info("failed to reset cloud progress {}", e);
throw new DdlException(e.getMessage());
}
}

@Override
public void replayModifyProperties(AlterRoutineLoadJobOperationLog log) {
try {
Expand Down
11 changes: 11 additions & 0 deletions gensrc/proto/cloud.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1423,6 +1423,16 @@ message GetRLTaskCommitAttachResponse {
optional RLTaskTxnCommitAttachmentPB commit_attach = 2;
}

message ResetProgressRequest {
optional string cloud_unique_id = 1; // For auth
optional int64 db_id = 2;
optional int64 job_id = 3;
}

message ResetProgressResponse {
optional MetaServiceResponseStatus status = 1;
}

message CheckKeyInfos {
repeated int64 db_ids = 1;
repeated int64 table_ids = 2;
Expand Down Expand Up @@ -1513,6 +1523,7 @@ service MetaService {

// routine load progress
rpc get_rl_task_commit_attach(GetRLTaskCommitAttachRequest) returns (GetRLTaskCommitAttachResponse);
rpc reset_progress(ResetProgressRequest) returns (ResetProgressResponse);

// check KV
rpc check_kv(CheckKVRequest) returns (CheckKVResponse);
Expand Down

0 comments on commit 3e4bf96

Please sign in to comment.