Skip to content

Commit

Permalink
chore: properly write state_state_ for source connector. (#13672)
Browse files Browse the repository at this point in the history
  • Loading branch information
tryangul committed Aug 26, 2024
1 parent 3ed72e6 commit d25b622
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,15 @@ class ReplicationHydrationProcessor(
SOURCE_DIR,
)

// ensure empty state serializes properly as we will pass it to the connector
val serializedState =
hydrated.state?.state?.let {
serializer.serialize(it)
} ?: "{}"

fileClient.writeInputFile(
FileConstants.INPUT_STATE_FILE,
serializer.serialize(hydrated.state),
serializedState,
SOURCE_DIR,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import io.airbyte.commons.protocol.ProtocolSerializer
import io.airbyte.config.AirbyteStream
import io.airbyte.config.ConfiguredAirbyteCatalog
import io.airbyte.config.ConfiguredAirbyteStream
import io.airbyte.config.State
import io.airbyte.config.SyncMode
import io.airbyte.initContainer.input.ReplicationHydrationProcessorTest.Fixtures.SERIALIZED_STATE
import io.airbyte.initContainer.system.FileClient
import io.airbyte.persistence.job.models.IntegrationLauncherConfig
import io.airbyte.persistence.job.models.ReplicationInput
Expand All @@ -22,9 +24,12 @@ import io.mockk.impl.annotations.MockK
import io.mockk.junit5.MockKExtension
import io.mockk.verify
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments
import org.junit.jupiter.params.provider.MethodSource
import java.util.UUID
import java.util.stream.Stream

@ExtendWith(MockKExtension::class)
class ReplicationHydrationProcessorTest {
Expand Down Expand Up @@ -57,8 +62,12 @@ class ReplicationHydrationProcessorTest {
)
}

@Test
fun `parses input, hydrates, and writes output to expected files`() {
@ParameterizedTest
@MethodSource("stateMatrix")
fun `parses input, hydrates, and writes output to expected files`(
state: State?,
expectedSerializedState: String,
) {
val input = Fixtures.workload
val catalog =
ConfiguredAirbyteCatalog(
Expand All @@ -78,6 +87,7 @@ class ReplicationHydrationProcessorTest {
.withDestinationSupportsRefreshes(true)
.withCatalog(catalog)
.withPrefix("dest_test") // for validating the mapper ran
.withState(state)
val mapper =
NamespacingMapper(
null,
Expand All @@ -90,14 +100,14 @@ class ReplicationHydrationProcessorTest {
val serializedSrcConfig = "serialized src config"
val serializedDestCatalog = "serialized dest catalog"
val serializedDestConfig = "serialized dest config"
val serializedState = "serialized state config"
val serializedState = SERIALIZED_STATE

every { deserializer.toReplicationActivityInput(input.inputPayload) } returns activityInput
every { replicationInputHydrator.getHydratedReplicationInput(activityInput) } returns hydrated
every { serializer.serialize(hydrated) } returns serializedReplInput
every { serializer.serialize(hydrated.sourceConfiguration) } returns serializedSrcConfig
every { serializer.serialize(hydrated.destinationConfiguration) } returns serializedDestConfig
every { serializer.serialize(hydrated.state) } returns serializedState
every { serializer.serialize(hydrated.state?.state) } returns serializedState
every { protocolSerializer.serialize(hydrated.catalog, false) } returns serializedSrcCatalog
every { protocolSerializer.serialize(mapper.mapCatalog(hydrated.catalog), hydrated.destinationSupportsRefreshes) } returns serializedDestCatalog

Expand All @@ -109,18 +119,30 @@ class ReplicationHydrationProcessorTest {
verify { fileClient.writeInputFile(FileConstants.INIT_INPUT_FILE, serializedReplInput) }
verify { serializer.serialize(hydrated.sourceConfiguration) }
verify { serializer.serialize(hydrated.destinationConfiguration) }
verify { serializer.serialize(hydrated.state) }
verify { protocolSerializer.serialize(hydrated.catalog, false) }
verify { protocolSerializer.serialize(mapper.mapCatalog(hydrated.catalog), hydrated.destinationSupportsRefreshes) }
verify { fileClient.writeInputFile(FileConstants.CATALOG_FILE, serializedSrcCatalog, FileConstants.SOURCE_DIR) }
verify { fileClient.writeInputFile(FileConstants.CONNECTOR_CONFIG_FILE, serializedSrcConfig, FileConstants.SOURCE_DIR) }
verify { fileClient.writeInputFile(FileConstants.INPUT_STATE_FILE, serializedState, FileConstants.SOURCE_DIR) }
verify { fileClient.writeInputFile(FileConstants.INPUT_STATE_FILE, expectedSerializedState, FileConstants.SOURCE_DIR) }
verify { fileClient.writeInputFile(FileConstants.CATALOG_FILE, serializedDestCatalog, FileConstants.DEST_DIR) }
verify { fileClient.writeInputFile(FileConstants.CONNECTOR_CONFIG_FILE, serializedDestConfig, FileConstants.DEST_DIR) }
verify { fileClient.makeNamedPipes() }
}

companion object {
// Validates empty or null states serialize as "{}"
@JvmStatic
private fun stateMatrix(): Stream<Arguments> {
return Stream.of(
Arguments.of(State().withState(null), "{}"),
Arguments.of(null, "{}"),
Arguments.of(State().withState(Jsons.jsonNode("this is" to "nested for some reason")), SERIALIZED_STATE),
)
}
}

object Fixtures {
const val SERIALIZED_STATE = "serialized state config"
private const val WORKLOAD_ID = "workload-id-13"

val workload =
Expand Down

0 comments on commit d25b622

Please sign in to comment.