Skip to content

Commit

Permalink
chore: adds fetching init container for connector operations (#13373)
Browse files Browse the repository at this point in the history
  • Loading branch information
tryangul committed Aug 12, 2024
1 parent 3444015 commit c25efc9
Show file tree
Hide file tree
Showing 16 changed files with 663 additions and 84 deletions.
2 changes: 2 additions & 0 deletions airbyte-featureflag/src/main/kotlin/FlagDefinitions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ object ResetStreamsStateWhenDisabled : Temporary<Boolean>(key = "reset-stream-st

object OrchestratorFetchesInputFromInit : Temporary<Boolean>(key = "orchestrator-fetches-from-init", default = false)

object ConnectorSidecarFetchesInputFromInit : Temporary<Boolean>(key = "connector-sidecar-fetches-from-init", default = false)

object RefreshConfigBeforeSecretHydration : Temporary<Boolean>(key = "platform.refresh-config-before-secret-hydration", default = false)

object LogStateMsgs : Temporary<Boolean>(key = "platform.log-state-msgs", default = false)
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import io.airbyte.config.secrets.SecretsRepositoryReader
import io.airbyte.featureflag.FeatureFlagClient
import io.airbyte.metrics.lib.MetricClient
import io.airbyte.metrics.lib.NotImplementedMetricClient
import io.airbyte.workers.CheckConnectionInputHydrator
import io.airbyte.workers.ConnectorSecretsHydrator
import io.airbyte.workers.DiscoverCatalogInputHydrator
import io.airbyte.workers.ReplicationInputHydrator
import io.airbyte.workers.helper.ResumableFullRefreshStatsHelper
import io.micronaut.context.annotation.Factory
Expand All @@ -24,4 +27,27 @@ class ApplicationBeanFactory {

@Singleton
fun metricClient(): MetricClient = NotImplementedMetricClient()

@Singleton
fun baseConnectorInputHydrator(
airbyteApiClient: AirbyteApiClient,
secretsRepositoryReader: SecretsRepositoryReader,
featureFlagClient: FeatureFlagClient,
): ConnectorSecretsHydrator {
return ConnectorSecretsHydrator(
secretsRepositoryReader = secretsRepositoryReader,
airbyteApiClient = airbyteApiClient,
featureFlagClient = featureFlagClient,
)
}

@Singleton
fun checkInputHydrator(connectorSecretsHydrator: ConnectorSecretsHydrator): CheckConnectionInputHydrator {
return CheckConnectionInputHydrator(connectorSecretsHydrator)
}

@Singleton
fun discoverCatalogInputHydrator(connectorSecretsHydrator: ConnectorSecretsHydrator): DiscoverCatalogInputHydrator {
return DiscoverCatalogInputHydrator(connectorSecretsHydrator)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.airbyte.initContainer.input

import io.airbyte.initContainer.system.FileClient
import io.airbyte.workers.CheckConnectionInputHydrator
import io.airbyte.workers.models.CheckConnectionInput
import io.airbyte.workers.models.SidecarInput
import io.airbyte.workers.serde.ObjectSerializer
import io.airbyte.workers.serde.PayloadDeserializer
import io.airbyte.workers.sync.OrchestratorConstants
import io.airbyte.workload.api.client.model.generated.Workload
import io.micronaut.context.annotation.Requires
import jakarta.inject.Singleton

@Requires(property = "airbyte.init.operation", pattern = "check")
@Singleton
class CheckHydrationProcessor(
private val inputHydrator: CheckConnectionInputHydrator,
private val deserializer: PayloadDeserializer,
private val serializer: ObjectSerializer,
private val fileClient: FileClient,
) : InputHydrationProcessor {
override fun process(workload: Workload) {
val rawPayload = workload.inputPayload
val parsed: CheckConnectionInput = deserializer.toCheckConnectionInput(rawPayload)

val hydrated = inputHydrator.getHydratedStandardCheckInput(parsed.checkConnectionInput)

fileClient.writeInputFile(
OrchestratorConstants.CONNECTION_CONFIGURATION,
serializer.serialize(hydrated.connectionConfiguration),
)

fileClient.writeInputFile(
OrchestratorConstants.SIDECAR_INPUT,
serializer.serialize(
SidecarInput(
hydrated,
null,
workload.id,
parsed.launcherConfig,
SidecarInput.OperationType.CHECK,
workload.logPath,
),
),
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.airbyte.initContainer.input

import io.airbyte.initContainer.system.FileClient
import io.airbyte.workers.DiscoverCatalogInputHydrator
import io.airbyte.workers.models.DiscoverCatalogInput
import io.airbyte.workers.models.SidecarInput
import io.airbyte.workers.serde.ObjectSerializer
import io.airbyte.workers.serde.PayloadDeserializer
import io.airbyte.workers.sync.OrchestratorConstants
import io.airbyte.workload.api.client.model.generated.Workload
import io.micronaut.context.annotation.Requires
import jakarta.inject.Singleton

@Requires(property = "airbyte.init.operation", pattern = "discover")
@Singleton
class DiscoverHydrationProcessor(
private val inputHydrator: DiscoverCatalogInputHydrator,
private val deserializer: PayloadDeserializer,
private val serializer: ObjectSerializer,
private val fileClient: FileClient,
) : InputHydrationProcessor {
override fun process(workload: Workload) {
val rawPayload = workload.inputPayload
val parsed: DiscoverCatalogInput = deserializer.toDiscoverCatalogInput(rawPayload)

val hydrated = inputHydrator.getHydratedStandardDiscoverInput(parsed.discoverCatalogInput)

fileClient.writeInputFile(
OrchestratorConstants.CONNECTION_CONFIGURATION,
serializer.serialize(hydrated.connectionConfiguration),
)

fileClient.writeInputFile(
OrchestratorConstants.SIDECAR_INPUT,
serializer.serialize(
SidecarInput(
null,
hydrated,
workload.id,
parsed.launcherConfig,
SidecarInput.OperationType.DISCOVER,
workload.logPath,
),
),
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.airbyte.initContainer.input

import io.airbyte.initContainer.system.FileClient
import io.airbyte.workers.models.SidecarInput
import io.airbyte.workers.models.SpecInput
import io.airbyte.workers.serde.ObjectSerializer
import io.airbyte.workers.serde.PayloadDeserializer
import io.airbyte.workers.sync.OrchestratorConstants
import io.airbyte.workload.api.client.model.generated.Workload
import io.micronaut.context.annotation.Requires
import jakarta.inject.Singleton

@Requires(property = "airbyte.init.operation", pattern = "spec")
@Singleton
class SpecHydrationProcessor(
private val deserializer: PayloadDeserializer,
private val serializer: ObjectSerializer,
private val fileClient: FileClient,
) : InputHydrationProcessor {
override fun process(workload: Workload) {
val rawPayload = workload.inputPayload
val parsed: SpecInput = deserializer.toSpecInput(rawPayload)

fileClient.writeInputFile(
OrchestratorConstants.SIDECAR_INPUT,
serializer.serialize(
SidecarInput(
null,
null,
workload.id,
parsed.launcherConfig,
SidecarInput.OperationType.SPEC,
workload.logPath,
),
),
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package io.airbyte.initContainer.input

import io.airbyte.commons.json.Jsons
import io.airbyte.config.StandardCheckConnectionInput
import io.airbyte.initContainer.system.FileClient
import io.airbyte.persistence.job.models.IntegrationLauncherConfig
import io.airbyte.workers.CheckConnectionInputHydrator
import io.airbyte.workers.models.CheckConnectionInput
import io.airbyte.workers.models.SidecarInput
import io.airbyte.workers.serde.ObjectSerializer
import io.airbyte.workers.serde.PayloadDeserializer
import io.airbyte.workers.sync.OrchestratorConstants
import io.airbyte.workload.api.client.model.generated.Workload
import io.airbyte.workload.api.client.model.generated.WorkloadType
import io.mockk.every
import io.mockk.impl.annotations.MockK
import io.mockk.junit5.MockKExtension
import io.mockk.verify
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import java.util.UUID

@ExtendWith(MockKExtension::class)
class CheckHydrationProcessorTest {
@MockK
lateinit var inputHydrator: CheckConnectionInputHydrator

@MockK
lateinit var serializer: ObjectSerializer

@MockK
lateinit var deserializer: PayloadDeserializer

@MockK
lateinit var fileClient: FileClient

private lateinit var processor: CheckHydrationProcessor

@BeforeEach
fun setup() {
processor =
CheckHydrationProcessor(
inputHydrator,
deserializer,
serializer,
fileClient,
)
}

@Test
fun `parses input, hydrates and writes output to expected file`() {
val input = Fixtures.workload

val unhydrated = StandardCheckConnectionInput()
val parsed = CheckConnectionInput()
parsed.checkConnectionInput = unhydrated
parsed.launcherConfig = IntegrationLauncherConfig()

val connectionConfiguration = Jsons.jsonNode(mapOf("key-1" to "value-1"))
val hydrated = StandardCheckConnectionInput()
hydrated.connectionConfiguration = connectionConfiguration
val serializedConfig = "serialized hydrated config"
val serializedInput = "serialized hydrated blob"

val sidecarInput =
SidecarInput(
hydrated,
null,
input.id,
parsed.launcherConfig,
SidecarInput.OperationType.CHECK,
input.logPath,
)

every { deserializer.toCheckConnectionInput(input.inputPayload) } returns parsed
every { inputHydrator.getHydratedStandardCheckInput(unhydrated) } returns hydrated
every { serializer.serialize(connectionConfiguration) } returns serializedConfig
every { serializer.serialize(sidecarInput) } returns serializedInput
every { fileClient.writeInputFile(OrchestratorConstants.CONNECTION_CONFIGURATION, serializedConfig) } returns Unit
every { fileClient.writeInputFile(OrchestratorConstants.SIDECAR_INPUT, serializedInput) } returns Unit

processor.process(input)

verify { deserializer.toCheckConnectionInput(input.inputPayload) }
verify { inputHydrator.getHydratedStandardCheckInput(unhydrated) }
verify { serializer.serialize(connectionConfiguration) }
verify { serializer.serialize(sidecarInput) }
verify { fileClient.writeInputFile(OrchestratorConstants.CONNECTION_CONFIGURATION, serializedConfig) }
verify { fileClient.writeInputFile(OrchestratorConstants.SIDECAR_INPUT, serializedInput) }
}

object Fixtures {
private const val WORKLOAD_ID = "workload-id-14"

val workload =
Workload(
WORKLOAD_ID,
listOf(),
"inputPayload",
"logPath",
"geography",
WorkloadType.CHECK,
UUID.randomUUID(),
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package io.airbyte.initContainer.input

import io.airbyte.commons.json.Jsons
import io.airbyte.config.StandardDiscoverCatalogInput
import io.airbyte.initContainer.system.FileClient
import io.airbyte.persistence.job.models.IntegrationLauncherConfig
import io.airbyte.workers.DiscoverCatalogInputHydrator
import io.airbyte.workers.models.DiscoverCatalogInput
import io.airbyte.workers.models.SidecarInput
import io.airbyte.workers.serde.ObjectSerializer
import io.airbyte.workers.serde.PayloadDeserializer
import io.airbyte.workers.sync.OrchestratorConstants
import io.airbyte.workload.api.client.model.generated.Workload
import io.airbyte.workload.api.client.model.generated.WorkloadType
import io.mockk.every
import io.mockk.impl.annotations.MockK
import io.mockk.junit5.MockKExtension
import io.mockk.verify
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import java.util.UUID

@ExtendWith(MockKExtension::class)
class DiscoverHydrationProcessorTest {
@MockK
lateinit var inputHydrator: DiscoverCatalogInputHydrator

@MockK
lateinit var serializer: ObjectSerializer

@MockK
lateinit var deserializer: PayloadDeserializer

@MockK
lateinit var fileClient: FileClient

private lateinit var processor: DiscoverHydrationProcessor

@BeforeEach
fun setup() {
processor =
DiscoverHydrationProcessor(
inputHydrator,
deserializer,
serializer,
fileClient,
)
}

@Test
fun `parses input, hydrates and writes output to expected file`() {
val input = Fixtures.workload

val unhydrated = StandardDiscoverCatalogInput()
val parsed = DiscoverCatalogInput()
parsed.discoverCatalogInput = unhydrated
parsed.launcherConfig = IntegrationLauncherConfig()

val connectionConfiguration = Jsons.jsonNode(mapOf("key-1" to "value-1"))
val hydrated = StandardDiscoverCatalogInput()
hydrated.connectionConfiguration = connectionConfiguration
val serializedConfig = "serialized hydrated config"
val serializedInput = "serialized hydrated blob"

val sidecarInput =
SidecarInput(
null,
hydrated,
input.id,
parsed.launcherConfig,
SidecarInput.OperationType.DISCOVER,
input.logPath,
)

every { deserializer.toDiscoverCatalogInput(input.inputPayload) } returns parsed
every { inputHydrator.getHydratedStandardDiscoverInput(unhydrated) } returns hydrated
every { serializer.serialize(connectionConfiguration) } returns serializedConfig
every { serializer.serialize(sidecarInput) } returns serializedInput
every { fileClient.writeInputFile(OrchestratorConstants.CONNECTION_CONFIGURATION, serializedConfig) } returns Unit
every { fileClient.writeInputFile(OrchestratorConstants.SIDECAR_INPUT, serializedInput) } returns Unit

processor.process(input)

verify { deserializer.toDiscoverCatalogInput(input.inputPayload) }
verify { inputHydrator.getHydratedStandardDiscoverInput(unhydrated) }
verify { serializer.serialize(connectionConfiguration) }
verify { serializer.serialize(sidecarInput) }
verify { fileClient.writeInputFile(OrchestratorConstants.CONNECTION_CONFIGURATION, serializedConfig) }
verify { fileClient.writeInputFile(OrchestratorConstants.SIDECAR_INPUT, serializedInput) }
}

object Fixtures {
private const val WORKLOAD_ID = "workload-id-15"

val workload =
Workload(
WORKLOAD_ID,
listOf(),
"inputPayload",
"logPath",
"geography",
WorkloadType.DISCOVER,
UUID.randomUUID(),
)
}
}
Loading

0 comments on commit c25efc9

Please sign in to comment.