diff --git a/pom.xml b/pom.xml
index cc0cdc2e9..cee313006 100644
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
io.confluent
kafka-connect-elasticsearch
- 11.1.0
+ 11.1.0-LOCAL
jar
kafka-connect-elasticsearch
diff --git a/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java b/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java
index c2b8728f4..ca828aba6 100644
--- a/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java
+++ b/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java
@@ -20,6 +20,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnNullValues;
+import io.confluent.connect.elasticsearch.util.ScriptParser;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
@@ -40,6 +41,7 @@
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
+import org.elasticsearch.script.Script;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -178,11 +180,44 @@ public DocWriteRequest> convertRecord(SinkRecord record, String index) {
new IndexRequest(index).id(id).source(payload, XContentType.JSON).opType(opType),
record
);
+ case SCRIPTED_UPSERT:
+ try {
+
+ if (config.getIsPayloadAsParams()) {
+ return buildUpdateRequestWithParams(index, payload, id);
+ }
+
+ Script script = ScriptParser.parseScript(config.getScript());
+
+ return new UpdateRequest(index, id)
+ .doc(payload, XContentType.JSON)
+ .upsert(payload, XContentType.JSON)
+ .retryOnConflict(Math.min(config.maxInFlightRequests(), 5))
+ .script(script)
+ .scriptedUpsert(true);
+
+ } catch (JsonProcessingException jsonProcessingException) {
+ throw new RuntimeException(jsonProcessingException);
+ }
default:
return null; // shouldn't happen
}
}
+ private UpdateRequest buildUpdateRequestWithParams(String index, String payload, String id)
+ throws JsonProcessingException {
+
+ Script script = ScriptParser.parseScriptWithParams(config.getScript(), payload);
+
+ UpdateRequest updateRequest =
+ new UpdateRequest(index, id)
+ .retryOnConflict(Math.min(config.maxInFlightRequests(), 5))
+ .script(script)
+ .scriptedUpsert(true);
+
+ return updateRequest;
+ }
+
private String getPayload(SinkRecord record) {
if (record.value() == null) {
return null;
diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java
index f3d6a677f..f6e510d01 100644
--- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java
+++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java
@@ -24,6 +24,8 @@
import java.util.Set;
import java.util.stream.Collectors;
import java.util.concurrent.TimeUnit;
+
+import io.confluent.connect.elasticsearch.validator.ScriptValidator;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
@@ -256,6 +258,22 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
private static final String WRITE_METHOD_DISPLAY = "Write Method";
private static final String WRITE_METHOD_DEFAULT = WriteMethod.INSERT.name();
+ public static final String UPSERT_SCRIPT_CONFIG = "upsert.script";
+
+ private static final String UPSERT_SCRIPT_DOC = "Script used for"
+ + " upserting data to Elasticsearch. This script allows for"
+ + " customizable behavior upon upserting a document. Please refer to"
+ + " Elasticsearch scripted upsert documentation";
+
+ private static final String UPSERT_SCRIPT_DISPLAY = "Upsert Script";
+
+ public static final String PAYLOAD_AS_PARAMS_CONFIG = "payload.as.params";
+
+ private static final String PAYLOAD_AS_PARAMS_DOC = "Defines Payload to be injected"
+ + " into upsert.script script component as params object";
+
+ private static final String PAYLOAD_AS_PARAMS_DISPLAY = "Payload as Params";
+
// Proxy group
public static final String PROXY_HOST_CONFIG = "proxy.host";
private static final String PROXY_HOST_DISPLAY = "Proxy Host";
@@ -379,7 +397,8 @@ public enum SecurityProtocol {
public enum WriteMethod {
INSERT,
- UPSERT
+ UPSERT,
+ SCRIPTED_UPSERT
}
protected static ConfigDef baseConfigDef() {
@@ -573,8 +592,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
DATA_CONVERSION_GROUP,
++order,
Width.SHORT,
- IGNORE_KEY_DISPLAY
- ).define(
+ IGNORE_KEY_DISPLAY)
+ .define(
IGNORE_SCHEMA_CONFIG,
Type.BOOLEAN,
IGNORE_SCHEMA_DEFAULT,
@@ -583,8 +602,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
DATA_CONVERSION_GROUP,
++order,
Width.SHORT,
- IGNORE_SCHEMA_DISPLAY
- ).define(
+ IGNORE_SCHEMA_DISPLAY)
+ .define(
COMPACT_MAP_ENTRIES_CONFIG,
Type.BOOLEAN,
COMPACT_MAP_ENTRIES_DEFAULT,
@@ -593,8 +612,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
DATA_CONVERSION_GROUP,
++order,
Width.SHORT,
- COMPACT_MAP_ENTRIES_DISPLAY
- ).define(
+ COMPACT_MAP_ENTRIES_DISPLAY)
+ .define(
IGNORE_KEY_TOPICS_CONFIG,
Type.LIST,
IGNORE_KEY_TOPICS_DEFAULT,
@@ -603,8 +622,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
DATA_CONVERSION_GROUP,
++order,
Width.LONG,
- IGNORE_KEY_TOPICS_DISPLAY
- ).define(
+ IGNORE_KEY_TOPICS_DISPLAY)
+ .define(
IGNORE_SCHEMA_TOPICS_CONFIG,
Type.LIST,
IGNORE_SCHEMA_TOPICS_DEFAULT,
@@ -613,8 +632,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
DATA_CONVERSION_GROUP,
++order,
Width.LONG,
- IGNORE_SCHEMA_TOPICS_DISPLAY
- ).define(
+ IGNORE_SCHEMA_TOPICS_DISPLAY)
+ .define(
DROP_INVALID_MESSAGE_CONFIG,
Type.BOOLEAN,
DROP_INVALID_MESSAGE_DEFAULT,
@@ -623,8 +642,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
DATA_CONVERSION_GROUP,
++order,
Width.LONG,
- DROP_INVALID_MESSAGE_DISPLAY
- ).define(
+ DROP_INVALID_MESSAGE_DISPLAY)
+ .define(
BEHAVIOR_ON_NULL_VALUES_CONFIG,
Type.STRING,
BEHAVIOR_ON_NULL_VALUES_DEFAULT.name(),
@@ -635,8 +654,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
++order,
Width.SHORT,
BEHAVIOR_ON_NULL_VALUES_DISPLAY,
- new EnumRecommender<>(BehaviorOnNullValues.class)
- ).define(
+ new EnumRecommender<>(BehaviorOnNullValues.class))
+ .define(
BEHAVIOR_ON_MALFORMED_DOCS_CONFIG,
Type.STRING,
BEHAVIOR_ON_MALFORMED_DOCS_DEFAULT.name(),
@@ -647,8 +666,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
++order,
Width.SHORT,
BEHAVIOR_ON_MALFORMED_DOCS_DISPLAY,
- new EnumRecommender<>(BehaviorOnMalformedDoc.class)
- ).define(
+ new EnumRecommender<>(BehaviorOnMalformedDoc.class))
+ .define(
WRITE_METHOD_CONFIG,
Type.STRING,
WRITE_METHOD_DEFAULT,
@@ -659,8 +678,30 @@ private static void addConversionConfigs(ConfigDef configDef) {
++order,
Width.SHORT,
WRITE_METHOD_DISPLAY,
- new EnumRecommender<>(WriteMethod.class)
- );
+ new EnumRecommender<>(WriteMethod.class))
+ .define(
+ UPSERT_SCRIPT_CONFIG,
+ Type.STRING,
+ null,
+ new ScriptValidator(),
+ Importance.LOW,
+ UPSERT_SCRIPT_DOC,
+ DATA_CONVERSION_GROUP,
+ ++order,
+ Width.SHORT,
+ UPSERT_SCRIPT_DISPLAY,
+ new ScriptValidator())
+ .define(
+ PAYLOAD_AS_PARAMS_CONFIG,
+ Type.BOOLEAN,
+ false,
+ Importance.LOW,
+ PAYLOAD_AS_PARAMS_DOC,
+ DATA_CONVERSION_GROUP,
+ ++order,
+ Width.SHORT,
+ PAYLOAD_AS_PARAMS_DISPLAY);
+ ;
}
private static void addProxyConfigs(ConfigDef configDef) {
@@ -989,6 +1030,14 @@ public WriteMethod writeMethod() {
return WriteMethod.valueOf(getString(WRITE_METHOD_CONFIG).toUpperCase());
}
+ public String getScript() {
+ return getString(UPSERT_SCRIPT_CONFIG);
+ }
+
+ public Boolean getIsPayloadAsParams() {
+ return getBoolean(PAYLOAD_AS_PARAMS_CONFIG);
+ }
+
private static class DataStreamDatasetValidator implements Validator {
@Override
diff --git a/src/main/java/io/confluent/connect/elasticsearch/util/ScriptParser.java b/src/main/java/io/confluent/connect/elasticsearch/util/ScriptParser.java
new file mode 100644
index 000000000..83ee94644
--- /dev/null
+++ b/src/main/java/io/confluent/connect/elasticsearch/util/ScriptParser.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2018 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.connect.elasticsearch.util;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.elasticsearch.script.Script;
+
+import java.util.Map;
+
+public class ScriptParser {
+
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+
+ public static Script parseScript(String scriptJson) throws JsonProcessingException {
+
+ Map map = ScriptParser.parseSchemaStringAsJson(scriptJson);
+
+ return Script.parse(map);
+ }
+
+ private static Map parseSchemaStringAsJson(String scriptJson)
+ throws JsonProcessingException {
+
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ Map scriptConverted;
+
+ scriptConverted = objectMapper.readValue(
+ scriptJson, new TypeReference