Skip to content

Commit

Permalink
feat:support file cache mode
Browse files Browse the repository at this point in the history
  • Loading branch information
dingxin-tech committed May 15, 2024
1 parent f165281 commit 1080a59
Show file tree
Hide file tree
Showing 8 changed files with 318 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.cdc.common.factories.DataSinkFactory;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeDataSinkOptions;
import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeExecutionOptions;
import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeOptions;
import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeWriteOptions;
Expand Down Expand Up @@ -56,11 +57,14 @@ private MaxComputeOptions extractMaxComputeOptions(Configuration factoryConfigur
String quotaName = factoryConfiguration.get(MaxComputeDataSinkOptions.QUOTA_NAME);
String stsToken = factoryConfiguration.get(MaxComputeDataSinkOptions.STS_TOKEN);
int bucketSize = factoryConfiguration.get(MaxComputeDataSinkOptions.BUCKETS_NUM);
int maxSessionParallelism =
factoryConfiguration.get(MaxComputeDataSinkOptions.MAX_SESSION_PARALLELISM);
return MaxComputeOptions.builder(accessId, accessKey, endpoint, project)
.withTunnelEndpoint(tunnelEndpoint)
.withQuotaName(quotaName)
.withStsToken(stsToken)
.withBucketSize(bucketSize)
.withMaxSessionParallelism(maxSessionParallelism)
.build();
}

Expand Down Expand Up @@ -128,6 +132,7 @@ public Set<ConfigOption<?>> optionalOptions() {
optionalOptions.add(MaxComputeDataSinkOptions.QUOTA_NAME);
optionalOptions.add(MaxComputeDataSinkOptions.STS_TOKEN);
optionalOptions.add(MaxComputeDataSinkOptions.BUCKETS_NUM);
optionalOptions.add(MaxComputeDataSinkOptions.MAX_SESSION_PARALLELISM);
// write options
optionalOptions.add(MaxComputeDataSinkOptions.NUM_COMMIT_THREADS);
optionalOptions.add(MaxComputeDataSinkOptions.COMPRESS_ALGORITHM);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ public class Constant {
"$$_session_manage_operator_$$";

public static final String END_OF_SESSION = "end_of_session";
public static final String SESSION_LIMITING = "session_limiting";
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ public String getSessionId() {
return sessionId;
}

public String getSimpleName() {
return project + "." + schema + "." + table + "." + partitionName;
}

@Override
public String toString() {
return "SessionIdentifier{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,13 @@ public CompletableFuture<CoordinationResponse> handleCoordinationRequest(
} else if (request instanceof CreateSessionRequest) {
SessionIdentifier sessionIdentifier = ((CreateSessionRequest) request).getIdentifier();
if (!sessionCache.containsKey(sessionIdentifier)) {
// if sessionCache size exceed the max size, return SESSION_LIMITING to let
// TaskManager cache the data of new session
if (options.getMaxSessionParallelism() <= 0
&& sessionCache.size() >= options.getMaxSessionParallelism()) {
return CompletableFuture.completedFuture(
new CreateSessionResponse(Constant.SESSION_LIMITING));
}
MaxComputeWriter writer = createWriter(sessionIdentifier);
sessionCache.put(sessionIdentifier, writer);
sessionIdMap.put(writer.getId(), sessionIdentifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.connectors.maxcompute.common.Constant;
import org.apache.flink.cdc.connectors.maxcompute.common.FlinkOdpsException;
import org.apache.flink.cdc.connectors.maxcompute.common.SessionIdentifier;
import org.apache.flink.cdc.connectors.maxcompute.coordinator.message.CreateSessionRequest;
import org.apache.flink.cdc.connectors.maxcompute.coordinator.message.CreateSessionResponse;
import org.apache.flink.cdc.connectors.maxcompute.coordinator.message.WaitForFlushSuccessRequest;
import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeOptions;
import org.apache.flink.cdc.connectors.maxcompute.utils.MaxComputeUtils;
import org.apache.flink.cdc.connectors.maxcompute.utils.TypeConvertUtils;
import org.apache.flink.cdc.connectors.maxcompute.utils.cache.UnifiedFileWriter;
import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
Expand All @@ -46,6 +48,7 @@
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
Expand All @@ -58,6 +61,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -73,7 +77,7 @@
* SessionId into the metadata of the {@link DataChangeEvent} for downstream processing.
*/
public class SessionManageOperator extends AbstractStreamOperator<Event>
implements OneInputStreamOperator<Event, Event>, OperatorEventHandler {
implements OneInputStreamOperator<Event, Event>, OperatorEventHandler, BoundedOneInput {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(SessionManageOperator.class);

Expand All @@ -90,7 +94,13 @@ public class SessionManageOperator extends AbstractStreamOperator<Event>
private transient SchemaEvolutionClient schemaEvolutionClient;

private transient Future<CoordinationResponse> snapshotFlushSuccess;
private transient Map<SessionIdentifier, UnifiedFileWriter<DataChangeEvent>> fileCacheWriters;
private transient int indexOfThisSubtask;
/**
* trigger endOfInput is ahead of prepareSnapshotPreBarrier, so we need this flag to handle when
* endOfInput, send WaitForSuccessRequest in advance.
*/
private transient boolean endOfInput;

public SessionManageOperator(MaxComputeOptions options, OperatorID schemaOperatorUid) {
this.chainingStrategy = ChainingStrategy.ALWAYS;
Expand All @@ -100,10 +110,15 @@ public SessionManageOperator(MaxComputeOptions options, OperatorID schemaOperato

@Override
public void open() throws Exception {
this.endOfInput = false;
this.sessionCache = new HashMap<>();
this.schemaMaps = new HashMap<>();
this.fieldGetterMaps = new HashMap<>();
SessionManageOperator.instance = this;

if (options.getMaxSessionParallelism() <= 0) {
this.fileCacheWriters = new HashMap<>();
}
}

@Override
Expand All @@ -123,48 +138,15 @@ public void setup(
public void processElement(StreamRecord<Event> element) throws Exception {
if (element.getValue() instanceof DataChangeEvent) {
DataChangeEvent dataChangeEvent = (DataChangeEvent) element.getValue();
TableId tableId = dataChangeEvent.tableId();
// because of this operator is between SchemaOperator and DataSinkWriterOperator, no
// schema will fill when CreateTableEvent is loss.
if (!schemaMaps.containsKey(tableId)) {
emitLatestSchema(tableId);
}
String partitionName =
extractPartition(
dataChangeEvent.op() == OperationType.DELETE
? dataChangeEvent.before()
: dataChangeEvent.after(),
tableId);
SessionIdentifier sessionIdentifier =
SessionIdentifier.of(
options.getProject(),
MaxComputeUtils.getSchema(options, tableId),
tableId.getTableName(),
partitionName);
if (!sessionCache.containsKey(sessionIdentifier)) {
CreateSessionResponse response =
(CreateSessionResponse)
sendRequestToOperator(new CreateSessionRequest(sessionIdentifier));
sessionCache.put(sessionIdentifier, response.getSessionId());
}
dataChangeEvent
.meta()
.put(Constant.TUNNEL_SESSION_ID, sessionCache.get(sessionIdentifier));
dataChangeEvent.meta().put(Constant.MAXCOMPUTE_PARTITION_NAME, partitionName);
output.collect(new StreamRecord<>(dataChangeEvent));
handleDataChangeEvent(dataChangeEvent);
} else if (element.getValue() instanceof FlushEvent) {
LOG.info(
"operator {} handle FlushEvent begin, wait for sink writers flush success",
indexOfThisSubtask);
sessionCache.clear();
Future<CoordinationResponse> waitForSuccess =
submitRequestToOperator(new WaitForFlushSuccessRequest(indexOfThisSubtask));
output.collect(element);
// wait for sink writers flush success
waitForSuccess.get();
LOG.info(
"operator {} handle FlushEvent end, all sink writers flush success",
indexOfThisSubtask);
handleFlushEvent(element);
if (options.getMaxSessionParallelism() <= 0) {
// when maxSessionParallelism is set, we indicate that some data have cached in
// file. so we need to un-cache data and flush again.
uncache(element);
handleFlushEvent(element);
}
} else if (element.getValue() instanceof CreateTableEvent) {
TableId tableId = ((CreateTableEvent) element.getValue()).tableId();
Schema schema = ((CreateTableEvent) element.getValue()).getSchema();
Expand All @@ -185,6 +167,56 @@ public void processElement(StreamRecord<Event> element) throws Exception {
}
}

private void handleDataChangeEvent(DataChangeEvent dataChangeEvent) throws Exception {
TableId tableId = dataChangeEvent.tableId();
// because of this operator is between SchemaOperator and DataSinkWriterOperator, no
// schema will fill when CreateTableEvent is loss.
if (!schemaMaps.containsKey(tableId)) {
emitLatestSchema(tableId);
}
String partitionName =
extractPartition(
dataChangeEvent.op() == OperationType.DELETE
? dataChangeEvent.before()
: dataChangeEvent.after(),
tableId);
SessionIdentifier sessionIdentifier =
SessionIdentifier.of(
options.getProject(),
MaxComputeUtils.getSchema(options, tableId),
tableId.getTableName(),
partitionName);
if (!sessionCache.containsKey(sessionIdentifier)) {
CreateSessionResponse response =
(CreateSessionResponse)
sendRequestToOperator(new CreateSessionRequest(sessionIdentifier));
if (response.getSessionId().equals(Constant.SESSION_LIMITING)) {
cache(sessionIdentifier, dataChangeEvent);
return;
}
sessionCache.put(sessionIdentifier, response.getSessionId());
}
dataChangeEvent.meta().put(Constant.TUNNEL_SESSION_ID, sessionCache.get(sessionIdentifier));
dataChangeEvent.meta().put(Constant.MAXCOMPUTE_PARTITION_NAME, partitionName);
output.collect(new StreamRecord<>(dataChangeEvent));
}

private void handleFlushEvent(StreamRecord<Event> element)
throws IOException, ExecutionException, InterruptedException {
LOG.info(
"operator {} handle FlushEvent begin, wait for sink writers flush success",
indexOfThisSubtask);
sessionCache.clear();
Future<CoordinationResponse> waitForSuccess =
submitRequestToOperator(new WaitForFlushSuccessRequest(indexOfThisSubtask));
output.collect(element);
// wait for sink writers flush success
waitForSuccess.get();
LOG.info(
"operator {} handle FlushEvent end, all sink writers flush success",
indexOfThisSubtask);
}

private void emitLatestSchema(TableId tableId) throws Exception {
Optional<Schema> schema = schemaEvolutionClient.getLatestSchema(tableId);
if (schema.isPresent()) {
Expand All @@ -201,7 +233,12 @@ private void emitLatestSchema(TableId tableId) throws Exception {
@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
super.prepareSnapshotPreBarrier(checkpointId);
LOG.info("prepare snapshot, wait for sink writers flush success");
if (endOfInput) {
return;
}
LOG.info(
"operator {} prepare snapshot, wait for sink writers flush success",
indexOfThisSubtask);
// wait for sink writers flush success
snapshotFlushSuccess =
submitRequestToOperator(
Expand All @@ -215,7 +252,19 @@ public void snapshotState(StateSnapshotContext context) throws Exception {
sessionCache.clear();
// wait for sink writers flush success
snapshotFlushSuccess.get();
LOG.info("snapshot end, all sink writers flush success");
LOG.info("operator {} snapshot end, all sink writers flush success", indexOfThisSubtask);
}

@Override
public void endInput() throws Exception {
this.endOfInput = true;
LOG.info(
"operator {} end of input, wait for sink writers flush success",
indexOfThisSubtask);
snapshotFlushSuccess =
submitRequestToOperator(
new WaitForFlushSuccessRequest(
getRuntimeContext().getIndexOfThisSubtask()));
}

/** partition column is always after data column. */
Expand All @@ -238,6 +287,72 @@ private String extractPartition(RecordData recordData, TableId tableId) {
return partitionSpec.toString(true, true);
}

private void cache(SessionIdentifier sessionIdentifier, DataChangeEvent dataChangeEvent)
throws IOException {
fileCacheWriters.putIfAbsent(sessionIdentifier, new UnifiedFileWriter<>("/tmp/"));
fileCacheWriters.get(sessionIdentifier).write(dataChangeEvent);
}

private void uncache(StreamRecord<Event> flushElement) {
fileCacheWriters.entrySet().stream()
// sort by table name
.sorted(
Comparator.comparing(
e -> e.getKey().getSimpleName(), Comparator.naturalOrder()))
.forEach(
entry -> {
try {
SessionIdentifier sessionIdentifier = entry.getKey();
LOG.info(
"operator {} un-cache data of {}",
indexOfThisSubtask,
sessionIdentifier);
if (!sessionCache.containsKey(sessionIdentifier)) {
CreateSessionResponse response =
(CreateSessionResponse)
sendRequestToOperator(
new CreateSessionRequest(
sessionIdentifier));
// when encounter session limiting again, we need to flush again
// and then continue
if (response.getSessionId().equals(Constant.SESSION_LIMITING)) {
handleFlushEvent(flushElement);
}
sessionCache.put(sessionIdentifier, response.getSessionId());
}
entry.getValue().close();
entry.getValue()
.read(
dataChangeEvent -> {
try {
dataChangeEvent
.meta()
.put(
Constant.TUNNEL_SESSION_ID,
sessionCache.get(
sessionIdentifier));
dataChangeEvent
.meta()
.put(
Constant
.MAXCOMPUTE_PARTITION_NAME,
sessionIdentifier
.getPartitionName());
output.collect(
new StreamRecord<>(
dataChangeEvent));
} catch (Exception ex) {
throw new FlinkOdpsException(ex);
}
return null;
});
} catch (Exception e) {
throw new FlinkOdpsException(e);
}
});
fileCacheWriters.clear();
}

@Override
public void handleOperatorEvent(OperatorEvent evt) {
// handle event
Expand Down
Loading

0 comments on commit 1080a59

Please sign in to comment.