Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isEndWatermarkEvent;

/** Fetcher to fetch data from table split, the split is the stream split {@link StreamSplit}. */
public class IncrementalSourceStreamFetcher implements Fetcher<SourceRecords, SourceSplitBase> {
private static final Logger LOG = LoggerFactory.getLogger(IncrementalSourceStreamFetcher.class);
Expand Down Expand Up @@ -96,12 +98,6 @@ public void submitTask(FetchTask<SourceSplitBase> fetchTask) {
currentStreamSplit),
e);
readException = e;
} finally {
try {
stopReadTask();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
}
Expand All @@ -116,10 +112,19 @@ public boolean isFinished() {
public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {
checkReadException();
final List<SourceRecord> sourceRecords = new ArrayList<>();
// what happens if currentTaskRunning
if (currentTaskRunning) {
List<DataChangeEvent> batch = queue.poll();
for (DataChangeEvent event : batch) {
if (shouldEmit(event.getRecord())) {
if (isEndWatermarkEvent(event.getRecord())) {
LOG.info("Read split {} end watermark event", currentStreamSplit);
try {
stopReadTask();
} catch (Exception e) {
throw new RuntimeException(e);
}
break;
} else if (shouldEmit(event.getRecord())) {
sourceRecords.add(event.getRecord());
} else {
LOG.debug("{} data change event should not emit", event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@

import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createBinaryClient;
import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createMySqlConnection;
import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.isEndWatermarkEvent;

/**
* A Debezium binlog reader implementation that also support reads binlog and filter overlapping
Expand Down Expand Up @@ -148,8 +149,6 @@ public void submitSplit(MySqlSplit mySqlSplit) {
currentBinlogSplit),
t);
readException = t;
} finally {
stopBinlogReadTask();
}
});
}
Expand All @@ -167,6 +166,16 @@ public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {
if (currentTaskRunning) {
List<DataChangeEvent> batch = queue.poll();
for (DataChangeEvent event : batch) {
if (isEndWatermarkEvent(event.getRecord())) {
LOG.info("Read split {} end watermark event", currentBinlogSplit);
try {
stopBinlogReadTask();
} catch (Exception e) {
throw new RuntimeException(e);
}
break;
}

if (isParsingOnLineSchemaChanges) {
Optional<SourceRecord> oscRecord =
parseOnLineSchemaChangeEvent(event.getRecord());
Expand Down Expand Up @@ -398,4 +407,9 @@ public ExecutorService getExecutorService() {
MySqlBinlogSplitReadTask getBinlogSplitReadTask() {
return binlogSplitReadTask;
}

@VisibleForTesting
public StoppableChangeEventSourceContext getChangeEventSourceContext() {
return changeEventSourceContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.getStartingOffsetOfBinlogSplit;
import static org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader}. */
class BinlogSplitReaderTest extends MySqlSourceTestBase {
Expand Down Expand Up @@ -1119,6 +1120,37 @@ public void testBinlogOffsetCompareWithSnapshotAndBinlogPhase() throws Exception
Assertions.assertThat(sourceRecords).isEmpty();
}

@Test
void testReadBinlogWithException() throws Exception {
customerDatabase.createAndInitialize();
MySqlSourceConfig sourceConfig =
getConfig(StartupOptions.latest(), new String[] {"customers"});
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig);

// Create reader and submit splits
StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(sourceConfig, binaryLogClient, mySqlConnection);
MySqlBinlogSplit split = createBinlogSplit(sourceConfig);
BinlogSplitReader reader = new BinlogSplitReader(statefulTaskContext, 0);

// Mock an exception occurring during stream split reading by setting the error handler
// and stopping the change event source to test exception handling
reader.submitSplit(split);
statefulTaskContext
.getErrorHandler()
.setProducerThrowable(new RuntimeException("Test read with exception"));
reader.getChangeEventSourceContext().stopChangeEventSource();
// wait until executor is finished.
Thread.sleep(500L);

assertThatThrownBy(() -> pollRecordsFromReader(reader, RecordUtils::isDataChangeRecord))
.rootCause()
.isExactlyInstanceOf(RuntimeException.class)
.hasMessage("Test read with exception");
reader.close();
}

private BinlogSplitReader createBinlogReader(MySqlSourceConfig sourceConfig) {
return createBinlogReader(sourceConfig, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.cdc.connectors.postgres.source.fetch;

import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.connectors.base.WatermarkDispatcher;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
Expand Down Expand Up @@ -54,6 +55,7 @@ public class PostgresStreamFetchTask implements FetchTask<SourceSplitBase> {
private static final Logger LOG = LoggerFactory.getLogger(PostgresStreamFetchTask.class);

private final StreamSplit split;
private final StoppableChangeEventSourceContext changeEventSourceContext;
private volatile boolean taskRunning = false;
private volatile boolean stopped = false;

Expand All @@ -63,6 +65,7 @@ public class PostgresStreamFetchTask implements FetchTask<SourceSplitBase> {

public PostgresStreamFetchTask(StreamSplit streamSplit) {
this.split = streamSplit;
this.changeEventSourceContext = new StoppableChangeEventSourceContext();
}

@Override
Expand Down Expand Up @@ -92,8 +95,7 @@ public void execute(Context context) throws Exception {
sourceFetchContext.getTaskContext(),
sourceFetchContext.getReplicationConnection(),
split);
StoppableChangeEventSourceContext changeEventSourceContext =
new StoppableChangeEventSourceContext();

streamSplitReadTask.execute(
changeEventSourceContext,
sourceFetchContext.getPartition(),
Expand Down Expand Up @@ -162,6 +164,11 @@ public void commitCurrentOffset(@Nullable Offset offsetToCommit) {
}
}

@VisibleForTesting
StoppableChangeEventSourceContext getChangeEventSourceContext() {
return changeEventSourceContext;
}

/** A {@link ChangeEventSource} implementation for Postgres to read streaming changes. */
public static class StreamSplitReadTask extends PostgresStreamingChangeEventSource {
private static final Logger LOG = LoggerFactory.getLogger(StreamSplitReadTask.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.postgres.source.fetch;

import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.base.source.assigner.StreamSplitAssigner;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher;
import org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils;
import org.apache.flink.cdc.connectors.postgres.PostgresTestBase;
import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
import org.apache.flink.cdc.connectors.postgres.testutils.UniqueDatabase;

import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;

import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Test for {@link IncrementalSourceStreamFetcher }. */
public class IncrementalSourceStreamFetcherTest extends PostgresTestBase {

private static final String schemaName = "customer";
private static final String tableName = "Customers";

private final UniqueDatabase customDatabase =
new UniqueDatabase(
POSTGRES_CONTAINER,
"postgres",
"customer",
POSTGRES_CONTAINER.getUsername(),
POSTGRES_CONTAINER.getPassword());

@Test
void testReadStreamSplitWithException() throws Exception {
customDatabase.createAndInitialize();
PostgresSourceConfigFactory sourceConfigFactory =
getMockPostgresSourceConfigFactory(customDatabase, schemaName, tableName, 10, true);
sourceConfigFactory.startupOptions(StartupOptions.latest());
PostgresSourceConfig sourceConfig = sourceConfigFactory.create(0);
PostgresDialect dialect = new PostgresDialect(sourceConfigFactory.create(0));

// Create reader and submit splits
PostgresSourceFetchTaskContext taskContext =
new PostgresSourceFetchTaskContext(sourceConfig, dialect);
IncrementalSourceStreamFetcher fetcher = new IncrementalSourceStreamFetcher(taskContext, 0);
StreamSplit split = createStreamSplit(sourceConfig, dialect);
PostgresStreamFetchTask fetchTask =
(PostgresStreamFetchTask) dialect.createFetchTask(split);
StoppableChangeEventSourceContext changeEventSourceContext =
fetchTask.getChangeEventSourceContext();

fetcher.submitTask(fetchTask);
// Mock an exception occurring during stream split reading by setting the error handler
// and stopping the change event source to test exception handling
taskContext
.getErrorHandler()
.setProducerThrowable(new RuntimeException("Test read with exception"));
changeEventSourceContext.stopChangeEventSource();

// Wait for the task to complete
Thread.sleep(500L);

assertThatThrownBy(
() -> pollRecordsFromReader(fetcher, SourceRecordUtils::isDataChangeRecord))
.rootCause()
.isExactlyInstanceOf(RuntimeException.class)
.hasMessage("Test read with exception");
fetcher.close();
}

private StreamSplit createStreamSplit(
PostgresSourceConfig sourceConfig, PostgresDialect dialect) throws Exception {
StreamSplitAssigner streamSplitAssigner =
new StreamSplitAssigner(
sourceConfig,
dialect,
new PostgresOffsetFactory(),
new MockSplitEnumeratorContext<>(1));
streamSplitAssigner.open();

Map<TableId, TableChanges.TableChange> tableSchemas =
dialect.discoverDataCollectionSchemas(sourceConfig);
return StreamSplit.fillTableSchemas(streamSplitAssigner.createStreamSplit(), tableSchemas);
}

private List<SourceRecord> pollRecordsFromReader(
IncrementalSourceStreamFetcher fetcher, Predicate<SourceRecord> filter) {
List<SourceRecord> records = new ArrayList<>();
Iterator<SourceRecords> recordIterator;
try {
recordIterator = fetcher.pollSplitRecords();
} catch (InterruptedException e) {
throw new RuntimeException("Polling action was interrupted", e);
}
if (recordIterator == null) {
return records;
}
while (recordIterator.hasNext()) {
Iterator<SourceRecord> iterator = recordIterator.next().iterator();
while (iterator.hasNext()) {
SourceRecord record = iterator.next();
if (filter.test(record)) {
records.add(record);
}
}
}
LOG.debug("Records polled: {}", records);
return records;
}
}
Loading