Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,14 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -98,6 +100,9 @@ public class SchemaCoordinator extends SchemaRegistry {
private transient Multimap<Tuple2<Integer, SchemaChangeEvent>, Integer>
alreadyHandledSchemaChangeEvents;

/** Executor service to execute schema change. */
private final ExecutorService schemaChangeThreadPool;

public SchemaCoordinator(
String operatorName,
OperatorCoordinator.Context context,
Expand All @@ -114,6 +119,7 @@ public SchemaCoordinator(
routingRules,
schemaChangeBehavior,
rpcTimeout);
this.schemaChangeThreadPool = Executors.newSingleThreadExecutor();
}

// -----------------
Expand All @@ -131,6 +137,14 @@ public void start() throws Exception {
"Started SchemaRegistry for {}. Parallelism: {}", operatorName, currentParallelism);
}

@Override
public void close() throws Exception {
super.close();
if (schemaChangeThreadPool != null && !schemaChangeThreadPool.isShutdown()) {
schemaChangeThreadPool.shutdownNow();
}
}

// --------------------------
// Checkpoint related methods
// --------------------------
Expand Down Expand Up @@ -268,7 +282,20 @@ private void handleSchemaEvolveRequest(
LOG.info(
"Received the last required schema change request {}. Switching from WAITING_FOR_FLUSH to EVOLVING.",
request);
startSchemaChange();

schemaChangeThreadPool.submit(
() -> {
try {
startSchemaChange();
} catch (Throwable t) {
failJob(
"Schema change applying task",
new FlinkRuntimeException(
"Failed to apply schema change event.", t));
throw new FlinkRuntimeException(
"Failed to apply schema change event.", t);
}
});
}
}

Expand Down Expand Up @@ -301,34 +328,56 @@ private void startSchemaChange() throws TimeoutException {
LOG.info("All flushed. Going to evolve schema for pending requests: {}", pendingRequests);
flushedSinkWriters.clear();

// Deduce what schema change events should be applied to sink table
List<SchemaChangeEvent> deducedSchemaChangeEvents = deduceEvolvedSchemaChanges();
// Deduce what schema change events should be applied to sink table, and affected sink
// tables' schema
Tuple2<Set<TableId>, List<SchemaChangeEvent>> deduceSummary = deduceEvolvedSchemaChanges();

// And tries to apply it to external system
List<SchemaChangeEvent> successfullyAppliedSchemaChangeEvents = new ArrayList<>();
for (SchemaChangeEvent appliedSchemaChangeEvent : deducedSchemaChangeEvents) {
for (SchemaChangeEvent appliedSchemaChangeEvent : deduceSummary.f1) {
if (applyAndUpdateEvolvedSchemaChange(appliedSchemaChangeEvent)) {
successfullyAppliedSchemaChangeEvents.add(appliedSchemaChangeEvent);
}
}

// Then, we broadcast affected schema changes to mapper and release upstream
pendingRequests.forEach(
(subTaskId, tuple) -> {
LOG.info("Coordinator finishes pending future from {}", subTaskId);
tuple.f1.complete(
wrap(new SchemaChangeResponse(successfullyAppliedSchemaChangeEvents)));
});
// Fetch refreshed view for affected tables. We can't rely on operator clients to do this
// because it might not have a complete schema view after restoring from previous states.
Set<TableId> affectedTableIds = deduceSummary.f0;
Map<TableId, Schema> evolvedSchemaView = new HashMap<>();
for (TableId tableId : affectedTableIds) {
schemaManager
.getLatestEvolvedSchema(tableId)
.ifPresent(schema -> evolvedSchemaView.put(tableId, schema));
}

List<Tuple2<SchemaChangeRequest, CompletableFuture<CoordinationResponse>>> futures =
new ArrayList<>(pendingRequests.values());

// Restore coordinator internal states first...
pendingRequests.clear();

LOG.info("Finished schema evolving. Switching from EVOLVING to IDLE.");
Preconditions.checkState(
evolvingStatus.compareAndSet(RequestStatus.EVOLVING, RequestStatus.IDLE),
"RequestStatus should be EVOLVING when schema evolving finishes.");

// ... and broadcast affected schema changes to mapper and release upstream then.
// Make sure we've cleaned-up internal state before this, or we may receive new requests in
// a dirty state.
futures.forEach(
tuple -> {
LOG.info(
"Coordinator finishes pending future from {}",
tuple.f0.getSinkSubTaskId());
tuple.f1.complete(
wrap(
new SchemaChangeResponse(
evolvedSchemaView,
successfullyAppliedSchemaChangeEvents)));
});
}

private List<SchemaChangeEvent> deduceEvolvedSchemaChanges() {
private Tuple2<Set<TableId>, List<SchemaChangeEvent>> deduceEvolvedSchemaChanges() {
List<SchemaChangeRequest> validSchemaChangeRequests =
pendingRequests.values().stream()
.map(e -> e.f0)
Expand Down Expand Up @@ -408,7 +457,7 @@ private List<SchemaChangeEvent> deduceEvolvedSchemaChanges() {
evolvedSchemaChanges.addAll(normalizedEvents);
}

return evolvedSchemaChanges;
return Tuple2.of(affectedSinkTableIds, evolvedSchemaChanges);
}

private boolean applyAndUpdateEvolvedSchemaChange(SchemaChangeEvent schemaChangeEvent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,17 +205,11 @@ private void requestSchemaChange(
LOG.info("{}> Evolve request response: {}", subTaskId, response);

// Update local evolved schema cache
response.getSchemaEvolveResult()
.forEach(
schemaChangeEvent ->
evolvedSchemaMap.compute(
schemaChangeEvent.tableId(),
(tableId, schema) ->
SchemaUtils.applySchemaChangeEvent(
schema, schemaChangeEvent)));
evolvedSchemaMap.putAll(response.getEvolvedSchemas());

// And emit schema change events to downstream
response.getSchemaEvolveResult().forEach(evt -> output.collect(new StreamRecord<>(evt)));
response.getEvolvedSchemaChangeEvents()
.forEach(evt -> output.collect(new StreamRecord<>(evt)));
LOG.info(
"{}> Successfully updated evolved schema cache. Current state: {}",
subTaskId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,16 @@ public SchemaChangeEvent getSchemaChangeEvent() {
!isNoOpRequest(), "Unable to fetch source subTaskId for an align event.");
return schemaChangeEvent;
}

@Override
public String toString() {
return "SchemaChangeRequest{"
+ "sourceSubTaskId="
+ sourceSubTaskId
+ ", sinkSubTaskId="
+ sinkSubTaskId
+ ", schemaChangeEvent="
+ schemaChangeEvent
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,34 @@
package org.apache.flink.cdc.runtime.operators.schema.distributed.event;

import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.runtime.operators.schema.distributed.SchemaCoordinator;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;

import java.util.List;
import java.util.Map;
import java.util.Objects;

/** Response from a {@link SchemaCoordinator} to broadcast a coordination consensus. */
public class SchemaChangeResponse implements CoordinationResponse {

private final List<SchemaChangeEvent> schemaEvolveResult;
private final Map<TableId, Schema> evolvedSchemas;
private final List<SchemaChangeEvent> evolvedSchemaChangeEvents;

public SchemaChangeResponse(List<SchemaChangeEvent> schemaEvolveResult) {
this.schemaEvolveResult = schemaEvolveResult;
public SchemaChangeResponse(
Map<TableId, Schema> evolvedSchemas,
List<SchemaChangeEvent> evolvedSchemaChangeEvents) {
this.evolvedSchemas = evolvedSchemas;
this.evolvedSchemaChangeEvents = evolvedSchemaChangeEvents;
}

public List<SchemaChangeEvent> getSchemaEvolveResult() {
return schemaEvolveResult;
public Map<TableId, Schema> getEvolvedSchemas() {
return evolvedSchemas;
}

public List<SchemaChangeEvent> getEvolvedSchemaChangeEvents() {
return evolvedSchemaChangeEvents;
}

@Override
Expand All @@ -43,16 +54,22 @@ public boolean equals(Object o) {
return false;
}
SchemaChangeResponse that = (SchemaChangeResponse) o;
return Objects.equals(schemaEvolveResult, that.schemaEvolveResult);
return Objects.equals(evolvedSchemas, that.evolvedSchemas)
&& Objects.equals(evolvedSchemaChangeEvents, that.evolvedSchemaChangeEvents);
}

@Override
public int hashCode() {
return Objects.hash(schemaEvolveResult);
return Objects.hash(evolvedSchemas, evolvedSchemaChangeEvents);
}

@Override
public String toString() {
return "SchemaChangeResponse{" + "schemaEvolveResult=" + schemaEvolveResult + '}';
return "SchemaChangeResponse{"
+ "evolvedSchemas="
+ evolvedSchemas
+ ", evolvedSchemaChangeEvents="
+ evolvedSchemaChangeEvents
+ '}';
}
}
Loading