Skip to content

Commit

Permalink
[FLINK-36075][Connectors/MongoDB] Add pagination partitioning strategy (
Browse files Browse the repository at this point in the history
  • Loading branch information
yuxiqian authored Dec 2, 2024
1 parent e0a04d0 commit fd177d7
Show file tree
Hide file tree
Showing 14 changed files with 470 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ public class MongoConstants {

public static final BsonDocument ID_HINT = new BsonDocument(ID_FIELD, new BsonInt32(1));

public static final BsonDocument BSON_MIN_BOUNDARY = new BsonDocument(ID_FIELD, BSON_MIN_KEY);
public static final BsonDocument BSON_MAX_BOUNDARY = new BsonDocument(ID_FIELD, BSON_MAX_KEY);

public static final JsonWriterSettings DEFAULT_JSON_WRITER_SETTINGS =
JsonWriterSettings.builder().outputMode(JsonMode.EXTENDED).build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.flink.connector.mongodb.source.enumerator.splitter.PartitionStrategy;
import org.apache.flink.connector.mongodb.source.reader.split.MongoScanSourceSplitReader;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.Objects;

Expand Down Expand Up @@ -49,17 +51,21 @@ public class MongoReadOptions implements Serializable {

private final int samplesPerPartition;

private final @Nullable Integer partitionRecordSize;

private MongoReadOptions(
int fetchSize,
boolean noCursorTimeout,
PartitionStrategy partitionStrategy,
MemorySize partitionSize,
int samplesPerPartition) {
int samplesPerPartition,
@Nullable Integer partitionRecordSize) {
this.fetchSize = fetchSize;
this.noCursorTimeout = noCursorTimeout;
this.partitionStrategy = partitionStrategy;
this.partitionSize = partitionSize;
this.samplesPerPartition = samplesPerPartition;
this.partitionRecordSize = partitionRecordSize;
}

public int getFetchSize() {
Expand All @@ -82,6 +88,10 @@ public int getSamplesPerPartition() {
return samplesPerPartition;
}

public @Nullable Integer getPartitionRecordSize() {
return partitionRecordSize;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down Expand Up @@ -114,6 +124,7 @@ public static class MongoReadOptionsBuilder {
private PartitionStrategy partitionStrategy = SCAN_PARTITION_STRATEGY.defaultValue();
private MemorySize partitionSize = SCAN_PARTITION_SIZE.defaultValue();
private int samplesPerPartition = SCAN_PARTITION_SAMPLES.defaultValue();
private @Nullable Integer partitionRecordSize = null;

private MongoReadOptionsBuilder() {}

Expand Down Expand Up @@ -199,6 +210,19 @@ public MongoReadOptionsBuilder setSamplesPerPartition(int samplesPerPartition) {
return this;
}

/**
* Sets the number of records in each partition. This will only take effect when the
* partition strategy is set to Pagination.
*
* @param partitionRecordSize number of records in each partition.
* @return this builder
*/
public MongoReadOptionsBuilder setPartitionRecordSize(
@Nullable Integer partitionRecordSize) {
this.partitionRecordSize = partitionRecordSize;
return this;
}

/**
* Build the {@link MongoReadOptions}.
*
Expand All @@ -210,7 +234,8 @@ public MongoReadOptions build() {
noCursorTimeout,
partitionStrategy,
partitionSize,
samplesPerPartition);
samplesPerPartition,
partitionRecordSize);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.mongodb.source.enumerator.splitter;

import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.mongodb.source.config.MongoReadOptions;
import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit;

import com.mongodb.MongoNamespace;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Projections;
import com.mongodb.client.model.Sorts;
import org.bson.BsonDocument;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MAX_BOUNDARY;
import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MIN_BOUNDARY;
import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_FIELD;
import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_HINT;

/** Mongo Splitter that splits MongoDB collection evenly by record counts. */
@Internal
public class MongoPaginationSplitter {

private static final Logger LOG = LoggerFactory.getLogger(MongoPaginationSplitter.class);

public static Collection<MongoScanSourceSplit> split(MongoSplitContext splitContext) {
MongoReadOptions readOptions = splitContext.getReadOptions();
MongoNamespace namespace = splitContext.getMongoNamespace();

// If partition record size isn't present, we'll use the partition size option and average
// object size to calculate number of records in each partitioned split.
Integer partitionRecordSize = readOptions.getPartitionRecordSize();
if (partitionRecordSize == null) {
long avgObjSizeInBytes = splitContext.getAvgObjSize();
if (avgObjSizeInBytes == 0) {
LOG.info(
"{} seems to be an empty collection, Returning a single partition.",
namespace);
return MongoSingleSplitter.split(splitContext);
}

partitionRecordSize =
Math.toIntExact(readOptions.getPartitionSize().getBytes() / avgObjSizeInBytes);
}

long totalNumOfDocuments = splitContext.getCount();

if (partitionRecordSize >= totalNumOfDocuments) {
LOG.info(
"Fewer documents ({}) than the number of documents per partition ({}), Returning a single partition.",
totalNumOfDocuments,
partitionRecordSize);
return MongoSingleSplitter.split(splitContext);
}

int numberOfPartitions =
(int) (Math.ceil(totalNumOfDocuments / (double) partitionRecordSize));

BsonDocument lastUpperBound = null;
List<MongoScanSourceSplit> paginatedSplits = new ArrayList<>();

for (int splitNum = 0; splitNum < numberOfPartitions; splitNum++) {
List<Bson> pipeline = new ArrayList<>();

pipeline.add(Aggregates.project(Projections.include(ID_FIELD)));
pipeline.add(Aggregates.project(Sorts.ascending(ID_FIELD)));

// We don't have to set the upper bounds limit if we're generating the first split.
if (lastUpperBound != null) {
BsonDocument matchFilter = new BsonDocument();
if (lastUpperBound.containsKey(ID_FIELD)) {
matchFilter.put(
ID_FIELD, new BsonDocument("$gte", lastUpperBound.get(ID_FIELD)));
}
pipeline.add(Aggregates.match(matchFilter));
}
pipeline.add(Aggregates.skip(partitionRecordSize));
pipeline.add(Aggregates.limit(1));

BsonDocument currentUpperBound =
splitContext
.getMongoCollection()
.aggregate(pipeline)
.allowDiskUse(true)
.first();

paginatedSplits.add(
new MongoScanSourceSplit(
String.format("%s_%d", namespace, splitNum),
namespace.getDatabaseName(),
namespace.getCollectionName(),
lastUpperBound != null ? lastUpperBound : BSON_MIN_BOUNDARY,
currentUpperBound != null ? currentUpperBound : BSON_MAX_BOUNDARY,
ID_HINT));

if (currentUpperBound == null) {
break;
}
lastUpperBound = currentUpperBound;
}

return paginatedSplits;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
import java.util.List;
import java.util.function.BiFunction;

import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MAX_KEY;
import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MIN_KEY;
import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MAX_BOUNDARY;
import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MIN_BOUNDARY;
import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_FIELD;
import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_HINT;

Expand Down Expand Up @@ -109,10 +109,10 @@ static Collection<MongoScanSourceSplit> split(
@VisibleForTesting
static List<MongoScanSourceSplit> createSplits(
List<BsonDocument> samples, int samplesPerPartition, MongoNamespace namespace) {
samples.add(new BsonDocument(ID_FIELD, BSON_MAX_KEY));
samples.add(BSON_MAX_BOUNDARY);

List<MongoScanSourceSplit> sourceSplits = new ArrayList<>();
BsonDocument partitionStart = new BsonDocument(ID_FIELD, BSON_MIN_KEY);
BsonDocument partitionStart = BSON_MIN_BOUNDARY;
int splitNum = 0;
for (int i = samplesPerPartition - 1; i < samples.size(); i += samplesPerPartition) {
sourceSplits.add(createSplit(namespace, splitNum++, partitionStart, samples.get(i)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit;

import org.bson.BsonDocument;

import java.util.Collection;

import static java.util.Collections.singletonList;
import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MAX_KEY;
import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MIN_KEY;
import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_FIELD;
import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MAX_BOUNDARY;
import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MIN_BOUNDARY;
import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_HINT;

/** Mongo Splitter that splits MongoDB collection as a single split. */
Expand All @@ -42,8 +39,8 @@ public static Collection<MongoScanSourceSplit> split(MongoSplitContext splitCont
splitContext.getMongoNamespace().getFullName(),
splitContext.getDatabaseName(),
splitContext.getCollectionName(),
new BsonDocument(ID_FIELD, BSON_MIN_KEY),
new BsonDocument(ID_FIELD, BSON_MAX_KEY),
BSON_MIN_BOUNDARY,
BSON_MAX_BOUNDARY,
ID_HINT);

return singletonList(singleSplit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import java.util.Collection;
import java.util.List;

import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MAX_KEY;
import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MAX_BOUNDARY;
import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MIN_KEY;
import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_FIELD;
import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_HINT;
Expand Down Expand Up @@ -90,7 +90,7 @@ public static Collection<MongoScanSourceSplit> split(MongoSplitContext splitCont
}

// Complete right bound: (lastKey, maxKey)
splitKeys.add(new BsonDocument(ID_FIELD, BSON_MAX_KEY));
splitKeys.add(BSON_MAX_BOUNDARY);

List<MongoScanSourceSplit> sourceSplits = new ArrayList<>(splitKeys.size());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public static Collection<MongoScanSourceSplit> split(
return MongoSplitVectorSplitter.split(splitContext);
case SHARDED:
return MongoShardedSplitter.split(splitContext);
case PAGINATION:
return MongoPaginationSplitter.split(splitContext);
case DEFAULT:
default:
return splitContext.isSharded()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public enum PartitionStrategy implements DescribedEnum {
text(
"Read the chunk ranges from config.chunks collection and splits to multiple chunks. Only support sharded collections.")),

PAGINATION(
"pagination",
text(
"Creating chunk records evenly by count. Each chunk will have exactly the same number of records.")),

DEFAULT(
"default",
text(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,13 @@ private MongoConnectorOptions() {}
.enumType(PartitionStrategy.class)
.defaultValue(PartitionStrategy.DEFAULT)
.withDescription(
"Specifies the partition strategy. Available strategies are single, sample, split-vector, sharded and default."
+ "The single partition strategy treats the entire collection as a single partition."
+ "The sample partition strategy samples the collection and generate partitions which is fast but possibly uneven."
+ "The split-vector partition strategy uses the splitVector command to generate partitions for non-sharded collections which is fast and even. The splitVector permission is required."
+ "The sharded partition strategy reads config.chunks (MongoDB splits a sharded collection into chunks, and the range of the chunks are stored within the collection) as the partitions directly."
+ "The sharded partition strategy is only used for sharded collection which is fast and even. Read permission of config database is required."
"Specifies the partition strategy. Available strategies are single, sample, split-vector, sharded, pagination and default. "
+ "The single partition strategy treats the entire collection as a single partition. "
+ "The sample partition strategy samples the collection and generate partitions which is fast but possibly uneven. "
+ "The split-vector partition strategy uses the splitVector command to generate partitions for non-sharded collections which is fast and even. The splitVector permission is required. "
+ "The sharded partition strategy reads config.chunks (MongoDB splits a sharded collection into chunks, and the range of the chunks are stored within the collection) as the partitions directly. "
+ "The sharded partition strategy is only used for sharded collection which is fast and even. Read permission of config database is required. "
+ "The pagination partition strategy splits records evenly. Each chunk will have exactly the same number of records. This could be configured by `scan.partition.record-size` option. "
+ "The default partition strategy uses sharded strategy for sharded collections otherwise using split vector strategy.");

public static final ConfigOption<MemorySize> SCAN_PARTITION_SIZE =
Expand All @@ -99,6 +100,14 @@ private MongoConnectorOptions() {}
+ "Then uses every 'scan.partition.samples' as the value to use to calculate the partition boundaries."
+ "The total number of samples taken is calculated as: samples per partition * (count of documents / number of documents per partition.");

public static final ConfigOption<Integer> SCAN_PARTITION_RECORD_SIZE =
ConfigOptions.key("scan.partition.record-size")
.intType()
.noDefaultValue()
.withDescription(
"Specifies the number of records in each chunk. Only takes effect when `scan.partition.strategy` is `pagination`. "
+ "This option will be automatically inferred from `scan.partition.size` if absent.");

public static final ConfigOption<Duration> LOOKUP_RETRY_INTERVAL =
ConfigOptions.key("lookup.retry.interval")
.durationType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ private static MongoReadOptions getReadOptions(MongoConfiguration configuration)
.setNoCursorTimeout(configuration.isNoCursorTimeout())
.setPartitionStrategy(configuration.getPartitionStrategy())
.setPartitionSize(configuration.getPartitionSize())
.setPartitionRecordSize(configuration.getPartitionRecordSize())
.setSamplesPerPartition(configuration.getSamplesPerPartition())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.LOOKUP_RETRY_INTERVAL;
import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_CURSOR_NO_TIMEOUT;
import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_FETCH_SIZE;
import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_PARTITION_RECORD_SIZE;
import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_PARTITION_SAMPLES;
import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_PARTITION_SIZE;
import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_PARTITION_STRATEGY;
Expand Down Expand Up @@ -90,6 +91,10 @@ public int getSamplesPerPartition() {
return config.get(SCAN_PARTITION_SAMPLES);
}

public Integer getPartitionRecordSize() {
return config.get(SCAN_PARTITION_RECORD_SIZE);
}

// -----------------------------------Lookup Config----------------------------------------
public int getLookupMaxRetries() {
return config.get(LookupOptions.MAX_RETRIES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.flink.connector.mongodb.source.enumerator;

import org.apache.flink.connector.mongodb.common.utils.MongoConstants;
import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit;

import org.bson.BsonDocument;
Expand All @@ -29,6 +28,7 @@
import java.util.List;
import java.util.Map;

import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MAX_BOUNDARY;
import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_HINT;
import static org.apache.flink.connector.mongodb.source.enumerator.MongoSourceEnumStateSerializer.INSTANCE;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -75,7 +75,7 @@ private static MongoScanSourceSplit createSourceSplit(int index) {
"db",
"coll",
new BsonDocument("_id", new BsonInt32(index)),
new BsonDocument("_id", MongoConstants.BSON_MAX_KEY),
BSON_MAX_BOUNDARY,
ID_HINT);
}
}
Loading

0 comments on commit fd177d7

Please sign in to comment.