From 3e661359dbcc2ecbd84d0769b400a88434be2778 Mon Sep 17 00:00:00 2001 From: Nicholas Cole Date: Tue, 12 Mar 2024 12:44:26 -0500 Subject: [PATCH] Port ES Scripted Upsert to 11.1.0 from https://github.com/confluentinc/kafka-connect-elasticsearch/pull/759 Signed-off-by: Nicholas Cole --- pom.xml | 2 +- .../connect/elasticsearch/DataConverter.java | 35 ++++++++ .../ElasticsearchSinkConnectorConfig.java | 87 +++++++++++++++---- .../elasticsearch/util/ScriptParser.java | 61 +++++++++++++ .../validator/ScriptValidator.java | 74 ++++++++++++++++ .../elasticsearch/DataConverterTest.java | 56 ++++++++++++ .../connect/elasticsearch/ValidatorTest.java | 42 +++++++++ 7 files changed, 337 insertions(+), 20 deletions(-) create mode 100644 src/main/java/io/confluent/connect/elasticsearch/util/ScriptParser.java create mode 100644 src/main/java/io/confluent/connect/elasticsearch/validator/ScriptValidator.java diff --git a/pom.xml b/pom.xml index cc0cdc2e9..cee313006 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ io.confluent kafka-connect-elasticsearch - 11.1.0 + 11.1.0-LOCAL jar kafka-connect-elasticsearch diff --git a/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java b/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java index c2b8728f4..ca828aba6 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java +++ b/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnNullValues; +import io.confluent.connect.elasticsearch.util.ScriptParser; import org.apache.kafka.connect.data.ConnectSchema; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; @@ -40,6 +41,7 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; +import org.elasticsearch.script.Script; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -178,11 +180,44 @@ public DocWriteRequest convertRecord(SinkRecord record, String index) { new IndexRequest(index).id(id).source(payload, XContentType.JSON).opType(opType), record ); + case SCRIPTED_UPSERT: + try { + + if (config.getIsPayloadAsParams()) { + return buildUpdateRequestWithParams(index, payload, id); + } + + Script script = ScriptParser.parseScript(config.getScript()); + + return new UpdateRequest(index, id) + .doc(payload, XContentType.JSON) + .upsert(payload, XContentType.JSON) + .retryOnConflict(Math.min(config.maxInFlightRequests(), 5)) + .script(script) + .scriptedUpsert(true); + + } catch (JsonProcessingException jsonProcessingException) { + throw new RuntimeException(jsonProcessingException); + } default: return null; // shouldn't happen } } + private UpdateRequest buildUpdateRequestWithParams(String index, String payload, String id) + throws JsonProcessingException { + + Script script = ScriptParser.parseScriptWithParams(config.getScript(), payload); + + UpdateRequest updateRequest = + new UpdateRequest(index, id) + .retryOnConflict(Math.min(config.maxInFlightRequests(), 5)) + .script(script) + .scriptedUpsert(true); + + return updateRequest; + } + private String getPayload(SinkRecord record) { if (record.value() == null) { return null; diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java index f3d6a677f..f6e510d01 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java +++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java @@ -24,6 +24,8 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.concurrent.TimeUnit; + +import io.confluent.connect.elasticsearch.validator.ScriptValidator; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; @@ -256,6 +258,22 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig { private static final String WRITE_METHOD_DISPLAY = "Write Method"; private static final String WRITE_METHOD_DEFAULT = WriteMethod.INSERT.name(); + public static final String UPSERT_SCRIPT_CONFIG = "upsert.script"; + + private static final String UPSERT_SCRIPT_DOC = "Script used for" + + " upserting data to Elasticsearch. This script allows for" + + " customizable behavior upon upserting a document. Please refer to" + + " Elasticsearch scripted upsert documentation"; + + private static final String UPSERT_SCRIPT_DISPLAY = "Upsert Script"; + + public static final String PAYLOAD_AS_PARAMS_CONFIG = "payload.as.params"; + + private static final String PAYLOAD_AS_PARAMS_DOC = "Defines Payload to be injected" + + " into upsert.script script component as params object"; + + private static final String PAYLOAD_AS_PARAMS_DISPLAY = "Payload as Params"; + // Proxy group public static final String PROXY_HOST_CONFIG = "proxy.host"; private static final String PROXY_HOST_DISPLAY = "Proxy Host"; @@ -379,7 +397,8 @@ public enum SecurityProtocol { public enum WriteMethod { INSERT, - UPSERT + UPSERT, + SCRIPTED_UPSERT } protected static ConfigDef baseConfigDef() { @@ -573,8 +592,8 @@ private static void addConversionConfigs(ConfigDef configDef) { DATA_CONVERSION_GROUP, ++order, Width.SHORT, - IGNORE_KEY_DISPLAY - ).define( + IGNORE_KEY_DISPLAY) + .define( IGNORE_SCHEMA_CONFIG, Type.BOOLEAN, IGNORE_SCHEMA_DEFAULT, @@ -583,8 +602,8 @@ private static void addConversionConfigs(ConfigDef configDef) { DATA_CONVERSION_GROUP, ++order, Width.SHORT, - IGNORE_SCHEMA_DISPLAY - ).define( + IGNORE_SCHEMA_DISPLAY) + .define( COMPACT_MAP_ENTRIES_CONFIG, Type.BOOLEAN, COMPACT_MAP_ENTRIES_DEFAULT, @@ -593,8 +612,8 @@ private static void addConversionConfigs(ConfigDef configDef) { DATA_CONVERSION_GROUP, ++order, Width.SHORT, - COMPACT_MAP_ENTRIES_DISPLAY - ).define( + COMPACT_MAP_ENTRIES_DISPLAY) + .define( IGNORE_KEY_TOPICS_CONFIG, Type.LIST, IGNORE_KEY_TOPICS_DEFAULT, @@ -603,8 +622,8 @@ private static void addConversionConfigs(ConfigDef configDef) { DATA_CONVERSION_GROUP, ++order, Width.LONG, - IGNORE_KEY_TOPICS_DISPLAY - ).define( + IGNORE_KEY_TOPICS_DISPLAY) + .define( IGNORE_SCHEMA_TOPICS_CONFIG, Type.LIST, IGNORE_SCHEMA_TOPICS_DEFAULT, @@ -613,8 +632,8 @@ private static void addConversionConfigs(ConfigDef configDef) { DATA_CONVERSION_GROUP, ++order, Width.LONG, - IGNORE_SCHEMA_TOPICS_DISPLAY - ).define( + IGNORE_SCHEMA_TOPICS_DISPLAY) + .define( DROP_INVALID_MESSAGE_CONFIG, Type.BOOLEAN, DROP_INVALID_MESSAGE_DEFAULT, @@ -623,8 +642,8 @@ private static void addConversionConfigs(ConfigDef configDef) { DATA_CONVERSION_GROUP, ++order, Width.LONG, - DROP_INVALID_MESSAGE_DISPLAY - ).define( + DROP_INVALID_MESSAGE_DISPLAY) + .define( BEHAVIOR_ON_NULL_VALUES_CONFIG, Type.STRING, BEHAVIOR_ON_NULL_VALUES_DEFAULT.name(), @@ -635,8 +654,8 @@ private static void addConversionConfigs(ConfigDef configDef) { ++order, Width.SHORT, BEHAVIOR_ON_NULL_VALUES_DISPLAY, - new EnumRecommender<>(BehaviorOnNullValues.class) - ).define( + new EnumRecommender<>(BehaviorOnNullValues.class)) + .define( BEHAVIOR_ON_MALFORMED_DOCS_CONFIG, Type.STRING, BEHAVIOR_ON_MALFORMED_DOCS_DEFAULT.name(), @@ -647,8 +666,8 @@ private static void addConversionConfigs(ConfigDef configDef) { ++order, Width.SHORT, BEHAVIOR_ON_MALFORMED_DOCS_DISPLAY, - new EnumRecommender<>(BehaviorOnMalformedDoc.class) - ).define( + new EnumRecommender<>(BehaviorOnMalformedDoc.class)) + .define( WRITE_METHOD_CONFIG, Type.STRING, WRITE_METHOD_DEFAULT, @@ -659,8 +678,30 @@ private static void addConversionConfigs(ConfigDef configDef) { ++order, Width.SHORT, WRITE_METHOD_DISPLAY, - new EnumRecommender<>(WriteMethod.class) - ); + new EnumRecommender<>(WriteMethod.class)) + .define( + UPSERT_SCRIPT_CONFIG, + Type.STRING, + null, + new ScriptValidator(), + Importance.LOW, + UPSERT_SCRIPT_DOC, + DATA_CONVERSION_GROUP, + ++order, + Width.SHORT, + UPSERT_SCRIPT_DISPLAY, + new ScriptValidator()) + .define( + PAYLOAD_AS_PARAMS_CONFIG, + Type.BOOLEAN, + false, + Importance.LOW, + PAYLOAD_AS_PARAMS_DOC, + DATA_CONVERSION_GROUP, + ++order, + Width.SHORT, + PAYLOAD_AS_PARAMS_DISPLAY); + ; } private static void addProxyConfigs(ConfigDef configDef) { @@ -989,6 +1030,14 @@ public WriteMethod writeMethod() { return WriteMethod.valueOf(getString(WRITE_METHOD_CONFIG).toUpperCase()); } + public String getScript() { + return getString(UPSERT_SCRIPT_CONFIG); + } + + public Boolean getIsPayloadAsParams() { + return getBoolean(PAYLOAD_AS_PARAMS_CONFIG); + } + private static class DataStreamDatasetValidator implements Validator { @Override diff --git a/src/main/java/io/confluent/connect/elasticsearch/util/ScriptParser.java b/src/main/java/io/confluent/connect/elasticsearch/util/ScriptParser.java new file mode 100644 index 000000000..83ee94644 --- /dev/null +++ b/src/main/java/io/confluent/connect/elasticsearch/util/ScriptParser.java @@ -0,0 +1,61 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.elasticsearch.util; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.elasticsearch.script.Script; + +import java.util.Map; + +public class ScriptParser { + + private static final ObjectMapper objectMapper = new ObjectMapper(); + + public static Script parseScript(String scriptJson) throws JsonProcessingException { + + Map map = ScriptParser.parseSchemaStringAsJson(scriptJson); + + return Script.parse(map); + } + + private static Map parseSchemaStringAsJson(String scriptJson) + throws JsonProcessingException { + + ObjectMapper objectMapper = new ObjectMapper(); + + Map scriptConverted; + + scriptConverted = objectMapper.readValue( + scriptJson, new TypeReference>(){}); + + return scriptConverted; + } + + public static Script parseScriptWithParams(String scriptJson, String jsonPayload) + throws JsonProcessingException { + + Map map = ScriptParser.parseSchemaStringAsJson(scriptJson); + + Map fields = objectMapper.readValue(jsonPayload, + new TypeReference>() {}); + + map.put("params", fields); + + return Script.parse(map); + } +} \ No newline at end of file diff --git a/src/main/java/io/confluent/connect/elasticsearch/validator/ScriptValidator.java b/src/main/java/io/confluent/connect/elasticsearch/validator/ScriptValidator.java new file mode 100644 index 000000000..2670758ef --- /dev/null +++ b/src/main/java/io/confluent/connect/elasticsearch/validator/ScriptValidator.java @@ -0,0 +1,74 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.elasticsearch.validator; + +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WriteMethod.SCRIPTED_UPSERT; + +import com.fasterxml.jackson.core.JsonProcessingException; +import io.confluent.connect.elasticsearch.util.ScriptParser; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.elasticsearch.script.Script; + +public class ScriptValidator implements ConfigDef.Validator, ConfigDef.Recommender { + + @Override + @SuppressWarnings("unchecked") + public void ensureValid(String name, Object value) { + + if (value == null) { + return; + } + + String script = (String) value; + + try { + Script parsedScript = ScriptParser.parseScript(script); + + if (parsedScript.getIdOrCode() == null) { + throw new ConfigException(name, script, "The specified script is missing code"); + } else if (parsedScript.getLang() == null) { + throw new ConfigException(name, script, "The specified script is missing lang"); + } + + } catch (JsonProcessingException jsonProcessingException) { + throw new ConfigException( + name, script, "The specified script is not a valid Elasticsearch painless script"); + } + } + + @Override + public String toString() { + return "A valid script that is able to be parsed"; + } + + @Override + public List validValues(String name, Map parsedConfig) { + if (!parsedConfig.get(WRITE_METHOD_CONFIG).equals(SCRIPTED_UPSERT)) { + return new ArrayList<>(); + } + return null; + } + + @Override + public boolean visible(String name, Map parsedConfig) { + return parsedConfig.get(WRITE_METHOD_CONFIG).equals(SCRIPTED_UPSERT.name()); + } +} \ No newline at end of file diff --git a/src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java b/src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java index e18d3e179..6e1d0de2d 100644 --- a/src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java +++ b/src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.connect.sink.SinkRecord; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.index.VersionType; import org.junit.Before; import org.junit.Test; @@ -39,7 +40,9 @@ import static io.confluent.connect.elasticsearch.DataConverter.TIMESTAMP_FIELD; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class DataConverterTest { @@ -492,6 +495,59 @@ public void testDoNotAddExternalVersioningIfDataStream() { assertEquals(VersionType.INTERNAL, actualRecord.versionType()); } + @Test + public void upsertScript() { + props.put(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG, "true"); + props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "false"); + props.put(ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG, "false"); + props.put(ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG, ElasticsearchSinkConnectorConfig.WriteMethod.SCRIPTED_UPSERT.name()); + props.put( + ElasticsearchSinkConnectorConfig.UPSERT_SCRIPT_CONFIG, + "{\"lang\":\"painless\",\"source\":\"if ( ctx.op == 'create' ) ctx._source.counter = params.count} else {ctx._source.counter += params.count}\",\"params\":{\"count\":4}}"); + props.put(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, BehaviorOnNullValues.FAIL.name()); + converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props)); + Schema preProcessedSchema = converter.preProcessSchema(schema); + Struct struct = new Struct(preProcessedSchema).put("string", "myValue"); + SinkRecord sinkRecord = createSinkRecordWithValue(struct); + + UpdateRequest actualRecord = (UpdateRequest) converter.convertRecord(sinkRecord, index); + + assertNotNull(actualRecord.script()); + assertEquals("if ( ctx.op == 'create' ) ctx._source.counter = params.count} else {ctx._source.counter += params.count}", actualRecord.script().getIdOrCode()); + assertEquals("painless", actualRecord.script().getLang()); + assertEquals(4, actualRecord.script().getParams().getOrDefault("count", 0)); + assertTrue(actualRecord.scriptedUpsert()); + assertNotNull(actualRecord.doc()); + } + + @Test + public void upsertScriptWithParamPayload(){ + + props.put(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG, "true"); + props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "false"); + props.put(ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG, "false"); + props.put(ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG, ElasticsearchSinkConnectorConfig.WriteMethod.SCRIPTED_UPSERT.name()); + props.put(ElasticsearchSinkConnectorConfig.PAYLOAD_AS_PARAMS_CONFIG, "true"); + props.put( + ElasticsearchSinkConnectorConfig.UPSERT_SCRIPT_CONFIG, + "{\"lang\":\"painless\",\"source\":\"def paramAnswerList = params['answers']; def paramAnswerMap = new HashMap(); for (int i = 0; i < paramAnswerList.length; i++) { def answer = paramAnswerList[i]; paramAnswerMap[answer.questionId] = answer;} if (ctx._source.answers == null) { ctx._source.answers = [];} for (int i = 0; i < ctx._source.answers.length; i++) { if (paramAnswerMap.get(ctx._source.answers[i].questionId) != null) { ctx._source.answers[i].putAll(paramAnswerMap.get(ctx._source.answers[i].questionId)); } }\"}"); + props.put(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, BehaviorOnNullValues.FAIL.name()); + props.put(ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG, "true"); + converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props)); + + Schema preProcessedSchema = converter.preProcessSchema(schema); + Struct struct = new Struct(preProcessedSchema).put("string", "myValue"); + SinkRecord sinkRecord = createSinkRecordWithValue(struct); + + + UpdateRequest actualRecord = (UpdateRequest) converter.convertRecord(sinkRecord, index); + + Map recordParams = actualRecord.script().getParams(); + + assertEquals("myValue", recordParams.get("string")); + + } + private void configureDataStream() { props.put(ElasticsearchSinkConnectorConfig.DATA_STREAM_TYPE_CONFIG, "logs"); props.put(ElasticsearchSinkConnectorConfig.DATA_STREAM_DATASET_CONFIG, "dataset"); diff --git a/src/test/java/io/confluent/connect/elasticsearch/ValidatorTest.java b/src/test/java/io/confluent/connect/elasticsearch/ValidatorTest.java index 3dba9ec8a..8b80e077b 100644 --- a/src/test/java/io/confluent/connect/elasticsearch/ValidatorTest.java +++ b/src/test/java/io/confluent/connect/elasticsearch/ValidatorTest.java @@ -39,6 +39,7 @@ import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SECURITY_PROTOCOL_CONFIG; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SSL_CONFIG_PREFIX; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.KERBEROS_PRINCIPAL_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.UPSERT_SCRIPT_CONFIG; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -53,6 +54,9 @@ import java.nio.file.Path; import java.util.HashMap; import java.util.Map; +import java.util.stream.Collectors; + +import io.confluent.connect.elasticsearch.validator.ScriptValidator; import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.common.config.SslConfigs; @@ -148,6 +152,44 @@ public void testValidCredentials() { assertNoErrors(result); } + @Test + public void testValidScriptedUpsert() { + props.put(BEHAVIOR_ON_NULL_VALUES_CONFIG, "delete"); + props.put(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG, "true"); + props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "false"); + props.put(ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG, "false"); + props.put(ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG, ElasticsearchSinkConnectorConfig.WriteMethod.SCRIPTED_UPSERT.name()); + props.put( + UPSERT_SCRIPT_CONFIG, + "{\"lang\":\"painless\",\"source\":\"def paramAnswerList = params['answers']; def paramAnswerMap = new HashMap(); for (int i = 0; i < paramAnswerList.length; i++) { def answer = paramAnswerList[i]; paramAnswerMap[answer.questionId] = answer;} if (ctx._source.answers == null) { ctx._source.answers = [];} for (int i = 0; i < ctx._source.answers.length; i++) { if (paramAnswerMap.get(ctx._source.answers[i].questionId) != null) { ctx._source.answers[i].putAll(paramAnswerMap.get(ctx._source.answers[i].questionId)); } }\"}"); + validator = new Validator(props, () -> mockClient); + Config result = validator.validate(); + + ScriptValidator scriptValidator = new ScriptValidator(); + + scriptValidator.ensureValid("script", props.get(UPSERT_SCRIPT_CONFIG)); + + Map map = result.configValues().stream().collect(Collectors.toMap(x -> x.name(), x -> x)); + + assertNoErrors(result); + assertTrue(map.get(UPSERT_SCRIPT_CONFIG).visible()); + } + + @Test + public void testInvalidScriptedUpsert() { + props.put(BEHAVIOR_ON_NULL_VALUES_CONFIG, "delete"); + props.put(WRITE_METHOD_CONFIG, "upsert"); + props.put( + UPSERT_SCRIPT_CONFIG, + "{\"lang\":\"painless\",\"source\":\"if ( ctx.op == 'create' ) ctx._source.counter = params.count} else {ctx._source.counter += params.count}\",\"params\":{\"count\":4}}"); + validator = new Validator(props, () -> mockClient); + Config result = validator.validate(); + Map map = result.configValues().stream().collect(Collectors.toMap(x -> x.name(), x -> x)); + + assertNoErrors(result); + assertFalse(map.get(UPSERT_SCRIPT_CONFIG).visible()); + } + @Test public void testInvalidMissingOneDataStreamConfig() { props.put(DATA_STREAM_DATASET_CONFIG, "a_valid_dataset");