Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/content.zh/docs/connectors/flink-sources/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ You can also read [tutorials]({{< ref "docs/connectors/flink-sources/tutorials/b

| Connector | Database | Driver |
|----------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------|
| [mongodb-cdc]({{< ref "docs/connectors/flink-sources/mongodb-cdc" >}}) | <li> [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0 | MongoDB Driver: 4.3.4 |
| [mongodb-cdc]({{< ref "docs/connectors/flink-sources/mongodb-cdc" >}}) | <li> [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0, 6.0, 6.1, 7.0 | MongoDB Driver: 4.11.2 |
| [mysql-cdc]({{< ref "docs/connectors/flink-sources/mysql-cdc" >}}) | <li> [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x <li> [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x <li> [PolarDB MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x <li> [Aurora MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x <li> [MariaDB](https://mariadb.org): 10.x <li> [PolarDB X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | JDBC Driver: 8.0.28 |
| [oceanbase-cdc]({{< ref "docs/connectors/flink-sources/oceanbase-cdc" >}}) | <li> [OceanBase CE](https://open.oceanbase.com): 3.1.x, 4.x <li> [OceanBase EE](https://www.oceanbase.com/product/oceanbase): 2.x, 3.x, 4.x | OceanBase Driver: 2.4.x |
| [oracle-cdc]({{< ref "docs/connectors/flink-sources/oracle-cdc" >}}) | <li> [Oracle](https://www.oracle.com/index.html): 11, 12, 19, 21 | Oracle Driver: 19.3.0.0 |
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/connectors/flink-sources/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ You can also read [tutorials]({{< ref "docs/connectors/flink-sources/tutorials/b

| Connector | Database | Driver |
|----------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------|
| [mongodb-cdc]({{< ref "docs/connectors/flink-sources/mongodb-cdc" >}}) | <li> [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0 | MongoDB Driver: 4.3.4 |
| [mongodb-cdc]({{< ref "docs/connectors/flink-sources/mongodb-cdc" >}}) | <li> [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0, 6.0, 6.1, 7.0 | MongoDB Driver: 4.11.2 |
| [mysql-cdc]({{< ref "docs/connectors/flink-sources/mysql-cdc" >}}) | <li> [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x <li> [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x <li> [PolarDB MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x <li> [Aurora MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x <li> [MariaDB](https://mariadb.org): 10.x <li> [PolarDB X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | JDBC Driver: 8.0.28 |
| [oceanbase-cdc]({{< ref "docs/connectors/flink-sources/oceanbase-cdc" >}}) | <li> [OceanBase CE](https://open.oceanbase.com): 3.1.x, 4.x <li> [OceanBase EE](https://www.oceanbase.com/product/oceanbase): 2.x, 3.x, 4.x | OceanBase Driver: 2.4.x |
| [oracle-cdc]({{< ref "docs/connectors/flink-sources/oracle-cdc" >}}) | <li> [Oracle](https://www.oracle.com/index.html): 11, 12, 19, 21 | Oracle Driver: 19.3.0.0 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ limitations under the License.
<dependency>
<groupId>org.mongodb.kafka</groupId>
<artifactId>mongo-kafka-connect</artifactId>
<version>1.10.1</version>
<version>1.13.0</version>
<exclusions>
<exclusion>
<artifactId>mongodb-driver-sync</artifactId>
Expand All @@ -69,7 +69,7 @@ limitations under the License.
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
<version>4.9.1</version>
<version>4.11.2</version>
</dependency>

<!-- test dependencies on Flink -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,28 +78,36 @@ public class MongoDBFullChangelogITCase extends MongoDBSourceTestBase {

@Rule public final Timeout timeoutPerTest = Timeout.seconds(300);

private final String mongoVersion;
private final boolean parallelismSnapshot;

public MongoDBFullChangelogITCase(boolean parallelismSnapshot) {
public MongoDBFullChangelogITCase(String mongoVersion, boolean parallelismSnapshot) {
super(mongoVersion);
this.mongoVersion = mongoVersion;
this.parallelismSnapshot = parallelismSnapshot;
}

@Parameterized.Parameters(name = "parallelismSnapshot: {0}")
@Parameterized.Parameters(name = "mongoVersion: {0} parallelismSnapshot: {1}")
public static Object[] parameters() {
return new Object[][] {new Object[] {false}, new Object[] {true}};
List<Object[]> parameterTuples = new ArrayList<>();
for (String mongoVersion : MONGO_VERSIONS) {
parameterTuples.add(new Object[] {mongoVersion, true});
parameterTuples.add(new Object[] {mongoVersion, false});
}
return parameterTuples.toArray();
}

@Test
public void testGetMongoDBVersion() {
MongoDBSourceConfig config =
new MongoDBSourceConfigFactory()
.hosts(CONTAINER.getHostAndPort())
.hosts(mongoContainer.getHostAndPort())
.splitSizeMB(1)
.samplesPerChunk(10)
.pollAwaitTimeMillis(500)
.create(0);

assertEquals(MongoUtils.getMongoVersion(config), "6.0.9");
assertEquals(MongoUtils.getMongoVersion(config), mongoVersion);
}

@Test
Expand Down Expand Up @@ -499,16 +507,16 @@ private List<String> testBackfillWhenWritingEvents(
"customer_" + Integer.toUnsignedString(new Random().nextInt(), 36);

// A - enable system-level fulldoc pre & post image feature
CONTAINER.executeCommand(
mongoContainer.executeCommand(
"use admin; db.runCommand({ setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 'off' } } } })");

// B - enable collection-level fulldoc pre & post image for change capture collection
CONTAINER.executeCommandInDatabase(
mongoContainer.executeCommandInDatabase(
String.format(
"db.createCollection('%s'); db.runCommand({ collMod: '%s', changeStreamPreAndPostImages: { enabled: true } })",
"customers", "customers"),
customerDatabase);
CONTAINER.executeCommandFileInDatabase("customer", customerDatabase);
mongoContainer.executeCommandFileInDatabase("customer", customerDatabase);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
Expand All @@ -526,7 +534,7 @@ private List<String> testBackfillWhenWritingEvents(
TestTable customerTable = new TestTable(customerDatabase, "customers", customersSchema);
MongoDBSource source =
new MongoDBSourceBuilder()
.hosts(CONTAINER.getHostAndPort())
.hosts(mongoContainer.getHostAndPort())
.databaseList(customerDatabase)
.username(FLINK_USER)
.password(FLINK_USER_PASSWORD)
Expand Down Expand Up @@ -613,12 +621,12 @@ private void testMongoDBParallelSource(
"customer_" + Integer.toUnsignedString(new Random().nextInt(), 36);

// A - enable system-level fulldoc pre & post image feature
CONTAINER.executeCommand(
mongoContainer.executeCommand(
"use admin; db.runCommand({ setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 'off' } } } })");

// B - enable collection-level fulldoc pre & post image for change capture collection
for (String collectionName : captureCustomerCollections) {
CONTAINER.executeCommandInDatabase(
mongoContainer.executeCommandInDatabase(
String.format(
"db.createCollection('%s'); db.runCommand({ collMod: '%s', changeStreamPreAndPostImages: { enabled: true } })",
collectionName, collectionName),
Expand Down Expand Up @@ -654,14 +662,14 @@ private void testMongoDBParallelSource(
+ " 'scan.incremental.snapshot.backfill.skip' = '%s'"
+ ")",
parallelismSnapshot ? "true" : "false",
CONTAINER.getHostAndPort(),
mongoContainer.getHostAndPort(),
FLINK_USER,
FLINK_USER_PASSWORD,
customerDatabase,
getCollectionNameRegex(customerDatabase, captureCustomerCollections),
skipSnapshotBackfill);

CONTAINER.executeCommandFileInDatabase("customer", customerDatabase);
mongoContainer.executeCommandFileInDatabase("customer", customerDatabase);

// first step: check the snapshot data
String[] snapshotForSingleTable =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,42 @@

import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.ArrayList;
import java.util.List;

import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER;
import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER_PASSWORD;
import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH;

/** Example Tests for {@link MongoDBSource}. */
@RunWith(Parameterized.class)
public class MongoDBParallelSourceExampleTest extends MongoDBSourceTestBase {

@Parameterized.Parameters(name = "mongoVersion: {0} parallelismSnapshot: {1}")
public static Object[] parameters() {
List<Object[]> parameterTuples = new ArrayList<>();
for (String mongoVersion : MONGO_VERSIONS) {
parameterTuples.add(new Object[] {mongoVersion, true});
parameterTuples.add(new Object[] {mongoVersion, false});
}
return parameterTuples.toArray();
}

public MongoDBParallelSourceExampleTest(String mongoVersion) {
super(mongoVersion);
}

@Test
@Ignore("Test ignored because it won't stop and is used for manual test")
public void testMongoDBExampleSource() throws Exception {
String database = CONTAINER.executeCommandFileInSeparateDatabase("inventory");
String database = mongoContainer.executeCommandFileInSeparateDatabase("inventory");

MongoDBSource<String> mongoSource =
MongoDBSource.<String>builder()
.hosts(CONTAINER.getHostAndPort())
.hosts(mongoContainer.getHostAndPort())
.databaseList(database)
.collectionList(database + ".products")
.username(FLINK_USER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,15 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBAssertUtils.assertEqualsInAnyOrder;
import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER;
Expand All @@ -65,11 +68,21 @@
import static org.apache.flink.util.Preconditions.checkState;

/** IT tests for {@link MongoDBSource}. */
@RunWith(Parameterized.class)
public class MongoDBParallelSourceITCase extends MongoDBSourceTestBase {
private static final int USE_POST_LOWWATERMARK_HOOK = 1;
private static final int USE_PRE_HIGHWATERMARK_HOOK = 2;
private static final int USE_POST_HIGHWATERMARK_HOOK = 3;

public MongoDBParallelSourceITCase(String mongoVersion) {
super(mongoVersion);
}

@Parameterized.Parameters(name = "mongoVersion: {0}")
public static Object[] parameters() {
return Stream.of(MONGO_VERSIONS).map(e -> new Object[] {e}).toArray();
}

@Rule public final Timeout timeoutPerTest = Timeout.seconds(300);

@Test
Expand Down Expand Up @@ -406,7 +419,7 @@ public void testSkipBackfillWithDMLPostLowWaterMark() throws Exception {
private List<String> testBackfillWhenWritingEvents(
boolean skipBackFill, int fetchSize, int hookType, StartupOptions startupOptions)
throws Exception {
String customerDatabase = CONTAINER.executeCommandFileInSeparateDatabase("customer");
String customerDatabase = mongoContainer.executeCommandFileInSeparateDatabase("customer");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
env.setParallelism(1);
Expand All @@ -423,7 +436,7 @@ private List<String> testBackfillWhenWritingEvents(
TestTable customerTable = new TestTable(customerDatabase, "customers", customersSchema);
MongoDBSource source =
new MongoDBSourceBuilder()
.hosts(CONTAINER.getHostAndPort())
.hosts(mongoContainer.getHostAndPort())
.databaseList(customerDatabase)
.username(FLINK_USER)
.password(FLINK_USER_PASSWORD)
Expand Down Expand Up @@ -507,7 +520,7 @@ private void testMongoDBParallelSource(
boolean skipSnapshotBackfill)
throws Exception {

String customerDatabase = CONTAINER.executeCommandFileInSeparateDatabase("customer");
String customerDatabase = mongoContainer.executeCommandFileInSeparateDatabase("customer");

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
Expand Down Expand Up @@ -535,7 +548,7 @@ private void testMongoDBParallelSource(
+ " 'heartbeat.interval.ms' = '500',"
+ " 'scan.incremental.snapshot.backfill.skip' = '%s'"
+ ")",
CONTAINER.getHostAndPort(),
mongoContainer.getHostAndPort(),
FLINK_USER,
FLINK_USER_PASSWORD,
customerDatabase,
Expand Down
Loading