diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java index ac0c2913..6c742002 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java @@ -46,6 +46,7 @@ import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.RETRY_ON_CONFLICTS; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.SOCKET_TIMEOUT; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION; import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM; @@ -130,6 +131,10 @@ public Optional getParallelism() { return config.getOptional(SINK_PARALLELISM); } + public int getRetryOnConflict() { + return config.get(RETRY_ON_CONFLICTS); + } + /** * Parse Hosts String to list. * diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java index 10ea0ae2..f34cb879 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java @@ -145,4 +145,11 @@ public class ElasticsearchConnectorOptions { .enumType(DeliveryGuarantee.class) .defaultValue(DeliveryGuarantee.AT_LEAST_ONCE) .withDescription("Optional delivery guarantee when committing."); + + public static final ConfigOption RETRY_ON_CONFLICTS = + ConfigOptions.key("sink.retry-on-conflicts") + .intType() + .defaultValue(-1) + .withDescription( + "The number of retry when conflicts with concurrent requests."); } diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java index 0fd389bd..d6d04a6d 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java @@ -62,6 +62,7 @@ class ElasticsearchDynamicSink implements DynamicTableSink { final ElasticsearchSinkBuilderSupplier builderSupplier; @Nullable final String documentType; final boolean isDynamicIndexWithSystemTime; + final int retryOnConflict; ElasticsearchDynamicSink( EncodingFormat> format, @@ -71,6 +72,7 @@ class ElasticsearchDynamicSink implements DynamicTableSink { String summaryString, ElasticsearchSinkBuilderSupplier builderSupplier, @Nullable String documentType, + int retryOnConflict, ZoneId localTimeZoneId) { this.format = checkNotNull(format); this.physicalRowDataType = checkNotNull(physicalRowDataType); @@ -79,6 +81,7 @@ class ElasticsearchDynamicSink implements DynamicTableSink { this.summaryString = checkNotNull(summaryString); this.builderSupplier = checkNotNull(builderSupplier); this.documentType = documentType; + this.retryOnConflict = retryOnConflict; this.localTimeZoneId = localTimeZoneId; this.isDynamicIndexWithSystemTime = isDynamicIndexWithSystemTime(); } @@ -127,6 +130,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { format, XContentType.JSON, documentType, + retryOnConflict, createKeyExtractor()); ElasticsearchSinkBuilderBase builder = @@ -187,6 +191,7 @@ public DynamicTableSink copy() { summaryString, builderSupplier, documentType, + retryOnConflict, localTimeZoneId); } diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java index 0395eafd..2686e2e6 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java @@ -128,6 +128,7 @@ public DynamicTableSink createDynamicTableSink(Context context) { capitalize(factoryIdentifier), sinkBuilderSupplier, getDocumentType(config), + config.getRetryOnConflict(), getLocalTimeZoneId(context.getConfiguration())); } diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/RowElasticsearchEmitter.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/RowElasticsearchEmitter.java index bddc6cb1..f91aa9bb 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/RowElasticsearchEmitter.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/RowElasticsearchEmitter.java @@ -50,17 +50,20 @@ class RowElasticsearchEmitter implements ElasticsearchEmitter { private final XContentType contentType; @Nullable private final String documentType; private final Function createKey; + private final int retryOnConflict; public RowElasticsearchEmitter( IndexGenerator indexGenerator, SerializationSchema serializationSchema, XContentType contentType, @Nullable String documentType, + int retryOnConflict, Function createKey) { this.indexGenerator = checkNotNull(indexGenerator); this.serializationSchema = checkNotNull(serializationSchema); this.contentType = checkNotNull(contentType); this.documentType = documentType; + this.retryOnConflict = retryOnConflict; this.createKey = checkNotNull(createKey); } @@ -110,6 +113,9 @@ private void processUpsert(RowData row, RequestIndexer indexer) { new UpdateRequest(indexGenerator.generate(row), documentType, key) .doc(document, contentType) .upsert(document, contentType); + if (retryOnConflict != -1) { + updateRequest.retryOnConflict(retryOnConflict); + } indexer.add(updateRequest); } else { final IndexRequest indexRequest =