Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION;
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION;
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION;
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.RETRY_ON_CONFLICTS;
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.SOCKET_TIMEOUT;
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION;
import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
Expand Down Expand Up @@ -130,6 +131,10 @@ public Optional<Integer> getParallelism() {
return config.getOptional(SINK_PARALLELISM);
}

public int getRetryOnConflict() {
return config.get(RETRY_ON_CONFLICTS);
}

/**
* Parse Hosts String to list.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,11 @@ public class ElasticsearchConnectorOptions {
.enumType(DeliveryGuarantee.class)
.defaultValue(DeliveryGuarantee.AT_LEAST_ONCE)
.withDescription("Optional delivery guarantee when committing.");

public static final ConfigOption<Integer> RETRY_ON_CONFLICTS =
ConfigOptions.key("sink.retry-on-conflicts")
.intType()
.defaultValue(-1)
.withDescription(
"The number of retry when conflicts with concurrent requests.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class ElasticsearchDynamicSink implements DynamicTableSink {
final ElasticsearchSinkBuilderSupplier<RowData> builderSupplier;
@Nullable final String documentType;
final boolean isDynamicIndexWithSystemTime;
final int retryOnConflict;

ElasticsearchDynamicSink(
EncodingFormat<SerializationSchema<RowData>> format,
Expand All @@ -71,6 +72,7 @@ class ElasticsearchDynamicSink implements DynamicTableSink {
String summaryString,
ElasticsearchSinkBuilderSupplier<RowData> builderSupplier,
@Nullable String documentType,
int retryOnConflict,
ZoneId localTimeZoneId) {
this.format = checkNotNull(format);
this.physicalRowDataType = checkNotNull(physicalRowDataType);
Expand All @@ -79,6 +81,7 @@ class ElasticsearchDynamicSink implements DynamicTableSink {
this.summaryString = checkNotNull(summaryString);
this.builderSupplier = checkNotNull(builderSupplier);
this.documentType = documentType;
this.retryOnConflict = retryOnConflict;
this.localTimeZoneId = localTimeZoneId;
this.isDynamicIndexWithSystemTime = isDynamicIndexWithSystemTime();
}
Expand Down Expand Up @@ -127,6 +130,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
format,
XContentType.JSON,
documentType,
retryOnConflict,
createKeyExtractor());

ElasticsearchSinkBuilderBase<RowData, ? extends ElasticsearchSinkBuilderBase> builder =
Expand Down Expand Up @@ -187,6 +191,7 @@ public DynamicTableSink copy() {
summaryString,
builderSupplier,
documentType,
retryOnConflict,
localTimeZoneId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ public DynamicTableSink createDynamicTableSink(Context context) {
capitalize(factoryIdentifier),
sinkBuilderSupplier,
getDocumentType(config),
config.getRetryOnConflict(),
getLocalTimeZoneId(context.getConfiguration()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,20 @@ class RowElasticsearchEmitter implements ElasticsearchEmitter<RowData> {
private final XContentType contentType;
@Nullable private final String documentType;
private final Function<RowData, String> createKey;
private final int retryOnConflict;

public RowElasticsearchEmitter(
IndexGenerator indexGenerator,
SerializationSchema<RowData> serializationSchema,
XContentType contentType,
@Nullable String documentType,
int retryOnConflict,
Function<RowData, String> createKey) {
this.indexGenerator = checkNotNull(indexGenerator);
this.serializationSchema = checkNotNull(serializationSchema);
this.contentType = checkNotNull(contentType);
this.documentType = documentType;
this.retryOnConflict = retryOnConflict;
this.createKey = checkNotNull(createKey);
}

Expand Down Expand Up @@ -110,6 +113,9 @@ private void processUpsert(RowData row, RequestIndexer indexer) {
new UpdateRequest(indexGenerator.generate(row), documentType, key)
.doc(document, contentType)
.upsert(document, contentType);
if (retryOnConflict != -1) {
updateRequest.retryOnConflict(retryOnConflict);
}
indexer.add(updateRequest);
} else {
final IndexRequest indexRequest =
Expand Down