Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/flink_cdc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ jobs:
strategy:
matrix:
java-version: [ '8' ]
flink-version: ['1.17.2', '1.18.1', '1.19.1', '1.20.0']
flink-version: ['1.19.1', '1.20.0']
module: [ 'pipeline_e2e' ]
name: Pipeline E2E Tests
uses: ./.github/workflows/flink_cdc_base.yml
Expand All @@ -83,7 +83,7 @@ jobs:
strategy:
matrix:
java-version: [ '8' ]
flink-version: ['1.16.3', '1.17.2', '1.18.1', '1.19.1', '1.20.0']
flink-version: ['1.19.1', '1.20.0']
module: [ 'source_e2e' ]
name: Source E2E Tests
uses: ./.github/workflows/flink_cdc_base.yml
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/flink_cdc_migration_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
flink-version: [ '1.18.1', '1.19.1', '1.20.0' ]
flink-version: [ '1.19.1', '1.20.0' ]

steps:
- uses: actions/checkout@v4
Expand Down Expand Up @@ -78,7 +78,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
flink-version: [ '1.18.1', '1.19.1', '1.20.0' ]
flink-version: [ '1.19.1', '1.20.0' ]

steps:
- uses: actions/checkout@v4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ limitations under the License.
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<elasticsearch.version>8.12.1</elasticsearch.version>
<flink.version>1.18.0</flink.version>
<flink.version>1.19.1</flink.version>
<scala.binary.version>4.0</scala.binary.version>
<jackson.version>2.13.2</jackson.version>
<surefire.module.config>--add-opens=java.base/java.util=ALL-UNNAMED</surefire.module.config>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ limitations under the License.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.0.2-${flink.major.version}</version>
<version>3.3.0-${flink.major.version}</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ limitations under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.26.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.flink.cdc.connectors.paimon.sink.v2;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobInfo;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.serialization.SerializationSchema;
Expand All @@ -40,6 +42,8 @@
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.metrics.groups.InternalSinkCommitterMetricGroup;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
Expand Down Expand Up @@ -572,7 +576,10 @@ private MultiTableCommittable correctCheckpointId(MultiTableCommittable committa
private static class MockCommitRequestImpl<CommT> extends CommitRequestImpl<CommT> {

protected MockCommitRequestImpl(CommT committable) {
super(committable);
super(
committable,
InternalSinkCommitterMetricGroup.wrap(
UnregisteredMetricsGroup.createOperatorMetricGroup()));
}
}

Expand Down Expand Up @@ -633,5 +640,15 @@ public <IN> TypeSerializer<IN> createInputSerializer() {
public JobID getJobId() {
return null;
}

@Override
public JobInfo getJobInfo() {
return null;
}

@Override
public TaskInfo getTaskInfo() {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ limitations under the License.
<artifactId>flink-cdc-pipeline-connector-starrocks</artifactId>

<properties>
<starrocks.connector.version>1.2.9_flink-${flink.major.version}</starrocks.connector.version>
<starrocks.connector.version>1.2.10_flink-${flink.major.version}</starrocks.connector.version>
</properties>

<dependencies>
Expand All @@ -38,15 +38,6 @@ limitations under the License.
<version>${starrocks.connector.version}</version>
</dependency>

<dependency>
<!-- TODO connector 1.2.9 depends on this, but not package it, so add this dependency here.
This dependency can be removed after upgrading connector to 1.2.10 which will not use
commons-compress anymore. -->
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.21</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-composer</artifactId>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.flink.cdc.connectors.starrocks.sink;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobInfo;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.serialization.SerializationSchema;
Expand Down Expand Up @@ -353,5 +355,15 @@ public <IN> TypeSerializer<IN> createInputSerializer() {
public JobID getJobId() {
throw new UnsupportedOperationException();
}

@Override
public JobInfo getJobInfo() {
throw new UnsupportedOperationException();
}

@Override
public TaskInfo getTaskInfo() {
throw new UnsupportedOperationException();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public void testConsumingAllEvents()
"spare tire,22.200"
};

List<String> actual = TestValuesTableFactory.getResults("sink");
List<String> actual = TestValuesTableFactory.getResultsAsStrings("sink");
assertThat(actual, containsInAnyOrder(expected));

cancelJobIfRunning(result);
Expand Down Expand Up @@ -302,7 +302,7 @@ public void testAllTypes() throws Exception {
Arrays.asList(
"+I(1,32767,65535,2147483647,5.5,6.6,123.12345,404.4,Hello World,a,abc,2020-07-17T18:00:22.123,2020-07-17,18:00:22,500,2020-07-17T18:00:22.123456789)",
"+U(1,0,65535,2147483647,5.5,6.6,123.12345,404.4,Hello World,a,abc,2020-07-17T18:00:22.123,2020-07-17,18:00:22,500,2020-07-17T18:00:22.123456789)");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
Collections.sort(expected);
Collections.sort(actual);
assertEquals(expected, actual);
Expand Down Expand Up @@ -373,7 +373,7 @@ public void testStartupFromLatestOffset() throws Exception {
String[] expected =
new String[] {"110,jacket,new water resistent white wind breaker,0.500"};

List<String> actual = TestValuesTableFactory.getResults("sink");
List<String> actual = TestValuesTableFactory.getResultsAsStrings("sink");
assertThat(actual, containsInAnyOrder(expected));

cancelJobIfRunning(result);
Expand Down Expand Up @@ -470,7 +470,7 @@ public void testMetadataColumns() throws Throwable {
"+U(testdb,DB2INST1,PRODUCTS,111,scooter,Big 2-wheel scooter ,5.170)",
"-D(testdb,DB2INST1,PRODUCTS,111,scooter,Big 2-wheel scooter ,5.170)");

List<String> actual = TestValuesTableFactory.getRawResults("sink");
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
Collections.sort(expected);
Collections.sort(actual);
assertEquals(expected, actual);
Expand All @@ -493,7 +493,7 @@ private static void waitForSinkSize(String sinkName, int expectedSize)
private static int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ private void testRemoveAndAddCollectionsOneByOne(

MongoDBTestUtils.waitForSinkSize("sink", fetchedDataList.size());
MongoDBAssertUtils.assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink"));

// first round's changelog data
makeOplogForAddressTableInRound(database, collection0, 0);
Expand All @@ -418,7 +418,7 @@ private void testRemoveAndAddCollectionsOneByOne(
collection0, 417022095255614380L, cityName0, cityName0)));
MongoDBTestUtils.waitForSinkSize("sink", fetchedDataList.size());
MongoDBAssertUtils.assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink"));
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
jobClient.cancel().get();
}
Expand Down Expand Up @@ -466,7 +466,7 @@ private void testRemoveAndAddCollectionsOneByOne(
captureTableThisRound, cityName, cityName)));
MongoDBTestUtils.waitForSinkSize("sink", fetchedDataList.size());
MongoDBAssertUtils.assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink"));

// step 4: make changelog data for all collections before this round(also includes this
// round),
Expand Down Expand Up @@ -512,7 +512,7 @@ private void testRemoveAndAddCollectionsOneByOne(
MongoDBTestUtils.waitForSinkSize("sink", fetchedDataList.size());

MongoDBAssertUtils.assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink"));
// step 6: trigger savepoint
if (round != captureAddressCollections.length - 1) {
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
Expand Down Expand Up @@ -590,7 +590,7 @@ private void testRemoveCollectionsOneByOne(
}
MongoDBTestUtils.waitForSinkSize("sink", fetchedDataList.size());
MongoDBAssertUtils.assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink"));
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
jobClient.cancel().get();
}
Expand Down Expand Up @@ -629,7 +629,7 @@ private void testRemoveCollectionsOneByOne(

MongoDBTestUtils.waitForSinkSize("sink", fetchedDataList.size());
MongoDBAssertUtils.assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink"));

// step 3: make oplog data for all collections
List<String> expectedOplogDataThisRound = new ArrayList<>();
Expand Down Expand Up @@ -665,7 +665,7 @@ private void testRemoveCollectionsOneByOne(
}

if (failoverPhase == MongoDBTestUtils.FailoverPhase.STREAM
&& TestValuesTableFactory.getRawResults("sink").size()
&& TestValuesTableFactory.getRawResultsAsStrings("sink").size()
> fetchedDataList.size()) {
MongoDBTestUtils.triggerFailover(
failoverType,
Expand All @@ -679,7 +679,7 @@ private void testRemoveCollectionsOneByOne(
MongoDBTestUtils.waitForSinkSize("sink", fetchedDataList.size());

MongoDBAssertUtils.assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink"));

// step 5: trigger savepoint
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
Expand Down Expand Up @@ -798,7 +798,7 @@ private void testNewlyAddedCollectionOneByOne(
fetchedDataList.addAll(expectedSnapshotDataThisRound);
waitForUpsertSinkSize("sink", fetchedDataList.size());
MongoDBAssertUtils.assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getResults("sink"));
fetchedDataList, TestValuesTableFactory.getResultsAsStrings("sink"));

// step 3: make some changelog data for this round
makeFirstPartOplogForAddressCollection(
Expand Down Expand Up @@ -843,7 +843,7 @@ private void testNewlyAddedCollectionOneByOne(
// checkpoint to wait retract old record and send new record
Thread.sleep(1000);
MongoDBAssertUtils.assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getResults("sink"));
fetchedDataList, TestValuesTableFactory.getResultsAsStrings("sink"));

// step 6: trigger savepoint
if (round != captureAddressCollections.length - 1) {
Expand Down Expand Up @@ -1023,7 +1023,7 @@ protected static void waitForUpsertSinkSize(String sinkName, int expectedSize)
protected static int upsertSinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getResults(sinkName).size();
return TestValuesTableFactory.getResultsAsStrings(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public void testConsumingAllEvents() throws ExecutionException, InterruptedExcep
"spare tire,22.200"
};

List<String> actual = TestValuesTableFactory.getResults("sink");
List<String> actual = TestValuesTableFactory.getResultsAsStrings("sink");
assertThat(actual, containsInAnyOrder(expected));

result.getJobClient().get().cancel().get();
Expand Down Expand Up @@ -300,7 +300,7 @@ public void testStartupFromTimestamp() throws Exception {

String[] expected = new String[] {"jacket,0.200", "scooter,5.180"};

List<String> actual = TestValuesTableFactory.getResults("sink");
List<String> actual = TestValuesTableFactory.getResultsAsStrings("sink");
assertThat(actual, containsInAnyOrder(expected));

result.getJobClient().get().cancel().get();
Expand Down Expand Up @@ -463,7 +463,7 @@ public void testAllTypes() throws Throwable {
"+U(5d505646cf6d4fe581014ab2,hello,0bd1e27e-2829-4b47-8e21-dfef93da44e1,2078693f4c61ce3073b01be69ab76428,17:54:14,2019-08-11,1960-08-11,2019-08-11T17:54:14.692,2019-08-11T17:54:14.692Z,2019-08-11T17:47:44,2019-08-11T17:47:44Z,true,11,10.5,10,510,hello,50,{inner_map={key=234}},[hello, world],[1.0, 1.1, null],[hello0,51, hello1,53],MIN_KEY,MAX_KEY,/^H/i,null,null,[1, 2, 3],function() { x++; },ref_doc,5d505646cf6d4fe581014ab3)",
"-U(5d505646cf6d4fe581014ab2,hello,0bd1e27e-2829-4b47-8e21-dfef93da44e1,2078693f4c61ce3073b01be69ab76428,17:54:14,2019-08-11,1960-08-11,2019-08-11T17:54:14.692,2019-08-11T17:54:14.692Z,2019-08-11T17:47:44,2019-08-11T17:47:44Z,true,11,10.5,10,510,hello,50,{inner_map={key=234}},[hello, world],[1.0, 1.1, null],[hello0,51, hello1,53],MIN_KEY,MAX_KEY,/^H/i,null,null,[1, 2, 3],function() { x++; },ref_doc,5d505646cf6d4fe581014ab3)",
"+U(5d505646cf6d4fe581014ab2,hello,0bd1e27e-2829-4b47-8e21-dfef93da44e1,2078693f4c61ce3073b01be69ab76428,18:36:04,2021-09-03,1960-08-11,2021-09-03T18:36:04.123,2021-09-03T18:36:04.123Z,2021-09-03T18:36:04,2021-09-03T18:36:04Z,true,11,10.5,10,510,hello,50,{inner_map={key=234}},[hello, world],[1.0, 1.1, null],[hello0,51, hello1,53],MIN_KEY,MAX_KEY,/^H/i,null,null,[1, 2, 3],function() { x++; },ref_doc,5d505646cf6d4fe581014ab3)");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
assertEquals(expected, actual);

result.getJobClient().get().cancel().get();
Expand Down Expand Up @@ -582,7 +582,7 @@ public void testMetadataColumns() throws Exception {
.sorted()
.collect(Collectors.toList());

List<String> actual = TestValuesTableFactory.getRawResults("meta_sink");
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("meta_sink");
Collections.sort(actual);
assertEquals(expected, actual);
result.getJobClient().get().cancel().get();
Expand Down
Loading