diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java index 6d49653baf6..8626eafbcbf 100755 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java @@ -55,12 +55,14 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -98,6 +100,9 @@ public class SchemaCoordinator extends SchemaRegistry { private transient Multimap, Integer> alreadyHandledSchemaChangeEvents; + /** Executor service to execute schema change. */ + private final ExecutorService schemaChangeThreadPool; + public SchemaCoordinator( String operatorName, OperatorCoordinator.Context context, @@ -114,6 +119,7 @@ public SchemaCoordinator( routingRules, schemaChangeBehavior, rpcTimeout); + this.schemaChangeThreadPool = Executors.newSingleThreadExecutor(); } // ----------------- @@ -131,6 +137,14 @@ public void start() throws Exception { "Started SchemaRegistry for {}. Parallelism: {}", operatorName, currentParallelism); } + @Override + public void close() throws Exception { + super.close(); + if (schemaChangeThreadPool != null && !schemaChangeThreadPool.isShutdown()) { + schemaChangeThreadPool.shutdownNow(); + } + } + // -------------------------- // Checkpoint related methods // -------------------------- @@ -268,7 +282,20 @@ private void handleSchemaEvolveRequest( LOG.info( "Received the last required schema change request {}. Switching from WAITING_FOR_FLUSH to EVOLVING.", request); - startSchemaChange(); + + schemaChangeThreadPool.submit( + () -> { + try { + startSchemaChange(); + } catch (Throwable t) { + failJob( + "Schema change applying task", + new FlinkRuntimeException( + "Failed to apply schema change event.", t)); + throw new FlinkRuntimeException( + "Failed to apply schema change event.", t); + } + }); } } @@ -301,34 +328,56 @@ private void startSchemaChange() throws TimeoutException { LOG.info("All flushed. Going to evolve schema for pending requests: {}", pendingRequests); flushedSinkWriters.clear(); - // Deduce what schema change events should be applied to sink table - List deducedSchemaChangeEvents = deduceEvolvedSchemaChanges(); + // Deduce what schema change events should be applied to sink table, and affected sink + // tables' schema + Tuple2, List> deduceSummary = deduceEvolvedSchemaChanges(); // And tries to apply it to external system List successfullyAppliedSchemaChangeEvents = new ArrayList<>(); - for (SchemaChangeEvent appliedSchemaChangeEvent : deducedSchemaChangeEvents) { + for (SchemaChangeEvent appliedSchemaChangeEvent : deduceSummary.f1) { if (applyAndUpdateEvolvedSchemaChange(appliedSchemaChangeEvent)) { successfullyAppliedSchemaChangeEvents.add(appliedSchemaChangeEvent); } } - // Then, we broadcast affected schema changes to mapper and release upstream - pendingRequests.forEach( - (subTaskId, tuple) -> { - LOG.info("Coordinator finishes pending future from {}", subTaskId); - tuple.f1.complete( - wrap(new SchemaChangeResponse(successfullyAppliedSchemaChangeEvents))); - }); + // Fetch refreshed view for affected tables. We can't rely on operator clients to do this + // because it might not have a complete schema view after restoring from previous states. + Set affectedTableIds = deduceSummary.f0; + Map evolvedSchemaView = new HashMap<>(); + for (TableId tableId : affectedTableIds) { + schemaManager + .getLatestEvolvedSchema(tableId) + .ifPresent(schema -> evolvedSchemaView.put(tableId, schema)); + } + + List>> futures = + new ArrayList<>(pendingRequests.values()); + // Restore coordinator internal states first... pendingRequests.clear(); LOG.info("Finished schema evolving. Switching from EVOLVING to IDLE."); Preconditions.checkState( evolvingStatus.compareAndSet(RequestStatus.EVOLVING, RequestStatus.IDLE), "RequestStatus should be EVOLVING when schema evolving finishes."); + + // ... and broadcast affected schema changes to mapper and release upstream then. + // Make sure we've cleaned-up internal state before this, or we may receive new requests in + // a dirty state. + futures.forEach( + tuple -> { + LOG.info( + "Coordinator finishes pending future from {}", + tuple.f0.getSinkSubTaskId()); + tuple.f1.complete( + wrap( + new SchemaChangeResponse( + evolvedSchemaView, + successfullyAppliedSchemaChangeEvents))); + }); } - private List deduceEvolvedSchemaChanges() { + private Tuple2, List> deduceEvolvedSchemaChanges() { List validSchemaChangeRequests = pendingRequests.values().stream() .map(e -> e.f0) @@ -408,7 +457,7 @@ private List deduceEvolvedSchemaChanges() { evolvedSchemaChanges.addAll(normalizedEvents); } - return evolvedSchemaChanges; + return Tuple2.of(affectedSinkTableIds, evolvedSchemaChanges); } private boolean applyAndUpdateEvolvedSchemaChange(SchemaChangeEvent schemaChangeEvent) { diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java index 80adea3f0b3..f31cbabdcf3 100755 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java @@ -205,17 +205,11 @@ private void requestSchemaChange( LOG.info("{}> Evolve request response: {}", subTaskId, response); // Update local evolved schema cache - response.getSchemaEvolveResult() - .forEach( - schemaChangeEvent -> - evolvedSchemaMap.compute( - schemaChangeEvent.tableId(), - (tableId, schema) -> - SchemaUtils.applySchemaChangeEvent( - schema, schemaChangeEvent))); + evolvedSchemaMap.putAll(response.getEvolvedSchemas()); // And emit schema change events to downstream - response.getSchemaEvolveResult().forEach(evt -> output.collect(new StreamRecord<>(evt))); + response.getEvolvedSchemaChangeEvents() + .forEach(evt -> output.collect(new StreamRecord<>(evt))); LOG.info( "{}> Successfully updated evolved schema cache. Current state: {}", subTaskId, diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/event/SchemaChangeRequest.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/event/SchemaChangeRequest.java index e287f787461..a40986974d4 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/event/SchemaChangeRequest.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/event/SchemaChangeRequest.java @@ -65,4 +65,16 @@ public SchemaChangeEvent getSchemaChangeEvent() { !isNoOpRequest(), "Unable to fetch source subTaskId for an align event."); return schemaChangeEvent; } + + @Override + public String toString() { + return "SchemaChangeRequest{" + + "sourceSubTaskId=" + + sourceSubTaskId + + ", sinkSubTaskId=" + + sinkSubTaskId + + ", schemaChangeEvent=" + + schemaChangeEvent + + '}'; + } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/event/SchemaChangeResponse.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/event/SchemaChangeResponse.java index 1c0b7eaab21..69264edd373 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/event/SchemaChangeResponse.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/event/SchemaChangeResponse.java @@ -18,23 +18,34 @@ package org.apache.flink.cdc.runtime.operators.schema.distributed.event; import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.runtime.operators.schema.distributed.SchemaCoordinator; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import java.util.List; +import java.util.Map; import java.util.Objects; /** Response from a {@link SchemaCoordinator} to broadcast a coordination consensus. */ public class SchemaChangeResponse implements CoordinationResponse { - private final List schemaEvolveResult; + private final Map evolvedSchemas; + private final List evolvedSchemaChangeEvents; - public SchemaChangeResponse(List schemaEvolveResult) { - this.schemaEvolveResult = schemaEvolveResult; + public SchemaChangeResponse( + Map evolvedSchemas, + List evolvedSchemaChangeEvents) { + this.evolvedSchemas = evolvedSchemas; + this.evolvedSchemaChangeEvents = evolvedSchemaChangeEvents; } - public List getSchemaEvolveResult() { - return schemaEvolveResult; + public Map getEvolvedSchemas() { + return evolvedSchemas; + } + + public List getEvolvedSchemaChangeEvents() { + return evolvedSchemaChangeEvents; } @Override @@ -43,16 +54,22 @@ public boolean equals(Object o) { return false; } SchemaChangeResponse that = (SchemaChangeResponse) o; - return Objects.equals(schemaEvolveResult, that.schemaEvolveResult); + return Objects.equals(evolvedSchemas, that.evolvedSchemas) + && Objects.equals(evolvedSchemaChangeEvents, that.evolvedSchemaChangeEvents); } @Override public int hashCode() { - return Objects.hash(schemaEvolveResult); + return Objects.hash(evolvedSchemas, evolvedSchemaChangeEvents); } @Override public String toString() { - return "SchemaChangeResponse{" + "schemaEvolveResult=" + schemaEvolveResult + '}'; + return "SchemaChangeResponse{" + + "evolvedSchemas=" + + evolvedSchemas + + ", evolvedSchemaChangeEvents=" + + evolvedSchemaChangeEvents + + '}'; } }