Skip to content

Commit

Permalink
Port ES Scripted Upsert to 11.1.0 from confluentinc#759
Browse files Browse the repository at this point in the history
Signed-off-by: Nicholas Cole <nicholas.cole@walmart.com>
  • Loading branch information
NicholasDCole committed Mar 12, 2024
1 parent cc26466 commit 3e66135
Show file tree
Hide file tree
Showing 7 changed files with 337 additions and 20 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

<groupId>io.confluent</groupId>
<artifactId>kafka-connect-elasticsearch</artifactId>
<version>11.1.0</version>
<version>11.1.0-LOCAL</version>
<packaging>jar</packaging>
<name>kafka-connect-elasticsearch</name>
<organization>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnNullValues;
import io.confluent.connect.elasticsearch.util.ScriptParser;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
Expand All @@ -40,6 +41,7 @@
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.script.Script;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -178,11 +180,44 @@ public DocWriteRequest<?> convertRecord(SinkRecord record, String index) {
new IndexRequest(index).id(id).source(payload, XContentType.JSON).opType(opType),
record
);
case SCRIPTED_UPSERT:
try {

if (config.getIsPayloadAsParams()) {
return buildUpdateRequestWithParams(index, payload, id);
}

Script script = ScriptParser.parseScript(config.getScript());

return new UpdateRequest(index, id)
.doc(payload, XContentType.JSON)
.upsert(payload, XContentType.JSON)
.retryOnConflict(Math.min(config.maxInFlightRequests(), 5))
.script(script)
.scriptedUpsert(true);

} catch (JsonProcessingException jsonProcessingException) {
throw new RuntimeException(jsonProcessingException);
}
default:
return null; // shouldn't happen
}
}

private UpdateRequest buildUpdateRequestWithParams(String index, String payload, String id)
throws JsonProcessingException {

Script script = ScriptParser.parseScriptWithParams(config.getScript(), payload);

UpdateRequest updateRequest =
new UpdateRequest(index, id)
.retryOnConflict(Math.min(config.maxInFlightRequests(), 5))
.script(script)
.scriptedUpsert(true);

return updateRequest;
}

private String getPayload(SinkRecord record) {
if (record.value() == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Set;
import java.util.stream.Collectors;
import java.util.concurrent.TimeUnit;

import io.confluent.connect.elasticsearch.validator.ScriptValidator;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
Expand Down Expand Up @@ -256,6 +258,22 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
private static final String WRITE_METHOD_DISPLAY = "Write Method";
private static final String WRITE_METHOD_DEFAULT = WriteMethod.INSERT.name();

public static final String UPSERT_SCRIPT_CONFIG = "upsert.script";

private static final String UPSERT_SCRIPT_DOC = "Script used for"
+ " upserting data to Elasticsearch. This script allows for"
+ " customizable behavior upon upserting a document. Please refer to"
+ " Elasticsearch scripted upsert documentation";

private static final String UPSERT_SCRIPT_DISPLAY = "Upsert Script";

public static final String PAYLOAD_AS_PARAMS_CONFIG = "payload.as.params";

private static final String PAYLOAD_AS_PARAMS_DOC = "Defines Payload to be injected"
+ " into upsert.script script component as params object";

private static final String PAYLOAD_AS_PARAMS_DISPLAY = "Payload as Params";

// Proxy group
public static final String PROXY_HOST_CONFIG = "proxy.host";
private static final String PROXY_HOST_DISPLAY = "Proxy Host";
Expand Down Expand Up @@ -379,7 +397,8 @@ public enum SecurityProtocol {

public enum WriteMethod {
INSERT,
UPSERT
UPSERT,
SCRIPTED_UPSERT
}

protected static ConfigDef baseConfigDef() {
Expand Down Expand Up @@ -573,8 +592,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
DATA_CONVERSION_GROUP,
++order,
Width.SHORT,
IGNORE_KEY_DISPLAY
).define(
IGNORE_KEY_DISPLAY)
.define(
IGNORE_SCHEMA_CONFIG,
Type.BOOLEAN,
IGNORE_SCHEMA_DEFAULT,
Expand All @@ -583,8 +602,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
DATA_CONVERSION_GROUP,
++order,
Width.SHORT,
IGNORE_SCHEMA_DISPLAY
).define(
IGNORE_SCHEMA_DISPLAY)
.define(
COMPACT_MAP_ENTRIES_CONFIG,
Type.BOOLEAN,
COMPACT_MAP_ENTRIES_DEFAULT,
Expand All @@ -593,8 +612,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
DATA_CONVERSION_GROUP,
++order,
Width.SHORT,
COMPACT_MAP_ENTRIES_DISPLAY
).define(
COMPACT_MAP_ENTRIES_DISPLAY)
.define(
IGNORE_KEY_TOPICS_CONFIG,
Type.LIST,
IGNORE_KEY_TOPICS_DEFAULT,
Expand All @@ -603,8 +622,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
DATA_CONVERSION_GROUP,
++order,
Width.LONG,
IGNORE_KEY_TOPICS_DISPLAY
).define(
IGNORE_KEY_TOPICS_DISPLAY)
.define(
IGNORE_SCHEMA_TOPICS_CONFIG,
Type.LIST,
IGNORE_SCHEMA_TOPICS_DEFAULT,
Expand All @@ -613,8 +632,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
DATA_CONVERSION_GROUP,
++order,
Width.LONG,
IGNORE_SCHEMA_TOPICS_DISPLAY
).define(
IGNORE_SCHEMA_TOPICS_DISPLAY)
.define(
DROP_INVALID_MESSAGE_CONFIG,
Type.BOOLEAN,
DROP_INVALID_MESSAGE_DEFAULT,
Expand All @@ -623,8 +642,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
DATA_CONVERSION_GROUP,
++order,
Width.LONG,
DROP_INVALID_MESSAGE_DISPLAY
).define(
DROP_INVALID_MESSAGE_DISPLAY)
.define(
BEHAVIOR_ON_NULL_VALUES_CONFIG,
Type.STRING,
BEHAVIOR_ON_NULL_VALUES_DEFAULT.name(),
Expand All @@ -635,8 +654,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
++order,
Width.SHORT,
BEHAVIOR_ON_NULL_VALUES_DISPLAY,
new EnumRecommender<>(BehaviorOnNullValues.class)
).define(
new EnumRecommender<>(BehaviorOnNullValues.class))
.define(
BEHAVIOR_ON_MALFORMED_DOCS_CONFIG,
Type.STRING,
BEHAVIOR_ON_MALFORMED_DOCS_DEFAULT.name(),
Expand All @@ -647,8 +666,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
++order,
Width.SHORT,
BEHAVIOR_ON_MALFORMED_DOCS_DISPLAY,
new EnumRecommender<>(BehaviorOnMalformedDoc.class)
).define(
new EnumRecommender<>(BehaviorOnMalformedDoc.class))
.define(
WRITE_METHOD_CONFIG,
Type.STRING,
WRITE_METHOD_DEFAULT,
Expand All @@ -659,8 +678,30 @@ private static void addConversionConfigs(ConfigDef configDef) {
++order,
Width.SHORT,
WRITE_METHOD_DISPLAY,
new EnumRecommender<>(WriteMethod.class)
);
new EnumRecommender<>(WriteMethod.class))
.define(
UPSERT_SCRIPT_CONFIG,
Type.STRING,
null,
new ScriptValidator(),
Importance.LOW,
UPSERT_SCRIPT_DOC,
DATA_CONVERSION_GROUP,
++order,
Width.SHORT,
UPSERT_SCRIPT_DISPLAY,
new ScriptValidator())
.define(
PAYLOAD_AS_PARAMS_CONFIG,
Type.BOOLEAN,
false,
Importance.LOW,
PAYLOAD_AS_PARAMS_DOC,
DATA_CONVERSION_GROUP,
++order,
Width.SHORT,
PAYLOAD_AS_PARAMS_DISPLAY);
;
}

private static void addProxyConfigs(ConfigDef configDef) {
Expand Down Expand Up @@ -989,6 +1030,14 @@ public WriteMethod writeMethod() {
return WriteMethod.valueOf(getString(WRITE_METHOD_CONFIG).toUpperCase());
}

public String getScript() {
return getString(UPSERT_SCRIPT_CONFIG);
}

public Boolean getIsPayloadAsParams() {
return getBoolean(PAYLOAD_AS_PARAMS_CONFIG);
}

private static class DataStreamDatasetValidator implements Validator {

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.connect.elasticsearch.util;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.elasticsearch.script.Script;

import java.util.Map;

public class ScriptParser {

private static final ObjectMapper objectMapper = new ObjectMapper();

public static Script parseScript(String scriptJson) throws JsonProcessingException {

Map<String, Object> map = ScriptParser.parseSchemaStringAsJson(scriptJson);

return Script.parse(map);
}

private static Map<String, Object> parseSchemaStringAsJson(String scriptJson)
throws JsonProcessingException {

ObjectMapper objectMapper = new ObjectMapper();

Map<String, Object> scriptConverted;

scriptConverted = objectMapper.readValue(
scriptJson, new TypeReference<Map<String, Object>>(){});

return scriptConverted;
}

public static Script parseScriptWithParams(String scriptJson, String jsonPayload)
throws JsonProcessingException {

Map<String, Object> map = ScriptParser.parseSchemaStringAsJson(scriptJson);

Map<String, Object> fields = objectMapper.readValue(jsonPayload,
new TypeReference<Map<String, Object>>() {});

map.put("params", fields);

return Script.parse(map);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.connect.elasticsearch.validator;

import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WriteMethod.SCRIPTED_UPSERT;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.confluent.connect.elasticsearch.util.ScriptParser;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.elasticsearch.script.Script;

public class ScriptValidator implements ConfigDef.Validator, ConfigDef.Recommender {

@Override
@SuppressWarnings("unchecked")
public void ensureValid(String name, Object value) {

if (value == null) {
return;
}

String script = (String) value;

try {
Script parsedScript = ScriptParser.parseScript(script);

if (parsedScript.getIdOrCode() == null) {
throw new ConfigException(name, script, "The specified script is missing code");
} else if (parsedScript.getLang() == null) {
throw new ConfigException(name, script, "The specified script is missing lang");
}

} catch (JsonProcessingException jsonProcessingException) {
throw new ConfigException(
name, script, "The specified script is not a valid Elasticsearch painless script");
}
}

@Override
public String toString() {
return "A valid script that is able to be parsed";
}

@Override
public List<Object> validValues(String name, Map<String, Object> parsedConfig) {
if (!parsedConfig.get(WRITE_METHOD_CONFIG).equals(SCRIPTED_UPSERT)) {
return new ArrayList<>();
}
return null;
}

@Override
public boolean visible(String name, Map<String, Object> parsedConfig) {
return parsedConfig.get(WRITE_METHOD_CONFIG).equals(SCRIPTED_UPSERT.name());
}
}
Loading

0 comments on commit 3e66135

Please sign in to comment.