Skip to content

Commit

Permalink
Porting from confluentinc#759
Browse files Browse the repository at this point in the history
  • Loading branch information
NicholasDCole committed Mar 11, 2024
1 parent 5d5138a commit 50f145b
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnNullValues;
import io.confluent.connect.elasticsearch.util.ScriptParser;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
Expand All @@ -44,6 +45,7 @@
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.script.Script;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -186,6 +188,21 @@ public DocWriteRequest<?> convertRecord(SinkRecord record, String index) {
new IndexRequest(index).id(id).source(payload, XContentType.JSON).opType(opType),
record
);
case SCRIPTED_UPSERT:
Script script = null;

try {
script = ScriptParser.parseScript(config.getScript());
} catch (JsonProcessingException jsonProcessingException) {
throw new RuntimeException(jsonProcessingException);
}

return new UpdateRequest(index, id)
.doc(payload, XContentType.JSON)
.upsert(payload, XContentType.JSON)
.retryOnConflict(Math.min(config.maxInFlightRequests(), 5))
.script(script)
.scriptedUpsert(true);
default:
return null; // shouldn't happen
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Set;
import java.util.stream.Collectors;
import java.util.concurrent.TimeUnit;

import io.confluent.connect.elasticsearch.validator.ScriptValidator;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
Expand Down Expand Up @@ -277,6 +279,16 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
);
private static final String WRITE_METHOD_DISPLAY = "Write Method";
private static final String WRITE_METHOD_DEFAULT = WriteMethod.INSERT.name();

public static final String UPSERT_SCRIPT_CONFIG = "upsert.script";

private static final String UPSERT_SCRIPT_DOC = "Script used for"
+ " upserting data to Elasticsearch. This script allows for"
+ " customizable behavior upon upserting a document. Please refer to"
+ " Elasticsearch scripted upsert documentation";

private static final String UPSERT_SCRIPT_DISPLAY = "Upsert Script";
private static final String UPSERT_SCRIPT_DEFAULT = "";
public static final String LOG_SENSITIVE_DATA_CONFIG = "log.sensitive.data";
private static final String LOG_SENSITIVE_DATA_DISPLAY = "Log Sensitive data";
private static final String LOG_SENSITIVE_DATA_DOC = "If true, logs sensitive data "
Expand Down Expand Up @@ -408,7 +420,8 @@ public enum SecurityProtocol {

public enum WriteMethod {
INSERT,
UPSERT
UPSERT,
SCRIPTED_UPSERT
}

protected static ConfigDef baseConfigDef() {
Expand Down Expand Up @@ -622,8 +635,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
DATA_CONVERSION_GROUP,
++order,
Width.SHORT,
IGNORE_KEY_DISPLAY
).define(
IGNORE_KEY_DISPLAY)
.define(
IGNORE_SCHEMA_CONFIG,
Type.BOOLEAN,
IGNORE_SCHEMA_DEFAULT,
Expand All @@ -632,8 +645,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
DATA_CONVERSION_GROUP,
++order,
Width.SHORT,
IGNORE_SCHEMA_DISPLAY
).define(
IGNORE_SCHEMA_DISPLAY)
.define(
COMPACT_MAP_ENTRIES_CONFIG,
Type.BOOLEAN,
COMPACT_MAP_ENTRIES_DEFAULT,
Expand All @@ -642,8 +655,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
DATA_CONVERSION_GROUP,
++order,
Width.SHORT,
COMPACT_MAP_ENTRIES_DISPLAY
).define(
COMPACT_MAP_ENTRIES_DISPLAY)
.define(
IGNORE_KEY_TOPICS_CONFIG,
Type.LIST,
IGNORE_KEY_TOPICS_DEFAULT,
Expand All @@ -652,8 +665,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
DATA_CONVERSION_GROUP,
++order,
Width.LONG,
IGNORE_KEY_TOPICS_DISPLAY
).define(
IGNORE_KEY_TOPICS_DISPLAY)
.define(
IGNORE_SCHEMA_TOPICS_CONFIG,
Type.LIST,
IGNORE_SCHEMA_TOPICS_DEFAULT,
Expand All @@ -662,8 +675,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
DATA_CONVERSION_GROUP,
++order,
Width.LONG,
IGNORE_SCHEMA_TOPICS_DISPLAY
).define(
IGNORE_SCHEMA_TOPICS_DISPLAY)
.define(
DROP_INVALID_MESSAGE_CONFIG,
Type.BOOLEAN,
DROP_INVALID_MESSAGE_DEFAULT,
Expand All @@ -672,8 +685,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
DATA_CONVERSION_GROUP,
++order,
Width.LONG,
DROP_INVALID_MESSAGE_DISPLAY
).define(
DROP_INVALID_MESSAGE_DISPLAY)
.define(
BEHAVIOR_ON_NULL_VALUES_CONFIG,
Type.STRING,
BEHAVIOR_ON_NULL_VALUES_DEFAULT.name(),
Expand All @@ -684,8 +697,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
++order,
Width.SHORT,
BEHAVIOR_ON_NULL_VALUES_DISPLAY,
new EnumRecommender<>(BehaviorOnNullValues.class)
).define(
new EnumRecommender<>(BehaviorOnNullValues.class))
.define(
BEHAVIOR_ON_MALFORMED_DOCS_CONFIG,
Type.STRING,
BEHAVIOR_ON_MALFORMED_DOCS_DEFAULT.name(),
Expand All @@ -696,8 +709,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
++order,
Width.SHORT,
BEHAVIOR_ON_MALFORMED_DOCS_DISPLAY,
new EnumRecommender<>(BehaviorOnMalformedDoc.class)
).define(
new EnumRecommender<>(BehaviorOnMalformedDoc.class))
.define(
EXTERNAL_VERSION_HEADER_CONFIG,
Type.STRING,
EXTERNAL_VERSION_HEADER_DEFAULT,
Expand All @@ -706,8 +719,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
DATA_CONVERSION_GROUP,
++order,
Width.SHORT,
EXTERNAL_VERSION_HEADER_DISPLAY
).define(
EXTERNAL_VERSION_HEADER_DISPLAY)
.define(
WRITE_METHOD_CONFIG,
Type.STRING,
WRITE_METHOD_DEFAULT,
Expand All @@ -718,8 +731,20 @@ private static void addConversionConfigs(ConfigDef configDef) {
++order,
Width.SHORT,
WRITE_METHOD_DISPLAY,
new EnumRecommender<>(WriteMethod.class)
);
new EnumRecommender<>(WriteMethod.class))
.define(
UPSERT_SCRIPT_CONFIG,
Type.STRING,
null,
new ScriptValidator(),
Importance.LOW,
UPSERT_SCRIPT_DOC,
DATA_CONVERSION_GROUP,
++order,
Width.SHORT,
UPSERT_SCRIPT_DISPLAY,
new ScriptValidator());
;
}

private static void addProxyConfigs(ConfigDef configDef) {
Expand Down Expand Up @@ -1078,6 +1103,10 @@ public WriteMethod writeMethod() {
return WriteMethod.valueOf(getString(WRITE_METHOD_CONFIG).toUpperCase());
}

public String getScript() {
return getString(UPSERT_SCRIPT_CONFIG);
}

private static class DataStreamDatasetValidator implements Validator {

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.connect.elasticsearch.util;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.elasticsearch.script.Script;

import java.util.Map;

public class ScriptParser {

public static Script parseScript(String scriptJson) throws JsonProcessingException {

Map<String, Object> map = ScriptParser.parseSchemaStringAsJson(scriptJson);

return Script.parse(map);
}

private static Map<String, Object> parseSchemaStringAsJson(String scriptJson)
throws JsonProcessingException {

ObjectMapper objectMapper = new ObjectMapper();

Map<String, Object> scriptConverted;

scriptConverted = objectMapper.readValue(
scriptJson, new TypeReference<Map<String, Object>>(){});

return scriptConverted;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.connect.elasticsearch.validator;

import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WriteMethod.SCRIPTED_UPSERT;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.confluent.connect.elasticsearch.util.ScriptParser;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.elasticsearch.script.Script;

public class ScriptValidator implements ConfigDef.Validator, ConfigDef.Recommender {

@Override
@SuppressWarnings("unchecked")
public void ensureValid(String name, Object value) {

if (value == null) {
return;
}

String script = (String) value;

try {
Script parsedScript = ScriptParser.parseScript(script);

if (parsedScript.getIdOrCode() == null) {
throw new ConfigException(name, script, "The specified script is missing code");
} else if (parsedScript.getLang() == null) {
throw new ConfigException(name, script, "The specified script is missing lang");
}

} catch (JsonProcessingException jsonProcessingException) {
throw new ConfigException(
name, script, "The specified script is not a valid Elasticsearch painless script");
}
}

@Override
public String toString() {
return "A valid script that is able to be parsed";
}

@Override
public List<Object> validValues(String name, Map<String, Object> parsedConfig) {
if (!parsedConfig.get(WRITE_METHOD_CONFIG).equals(SCRIPTED_UPSERT)) {
return new ArrayList<>();
}
return null;
}

@Override
public boolean visible(String name, Map<String, Object> parsedConfig) {
return parsedConfig.get(WRITE_METHOD_CONFIG).equals(SCRIPTED_UPSERT.name());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.connect.sink.SinkRecord;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.index.VersionType;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -39,8 +40,10 @@
import static io.confluent.connect.elasticsearch.DataConverter.TIMESTAMP_FIELD;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class DataConverterTest {
Expand Down Expand Up @@ -572,6 +575,31 @@ public void testDoNotAddExternalVersioningIfDataStream() {
assertEquals(VersionType.INTERNAL, actualRecord.versionType());
}

@Test
public void upsertScript() {
props.put(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG, "true");
props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "false");
props.put(ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG, "false");
props.put(ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG, ElasticsearchSinkConnectorConfig.WriteMethod.SCRIPTED_UPSERT.name());
props.put(
ElasticsearchSinkConnectorConfig.UPSERT_SCRIPT_CONFIG,
"{\"lang\":\"painless\",\"source\":\"if ( ctx.op == 'create' ) ctx._source.counter = params.count} else {ctx._source.counter += params.count}\",\"params\":{\"count\":4}}");
props.put(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, BehaviorOnNullValues.FAIL.name());
converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props));
Schema preProcessedSchema = converter.preProcessSchema(schema);
Struct struct = new Struct(preProcessedSchema).put("string", "myValue");
SinkRecord sinkRecord = createSinkRecordWithValue(struct);

UpdateRequest actualRecord = (UpdateRequest) converter.convertRecord(sinkRecord, index);

assertNotNull(actualRecord.script());
assertEquals("if ( ctx.op == 'create' ) ctx._source.counter = params.count} else {ctx._source.counter += params.count}", actualRecord.script().getIdOrCode());
assertEquals("painless", actualRecord.script().getLang());
assertEquals(4, actualRecord.script().getParams().getOrDefault("count", 0));
assertTrue(actualRecord.scriptedUpsert());
assertNotNull(actualRecord.doc());
}

private void configureDataStream() {
props.put(ElasticsearchSinkConnectorConfig.DATA_STREAM_TYPE_CONFIG, "logs");
props.put(ElasticsearchSinkConnectorConfig.DATA_STREAM_DATASET_CONFIG, "dataset");
Expand Down
Loading

0 comments on commit 50f145b

Please sign in to comment.