Skip to content

Commit

Permalink
blegh
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Sep 5, 2024
1 parent 1300c34 commit a67195f
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.airbyte.cdk.test.util

import io.micronaut.test.extensions.junit5.annotation.MicronautTest
import java.util.concurrent.atomic.AtomicInteger
import javax.inject.Singleton
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.parallel.Execution
import org.junit.jupiter.api.parallel.ExecutionMode

@MicronautTest
@Execution(ExecutionMode.CONCURRENT)
@TestInstance(TestInstance.Lifecycle.PER_METHOD)
class DestE2eIntegrationTest {
@Test
fun test1(o: StatefulObject) {
println("counter was " + o.counter.incrementAndGet())
}

@Test
fun test2(o: StatefulObject) {
println("counter was " + o.counter.incrementAndGet())
}
}

@Singleton
class StatefulObject {
val counter = AtomicInteger(0)
}
4 changes: 4 additions & 0 deletions airbyte-cdk/bulk/core/load/src/test/resources/metadata.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
data:
dockerRepository: "nonexistent"
documentationUrl: "none"
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package io.airbyte.cdk.test.util

import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.Operation
import io.airbyte.cdk.command.CliRunner
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog

// This whole file is very wishy-washy, until we figure out the exact
// micronaut stuff. But it's directionally correct, and I'd rather nail down
// the actual test stuff first.
interface DestinationProcess {
fun sendMessage(message: AirbyteMessage)

Expand All @@ -19,17 +23,24 @@ interface DestinationProcess {
fun waitUntilDone()
}

fun interface DestinationProcessFactory<Config> {
fun createDestinationProcess(
command: String,
config: Config,
catalog: ConfiguredAirbyteCatalog,
): DestinationProcess
}

class NonDockerizedDestination(
command: String,
config: JsonNode,
catalog: ConfiguredAirbyteCatalog,
// some other param to get whatever code we're actually running,
// i.e. equivalent to io.airbyte.integrations.base.destination.Destination
operation: Operation,
): DestinationProcess {
init {
// invoke whatever CDK stuff exists to run a destination connector
// but use some reasonable interface instead of stdin/stdout
// maybe we don't use literal actual CliRunner, but it's something like this
CliRunner.runDestination(command, config = TODO(), catalog = catalog)
}

override fun sendMessage(message: AirbyteMessage) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,81 @@
package io.airbyte.cdk.test.util

import io.airbyte.cdk.Operation
import io.micronaut.context.annotation.Property
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.test.extensions.junit5.annotation.MicronautTest
import org.junit.jupiter.api.Test
import java.time.Instant
import java.time.LocalDateTime
import java.time.ZoneOffset
import java.time.format.DateTimeFormatter
import javax.inject.Inject
import kotlin.test.assertNull
import kotlin.test.fail
import org.apache.commons.lang3.RandomStringUtils
import org.junit.jupiter.api.parallel.Execution
import org.junit.jupiter.api.parallel.ExecutionMode

private val logger = KotlinLogging.logger {}

@MicronautTest
abstract class IntegrationTest {
@Execution(ExecutionMode.CONCURRENT)
abstract class IntegrationTest<Config: Any> {
// Intentionally don't inject the actual destination process - we need a full factory
// because some tests want to run multiple syncs, so we need to run the destination
// multiple times.
@Inject lateinit var destinationProcessFactory: DestinationProcessFactory<Config>
// Maybe inject the config? Different test classes need to inject different
// configs (e.g. bigquery: gcs staging vs direct load, raw namespace override, etc.)
// TODO figure out the whole default namespace thing
@Inject lateinit var config: Config
@Inject lateinit var dataDumper: DestinationDataDumper
@Inject lateinit var recordMangler: DestinationRecordMangler

private val randomSuffix = RandomStringUtils.randomAlphabetic(4)
private val timestampString =
LocalDateTime.ofInstant(Instant.now(), ZoneOffset.UTC)
.format(DateTimeFormatter.ofPattern("YYYYMMDD"))
// stream name doesn't need to be randomized, only the namespace.
// we need braces because otherwise kotlin tries to find a val `timestampString_`
val randomizedNamespace = "test${timestampString}$randomSuffix"

fun dumpAndDiffRecords(
canonicalExpectedRecords: List<OutputRecord>,
streamName: String,
streamNamespace: String?,
) {
val actualRecords: List<OutputRecord> = dataDumper.dumpRecords(streamName, streamNamespace)
val expectedRecords: List<OutputRecord> = canonicalExpectedRecords.map { recordMangler.mangleRecord(it) }

RecordDiffer(
// TODO accept these from the actual test
// in particular, these need to be destinationified names
// (e.g. snowflake uppercase ID)
{ record -> listOf(record.data["id"]) },
{ record -> record.data["updated_at"] },
).diffRecords(expectedRecords, actualRecords)
?.let(::fail)
}

fun runSync(
catalog: ConfiguredAirbyteCatalog,
messages: List<AirbyteMessage>,
) {
val destination = destinationProcessFactory.createDestinationProcess(
"write",
config,
catalog,
)
TODO()
}
}

fun interface DestinationDataDumper {
// TODO we probably should compact this pair into a useful class
// (but not StreamDescriptor/AirbyteStreamNameNamespacePair :P )
fun dumpRecords(streamName: String, streamNamespace: String?): List<OutputRecord>
}

fun interface DestinationRecordMangler {
fun mangleRecord(expectedRecord: OutputRecord): OutputRecord
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class RecordDiffer(
*/
private val everythingComparator = identityComparator.thenComparing(sortComparator)

/**
* Returns a pretty-printed diff of the two lists, or null if they were identical
*/
fun diffRecords(
expectedRecords: List<OutputRecord>,
actualRecords: List<OutputRecord>
Expand Down

0 comments on commit a67195f

Please sign in to comment.