From cbfbfe8086447d7397c2afd1694016c7f6f6e0c2 Mon Sep 17 00:00:00 2001 From: Jimmy Ma Date: Wed, 4 Sep 2024 16:42:21 -0700 Subject: [PATCH] chore: introduce AirbyteRecord and AirbyteJsonRecordAdapter (#13774) --- .../general/ReplicationWorkerHelper.kt | 13 +-- .../general/ReplicationWorkerHelperTest.java | 22 ++-- .../general/ReplicationWorkerTest.java | 5 +- .../airbyte/config/adapters/AirbyteRecord.kt | 28 +++++ .../airbyte/config/adapters/JsonAdapters.kt | 49 ++++++++ .../config/adapters/JsonRecordAdapterTest.kt | 109 ++++++++++++++++++ airbyte-mappers/build.gradle.kts | 1 + .../mappers/application/RecordMapper.kt | 4 +- .../mappers/transformations/HashingMapper.kt | 11 +- .../airbyte/mappers/transformations/Mapper.kt | 3 +- .../airbyte/mappers/transformations/Record.kt | 6 - .../config/adapters/TestRecordAdapter.kt | 34 ++++++ .../mappers/application/RecordMapperTest.kt | 34 ++++-- .../io/airbyte/mappers/mocks/TestMapper.kt | 11 +- .../transformations/HashingMapperTest.kt | 30 ++--- 15 files changed, 288 insertions(+), 72 deletions(-) create mode 100644 airbyte-config/config-models/src/main/kotlin/io/airbyte/config/adapters/AirbyteRecord.kt create mode 100644 airbyte-config/config-models/src/main/kotlin/io/airbyte/config/adapters/JsonAdapters.kt create mode 100644 airbyte-config/config-models/src/test/kotlin/io/airbyte/config/adapters/JsonRecordAdapterTest.kt delete mode 100644 airbyte-mappers/src/main/kotlin/io/airbyte/mappers/transformations/Record.kt create mode 100644 airbyte-mappers/src/test/kotlin/io/airbyte/config/adapters/TestRecordAdapter.kt diff --git a/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/general/ReplicationWorkerHelper.kt b/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/general/ReplicationWorkerHelper.kt index 5c4e4500f88..57890770c35 100644 --- a/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/general/ReplicationWorkerHelper.kt +++ b/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/general/ReplicationWorkerHelper.kt @@ -6,7 +6,6 @@ package io.airbyte.workers.general import com.fasterxml.jackson.core.JsonProcessingException import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.databind.node.ObjectNode import com.google.common.annotations.VisibleForTesting import io.airbyte.api.client.AirbyteApiClient import io.airbyte.api.client.model.generated.ActorType @@ -28,11 +27,12 @@ import io.airbyte.config.State import io.airbyte.config.StreamDescriptor import io.airbyte.config.SyncStats import io.airbyte.config.WorkerDestinationConfig +import io.airbyte.config.adapters.AirbyteJsonRecordAdapter +import io.airbyte.config.adapters.AirbyteRecord import io.airbyte.featureflag.Connection import io.airbyte.featureflag.EnableMappers import io.airbyte.featureflag.FeatureFlagClient import io.airbyte.mappers.application.RecordMapper -import io.airbyte.mappers.transformations.Record import io.airbyte.metrics.lib.ApmTraceUtils import io.airbyte.metrics.lib.MetricAttribute import io.airbyte.metrics.lib.MetricClient @@ -42,7 +42,6 @@ import io.airbyte.metrics.lib.OssMetricsRegistry import io.airbyte.persistence.job.models.ReplicationInput import io.airbyte.protocol.models.AirbyteMessage import io.airbyte.protocol.models.AirbyteMessage.Type -import io.airbyte.protocol.models.AirbyteRecordMessage import io.airbyte.protocol.models.AirbyteStateMessage import io.airbyte.protocol.models.AirbyteStateStats import io.airbyte.protocol.models.AirbyteTraceMessage @@ -410,7 +409,7 @@ class ReplicationWorkerHelper( } if (sourceRawMessage.type == Type.RECORD) { - applyTransformationMappers(sourceRawMessage.record) + applyTransformationMappers(AirbyteJsonRecordAdapter(sourceRawMessage)) } return sourceRawMessage @@ -472,12 +471,12 @@ class ReplicationWorkerHelper( return airbyteApiClient.destinationApi.getDestination(DestinationIdRequestBody(destinationId = destinationId)).destinationDefinitionId } - fun applyTransformationMappers(message: AirbyteRecordMessage) { + fun applyTransformationMappers(message: AirbyteRecord) { if (mapperEnabled) { val mappersForStream: List = - mappersPerStreamDescriptor[StreamDescriptor().withName(message.stream).withNamespace(message.namespace)] ?: listOf() + mappersPerStreamDescriptor[message.streamDescriptor] ?: listOf() - recordMapper.applyMappers(Record(message.data as ObjectNode), mappersForStream) + recordMapper.applyMappers(message, mappersForStream) } } diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerHelperTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerHelperTest.java index d0402ba45b8..aba6fce5df7 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerHelperTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerHelperTest.java @@ -18,7 +18,6 @@ import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; -import com.fasterxml.jackson.databind.node.ObjectNode; import io.airbyte.api.client.AirbyteApiClient; import io.airbyte.api.client.generated.ActorDefinitionVersionApi; import io.airbyte.api.client.generated.DestinationApi; @@ -33,12 +32,12 @@ import io.airbyte.config.State; import io.airbyte.config.StreamDescriptor; import io.airbyte.config.WorkerDestinationConfig; +import io.airbyte.config.adapters.AirbyteJsonRecordAdapter; import io.airbyte.featureflag.Connection; import io.airbyte.featureflag.EnableMappers; import io.airbyte.featureflag.FeatureFlagClient; import io.airbyte.featureflag.TestClient; import io.airbyte.mappers.application.RecordMapper; -import io.airbyte.mappers.transformations.Record; import io.airbyte.persistence.job.models.ReplicationInput; import io.airbyte.protocol.models.AirbyteAnalyticsTraceMessage; import io.airbyte.protocol.models.AirbyteLogMessage; @@ -312,12 +311,13 @@ void testApplyTransformationFlagDisableOrNoMapper(final boolean mappersEnabled) mock(ConfiguredAirbyteCatalog.class), mock(State.class)); - final AirbyteRecordMessage recordMessage = new AirbyteRecordMessage().withStream("stream").withData(Jsons.jsonNode(Map.of("column", "value"))); - final AirbyteRecordMessage copiedRecordMessage = Jsons.clone(recordMessage); + final AirbyteMessage recordMessage = new AirbyteMessage().withType(Type.RECORD) + .withRecord(new AirbyteRecordMessage().withStream("stream").withData(Jsons.jsonNode(Map.of("column", "value")))); + final AirbyteMessage copiedRecordMessage = Jsons.clone(recordMessage); when(featureFlagClient.boolVariation(EnableMappers.INSTANCE, new Connection(replicationContext.getConnectionId()))).thenReturn(mappersEnabled); - replicationWorkerHelper.applyTransformationMappers(recordMessage); + replicationWorkerHelper.applyTransformationMappers(new AirbyteJsonRecordAdapter(recordMessage)); assertEquals(copiedRecordMessage, recordMessage); verifyNoInteractions(recordMapper); @@ -344,14 +344,14 @@ void testApplyTransformationMapper() throws IOException { catalog, mock(State.class)); - final AirbyteRecordMessage recordMessage = - new AirbyteRecordMessage().withStream("stream").withData(Jsons.jsonNode(Map.of("column", "value"))); + final AirbyteMessage recordMessage = + new AirbyteMessage().withType(Type.RECORD) + .withRecord(new AirbyteRecordMessage().withStream("stream").withData(Jsons.jsonNode(Map.of("column", "value")))); + final AirbyteJsonRecordAdapter recordAdapter = new AirbyteJsonRecordAdapter(recordMessage); - Record record = new Record((ObjectNode) recordMessage.getData()); + replicationWorkerHelper.applyTransformationMappers(recordAdapter); - replicationWorkerHelper.applyTransformationMappers(recordMessage); - - verify(recordMapper).applyMappers(record, mappers); + verify(recordMapper).applyMappers(recordAdapter, mappers); } private void mockSupportRefreshes(final boolean supportsRefreshes) throws IOException { diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerTest.java index ca24e678cdf..07981bb71d7 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerTest.java @@ -60,6 +60,7 @@ import io.airbyte.config.SyncStats; import io.airbyte.config.WorkerDestinationConfig; import io.airbyte.config.WorkerSourceConfig; +import io.airbyte.config.adapters.AirbyteJsonRecordAdapter; import io.airbyte.featureflag.EnableMappers; import io.airbyte.featureflag.FeatureFlagClient; import io.airbyte.featureflag.TestClient; @@ -324,8 +325,8 @@ void test() throws Exception { verify(source).start(sourceConfig, jobRoot, replicationInput.getConnectionId()); verify(destination).start(destinationConfig, jobRoot); verify(onReplicationRunning).call(); - verify(replicationWorkerHelper).applyTransformationMappers(RECORD_MESSAGE1.getRecord()); - verify(replicationWorkerHelper).applyTransformationMappers(RECORD_MESSAGE2.getRecord()); + verify(replicationWorkerHelper).applyTransformationMappers(new AirbyteJsonRecordAdapter(RECORD_MESSAGE1)); + verify(replicationWorkerHelper).applyTransformationMappers(new AirbyteJsonRecordAdapter(RECORD_MESSAGE2)); verify(destination).accept(RECORD_MESSAGE1); verify(destination).accept(RECORD_MESSAGE2); verify(source, atLeastOnce()).close(); diff --git a/airbyte-config/config-models/src/main/kotlin/io/airbyte/config/adapters/AirbyteRecord.kt b/airbyte-config/config-models/src/main/kotlin/io/airbyte/config/adapters/AirbyteRecord.kt new file mode 100644 index 00000000000..d8b1f936421 --- /dev/null +++ b/airbyte-config/config-models/src/main/kotlin/io/airbyte/config/adapters/AirbyteRecord.kt @@ -0,0 +1,28 @@ +package io.airbyte.config.adapters + +import io.airbyte.config.StreamDescriptor +import io.airbyte.protocol.models.AirbyteMessage + +interface AirbyteRecord { + val streamDescriptor: StreamDescriptor + val asProtocol: AirbyteMessage + + fun has(fieldName: String): Boolean + + fun get(fieldName: String): Value + + fun remove(fieldName: String) + + fun set( + fieldName: String, + value: T, + ) +} + +interface Value { + fun asBoolean(): Boolean + + fun asNumber(): Number + + fun asString(): String +} diff --git a/airbyte-config/config-models/src/main/kotlin/io/airbyte/config/adapters/JsonAdapters.kt b/airbyte-config/config-models/src/main/kotlin/io/airbyte/config/adapters/JsonAdapters.kt new file mode 100644 index 00000000000..0ee36a6091a --- /dev/null +++ b/airbyte-config/config-models/src/main/kotlin/io/airbyte/config/adapters/JsonAdapters.kt @@ -0,0 +1,49 @@ +package io.airbyte.config.adapters + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.BooleanNode +import com.fasterxml.jackson.databind.node.DoubleNode +import com.fasterxml.jackson.databind.node.IntNode +import com.fasterxml.jackson.databind.node.ObjectNode +import com.fasterxml.jackson.databind.node.TextNode +import io.airbyte.config.StreamDescriptor +import io.airbyte.protocol.models.AirbyteMessage + +class JsonValueAdapter(private val node: JsonNode) : Value { + override fun asBoolean(): Boolean = node.asBoolean() + + override fun asNumber(): Number = node.asDouble() + + override fun asString(): String = node.asText() +} + +data class AirbyteJsonRecordAdapter(private val message: AirbyteMessage) : AirbyteRecord { + override val asProtocol: AirbyteMessage + get() = message + override val streamDescriptor: StreamDescriptor = StreamDescriptor().withNamespace(message.record.namespace).withName(message.record.stream) + private val data: ObjectNode = message.record.data as ObjectNode + + override fun has(fieldName: String): Boolean = data.has(fieldName) + + override fun get(fieldName: String): Value = JsonValueAdapter(data.get(fieldName)) + + override fun remove(fieldName: String) { + data.remove(fieldName) + } + + override fun set( + fieldName: String, + value: T, + ) { + data.set(fieldName, createNode(value)) + } + + private fun createNode(value: T): JsonNode = + when (value) { + is Boolean -> BooleanNode.valueOf(value) + is Double -> DoubleNode.valueOf(value) + is Int -> IntNode.valueOf(value) + is String -> TextNode.valueOf(value) + else -> TODO("Unsupported type ${value::class.java.name}") + } +} diff --git a/airbyte-config/config-models/src/test/kotlin/io/airbyte/config/adapters/JsonRecordAdapterTest.kt b/airbyte-config/config-models/src/test/kotlin/io/airbyte/config/adapters/JsonRecordAdapterTest.kt new file mode 100644 index 00000000000..d42fe0aa47c --- /dev/null +++ b/airbyte-config/config-models/src/test/kotlin/io/airbyte/config/adapters/JsonRecordAdapterTest.kt @@ -0,0 +1,109 @@ +package io.airbyte.config.adapters + +import io.airbyte.commons.json.Jsons +import io.airbyte.config.StreamDescriptor +import io.airbyte.protocol.models.AirbyteMessage +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test + +class JsonRecordAdapterTest { + companion object { + const val BOOLEAN_FIELD = "boolean-field" + const val INT_FIELD = "int-field" + const val NUMBER_FIELD = "number-field" + const val STRING_FIELD = "string-field" + } + + private val jsonRecordString = + """ + { + "type": "RECORD", + "record": { + "stream": "stream-name", + "namespace": "stream-namespace", + "emitted_at": 1337, + "data": { + "$STRING_FIELD": "bar", + "$BOOLEAN_FIELD": true, + "$INT_FIELD": 42, + "$NUMBER_FIELD": 4.2 + } + } + } + """.trimIndent() + + @Test + fun `basic read`() { + val adapter = getAdapterFromRecord(jsonRecordString) + + assertEquals(StreamDescriptor().withName("stream-name").withNamespace("stream-namespace"), adapter.streamDescriptor) + + assertEquals("bar", adapter.get(STRING_FIELD).asString()) + assertEquals(false, adapter.get(STRING_FIELD).asBoolean()) + + assertEquals(true, adapter.get(BOOLEAN_FIELD).asBoolean()) + assertEquals("true", adapter.get(BOOLEAN_FIELD).asString()) + + assertEquals("42", adapter.get(INT_FIELD).asString()) + + assertEquals("4.2", adapter.get(NUMBER_FIELD).asString()) + } + + @Test + fun `verify modify then serialize`() { + val adapter = getAdapterFromRecord(jsonRecordString) + adapter.set(STRING_FIELD, "woohoo") + + val serialized = Jsons.serialize(adapter.asProtocol) + val deserialized = Jsons.deserialize(serialized, AirbyteMessage::class.java) + assertEquals(adapter.asProtocol, deserialized) + } + + @Test + fun `writing boolean`() { + val adapter = getAdapterFromRecord(jsonRecordString) + + adapter.set(STRING_FIELD, true) + assertEquals(true, adapter.get(STRING_FIELD).asBoolean()) + + adapter.set(BOOLEAN_FIELD, false) + assertEquals(false, adapter.get(BOOLEAN_FIELD).asBoolean()) + } + + @Test + fun `writing double`() { + val adapter = getAdapterFromRecord(jsonRecordString) + + adapter.set(NUMBER_FIELD, 1.1) + assertEquals(1.1, adapter.get(NUMBER_FIELD).asNumber()) + + adapter.set(STRING_FIELD, 2) + assertEquals(2.0, adapter.get(STRING_FIELD).asNumber()) + } + + @Test + fun `writing numbers`() { + val adapter = getAdapterFromRecord(jsonRecordString) + + adapter.set(NUMBER_FIELD, 1) + assertEquals(1.0, adapter.get(NUMBER_FIELD).asNumber()) + + adapter.set(STRING_FIELD, 2) + assertEquals(2.0, adapter.get(STRING_FIELD).asNumber()) + } + + @Test + fun `writing strings`() { + val adapter = getAdapterFromRecord(jsonRecordString) + + adapter.set(STRING_FIELD, "updated") + assertEquals("updated", adapter.get(STRING_FIELD).asString()) + + adapter.set(BOOLEAN_FIELD, "overridden") + assertEquals("overridden", adapter.get(BOOLEAN_FIELD).asString()) + } + + fun getAdapterFromRecord(jsonString: String) = AirbyteJsonRecordAdapter(getRecord(jsonString)) + + fun getRecord(jsonString: String): AirbyteMessage = Jsons.deserialize(jsonString, AirbyteMessage::class.java) +} diff --git a/airbyte-mappers/build.gradle.kts b/airbyte-mappers/build.gradle.kts index 7d31f1aa2e2..d3ebb174963 100644 --- a/airbyte-mappers/build.gradle.kts +++ b/airbyte-mappers/build.gradle.kts @@ -17,6 +17,7 @@ dependencies { implementation(libs.kotlin.logging) testImplementation(project(":oss:airbyte-commons")) + testImplementation(libs.airbyte.protocol) testImplementation(libs.mockito.core) testImplementation(libs.mockk) } diff --git a/airbyte-mappers/src/main/kotlin/io/airbyte/mappers/application/RecordMapper.kt b/airbyte-mappers/src/main/kotlin/io/airbyte/mappers/application/RecordMapper.kt index bc1e28bcaf8..1dca2bb6574 100644 --- a/airbyte-mappers/src/main/kotlin/io/airbyte/mappers/application/RecordMapper.kt +++ b/airbyte-mappers/src/main/kotlin/io/airbyte/mappers/application/RecordMapper.kt @@ -1,8 +1,8 @@ package io.airbyte.mappers.application import io.airbyte.config.ConfiguredMapper +import io.airbyte.config.adapters.AirbyteRecord import io.airbyte.mappers.transformations.Mapper -import io.airbyte.mappers.transformations.Record import io.github.oshai.kotlinlogging.KotlinLogging import jakarta.inject.Singleton @@ -13,7 +13,7 @@ class RecordMapper(private val mappers: List) { private val mappersByName: Map = mappers.associateBy { it.name } fun applyMappers( - record: Record, + record: AirbyteRecord, configuredMappers: List, ) { try { diff --git a/airbyte-mappers/src/main/kotlin/io/airbyte/mappers/transformations/HashingMapper.kt b/airbyte-mappers/src/main/kotlin/io/airbyte/mappers/transformations/HashingMapper.kt index 61f377c289a..0bdb84e797e 100644 --- a/airbyte-mappers/src/main/kotlin/io/airbyte/mappers/transformations/HashingMapper.kt +++ b/airbyte-mappers/src/main/kotlin/io/airbyte/mappers/transformations/HashingMapper.kt @@ -7,6 +7,7 @@ import io.airbyte.config.MapperOperationName import io.airbyte.config.MapperSpecification import io.airbyte.config.MapperSpecificationFieldEnum import io.airbyte.config.MapperSpecificationFieldString +import io.airbyte.config.adapters.AirbyteRecord import jakarta.inject.Named import jakarta.inject.Singleton import java.security.MessageDigest @@ -98,16 +99,16 @@ class HashingMapper : Mapper { override fun map( config: ConfiguredMapper, - record: Record, + record: AirbyteRecord, ) { val (targetField, method, fieldNameSuffix) = getConfigValues(config.config) - if (record.data.hasNonNull(targetField)) { - val data = record.data.get(targetField).asText().toByteArray() + if (record.has(targetField)) { + val data = record.get(targetField).asString().toByteArray() val hashedAndEncodeValue: String = hashAndEncodeData(method, data) - record.data.put(targetField + fieldNameSuffix, hashedAndEncodeValue) - record.data.remove(targetField) + record.set(targetField + fieldNameSuffix, hashedAndEncodeValue) + record.remove(targetField) } } diff --git a/airbyte-mappers/src/main/kotlin/io/airbyte/mappers/transformations/Mapper.kt b/airbyte-mappers/src/main/kotlin/io/airbyte/mappers/transformations/Mapper.kt index 3ce12c5f4b2..9c38d982ff5 100644 --- a/airbyte-mappers/src/main/kotlin/io/airbyte/mappers/transformations/Mapper.kt +++ b/airbyte-mappers/src/main/kotlin/io/airbyte/mappers/transformations/Mapper.kt @@ -3,6 +3,7 @@ package io.airbyte.mappers.transformations import io.airbyte.config.ConfiguredMapper import io.airbyte.config.Field import io.airbyte.config.MapperSpecification +import io.airbyte.config.adapters.AirbyteRecord interface Mapper { val name: String @@ -16,6 +17,6 @@ interface Mapper { fun map( config: ConfiguredMapper, - record: Record, + record: AirbyteRecord, ) } diff --git a/airbyte-mappers/src/main/kotlin/io/airbyte/mappers/transformations/Record.kt b/airbyte-mappers/src/main/kotlin/io/airbyte/mappers/transformations/Record.kt deleted file mode 100644 index 91ab7923fd7..00000000000 --- a/airbyte-mappers/src/main/kotlin/io/airbyte/mappers/transformations/Record.kt +++ /dev/null @@ -1,6 +0,0 @@ -package io.airbyte.mappers.transformations - -import com.fasterxml.jackson.databind.node.ObjectNode - -// TODO: Remove when we have the real abstraction around the data in an airbyte record message -data class Record(val data: ObjectNode) diff --git a/airbyte-mappers/src/test/kotlin/io/airbyte/config/adapters/TestRecordAdapter.kt b/airbyte-mappers/src/test/kotlin/io/airbyte/config/adapters/TestRecordAdapter.kt new file mode 100644 index 00000000000..e9801b9222e --- /dev/null +++ b/airbyte-mappers/src/test/kotlin/io/airbyte/config/adapters/TestRecordAdapter.kt @@ -0,0 +1,34 @@ +package io.airbyte.config.adapters + +import io.airbyte.config.StreamDescriptor +import io.airbyte.protocol.models.AirbyteMessage + +class TestValueAdapter(private val value: Any) : Value { + override fun asBoolean(): Boolean = value as Boolean + + override fun asNumber(): Number = value as Number + + override fun asString(): String = value.toString() +} + +class TestRecordAdapter(override val streamDescriptor: StreamDescriptor, data: Map) : AirbyteRecord { + private val data: MutableMap = data.toMutableMap() + + override val asProtocol: AirbyteMessage + get() = TODO("Not yet implemented") + + override fun has(fieldName: String): Boolean = fieldName in data + + override fun get(fieldName: String): Value = TestValueAdapter(data[fieldName] as Any) + + override fun remove(fieldName: String) { + data.remove(fieldName) + } + + override fun set( + fieldName: String, + value: T, + ) { + data[fieldName] = value + } +} diff --git a/airbyte-mappers/src/test/kotlin/io/airbyte/mappers/application/RecordMapperTest.kt b/airbyte-mappers/src/test/kotlin/io/airbyte/mappers/application/RecordMapperTest.kt index 33135ea22ee..84bb0147f0c 100644 --- a/airbyte-mappers/src/test/kotlin/io/airbyte/mappers/application/RecordMapperTest.kt +++ b/airbyte-mappers/src/test/kotlin/io/airbyte/mappers/application/RecordMapperTest.kt @@ -1,10 +1,11 @@ package io.airbyte.mappers.application -import com.fasterxml.jackson.databind.node.ObjectNode import io.airbyte.commons.json.Jsons import io.airbyte.config.ConfiguredMapper +import io.airbyte.config.adapters.AirbyteJsonRecordAdapter import io.airbyte.mappers.mocks.TestMapper -import io.airbyte.mappers.transformations.Record +import io.airbyte.protocol.models.AirbyteMessage +import io.airbyte.protocol.models.AirbyteRecordMessage import io.mockk.spyk import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test @@ -13,30 +14,39 @@ class RecordMapperTest { private val mapper = spyk(TestMapper()) private val recordMapper = RecordMapper(listOf(mapper)) - private val inputRecord = Record(Jsons.jsonNode(mapOf("field1" to "value1")) as ObjectNode) + private val sampleRecord = createRecord(mapOf("field1" to "value1")) @Test fun testMapperNoConfig() { - val copiedRecord = Jsons.clone(inputRecord) + val testRecord = sampleRecord.deepCopy() - recordMapper.applyMappers(copiedRecord, listOf()) + recordMapper.applyMappers(testRecord, listOf()) - assertEquals(copiedRecord, copiedRecord) + assertEquals(sampleRecord, testRecord) } @Test fun testMapperWithConfig() { - val copiedRecord = Jsons.clone(inputRecord) + val testRecord = sampleRecord.deepCopy() recordMapper.applyMappers( - copiedRecord, + testRecord, listOf( - ConfiguredMapper("test", mapOf()), - ConfiguredMapper("test", mapOf()), + ConfiguredMapper("test", mapOf("target" to "field1")), + ConfiguredMapper("test", mapOf("target" to "field1_test")), ), ) - val expectedRecord = Record(Jsons.jsonNode(mapOf("field1_test_test" to "value1")) as ObjectNode) - assertEquals(expectedRecord, copiedRecord) + val expectedRecord = createRecord(mapOf("field1_test_test" to "value1")) + assertEquals(expectedRecord, testRecord) } + + fun createRecord(data: Map) = + AirbyteJsonRecordAdapter( + AirbyteMessage() + .withType(AirbyteMessage.Type.RECORD) + .withRecord(AirbyteRecordMessage().withStream("stream").withData(Jsons.jsonNode(data))), + ) + + fun AirbyteJsonRecordAdapter.deepCopy() = this.copy(message = Jsons.clone(this.asProtocol)) } diff --git a/airbyte-mappers/src/test/kotlin/io/airbyte/mappers/mocks/TestMapper.kt b/airbyte-mappers/src/test/kotlin/io/airbyte/mappers/mocks/TestMapper.kt index 7bf49bf4b30..4af645cfdcd 100644 --- a/airbyte-mappers/src/test/kotlin/io/airbyte/mappers/mocks/TestMapper.kt +++ b/airbyte-mappers/src/test/kotlin/io/airbyte/mappers/mocks/TestMapper.kt @@ -4,8 +4,8 @@ import io.airbyte.config.ConfiguredMapper import io.airbyte.config.Field import io.airbyte.config.FieldType import io.airbyte.config.MapperSpecification +import io.airbyte.config.adapters.AirbyteRecord import io.airbyte.mappers.transformations.Mapper -import io.airbyte.mappers.transformations.Record class TestMapper : Mapper { override val name: String = "test" @@ -22,11 +22,10 @@ class TestMapper : Mapper { override fun map( config: ConfiguredMapper, - record: Record, + record: AirbyteRecord, ) { - record.data.properties().forEach { - record.data.putIfAbsent(it.key + "_test", it.value) - record.data.remove(it.key) - } + val targetField = config.config["target"] ?: throw IllegalArgumentException("target is not defined") + record.set("${targetField}_test", record.get(targetField).asString()) + record.remove(targetField) } } diff --git a/airbyte-mappers/src/test/kotlin/io/airbyte/mappers/transformations/HashingMapperTest.kt b/airbyte-mappers/src/test/kotlin/io/airbyte/mappers/transformations/HashingMapperTest.kt index c62cbbd2984..c9e926cc3c5 100644 --- a/airbyte-mappers/src/test/kotlin/io/airbyte/mappers/transformations/HashingMapperTest.kt +++ b/airbyte-mappers/src/test/kotlin/io/airbyte/mappers/transformations/HashingMapperTest.kt @@ -1,10 +1,10 @@ package io.airbyte.mappers.transformations -import com.fasterxml.jackson.databind.node.ObjectNode -import io.airbyte.commons.json.Jsons import io.airbyte.config.ConfiguredMapper import io.airbyte.config.Field import io.airbyte.config.FieldType +import io.airbyte.config.StreamDescriptor +import io.airbyte.config.adapters.TestRecordAdapter import io.airbyte.mappers.transformations.HashingMapper.Companion.supportedMethods import io.mockk.every import io.mockk.spyk @@ -20,16 +20,6 @@ import java.security.Security class HashingMapperTest { private val hashingMapper = spyk(HashingMapper()) - private val testObject = - Jsons.deserialize( - """ - { - "field1": "value1", - "field2": "value2" - } - """, - ) as ObjectNode - @Test fun specReturnsCorrectSpecification() { val spec = hashingMapper.spec() @@ -103,14 +93,14 @@ class HashingMapperTest { every { hashingMapper.hashAndEncodeData(HashingMapper.SHA256, "value1".toByteArray()) } returns "hashed_value" - val record = Record(testObject) + val record = TestRecordAdapter(StreamDescriptor().withName("stream"), mapOf("field1" to "value1", "field2" to "value2")) hashingMapper.map(config, record) - assertTrue(record.data.has("field1_hashed")) - assertEquals("hashed_value", record.data.get("field1_hashed").asText()) - assertFalse(record.data.has("field1")) - assertTrue(record.data.has("field2")) - assertEquals("value2", record.data.get("field2").asText()) + assertTrue(record.has("field1_hashed")) + assertEquals("hashed_value", record.get("field1_hashed").asString()) + assertFalse(record.has("field1")) + assertTrue(record.has("field2")) + assertEquals("value2", record.get("field2").asString()) } @Test @@ -124,9 +114,9 @@ class HashingMapperTest { HashingMapper.FIELD_NAME_SUFFIX_CONFIG_KEY to "_hashed", ), ) - val record = Record(testObject) + assertThrows(IllegalArgumentException::class.java) { - hashingMapper.map(config, record) + hashingMapper.map(config, TestRecordAdapter(StreamDescriptor().withName("any"), mapOf("field1" to "anything"))) } }