Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.0][cdc-runtime] Remove waitForFlushSuccess field in request handler #2812

Merged
merged 2 commits into from
Dec 5, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import static com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.RequestStatus.RECEIVED_RELEASE_REQUEST;
import static com.ververica.cdc.runtime.operators.schema.event.CoordinationResponseUtils.wrap;

/** A handler to deal with all requests and events for {@link SchemaRegistry}. */
Expand All @@ -53,11 +54,6 @@ public class SchemaRegistryRequestHandler {
/** Schema manager holding schema for all tables. */
private final SchemaManager schemaManager;

/**
* Not applied SchemaChangeRequest's future before receiving all flush success events for its
* table from sink writers.
*/
private PendingSchemaChange waitFlushSuccess;
/**
* Not applied SchemaChangeRequest before receiving all flush success events for its table from
* sink writers.
Expand Down Expand Up @@ -93,7 +89,7 @@ private void applySchemaChange(TableId tableId, SchemaChangeEvent changeEvent) {
*/
public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
SchemaChangeRequest request) {
if (pendingSchemaChanges.isEmpty() && waitFlushSuccess == null) {
if (pendingSchemaChanges.isEmpty()) {
LOG.info(
"Received schema change event request from table {}. Start to buffer requests for others.",
request.getTableId().toString());
Expand All @@ -104,8 +100,8 @@ public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
CompletableFuture<CoordinationResponse> response =
CompletableFuture.completedFuture(wrap(new SchemaChangeResponse(true)));
schemaManager.applySchemaChange(request.getSchemaChangeEvent());
this.waitFlushSuccess =
new PendingSchemaChange(request, response).startToWaitForFlushSuccess();
pendingSchemaChanges.add(new PendingSchemaChange(request, response));
pendingSchemaChanges.get(0).startToWaitForReleaseRequest();
return response;
} else {
LOG.info("There are already processing requests. Wait for processing.");
Expand All @@ -117,7 +113,14 @@ public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(

/** Handle the {@link ReleaseUpstreamRequest} and wait for all sink subtasks flushing. */
public CompletableFuture<CoordinationResponse> handleReleaseUpstreamRequest() {
return waitFlushSuccess.getResponseFuture();
CompletableFuture<CoordinationResponse> response =
pendingSchemaChanges.get(0).getResponseFuture();
if (response.isDone()) {
startNextSchemaChangeRequest();
} else {
pendingSchemaChanges.get(0).receiveReleaseRequest();
}
return response;
}

/**
Expand All @@ -142,43 +145,50 @@ public void flushSuccess(TableId tableId, int sinkSubtask) {
LOG.info(
"All sink subtask have flushed for table {}. Start to apply schema change.",
tableId.toString());
PendingSchemaChange waitFlushSuccess = pendingSchemaChanges.get(0);
applySchemaChange(tableId, waitFlushSuccess.getChangeRequest().getSchemaChangeEvent());
waitFlushSuccess.getResponseFuture().complete(wrap(new ReleaseUpstreamResponse()));
startNextSchemaChangeRequest();

if (RECEIVED_RELEASE_REQUEST.equals(waitFlushSuccess.getStatus())) {
startNextSchemaChangeRequest();
}
}
}

private void startNextSchemaChangeRequest() {
pendingSchemaChanges.remove(0);
flushedSinkWriters.clear();
waitFlushSuccess = null;
while (!pendingSchemaChanges.isEmpty()) {
PendingSchemaChange pendingSchemaChange = pendingSchemaChanges.remove(0);
PendingSchemaChange pendingSchemaChange = pendingSchemaChanges.get(0);
SchemaChangeRequest request = pendingSchemaChange.changeRequest;
if (request.getSchemaChangeEvent() instanceof CreateTableEvent
&& schemaManager.schemaExists(request.getTableId())) {
pendingSchemaChange
.getResponseFuture()
.complete(wrap(new SchemaChangeResponse(false)));
pendingSchemaChanges.remove(0);
} else {
schemaManager.applySchemaChange(request.getSchemaChangeEvent());
pendingSchemaChange
.getResponseFuture()
.complete(wrap(new SchemaChangeResponse(true)));
this.waitFlushSuccess = pendingSchemaChange.startToWaitForFlushSuccess();
pendingSchemaChange.startToWaitForReleaseRequest();
break;
}
}
}

private static class PendingSchemaChange {
private final SchemaChangeRequest changeRequest;
private final CompletableFuture<CoordinationResponse> responseFuture;
private CompletableFuture<CoordinationResponse> responseFuture;
private RequestStatus status;

public PendingSchemaChange(
SchemaChangeRequest changeRequest,
CompletableFuture<CoordinationResponse> responseFuture) {
this.changeRequest = changeRequest;
this.responseFuture = responseFuture;
this.status = RequestStatus.PENDING;
}

public SchemaChangeRequest getChangeRequest() {
Expand All @@ -189,12 +199,27 @@ public CompletableFuture<CoordinationResponse> getResponseFuture() {
return responseFuture;
}

public PendingSchemaChange startToWaitForFlushSuccess() {
public RequestStatus getStatus() {
return status;
}

public void startToWaitForReleaseRequest() {
if (!responseFuture.isDone()) {
throw new IllegalStateException(
"Cannot start to wait for flush success before the SchemaChangeRequest is done.");
}
return new PendingSchemaChange(changeRequest, new CompletableFuture<>());
this.responseFuture = new CompletableFuture<>();
this.status = RequestStatus.WAIT_RELEASE_REQUEST;
}

public void receiveReleaseRequest() {
this.status = RECEIVED_RELEASE_REQUEST;
}
}

enum RequestStatus {
PENDING,
WAIT_RELEASE_REQUEST,
RECEIVED_RELEASE_REQUEST
}
}