From 333b1dc6ce7b1db6b9beb30b4dea162208a93d1c Mon Sep 17 00:00:00 2001 From: Souvik Bose Date: Thu, 10 Oct 2024 23:56:07 -0700 Subject: [PATCH 1/2] Json codec changes with specific json input codec config Signed-off-by: Souvik Bose --- .../dataprepper/model/codec/JsonDecoder.java | 53 ++++++++++- .../model/codec/JsonDecoderTest.java | 95 ++++++++++++++++++- .../plugins/codec/json/JsonInputCodec.java | 10 +- .../codec/json/JsonInputCodecConfig.java | 41 ++++++++ .../plugins/codec/json/JsonCodecsIT.java | 9 +- .../codec/json/JsonInputCodecConfigTest.java | 26 +++++ .../codec/json/JsonInputCodecTest.java | 55 ++++++++++- .../source/s3/JsonRecordsGenerator.java | 3 +- 8 files changed, 280 insertions(+), 12 deletions(-) create mode 100644 data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodecConfig.java create mode 100644 data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodecConfigTest.java diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/JsonDecoder.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/JsonDecoder.java index f0793aa65f..92de6258a4 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/JsonDecoder.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/JsonDecoder.java @@ -17,6 +17,8 @@ import java.io.IOException; import java.io.InputStream; import java.time.Instant; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.function.Consumer; @@ -24,6 +26,21 @@ public class JsonDecoder implements ByteDecoder { private final ObjectMapper objectMapper = new ObjectMapper(); private final JsonFactory jsonFactory = new JsonFactory(); + private String keyName; + private List includeKeys; + private List includeKeysMetadata; + + public JsonDecoder(String keyName, List includeKeys, List includeKeysMetadata) { + this.keyName = keyName; + this.includeKeys = includeKeys; + this.includeKeysMetadata = includeKeysMetadata; + } + + public JsonDecoder() { + this.keyName = null; + this.includeKeys = null; + this.includeKeysMetadata = null; + } public void parse(InputStream inputStream, Instant timeReceived, Consumer> eventConsumer) throws IOException { Objects.requireNonNull(inputStream); @@ -31,18 +48,50 @@ public void parse(InputStream inputStream, Instant timeReceived, Consumer includeKeysMap = new HashMap<>(); + Map includeMetadataKeysMap = new HashMap<>(); while (!jsonParser.isClosed() && jsonParser.nextToken() != JsonToken.END_OBJECT) { + final String nodeName = jsonParser.currentName(); + + if (includeKeys != null && includeKeys.contains(nodeName) || + (includeKeysMetadata != null && includeKeysMetadata.contains(nodeName))) { + jsonParser.nextToken(); + if (includeKeys.contains(nodeName)) { + includeKeysMap.put(nodeName, jsonParser.getValueAsString()); + } + if (includeKeysMetadata.contains(nodeName)) { + includeMetadataKeysMap.put(nodeName, jsonParser.getValueAsString()); + } + continue; + } + if (jsonParser.getCurrentToken() == JsonToken.START_ARRAY) { - parseRecordsArray(jsonParser, timeReceived, eventConsumer); + if (keyName != null && !nodeName.equals(keyName)) { + continue; + } + parseRecordsArray(jsonParser, timeReceived, eventConsumer, includeKeysMap, includeMetadataKeysMap); } } } - private void parseRecordsArray(final JsonParser jsonParser, final Instant timeReceived, final Consumer> eventConsumer) throws IOException { + private void parseRecordsArray(final JsonParser jsonParser, + final Instant timeReceived, + final Consumer> eventConsumer, + Map includeKeysMap, + Map includeMetadataKeysMap + ) throws IOException { while (jsonParser.nextToken() != JsonToken.END_ARRAY) { final Map innerJson = objectMapper.readValue(jsonParser, Map.class); final Record record = createRecord(innerJson, timeReceived); + for (final Map.Entry entry : includeKeysMap.entrySet()) { + record.getData().put(entry.getKey(), entry.getValue()); + } + + for (final Map.Entry entry : includeMetadataKeysMap.entrySet()) { + record.getData().getMetadata().setAttribute(entry.getKey(), entry.getValue()); + } + eventConsumer.accept(record); } } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/JsonDecoderTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/JsonDecoderTest.java index d2c7287313..7dc269ce4a 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/JsonDecoderTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/JsonDecoderTest.java @@ -1,12 +1,20 @@ package org.opensearch.dataprepper.model.codec; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.opensearch.dataprepper.model.record.Record; import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.UUID; @@ -14,6 +22,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import org.junit.jupiter.api.BeforeEach; @@ -26,7 +35,7 @@ private JsonDecoder createObjectUnderTest() { return new JsonDecoder(); } -@BeforeEach + @BeforeEach void setup() { jsonDecoder = createObjectUnderTest(); receivedRecord = null; @@ -60,10 +69,10 @@ void test_basicJsonDecoder_withTimeReceived() { try { jsonDecoder.parse(new ByteArrayInputStream(inputString.getBytes()), now, (record) -> { receivedRecord = record; - receivedTime = ((DefaultEventHandle)(((Event)record.getData()).getEventHandle())).getInternalOriginationTime(); + receivedTime = record.getData().getEventHandle().getInternalOriginationTime(); }); } catch (Exception e){} - + assertNotEquals(receivedRecord, null); Map map = receivedRecord.getData().toMap(); assertThat(map.get("key1"), equalTo(stringValue)); @@ -71,4 +80,82 @@ void test_basicJsonDecoder_withTimeReceived() { assertThat(receivedTime, equalTo(now)); } + @Nested + class JsonDecoderWithInputConfig { + private ObjectMapper objectMapper; + final List include_keys = new ArrayList<>(); + + @BeforeEach + void setup() { + objectMapper = new ObjectMapper(); + } + @Test + void test_basicJsonDecoder_withInputConfig() throws IOException { + Random r = new Random(); + final Instant now = Instant.now(); + List> records = new ArrayList<>(); + for (int i=0; i<10; i++) { + include_keys.add(UUID.randomUUID().toString()); + } + final String key_name = "logEvents"; + Map jsonObject = generateJsonWithSpecificKeys(include_keys, key_name, 10); + jsonDecoder = new JsonDecoder(key_name, include_keys, include_keys); + jsonDecoder.parse(createInputStream(jsonObject), now, (record) -> { + records.add(record); + receivedTime = ((Event)record.getData()).getEventHandle().getInternalOriginationTime(); + }); + + records.forEach(record -> { + Map dataMap = record.getData().toMap(); + Map metadataMap = record.getData().getMetadata().getAttributes(); + + for (String include_key: include_keys) { + assertThat(dataMap.get(include_key), equalTo(jsonObject.get(include_key))); + assertThat(metadataMap.get(include_key), equalTo(jsonObject.get(include_key))); + } + }); + + assertThat(receivedTime, equalTo(now)); + } + + @Test + void test_basicJsonDecoder_withInputConfig_withoutEvents() throws IOException { + Random r = new Random(); + final Instant now = Instant.now(); + List> records = new ArrayList<>(); + Map jsonObject = generateJsonWithSpecificKeys(include_keys, "logEvents", 10); + jsonDecoder = new JsonDecoder("", include_keys, Collections.emptyList()); + jsonDecoder.parse(createInputStream(jsonObject), now, (record) -> { + records.add(record); + receivedTime = ((Event)record.getData()).getEventHandle().getInternalOriginationTime(); + }); + + assertTrue(records.isEmpty()); + } + + private Map generateJsonWithSpecificKeys(final List outerKeys, final String key, final int numRecords) { + final Map jsonObject = new LinkedHashMap<>(); + final List> innerObjects = new ArrayList<>(); + + for (String outerKey: outerKeys) { + jsonObject.put(outerKey, UUID.randomUUID().toString()); + } + for (int i=0; i innerJsonMap = new LinkedHashMap<>(); + for (int j=0; j<3; j++) { + innerJsonMap.put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + } + innerObjects.add(innerJsonMap); + } + jsonObject.put(key, innerObjects); + return jsonObject; + } + + private InputStream createInputStream(final Map jsonRoot) throws JsonProcessingException { + final byte[] jsonBytes = objectMapper.writeValueAsBytes(jsonRoot); + + return new ByteArrayInputStream(jsonBytes); + } + } + } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodec.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodec.java index 6222682e2a..a7dc056105 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodec.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodec.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.codec.json; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.codec.JsonDecoder; import org.opensearch.dataprepper.model.event.Event; @@ -13,13 +14,20 @@ import java.io.IOException; import java.io.InputStream; +import java.util.Objects; import java.util.function.Consumer; /** * An implementation of {@link InputCodec} which parses JSON Objects for arrays. */ -@DataPrepperPlugin(name = "json", pluginType = InputCodec.class) +@DataPrepperPlugin(name = "json", pluginType = InputCodec.class, pluginConfigurationType = JsonInputCodecConfig.class) public class JsonInputCodec extends JsonDecoder implements InputCodec { + + @DataPrepperPluginConstructor + public JsonInputCodec(final JsonInputCodecConfig config) { + super(Objects.requireNonNull(config).getKeyName(), config.getIncludeKeys(), config.getIncludeKeysMetadata()); + } + public void parse(InputStream inputStream, Consumer> eventConsumer) throws IOException { parse(inputStream, null, eventConsumer); } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodecConfig.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodecConfig.java new file mode 100644 index 0000000000..d25b884365 --- /dev/null +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodecConfig.java @@ -0,0 +1,41 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.codec.json; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.Size; + +import java.util.List; + +public class JsonInputCodecConfig { + + @JsonProperty("key_name") + @Size(min = 1, max = 2048) + private String keyName; + + public String getKeyName() { + return keyName; + } + + @JsonProperty("include_keys") + private List includeKeys; + + public List getIncludeKeys() { + return includeKeys; + } + + @JsonProperty("include_keys_metadata") + private List includeKeysMetadata; + + public List getIncludeKeysMetadata() { + return includeKeysMetadata; + } +} diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonCodecsIT.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonCodecsIT.java index cbdedb3f1f..2138f4cc92 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonCodecsIT.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonCodecsIT.java @@ -36,21 +36,26 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class JsonCodecsIT { private ObjectMapper objectMapper; private Consumer> eventConsumer; + private JsonInputCodecConfig jsonInputCodecConfig; @BeforeEach void setUp() { objectMapper = new ObjectMapper(); - + jsonInputCodecConfig = mock(JsonInputCodecConfig.class); + when(jsonInputCodecConfig.getIncludeKeysMetadata()).thenReturn(Collections.emptyList()); + when(jsonInputCodecConfig.getIncludeKeys()).thenReturn(Collections.emptyList()); + when(jsonInputCodecConfig.getKeyName()).thenReturn(null); eventConsumer = mock(Consumer.class); } private JsonInputCodec createJsonInputCodecObjectUnderTest() { - return new JsonInputCodec(); + return new JsonInputCodec(jsonInputCodecConfig); } private JsonOutputCodec createJsonOutputCodecObjectUnderTest() { diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodecConfigTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodecConfigTest.java new file mode 100644 index 0000000000..b97df4abf2 --- /dev/null +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodecConfigTest.java @@ -0,0 +1,26 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.codec.json; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class JsonInputCodecConfigTest { + + private JsonInputCodecConfig createObjectUnderTest() { + return new JsonInputCodecConfig(); + } + + @Test + public void testJsonInputCodecConfig() { + JsonInputCodecConfig jsonInputCodecConfig = createObjectUnderTest(); + assertTrue(jsonInputCodecConfig.getKeyName().equals(JsonInputCodecConfig.DEFAULT_KEY_NAME)); + assertNull(jsonInputCodecConfig.getIncludeKeys()); + assertNull(jsonInputCodecConfig.getIncludeKeysMetadata()); + } +} diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodecTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodecTest.java index 544f486252..05b6401145 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodecTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodecTest.java @@ -51,21 +51,26 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; class JsonInputCodecTest { private ObjectMapper objectMapper; + private JsonInputCodecConfig jsonInputCodecConfig; private Consumer> eventConsumer; @BeforeEach void setUp() { objectMapper = new ObjectMapper(); - + jsonInputCodecConfig = mock(JsonInputCodecConfig.class); + when(jsonInputCodecConfig.getIncludeKeysMetadata()).thenReturn(Collections.emptyList()); + when(jsonInputCodecConfig.getIncludeKeys()).thenReturn(Collections.emptyList()); + when(jsonInputCodecConfig.getKeyName()).thenReturn(null); eventConsumer = mock(Consumer.class); } private JsonInputCodec createObjectUnderTest() { - return new JsonInputCodec(); + return new JsonInputCodec(jsonInputCodecConfig); } @Test @@ -215,6 +220,38 @@ void parse_with_InputStream_calls_Consumer_with_Event(final int numberOfObjects) } } + @ParameterizedTest + @ValueSource(ints = {1, 2, 10, 100}) + void parse_with_InputStream_calls_Consumer_with_EventConfig(final int numberOfObjects) throws IOException { + + List includeKeys = new ArrayList<>(); + for (int i=0; i jsonObjects = generateJsonWithSpecificKeys(includeKeys, objectKey, numberOfObjects); + when(jsonInputCodecConfig.getIncludeKeysMetadata()).thenReturn(includeKeys); + when(jsonInputCodecConfig.getIncludeKeys()).thenReturn(includeKeys); + when(jsonInputCodecConfig.getKeyName()).thenReturn(objectKey); + + createObjectUnderTest().parse(createInputStream(jsonObjects), eventConsumer); + + final ArgumentCaptor> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class); + verify(eventConsumer, times(numberOfObjects)).accept(recordArgumentCaptor.capture()); + + final List> actualRecords = recordArgumentCaptor.getAllValues(); + + assertThat(actualRecords.size(), equalTo(numberOfObjects)); + for (int i = 0; i < actualRecords.size(); i++) { + + final Record actualRecord = actualRecords.get(i); + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getData(), notNullValue()); + assertThat(actualRecord.getData().getMetadata(), notNullValue()); + assertThat(actualRecord.getData().getMetadata().getEventType(), equalTo(EventType.LOG.toString())); + } + } + @ParameterizedTest @ArgumentsSource(JsonPermutations.class) void parse_with_InputStream_calls_Consumer_for_arrays_in_Json_permutations(final Function>, Map> rootJsonGenerator) throws IOException { @@ -336,4 +373,18 @@ private static Map generateJson() { return jsonObject; } + + private static Map generateJsonWithSpecificKeys(final List innerKeys, final String key, final int numRecords) { + final Map jsonObject = new LinkedHashMap<>(); + final List> innerObjects = new ArrayList<>(); + for (int i=0; i innerJsonMap = new LinkedHashMap<>(); + for (String innerKey: innerKeys) { + innerJsonMap.put(innerKey, UUID.randomUUID().toString()); + } + innerObjects.add(innerJsonMap); + } + jsonObject.put(key, innerObjects); + return jsonObject; + } } \ No newline at end of file diff --git a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/JsonRecordsGenerator.java b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/JsonRecordsGenerator.java index 1031e7de56..29bb75c20b 100644 --- a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/JsonRecordsGenerator.java +++ b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/JsonRecordsGenerator.java @@ -11,6 +11,7 @@ import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.plugins.codec.json.JsonInputCodec; +import org.opensearch.dataprepper.plugins.codec.json.JsonInputCodecConfig; import java.io.File; import java.io.IOException; @@ -50,7 +51,7 @@ public void write(final File file, int numberOfRecords) { @Override public InputCodec getCodec() { - return new JsonInputCodec(); + return new JsonInputCodec(new JsonInputCodecConfig()); } @Override From dc7f0f0de4c091b7b3a932ef11a6cbfc2b1c7c84 Mon Sep 17 00:00:00 2001 From: Souvik Bose Date: Fri, 11 Oct 2024 18:02:11 -0700 Subject: [PATCH 2/2] Modify the tests and address comments Signed-off-by: Souvik Bose --- .../dataprepper/model/codec/JsonDecoder.java | 16 +-- .../model/codec/JsonDecoderTest.java | 99 ++++++++++---- .../codec/json/JsonInputCodecConfigTest.java | 3 +- .../codec/json/JsonInputCodecTest.java | 123 ++++++++++++++++-- 4 files changed, 198 insertions(+), 43 deletions(-) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/JsonDecoder.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/JsonDecoder.java index 92de6258a4..b837d36064 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/JsonDecoder.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/JsonDecoder.java @@ -17,8 +17,8 @@ import java.io.IOException; import java.io.InputStream; import java.time.Instant; +import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.function.Consumer; @@ -27,10 +27,10 @@ public class JsonDecoder implements ByteDecoder { private final ObjectMapper objectMapper = new ObjectMapper(); private final JsonFactory jsonFactory = new JsonFactory(); private String keyName; - private List includeKeys; - private List includeKeysMetadata; + private Collection includeKeys; + private Collection includeKeysMetadata; - public JsonDecoder(String keyName, List includeKeys, List includeKeysMetadata) { + public JsonDecoder(String keyName, Collection includeKeys, Collection includeKeysMetadata) { this.keyName = keyName; this.includeKeys = includeKeys; this.includeKeysMetadata = includeKeysMetadata; @@ -56,10 +56,10 @@ public void parse(InputStream inputStream, Instant timeReceived, Consumer> eventConsumer, - Map includeKeysMap, - Map includeMetadataKeysMap + final Map includeKeysMap, + final Map includeMetadataKeysMap ) throws IOException { while (jsonParser.nextToken() != JsonToken.END_ARRAY) { final Map innerJson = objectMapper.readValue(jsonParser, Map.class); diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/JsonDecoderTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/JsonDecoderTest.java index 7dc269ce4a..6ff6e6b936 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/JsonDecoderTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/JsonDecoderTest.java @@ -21,6 +21,8 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -83,35 +85,44 @@ void test_basicJsonDecoder_withTimeReceived() { @Nested class JsonDecoderWithInputConfig { private ObjectMapper objectMapper; - final List include_keys = new ArrayList<>(); + private final List includeKeys = new ArrayList<>(); + private final List includeMetadataKeys = new ArrayList<>(); + private static final int numKeyRecords = 10; + private static final int numKeyPerRecord = 3; + private Map jsonObject; + private final String key_name = "logEvents"; @BeforeEach void setup() { objectMapper = new ObjectMapper(); + for (int i=0; i<10; i++) { + includeKeys.add(UUID.randomUUID().toString()); + includeMetadataKeys.add(UUID.randomUUID().toString()); + } + jsonObject = generateJsonWithSpecificKeys(includeKeys, includeMetadataKeys, key_name, numKeyRecords, numKeyPerRecord); } @Test void test_basicJsonDecoder_withInputConfig() throws IOException { - Random r = new Random(); final Instant now = Instant.now(); List> records = new ArrayList<>(); - for (int i=0; i<10; i++) { - include_keys.add(UUID.randomUUID().toString()); - } - final String key_name = "logEvents"; - Map jsonObject = generateJsonWithSpecificKeys(include_keys, key_name, 10); - jsonDecoder = new JsonDecoder(key_name, include_keys, include_keys); + jsonDecoder = new JsonDecoder(key_name, includeKeys, includeMetadataKeys); jsonDecoder.parse(createInputStream(jsonObject), now, (record) -> { records.add(record); - receivedTime = ((Event)record.getData()).getEventHandle().getInternalOriginationTime(); + receivedTime = record.getData().getEventHandle().getInternalOriginationTime(); }); + assertFalse(records.isEmpty()); + assertEquals(numKeyRecords, records.size()); + records.forEach(record -> { Map dataMap = record.getData().toMap(); Map metadataMap = record.getData().getMetadata().getAttributes(); - for (String include_key: include_keys) { - assertThat(dataMap.get(include_key), equalTo(jsonObject.get(include_key))); - assertThat(metadataMap.get(include_key), equalTo(jsonObject.get(include_key))); + for (String includeKey: includeKeys) { + assertThat(dataMap.get(includeKey), equalTo(jsonObject.get(includeKey))); + } + for (String includeMetadataKey: includeMetadataKeys) { + assertThat(metadataMap.get(includeMetadataKey), equalTo(jsonObject.get(includeMetadataKey))); } }); @@ -119,30 +130,74 @@ void test_basicJsonDecoder_withInputConfig() throws IOException { } @Test - void test_basicJsonDecoder_withInputConfig_withoutEvents() throws IOException { - Random r = new Random(); + void test_basicJsonDecoder_withInputConfig_withoutEvents_empty_metadata_keys() throws IOException { + final Instant now = Instant.now(); + List> records = new ArrayList<>(); + jsonDecoder = new JsonDecoder("", includeKeys, Collections.emptyList()); + jsonDecoder.parse(createInputStream(jsonObject), now, (record) -> { + records.add(record); + receivedTime = record.getData().getEventHandle().getInternalOriginationTime(); + }); + assertTrue(records.isEmpty()); + } + + @Test + void test_basicJsonDecoder_withInputConfig_withoutEvents_null_include_metadata_keys() throws IOException { + final Instant now = Instant.now(); + List> records = new ArrayList<>(); + jsonDecoder = new JsonDecoder("", includeKeys, null); + jsonDecoder.parse(createInputStream(jsonObject), now, (record) -> { + records.add(record); + receivedTime = record.getData().getEventHandle().getInternalOriginationTime(); + }); + + assertTrue(records.isEmpty()); + } + + @Test + void test_basicJsonDecoder_withInputConfig_withoutEvents_empty_include_keys() throws IOException { final Instant now = Instant.now(); List> records = new ArrayList<>(); - Map jsonObject = generateJsonWithSpecificKeys(include_keys, "logEvents", 10); - jsonDecoder = new JsonDecoder("", include_keys, Collections.emptyList()); + jsonDecoder = new JsonDecoder("", Collections.emptyList(), includeMetadataKeys); jsonDecoder.parse(createInputStream(jsonObject), now, (record) -> { records.add(record); - receivedTime = ((Event)record.getData()).getEventHandle().getInternalOriginationTime(); + receivedTime = record.getData().getEventHandle().getInternalOriginationTime(); + }); + assertTrue(records.isEmpty()); + } + + @Test + void test_basicJsonDecoder_withInputConfig_withoutEvents_null_include_keys() throws IOException { + final Instant now = Instant.now(); + List> records = new ArrayList<>(); + jsonDecoder = new JsonDecoder("", null, includeMetadataKeys); + jsonDecoder.parse(createInputStream(jsonObject), now, (record) -> { + records.add(record); + receivedTime = record.getData().getEventHandle().getInternalOriginationTime(); }); assertTrue(records.isEmpty()); } - private Map generateJsonWithSpecificKeys(final List outerKeys, final String key, final int numRecords) { + private Map generateJsonWithSpecificKeys(final List includeKeys, + final List includeMetadataKeys, + final String key, + final int numKeyRecords, + final int numKeyPerRecord) { final Map jsonObject = new LinkedHashMap<>(); final List> innerObjects = new ArrayList<>(); - for (String outerKey: outerKeys) { - jsonObject.put(outerKey, UUID.randomUUID().toString()); + for (String includeKey: includeKeys) { + jsonObject.put(includeKey, UUID.randomUUID().toString()); } - for (int i=0; i innerJsonMap = new LinkedHashMap<>(); - for (int j=0; j<3; j++) { + for (int j=0; j includeKeys = new ArrayList<>(); + for (int i=0; i includeKeys = new ArrayList<>(); + List includeMetadataKeys = new ArrayList<>(); for (int i=0; i jsonObjects = generateJsonWithSpecificKeys(includeKeys, objectKey, numberOfObjects); - when(jsonInputCodecConfig.getIncludeKeysMetadata()).thenReturn(includeKeys); + parse_InputStream_withEventConfig(numberOfObjects, "key", includeKeys, includeMetadataKeys); + } + + @ParameterizedTest + @ValueSource(ints = {1, 2, 10, 100}) + void parse_with_InputStream_calls_Consumer_with_EventConfig_validKey_includeMetadataKeys(final int numberOfObjects) throws IOException { + List includeMetadataKeys = new ArrayList<>(); + for (int i=0; i includeMetadataKeys = null; + List includeKeys = null; + final Map jsonObjects = generateJsonWithSpecificKeys(includeKeys, includeMetadataKeys, "key", numberOfObjects, 2); + when(jsonInputCodecConfig.getIncludeKeysMetadata()).thenReturn(includeMetadataKeys); when(jsonInputCodecConfig.getIncludeKeys()).thenReturn(includeKeys); - when(jsonInputCodecConfig.getKeyName()).thenReturn(objectKey); + when(jsonInputCodecConfig.getKeyName()).thenReturn("key2"); + + createObjectUnderTest().parse(createInputStream(jsonObjects), eventConsumer); + + verify(eventConsumer, times(0)).accept(any(Record.class)); + } + + @ParameterizedTest + @ValueSource(ints = {1, 2, 10, 100}) + void parse_with_InputStream_calls_Consumer_with_EventConfig_NullKey(final int numberOfObjects) throws IOException { + List includeMetadataKeys = null; + List includeKeys = null; + final Map jsonObjects = generateJsonWithSpecificKeys(includeKeys, includeMetadataKeys, "key", numberOfObjects, 2); + when(jsonInputCodecConfig.getIncludeKeysMetadata()).thenReturn(includeMetadataKeys); + when(jsonInputCodecConfig.getIncludeKeys()).thenReturn(includeKeys); + when(jsonInputCodecConfig.getKeyName()).thenReturn(null); createObjectUnderTest().parse(createInputStream(jsonObjects), eventConsumer); @@ -374,17 +425,67 @@ private static Map generateJson() { return jsonObject; } - private static Map generateJsonWithSpecificKeys(final List innerKeys, final String key, final int numRecords) { + private Map generateJsonWithSpecificKeys(final List includeKeys, + final List includeMetadataKeys, + final String key, + final int numKeyRecords, + final int numKeyPerRecord) { final Map jsonObject = new LinkedHashMap<>(); final List> innerObjects = new ArrayList<>(); - for (int i=0; i innerJsonMap = new LinkedHashMap<>(); - for (String innerKey: innerKeys) { - innerJsonMap.put(innerKey, UUID.randomUUID().toString()); + for (int j=0; j includeKeys, final List includeMetadataKeys) throws IOException { + final Map jsonObjects = generateJsonWithSpecificKeys(includeKeys, includeMetadataKeys, objectKey, numberOfObjects, 2); + when(jsonInputCodecConfig.getIncludeKeysMetadata()).thenReturn(includeMetadataKeys); + when(jsonInputCodecConfig.getIncludeKeys()).thenReturn(includeKeys); + when(jsonInputCodecConfig.getKeyName()).thenReturn(objectKey); + + createObjectUnderTest().parse(createInputStream(jsonObjects), eventConsumer); + + final ArgumentCaptor> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class); + verify(eventConsumer, times(numberOfObjects)).accept(recordArgumentCaptor.capture()); + + final List> actualRecords = recordArgumentCaptor.getAllValues(); + + assertThat(actualRecords.size(), equalTo(numberOfObjects)); + for (final Record actualRecord : actualRecords) { + + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getData(), notNullValue()); + assertThat(actualRecord.getData().getMetadata(), notNullValue()); + + Map dataMap = actualRecord.getData().toMap(); + for (String includeKey : includeKeys) { + assertTrue(dataMap.containsKey(includeKey)); + } + + Map metadataMap = actualRecord.getData().getMetadata().getAttributes(); + for (String includeMetadataKey: includeMetadataKeys) { + assertTrue(metadataMap.containsKey(includeMetadataKey)); + } + assertThat(actualRecord.getData().getMetadata().getEventType(), equalTo(EventType.LOG.toString())); + } + } } \ No newline at end of file