Skip to content

Commit

Permalink
chore: introduce AirbyteRecord and AirbyteJsonRecordAdapter (#13774)
Browse files Browse the repository at this point in the history
  • Loading branch information
gosusnp committed Sep 4, 2024
1 parent d9e3dd5 commit cbfbfe8
Show file tree
Hide file tree
Showing 15 changed files with 288 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package io.airbyte.workers.general

import com.fasterxml.jackson.core.JsonProcessingException
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.node.ObjectNode
import com.google.common.annotations.VisibleForTesting
import io.airbyte.api.client.AirbyteApiClient
import io.airbyte.api.client.model.generated.ActorType
Expand All @@ -28,11 +27,12 @@ import io.airbyte.config.State
import io.airbyte.config.StreamDescriptor
import io.airbyte.config.SyncStats
import io.airbyte.config.WorkerDestinationConfig
import io.airbyte.config.adapters.AirbyteJsonRecordAdapter
import io.airbyte.config.adapters.AirbyteRecord
import io.airbyte.featureflag.Connection
import io.airbyte.featureflag.EnableMappers
import io.airbyte.featureflag.FeatureFlagClient
import io.airbyte.mappers.application.RecordMapper
import io.airbyte.mappers.transformations.Record
import io.airbyte.metrics.lib.ApmTraceUtils
import io.airbyte.metrics.lib.MetricAttribute
import io.airbyte.metrics.lib.MetricClient
Expand All @@ -42,7 +42,6 @@ import io.airbyte.metrics.lib.OssMetricsRegistry
import io.airbyte.persistence.job.models.ReplicationInput
import io.airbyte.protocol.models.AirbyteMessage
import io.airbyte.protocol.models.AirbyteMessage.Type
import io.airbyte.protocol.models.AirbyteRecordMessage
import io.airbyte.protocol.models.AirbyteStateMessage
import io.airbyte.protocol.models.AirbyteStateStats
import io.airbyte.protocol.models.AirbyteTraceMessage
Expand Down Expand Up @@ -410,7 +409,7 @@ class ReplicationWorkerHelper(
}

if (sourceRawMessage.type == Type.RECORD) {
applyTransformationMappers(sourceRawMessage.record)
applyTransformationMappers(AirbyteJsonRecordAdapter(sourceRawMessage))
}

return sourceRawMessage
Expand Down Expand Up @@ -472,12 +471,12 @@ class ReplicationWorkerHelper(
return airbyteApiClient.destinationApi.getDestination(DestinationIdRequestBody(destinationId = destinationId)).destinationDefinitionId
}

fun applyTransformationMappers(message: AirbyteRecordMessage) {
fun applyTransformationMappers(message: AirbyteRecord) {
if (mapperEnabled) {
val mappersForStream: List<ConfiguredMapper> =
mappersPerStreamDescriptor[StreamDescriptor().withName(message.stream).withNamespace(message.namespace)] ?: listOf()
mappersPerStreamDescriptor[message.streamDescriptor] ?: listOf()

recordMapper.applyMappers(Record(message.data as ObjectNode), mappersForStream)
recordMapper.applyMappers(message, mappersForStream)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.generated.ActorDefinitionVersionApi;
import io.airbyte.api.client.generated.DestinationApi;
Expand All @@ -33,12 +32,12 @@
import io.airbyte.config.State;
import io.airbyte.config.StreamDescriptor;
import io.airbyte.config.WorkerDestinationConfig;
import io.airbyte.config.adapters.AirbyteJsonRecordAdapter;
import io.airbyte.featureflag.Connection;
import io.airbyte.featureflag.EnableMappers;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.TestClient;
import io.airbyte.mappers.application.RecordMapper;
import io.airbyte.mappers.transformations.Record;
import io.airbyte.persistence.job.models.ReplicationInput;
import io.airbyte.protocol.models.AirbyteAnalyticsTraceMessage;
import io.airbyte.protocol.models.AirbyteLogMessage;
Expand Down Expand Up @@ -312,12 +311,13 @@ void testApplyTransformationFlagDisableOrNoMapper(final boolean mappersEnabled)
mock(ConfiguredAirbyteCatalog.class),
mock(State.class));

final AirbyteRecordMessage recordMessage = new AirbyteRecordMessage().withStream("stream").withData(Jsons.jsonNode(Map.of("column", "value")));
final AirbyteRecordMessage copiedRecordMessage = Jsons.clone(recordMessage);
final AirbyteMessage recordMessage = new AirbyteMessage().withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream("stream").withData(Jsons.jsonNode(Map.of("column", "value"))));
final AirbyteMessage copiedRecordMessage = Jsons.clone(recordMessage);

when(featureFlagClient.boolVariation(EnableMappers.INSTANCE, new Connection(replicationContext.getConnectionId()))).thenReturn(mappersEnabled);

replicationWorkerHelper.applyTransformationMappers(recordMessage);
replicationWorkerHelper.applyTransformationMappers(new AirbyteJsonRecordAdapter(recordMessage));

assertEquals(copiedRecordMessage, recordMessage);
verifyNoInteractions(recordMapper);
Expand All @@ -344,14 +344,14 @@ void testApplyTransformationMapper() throws IOException {
catalog,
mock(State.class));

final AirbyteRecordMessage recordMessage =
new AirbyteRecordMessage().withStream("stream").withData(Jsons.jsonNode(Map.of("column", "value")));
final AirbyteMessage recordMessage =
new AirbyteMessage().withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream("stream").withData(Jsons.jsonNode(Map.of("column", "value"))));
final AirbyteJsonRecordAdapter recordAdapter = new AirbyteJsonRecordAdapter(recordMessage);

Record record = new Record((ObjectNode) recordMessage.getData());
replicationWorkerHelper.applyTransformationMappers(recordAdapter);

replicationWorkerHelper.applyTransformationMappers(recordMessage);

verify(recordMapper).applyMappers(record, mappers);
verify(recordMapper).applyMappers(recordAdapter, mappers);
}

private void mockSupportRefreshes(final boolean supportsRefreshes) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import io.airbyte.config.SyncStats;
import io.airbyte.config.WorkerDestinationConfig;
import io.airbyte.config.WorkerSourceConfig;
import io.airbyte.config.adapters.AirbyteJsonRecordAdapter;
import io.airbyte.featureflag.EnableMappers;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.TestClient;
Expand Down Expand Up @@ -324,8 +325,8 @@ void test() throws Exception {
verify(source).start(sourceConfig, jobRoot, replicationInput.getConnectionId());
verify(destination).start(destinationConfig, jobRoot);
verify(onReplicationRunning).call();
verify(replicationWorkerHelper).applyTransformationMappers(RECORD_MESSAGE1.getRecord());
verify(replicationWorkerHelper).applyTransformationMappers(RECORD_MESSAGE2.getRecord());
verify(replicationWorkerHelper).applyTransformationMappers(new AirbyteJsonRecordAdapter(RECORD_MESSAGE1));
verify(replicationWorkerHelper).applyTransformationMappers(new AirbyteJsonRecordAdapter(RECORD_MESSAGE2));
verify(destination).accept(RECORD_MESSAGE1);
verify(destination).accept(RECORD_MESSAGE2);
verify(source, atLeastOnce()).close();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.airbyte.config.adapters

import io.airbyte.config.StreamDescriptor
import io.airbyte.protocol.models.AirbyteMessage

interface AirbyteRecord {
val streamDescriptor: StreamDescriptor
val asProtocol: AirbyteMessage

fun has(fieldName: String): Boolean

fun get(fieldName: String): Value

fun remove(fieldName: String)

fun <T : Any> set(
fieldName: String,
value: T,
)
}

interface Value {
fun asBoolean(): Boolean

fun asNumber(): Number

fun asString(): String
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package io.airbyte.config.adapters

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.BooleanNode
import com.fasterxml.jackson.databind.node.DoubleNode
import com.fasterxml.jackson.databind.node.IntNode
import com.fasterxml.jackson.databind.node.ObjectNode
import com.fasterxml.jackson.databind.node.TextNode
import io.airbyte.config.StreamDescriptor
import io.airbyte.protocol.models.AirbyteMessage

class JsonValueAdapter(private val node: JsonNode) : Value {
override fun asBoolean(): Boolean = node.asBoolean()

override fun asNumber(): Number = node.asDouble()

override fun asString(): String = node.asText()
}

data class AirbyteJsonRecordAdapter(private val message: AirbyteMessage) : AirbyteRecord {
override val asProtocol: AirbyteMessage
get() = message
override val streamDescriptor: StreamDescriptor = StreamDescriptor().withNamespace(message.record.namespace).withName(message.record.stream)
private val data: ObjectNode = message.record.data as ObjectNode

override fun has(fieldName: String): Boolean = data.has(fieldName)

override fun get(fieldName: String): Value = JsonValueAdapter(data.get(fieldName))

override fun remove(fieldName: String) {
data.remove(fieldName)
}

override fun <T : Any> set(
fieldName: String,
value: T,
) {
data.set<JsonNode>(fieldName, createNode(value))
}

private fun <T : Any> createNode(value: T): JsonNode =
when (value) {
is Boolean -> BooleanNode.valueOf(value)
is Double -> DoubleNode.valueOf(value)
is Int -> IntNode.valueOf(value)
is String -> TextNode.valueOf(value)
else -> TODO("Unsupported type ${value::class.java.name}")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package io.airbyte.config.adapters

import io.airbyte.commons.json.Jsons
import io.airbyte.config.StreamDescriptor
import io.airbyte.protocol.models.AirbyteMessage
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test

class JsonRecordAdapterTest {
companion object {
const val BOOLEAN_FIELD = "boolean-field"
const val INT_FIELD = "int-field"
const val NUMBER_FIELD = "number-field"
const val STRING_FIELD = "string-field"
}

private val jsonRecordString =
"""
{
"type": "RECORD",
"record": {
"stream": "stream-name",
"namespace": "stream-namespace",
"emitted_at": 1337,
"data": {
"$STRING_FIELD": "bar",
"$BOOLEAN_FIELD": true,
"$INT_FIELD": 42,
"$NUMBER_FIELD": 4.2
}
}
}
""".trimIndent()

@Test
fun `basic read`() {
val adapter = getAdapterFromRecord(jsonRecordString)

assertEquals(StreamDescriptor().withName("stream-name").withNamespace("stream-namespace"), adapter.streamDescriptor)

assertEquals("bar", adapter.get(STRING_FIELD).asString())
assertEquals(false, adapter.get(STRING_FIELD).asBoolean())

assertEquals(true, adapter.get(BOOLEAN_FIELD).asBoolean())
assertEquals("true", adapter.get(BOOLEAN_FIELD).asString())

assertEquals("42", adapter.get(INT_FIELD).asString())

assertEquals("4.2", adapter.get(NUMBER_FIELD).asString())
}

@Test
fun `verify modify then serialize`() {
val adapter = getAdapterFromRecord(jsonRecordString)
adapter.set(STRING_FIELD, "woohoo")

val serialized = Jsons.serialize(adapter.asProtocol)
val deserialized = Jsons.deserialize(serialized, AirbyteMessage::class.java)
assertEquals(adapter.asProtocol, deserialized)
}

@Test
fun `writing boolean`() {
val adapter = getAdapterFromRecord(jsonRecordString)

adapter.set(STRING_FIELD, true)
assertEquals(true, adapter.get(STRING_FIELD).asBoolean())

adapter.set(BOOLEAN_FIELD, false)
assertEquals(false, adapter.get(BOOLEAN_FIELD).asBoolean())
}

@Test
fun `writing double`() {
val adapter = getAdapterFromRecord(jsonRecordString)

adapter.set(NUMBER_FIELD, 1.1)
assertEquals(1.1, adapter.get(NUMBER_FIELD).asNumber())

adapter.set(STRING_FIELD, 2)
assertEquals(2.0, adapter.get(STRING_FIELD).asNumber())
}

@Test
fun `writing numbers`() {
val adapter = getAdapterFromRecord(jsonRecordString)

adapter.set(NUMBER_FIELD, 1)
assertEquals(1.0, adapter.get(NUMBER_FIELD).asNumber())

adapter.set(STRING_FIELD, 2)
assertEquals(2.0, adapter.get(STRING_FIELD).asNumber())
}

@Test
fun `writing strings`() {
val adapter = getAdapterFromRecord(jsonRecordString)

adapter.set(STRING_FIELD, "updated")
assertEquals("updated", adapter.get(STRING_FIELD).asString())

adapter.set(BOOLEAN_FIELD, "overridden")
assertEquals("overridden", adapter.get(BOOLEAN_FIELD).asString())
}

fun getAdapterFromRecord(jsonString: String) = AirbyteJsonRecordAdapter(getRecord(jsonString))

fun getRecord(jsonString: String): AirbyteMessage = Jsons.deserialize(jsonString, AirbyteMessage::class.java)
}
1 change: 1 addition & 0 deletions airbyte-mappers/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dependencies {
implementation(libs.kotlin.logging)

testImplementation(project(":oss:airbyte-commons"))
testImplementation(libs.airbyte.protocol)
testImplementation(libs.mockito.core)
testImplementation(libs.mockk)
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package io.airbyte.mappers.application

import io.airbyte.config.ConfiguredMapper
import io.airbyte.config.adapters.AirbyteRecord
import io.airbyte.mappers.transformations.Mapper
import io.airbyte.mappers.transformations.Record
import io.github.oshai.kotlinlogging.KotlinLogging
import jakarta.inject.Singleton

Expand All @@ -13,7 +13,7 @@ class RecordMapper(private val mappers: List<Mapper>) {
private val mappersByName: Map<String, Mapper> = mappers.associateBy { it.name }

fun applyMappers(
record: Record,
record: AirbyteRecord,
configuredMappers: List<ConfiguredMapper>,
) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import io.airbyte.config.MapperOperationName
import io.airbyte.config.MapperSpecification
import io.airbyte.config.MapperSpecificationFieldEnum
import io.airbyte.config.MapperSpecificationFieldString
import io.airbyte.config.adapters.AirbyteRecord
import jakarta.inject.Named
import jakarta.inject.Singleton
import java.security.MessageDigest
Expand Down Expand Up @@ -98,16 +99,16 @@ class HashingMapper : Mapper {

override fun map(
config: ConfiguredMapper,
record: Record,
record: AirbyteRecord,
) {
val (targetField, method, fieldNameSuffix) = getConfigValues(config.config)

if (record.data.hasNonNull(targetField)) {
val data = record.data.get(targetField).asText().toByteArray()
if (record.has(targetField)) {
val data = record.get(targetField).asString().toByteArray()

val hashedAndEncodeValue: String = hashAndEncodeData(method, data)
record.data.put(targetField + fieldNameSuffix, hashedAndEncodeValue)
record.data.remove(targetField)
record.set(targetField + fieldNameSuffix, hashedAndEncodeValue)
record.remove(targetField)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.airbyte.mappers.transformations
import io.airbyte.config.ConfiguredMapper
import io.airbyte.config.Field
import io.airbyte.config.MapperSpecification
import io.airbyte.config.adapters.AirbyteRecord

interface Mapper {
val name: String
Expand All @@ -16,6 +17,6 @@ interface Mapper {

fun map(
config: ConfiguredMapper,
record: Record,
record: AirbyteRecord,
)
}
Loading

0 comments on commit cbfbfe8

Please sign in to comment.