diff --git a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/common/utils/MongoConstants.java b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/common/utils/MongoConstants.java index e60b4532..378d6db4 100644 --- a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/common/utils/MongoConstants.java +++ b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/common/utils/MongoConstants.java @@ -65,6 +65,9 @@ public class MongoConstants { public static final BsonDocument ID_HINT = new BsonDocument(ID_FIELD, new BsonInt32(1)); + public static final BsonDocument BSON_MIN_BOUNDARY = new BsonDocument(ID_FIELD, BSON_MIN_KEY); + public static final BsonDocument BSON_MAX_BOUNDARY = new BsonDocument(ID_FIELD, BSON_MAX_KEY); + public static final JsonWriterSettings DEFAULT_JSON_WRITER_SETTINGS = JsonWriterSettings.builder().outputMode(JsonMode.EXTENDED).build(); diff --git a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/config/MongoReadOptions.java b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/config/MongoReadOptions.java index fb990ed3..41590e1d 100644 --- a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/config/MongoReadOptions.java +++ b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/config/MongoReadOptions.java @@ -22,6 +22,8 @@ import org.apache.flink.connector.mongodb.source.enumerator.splitter.PartitionStrategy; import org.apache.flink.connector.mongodb.source.reader.split.MongoScanSourceSplitReader; +import javax.annotation.Nullable; + import java.io.Serializable; import java.util.Objects; @@ -49,17 +51,21 @@ public class MongoReadOptions implements Serializable { private final int samplesPerPartition; + private final @Nullable Integer partitionRecordSize; + private MongoReadOptions( int fetchSize, boolean noCursorTimeout, PartitionStrategy partitionStrategy, MemorySize partitionSize, - int samplesPerPartition) { + int samplesPerPartition, + @Nullable Integer partitionRecordSize) { this.fetchSize = fetchSize; this.noCursorTimeout = noCursorTimeout; this.partitionStrategy = partitionStrategy; this.partitionSize = partitionSize; this.samplesPerPartition = samplesPerPartition; + this.partitionRecordSize = partitionRecordSize; } public int getFetchSize() { @@ -82,6 +88,10 @@ public int getSamplesPerPartition() { return samplesPerPartition; } + public @Nullable Integer getPartitionRecordSize() { + return partitionRecordSize; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -114,6 +124,7 @@ public static class MongoReadOptionsBuilder { private PartitionStrategy partitionStrategy = SCAN_PARTITION_STRATEGY.defaultValue(); private MemorySize partitionSize = SCAN_PARTITION_SIZE.defaultValue(); private int samplesPerPartition = SCAN_PARTITION_SAMPLES.defaultValue(); + private @Nullable Integer partitionRecordSize = null; private MongoReadOptionsBuilder() {} @@ -199,6 +210,19 @@ public MongoReadOptionsBuilder setSamplesPerPartition(int samplesPerPartition) { return this; } + /** + * Sets the number of records in each partition. This will only take effect when the + * partition strategy is set to Pagination. + * + * @param partitionRecordSize number of records in each partition. + * @return this builder + */ + public MongoReadOptionsBuilder setPartitionRecordSize( + @Nullable Integer partitionRecordSize) { + this.partitionRecordSize = partitionRecordSize; + return this; + } + /** * Build the {@link MongoReadOptions}. * @@ -210,7 +234,8 @@ public MongoReadOptions build() { noCursorTimeout, partitionStrategy, partitionSize, - samplesPerPartition); + samplesPerPartition, + partitionRecordSize); } } } diff --git a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoPaginationSplitter.java b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoPaginationSplitter.java new file mode 100644 index 00000000..64cb5301 --- /dev/null +++ b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoPaginationSplitter.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.source.enumerator.splitter; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.mongodb.source.config.MongoReadOptions; +import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit; + +import com.mongodb.MongoNamespace; +import com.mongodb.client.model.Aggregates; +import com.mongodb.client.model.Projections; +import com.mongodb.client.model.Sorts; +import org.bson.BsonDocument; +import org.bson.conversions.Bson; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MAX_BOUNDARY; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MIN_BOUNDARY; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_FIELD; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_HINT; + +/** Mongo Splitter that splits MongoDB collection evenly by record counts. */ +@Internal +public class MongoPaginationSplitter { + + private static final Logger LOG = LoggerFactory.getLogger(MongoPaginationSplitter.class); + + public static Collection split(MongoSplitContext splitContext) { + MongoReadOptions readOptions = splitContext.getReadOptions(); + MongoNamespace namespace = splitContext.getMongoNamespace(); + + // If partition record size isn't present, we'll use the partition size option and average + // object size to calculate number of records in each partitioned split. + Integer partitionRecordSize = readOptions.getPartitionRecordSize(); + if (partitionRecordSize == null) { + long avgObjSizeInBytes = splitContext.getAvgObjSize(); + if (avgObjSizeInBytes == 0) { + LOG.info( + "{} seems to be an empty collection, Returning a single partition.", + namespace); + return MongoSingleSplitter.split(splitContext); + } + + partitionRecordSize = + Math.toIntExact(readOptions.getPartitionSize().getBytes() / avgObjSizeInBytes); + } + + long totalNumOfDocuments = splitContext.getCount(); + + if (partitionRecordSize >= totalNumOfDocuments) { + LOG.info( + "Fewer documents ({}) than the number of documents per partition ({}), Returning a single partition.", + totalNumOfDocuments, + partitionRecordSize); + return MongoSingleSplitter.split(splitContext); + } + + int numberOfPartitions = + (int) (Math.ceil(totalNumOfDocuments / (double) partitionRecordSize)); + + BsonDocument lastUpperBound = null; + List paginatedSplits = new ArrayList<>(); + + for (int splitNum = 0; splitNum < numberOfPartitions; splitNum++) { + List pipeline = new ArrayList<>(); + + pipeline.add(Aggregates.project(Projections.include(ID_FIELD))); + pipeline.add(Aggregates.project(Sorts.ascending(ID_FIELD))); + + // We don't have to set the upper bounds limit if we're generating the first split. + if (lastUpperBound != null) { + BsonDocument matchFilter = new BsonDocument(); + if (lastUpperBound.containsKey(ID_FIELD)) { + matchFilter.put( + ID_FIELD, new BsonDocument("$gte", lastUpperBound.get(ID_FIELD))); + } + pipeline.add(Aggregates.match(matchFilter)); + } + pipeline.add(Aggregates.skip(partitionRecordSize)); + pipeline.add(Aggregates.limit(1)); + + BsonDocument currentUpperBound = + splitContext + .getMongoCollection() + .aggregate(pipeline) + .allowDiskUse(true) + .first(); + + paginatedSplits.add( + new MongoScanSourceSplit( + String.format("%s_%d", namespace, splitNum), + namespace.getDatabaseName(), + namespace.getCollectionName(), + lastUpperBound != null ? lastUpperBound : BSON_MIN_BOUNDARY, + currentUpperBound != null ? currentUpperBound : BSON_MAX_BOUNDARY, + ID_HINT)); + + if (currentUpperBound == null) { + break; + } + lastUpperBound = currentUpperBound; + } + + return paginatedSplits; + } +} diff --git a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSampleSplitter.java b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSampleSplitter.java index 22ca113c..ce094d7d 100644 --- a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSampleSplitter.java +++ b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSampleSplitter.java @@ -36,8 +36,8 @@ import java.util.List; import java.util.function.BiFunction; -import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MAX_KEY; -import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MIN_KEY; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MAX_BOUNDARY; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MIN_BOUNDARY; import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_FIELD; import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_HINT; @@ -109,10 +109,10 @@ static Collection split( @VisibleForTesting static List createSplits( List samples, int samplesPerPartition, MongoNamespace namespace) { - samples.add(new BsonDocument(ID_FIELD, BSON_MAX_KEY)); + samples.add(BSON_MAX_BOUNDARY); List sourceSplits = new ArrayList<>(); - BsonDocument partitionStart = new BsonDocument(ID_FIELD, BSON_MIN_KEY); + BsonDocument partitionStart = BSON_MIN_BOUNDARY; int splitNum = 0; for (int i = samplesPerPartition - 1; i < samples.size(); i += samplesPerPartition) { sourceSplits.add(createSplit(namespace, splitNum++, partitionStart, samples.get(i))); diff --git a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSingleSplitter.java b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSingleSplitter.java index f177a0d3..d8cb777e 100644 --- a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSingleSplitter.java +++ b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSingleSplitter.java @@ -20,14 +20,11 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit; -import org.bson.BsonDocument; - import java.util.Collection; import static java.util.Collections.singletonList; -import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MAX_KEY; -import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MIN_KEY; -import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_FIELD; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MAX_BOUNDARY; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MIN_BOUNDARY; import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_HINT; /** Mongo Splitter that splits MongoDB collection as a single split. */ @@ -42,8 +39,8 @@ public static Collection split(MongoSplitContext splitCont splitContext.getMongoNamespace().getFullName(), splitContext.getDatabaseName(), splitContext.getCollectionName(), - new BsonDocument(ID_FIELD, BSON_MIN_KEY), - new BsonDocument(ID_FIELD, BSON_MAX_KEY), + BSON_MIN_BOUNDARY, + BSON_MAX_BOUNDARY, ID_HINT); return singletonList(singleSplit); diff --git a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSplitVectorSplitter.java b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSplitVectorSplitter.java index 055c9a3f..1cd60e64 100644 --- a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSplitVectorSplitter.java +++ b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSplitVectorSplitter.java @@ -38,7 +38,7 @@ import java.util.Collection; import java.util.List; -import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MAX_KEY; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MAX_BOUNDARY; import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MIN_KEY; import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_FIELD; import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_HINT; @@ -90,7 +90,7 @@ public static Collection split(MongoSplitContext splitCont } // Complete right bound: (lastKey, maxKey) - splitKeys.add(new BsonDocument(ID_FIELD, BSON_MAX_KEY)); + splitKeys.add(BSON_MAX_BOUNDARY); List sourceSplits = new ArrayList<>(splitKeys.size()); diff --git a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSplitters.java b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSplitters.java index 1ebad817..d2ef3462 100644 --- a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSplitters.java +++ b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSplitters.java @@ -64,6 +64,8 @@ public static Collection split( return MongoSplitVectorSplitter.split(splitContext); case SHARDED: return MongoShardedSplitter.split(splitContext); + case PAGINATION: + return MongoPaginationSplitter.split(splitContext); case DEFAULT: default: return splitContext.isSharded() diff --git a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/PartitionStrategy.java b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/PartitionStrategy.java index a73de040..c117c342 100644 --- a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/PartitionStrategy.java +++ b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/PartitionStrategy.java @@ -55,6 +55,11 @@ public enum PartitionStrategy implements DescribedEnum { text( "Read the chunk ranges from config.chunks collection and splits to multiple chunks. Only support sharded collections.")), + PAGINATION( + "pagination", + text( + "Creating chunk records evenly by count. Each chunk will have exactly the same number of records.")), + DEFAULT( "default", text( diff --git a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoConnectorOptions.java b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoConnectorOptions.java index a579eaa3..36948f29 100644 --- a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoConnectorOptions.java +++ b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoConnectorOptions.java @@ -75,12 +75,13 @@ private MongoConnectorOptions() {} .enumType(PartitionStrategy.class) .defaultValue(PartitionStrategy.DEFAULT) .withDescription( - "Specifies the partition strategy. Available strategies are single, sample, split-vector, sharded and default." - + "The single partition strategy treats the entire collection as a single partition." - + "The sample partition strategy samples the collection and generate partitions which is fast but possibly uneven." - + "The split-vector partition strategy uses the splitVector command to generate partitions for non-sharded collections which is fast and even. The splitVector permission is required." - + "The sharded partition strategy reads config.chunks (MongoDB splits a sharded collection into chunks, and the range of the chunks are stored within the collection) as the partitions directly." - + "The sharded partition strategy is only used for sharded collection which is fast and even. Read permission of config database is required." + "Specifies the partition strategy. Available strategies are single, sample, split-vector, sharded, pagination and default. " + + "The single partition strategy treats the entire collection as a single partition. " + + "The sample partition strategy samples the collection and generate partitions which is fast but possibly uneven. " + + "The split-vector partition strategy uses the splitVector command to generate partitions for non-sharded collections which is fast and even. The splitVector permission is required. " + + "The sharded partition strategy reads config.chunks (MongoDB splits a sharded collection into chunks, and the range of the chunks are stored within the collection) as the partitions directly. " + + "The sharded partition strategy is only used for sharded collection which is fast and even. Read permission of config database is required. " + + "The pagination partition strategy splits records evenly. Each chunk will have exactly the same number of records. This could be configured by `scan.partition.record-size` option. " + "The default partition strategy uses sharded strategy for sharded collections otherwise using split vector strategy."); public static final ConfigOption SCAN_PARTITION_SIZE = @@ -99,6 +100,14 @@ private MongoConnectorOptions() {} + "Then uses every 'scan.partition.samples' as the value to use to calculate the partition boundaries." + "The total number of samples taken is calculated as: samples per partition * (count of documents / number of documents per partition."); + public static final ConfigOption SCAN_PARTITION_RECORD_SIZE = + ConfigOptions.key("scan.partition.record-size") + .intType() + .noDefaultValue() + .withDescription( + "Specifies the number of records in each chunk. Only takes effect when `scan.partition.strategy` is `pagination`. " + + "This option will be automatically inferred from `scan.partition.size` if absent."); + public static final ConfigOption LOOKUP_RETRY_INTERVAL = ConfigOptions.key("lookup.retry.interval") .durationType() diff --git a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableFactory.java b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableFactory.java index 055c6b75..6bf87d22 100644 --- a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableFactory.java +++ b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableFactory.java @@ -179,6 +179,7 @@ private static MongoReadOptions getReadOptions(MongoConfiguration configuration) .setNoCursorTimeout(configuration.isNoCursorTimeout()) .setPartitionStrategy(configuration.getPartitionStrategy()) .setPartitionSize(configuration.getPartitionSize()) + .setPartitionRecordSize(configuration.getPartitionRecordSize()) .setSamplesPerPartition(configuration.getSamplesPerPartition()) .build(); } diff --git a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/config/MongoConfiguration.java b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/config/MongoConfiguration.java index 753b429b..1a6bd51c 100644 --- a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/config/MongoConfiguration.java +++ b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/config/MongoConfiguration.java @@ -38,6 +38,7 @@ import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.LOOKUP_RETRY_INTERVAL; import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_CURSOR_NO_TIMEOUT; import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_FETCH_SIZE; +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_PARTITION_RECORD_SIZE; import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_PARTITION_SAMPLES; import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_PARTITION_SIZE; import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_PARTITION_STRATEGY; @@ -90,6 +91,10 @@ public int getSamplesPerPartition() { return config.get(SCAN_PARTITION_SAMPLES); } + public Integer getPartitionRecordSize() { + return config.get(SCAN_PARTITION_RECORD_SIZE); + } + // -----------------------------------Lookup Config---------------------------------------- public int getLookupMaxRetries() { return config.get(LookupOptions.MAX_RETRIES); diff --git a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/source/enumerator/MongoSourceEnumStateSerializerTest.java b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/source/enumerator/MongoSourceEnumStateSerializerTest.java index 8f0a796d..54da9627 100644 --- a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/source/enumerator/MongoSourceEnumStateSerializerTest.java +++ b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/source/enumerator/MongoSourceEnumStateSerializerTest.java @@ -17,7 +17,6 @@ package org.apache.flink.connector.mongodb.source.enumerator; -import org.apache.flink.connector.mongodb.common.utils.MongoConstants; import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit; import org.bson.BsonDocument; @@ -29,6 +28,7 @@ import java.util.List; import java.util.Map; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MAX_BOUNDARY; import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_HINT; import static org.apache.flink.connector.mongodb.source.enumerator.MongoSourceEnumStateSerializer.INSTANCE; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -75,7 +75,7 @@ private static MongoScanSourceSplit createSourceSplit(int index) { "db", "coll", new BsonDocument("_id", new BsonInt32(index)), - new BsonDocument("_id", MongoConstants.BSON_MAX_KEY), + BSON_MAX_BOUNDARY, ID_HINT); } } diff --git a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoPaginationSplitterTest.java b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoPaginationSplitterTest.java new file mode 100644 index 00000000..e234422f --- /dev/null +++ b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoPaginationSplitterTest.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.connector.mongodb.source.enumerator.splitter; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.connector.mongodb.source.config.MongoReadOptions; +import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit; +import org.apache.flink.connector.mongodb.testutils.MongoShardedContainers; +import org.apache.flink.connector.mongodb.testutils.MongoTestUtil; + +import com.mongodb.MongoNamespace; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import com.mongodb.client.MongoCollection; +import org.bson.BsonDocument; +import org.bson.BsonInt64; +import org.bson.BsonString; +import org.bson.BsonValue; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.testcontainers.containers.Network; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MAX_BOUNDARY; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MAX_KEY; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MIN_BOUNDARY; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MIN_KEY; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_FIELD; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_HINT; +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit tests for {@link MongoPaginationSplitter}. */ +class MongoPaginationSplitterTest { + + @RegisterExtension + private static final MongoShardedContainers MONGO_SHARDED_CONTAINER = + MongoTestUtil.createMongoDBShardedContainers(Network.newNetwork()); + + private static MongoClient mongoClient; + + private static final MongoNamespace TEST_NS = new MongoNamespace("test.test"); + + private static final int TOTAL_RECORDS_COUNT = 120; + + @BeforeAll + static void beforeAll() { + mongoClient = MongoClients.create(MONGO_SHARDED_CONTAINER.getConnectionString()); + MongoCollection coll = + mongoClient + .getDatabase(TEST_NS.getDatabaseName()) + .getCollection(TEST_NS.getCollectionName()) + .withDocumentClass(BsonDocument.class); + coll.insertMany(initializeRecords()); + } + + @AfterAll + static void afterAll() { + if (mongoClient != null) { + mongoClient.close(); + } + } + + /// Test cases that specifies number of records in each partition explicitly. + @Test + void testSingleSplitPartitions() { + MongoSplitContext splitContext = createSplitContext(TOTAL_RECORDS_COUNT); + assertThat(new ArrayList<>(MongoPaginationSplitter.split(splitContext))) + .isEqualTo(SINGLE_SPLIT); + } + + @Test + void testLargePartitionRecordSize() { + MongoSplitContext splitContext = createSplitContext(TOTAL_RECORDS_COUNT * 2); + assertThat(new ArrayList<>(MongoPaginationSplitter.split(splitContext))) + .isEqualTo(SINGLE_SPLIT); + } + + @Test + void testLargerSizedPartitions() { + MongoSplitContext splitContext = createSplitContext(15); + assertThat(new ArrayList<>(MongoPaginationSplitter.split(splitContext))) + .isEqualTo( + createReferenceSplits( + Arrays.asList( + Tuple2.of(BSON_MIN_KEY, new BsonInt64(15)), + Tuple2.of(new BsonInt64(15), new BsonInt64(30)), + Tuple2.of(new BsonInt64(30), new BsonInt64(45)), + Tuple2.of(new BsonInt64(45), new BsonInt64(60)), + Tuple2.of(new BsonInt64(60), new BsonInt64(75)), + Tuple2.of(new BsonInt64(75), new BsonInt64(90)), + Tuple2.of(new BsonInt64(90), new BsonInt64(105)), + Tuple2.of(new BsonInt64(105), BSON_MAX_KEY)))); + } + + @Test + void testOffByOnePartitions() { + { + MongoSplitContext splitContext = createSplitContext(TOTAL_RECORDS_COUNT - 1); + assertThat(new ArrayList<>(MongoPaginationSplitter.split(splitContext))) + .isEqualTo( + createReferenceSplits( + Arrays.asList( + Tuple2.of( + BSON_MIN_KEY, + new BsonInt64(TOTAL_RECORDS_COUNT - 1)), + Tuple2.of( + new BsonInt64(TOTAL_RECORDS_COUNT - 1), + BSON_MAX_KEY)))); + } + + { + MongoSplitContext splitContext = createSplitContext(TOTAL_RECORDS_COUNT); + assertThat(new ArrayList<>(MongoPaginationSplitter.split(splitContext))) + .isEqualTo(SINGLE_SPLIT); + } + } + + /// Test cases that do not specify number of records, and estimates record size with + /// `avgObjSize`. + @Test + void testEstimatedSingleSplitPartitions() { + MongoSplitContext splitContext = + createSplitContext(MemorySize.ofMebiBytes(16), MemorySize.ZERO); + assertThat(new ArrayList<>(MongoPaginationSplitter.split(splitContext))) + .isEqualTo(SINGLE_SPLIT); + } + + @Test + void testEstimatedLargerSizedPartitions() { + MongoSplitContext splitContext = + createSplitContext(MemorySize.ofMebiBytes(50), MemorySize.ofMebiBytes(3)); + + assertThat(new ArrayList<>(MongoPaginationSplitter.split(splitContext))) + .isEqualTo( + createReferenceSplits( + Arrays.asList( + Tuple2.of(BSON_MIN_KEY, new BsonInt64(16)), + Tuple2.of(new BsonInt64(16), new BsonInt64(32)), + Tuple2.of(new BsonInt64(32), new BsonInt64(48)), + Tuple2.of(new BsonInt64(48), new BsonInt64(64)), + Tuple2.of(new BsonInt64(64), new BsonInt64(80)), + Tuple2.of(new BsonInt64(80), new BsonInt64(96)), + Tuple2.of(new BsonInt64(96), new BsonInt64(112)), + Tuple2.of(new BsonInt64(112), BSON_MAX_KEY)))); + } + + @Test + void testEstimatedOffByOnePartitions() { + { + MongoSplitContext splitContext = + createSplitContext( + MemorySize.ofMebiBytes(TOTAL_RECORDS_COUNT - 1), + MemorySize.ofMebiBytes(1)); + assertThat(new ArrayList<>(MongoPaginationSplitter.split(splitContext))) + .isEqualTo( + createReferenceSplits( + Arrays.asList( + Tuple2.of( + BSON_MIN_KEY, + new BsonInt64(TOTAL_RECORDS_COUNT - 1)), + Tuple2.of( + new BsonInt64(TOTAL_RECORDS_COUNT - 1), + BSON_MAX_KEY)))); + } + + { + MongoSplitContext splitContext = + createSplitContext( + MemorySize.ofMebiBytes(TOTAL_RECORDS_COUNT), MemorySize.ofMebiBytes(1)); + assertThat(new ArrayList<>(MongoPaginationSplitter.split(splitContext))) + .isEqualTo(SINGLE_SPLIT); + } + } + + @Test + void testEstimateWithoutAvgObjSize() { + MongoSplitContext splitContext = + createSplitContext(MemorySize.ofMebiBytes(1), MemorySize.ZERO); + assertThat(new ArrayList<>(MongoPaginationSplitter.split(splitContext))) + .isEqualTo(SINGLE_SPLIT); + } + + private static List initializeRecords() { + return IntStream.range(0, MongoPaginationSplitterTest.TOTAL_RECORDS_COUNT) + .mapToObj( + idx -> + new BsonDocument("_id", new BsonInt64(idx)) + .append( + "str", + new BsonString(String.format("Record #%d", idx)))) + .collect(Collectors.toList()); + } + + private static MongoSplitContext createSplitContext( + MemorySize partitionSize, MemorySize avgObjSize) { + long avgObjSizeInBytes = avgObjSize.getBytes(); + return new MongoSplitContext( + MongoReadOptions.builder().setPartitionSize(partitionSize).build(), + mongoClient, + TEST_NS, + false, + TOTAL_RECORDS_COUNT, + (long) TOTAL_RECORDS_COUNT * avgObjSizeInBytes, + avgObjSizeInBytes); + } + + private static MongoSplitContext createSplitContext(int partitionRecordSize) { + return new MongoSplitContext( + MongoReadOptions.builder().setPartitionRecordSize(partitionRecordSize).build(), + mongoClient, + TEST_NS, + false, + MongoPaginationSplitterTest.TOTAL_RECORDS_COUNT, + 0, + 0); + } + + private static List createReferenceSplits( + List> ranges) { + + List results = new ArrayList<>(); + for (int i = 0; i < ranges.size(); i++) { + results.add( + new MongoScanSourceSplit( + TEST_NS.getFullName() + "_" + i, + TEST_NS.getDatabaseName(), + TEST_NS.getCollectionName(), + new BsonDocument(ID_FIELD, ranges.get(i).f0), + new BsonDocument(ID_FIELD, ranges.get(i).f1), + ID_HINT)); + } + return results; + } + + private static final List SINGLE_SPLIT = + Collections.singletonList( + new MongoScanSourceSplit( + TEST_NS.getFullName(), + TEST_NS.getDatabaseName(), + TEST_NS.getCollectionName(), + BSON_MIN_BOUNDARY, + BSON_MAX_BOUNDARY, + ID_HINT)); +} diff --git a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSampleSplitterTest.java b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSampleSplitterTest.java index ce4de541..55e0a3bf 100644 --- a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSampleSplitterTest.java +++ b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSampleSplitterTest.java @@ -29,8 +29,8 @@ import java.util.ArrayList; import java.util.List; -import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MAX_KEY; -import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MIN_KEY; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MAX_BOUNDARY; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MIN_BOUNDARY; import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_FIELD; import static org.assertj.core.api.Assertions.assertThat; @@ -38,8 +38,6 @@ class MongoSampleSplitterTest { private static final MongoNamespace TEST_NS = new MongoNamespace("test.test"); - private static final BsonDocument MIN = new BsonDocument(ID_FIELD, BSON_MIN_KEY); - private static final BsonDocument MAX = new BsonDocument(ID_FIELD, BSON_MAX_KEY); @Test void testSplitEmptyCollection() { @@ -124,7 +122,7 @@ void testSampleMerging() { assertThat(splits.get(0)) .satisfies( split -> { - assertThat(split.getMin()).isEqualTo(MIN); + assertThat(split.getMin()).isEqualTo(BSON_MIN_BOUNDARY); assertThat(split.getMax()).isEqualTo(samples.get(1)); }); assertThat(splits.get(1)) @@ -137,7 +135,7 @@ void testSampleMerging() { .satisfies( split -> { assertThat(split.getMin()).isEqualTo(samples.get(3)); - assertThat(split.getMax()).isEqualTo(MAX); + assertThat(split.getMax()).isEqualTo(BSON_MAX_BOUNDARY); }); } @@ -151,7 +149,7 @@ private static List createSamples(int samplesCount) { private static void assertSingleSplit(List splits) { assertThat(splits.size()).isEqualTo(1); - assertThat(splits.get(0).getMin()).isEqualTo(MIN); - assertThat(splits.get(0).getMax()).isEqualTo(MAX); + assertThat(splits.get(0).getMin()).isEqualTo(BSON_MIN_BOUNDARY); + assertThat(splits.get(0).getMax()).isEqualTo(BSON_MAX_BOUNDARY); } }