diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java index 788c6f1..68c0274 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java @@ -21,6 +21,12 @@ import java.util.Map; import java.util.stream.Collectors; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonParser; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; + import static java.util.stream.Collectors.toList; /** @@ -76,7 +82,19 @@ public SourceRecord toSourceRecord( )); // getUnmarshallItems from Dynamo Document - Map unMarshalledItems = ItemUtils.toSimpleMapValue(attributes); + //Map unMarshalledItems = ItemUtils.toSimpleMapValue(attributes); + + //JSON conversion + String outputJsonString = null; + try { + String jsonString = ItemUtils.toItem(attributes).toJSON(); + JsonObject jsonObject = new JsonParser().parse(jsonString).getAsJsonObject(); + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + outputJsonString = gson.toJson(jsonObject); + } catch (JsonParseException e) { + e.printStackTrace(); + throw new Exception("Error Occured in JSON Parsing " + e.getMessage(), e); + } // Leveraging offsets to store shard and sequence number with each item pushed to Kafka. // This info will only be used to update `shardRegister` and won't be used to reset state after restart @@ -105,7 +123,7 @@ public SourceRecord toSourceRecord( Struct valueData = new Struct(valueSchema) .put(Envelope.FieldName.VERSION, sourceInfo.version) - .put(Envelope.FieldName.DOCUMENT, objectMapper.writeValueAsString(unMarshalledItems)) + .put(Envelope.FieldName.DOCUMENT, objectMapper.writeValueAsString(outputJsonString)) .put(Envelope.FieldName.SOURCE, SourceInfo.toStruct(sourceInfo)) .put(Envelope.FieldName.OPERATION, op.code()) .put(Envelope.FieldName.TIMESTAMP, arrivalTimestamp.toEpochMilli()); diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java index 6d88497..d3ff394 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java @@ -23,6 +23,9 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; @@ -272,13 +275,21 @@ public void onInitSyncRunningPollReturnsScannedItemsBatch() throws InterruptedEx task.start(configs); List response = task.poll(); + String expected = "\"{\\n \\\"col2\\\": \\\"val1\\\",\\n \\\"col3\\\": 1,\\n \\\"col1\\\": \\\"key1\\\"\\n}\""; + String actual = (((Struct) response.get(0).value()).getString("document")); + + // Converting both expected and actual to JSON string + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + expected = gson.toJson(expected); + actual = gson.toJson(actual); + // Assert assertEquals(Instant.parse("2001-01-01T00:00:00.00Z"), task.getSourceInfo().lastInitSyncStart); assertEquals(1, task.getSourceInfo().initSyncCount); assertEquals(1, response.size()); assertEquals("r", ((Struct) response.get(0).value()).getString("op")); - assertEquals(({"col2":"val1","col3":1,"col1":"key1"}), ((Struct) response.get(0).value()).getString("document")); + assertEquals(expected , actual); assertEquals(InitSyncStatus.RUNNING, task.getSourceInfo().initSyncStatus); assertEquals(exclusiveStartKey, task.getSourceInfo().exclusiveStartKey); } @@ -557,11 +568,23 @@ public void onSyncPollReturnsReceivedRecords() throws InterruptedException { // Act task.start(configs); List response = task.poll(); + + String expected = "\"{\\n \\\"col2\\\": \\\"val1\\\",\\n \\\"col3\\\": 1,\\n \\\"col1\\\": \\\"key1\\\"\\n}\""; + String expected_document_key = "\"{\\n \\\"col1\\\": \\\"key2\\\"\\n}\""; + String actual = (((Struct) response.get(0).value()).getString("document")); + String actual_document_key = ((Struct) response.get(1).value()).getString("document"); + + // Converting both expected and actual to JSON string + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + expected = gson.toJson(expected); + expected_document_key = gson.toJson(expected_document_key); + actual = gson.toJson(actual); + actual_document_key = gson.toJson(actual_document_key); // Assert assertEquals(3, response.size()); - assertEquals("{\"col2\":\"val1\",\"col3\":1,\"col1\":\"key1\"}", ((Struct) response.get(0).value()).getString("document")); - assertEquals("{\"col1\":\"key2\"}", ((Struct) response.get(1).value()).getString("document")); + assertEquals(expected, actual); + assertEquals(expected_document_key, actual_document_key); assertNull(response.get(2).value()); // tombstone } diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java index a87cc38..ffd2caf 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java @@ -1,6 +1,3 @@ - - - package com.trustpilot.connector.dynamodb.utils; import com.amazonaws.services.dynamodbv2.model.AttributeValue; @@ -22,6 +19,9 @@ import java.util.List; import java.util.Map; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + import static org.junit.jupiter.api.Assertions.assertEquals; @@ -198,9 +198,16 @@ public void recordAttributesAreAddedToValueData() throws Exception { "testSequenceNumberID1" ); + String expected = "\"{\\n \\\"testKV1\\\": \\\"testKV1Value\\\",\\n \\\"testKV2\\\": \\\"2\\\",\\n \\\"testV2\\\": \\\"testStringValue\\\",\\n \\\"testV1\\\": 1\\n}\""; + String actual = ((Struct) record.value()).getString("document"); + + // Converting both expected and actual to JSON string + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + expected = gson.toJson(expected); + actual = gson.toJson(actual); + // Assert - assertEquals("{\"testKV1\":\"testKV1Value\",\"testKV2\":\"2\",\"testV2\":\"testStringValue\",\"testV1\":1}", - ((Struct) record.value()).getString("document")); + assertEquals(expected, actual); } @Test @@ -271,11 +278,16 @@ public void recordAttributesAreAddedToValueDataWhenAttributesContainsInvalidChar "testSequenceNumberID1" ); - String expected = "{\"test-1234\":\"testKV1Value\",\"_starts_with_underscore\":1,\"1-starts-with-number\":\"2\",\"test!@£$%^\":\"testStringValue\"}"; + String expected = "\"{\\n \\\"test-1234\\\": \\\"testKV1Value\\\",\\n \\\"_starts_with_underscore\\\": 1,\\n \\\"1-starts-with-number\\\": \\\"2\\\",\\n \\\"test!@£$%^\\\": \\\"testStringValue\\\"\\n}\""; + String actual = ((Struct) record.value()).getString("document"); + + // Converting both expected and actual to JSON string + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + expected = gson.toJson(expected); + actual = gson.toJson(actual); // Assert - assertEquals(expected, - ((Struct) record.value()).getString("document")); + assertEquals(expected, actual); } @Test