Skip to content

Commit

Permalink
implement basic spec test
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Sep 17, 2024
1 parent 00dee67 commit 66dc128
Show file tree
Hide file tree
Showing 4 changed files with 282 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.test.spec

import com.deblock.jsondiff.DiffGenerator
import com.deblock.jsondiff.diff.JsonDiff
import com.deblock.jsondiff.matcher.CompositeJsonMatcher
import com.deblock.jsondiff.matcher.JsonMatcher
import com.deblock.jsondiff.matcher.LenientJsonObjectPartialMatcher
import com.deblock.jsondiff.matcher.StrictJsonArrayPartialMatcher
import com.deblock.jsondiff.matcher.StrictPrimitivePartialMatcher
import com.deblock.jsondiff.viewer.OnlyErrorDiffViewer
import io.airbyte.cdk.test.util.DestinationProcessFactory
import io.airbyte.cdk.test.util.FakeDataDumper
import io.airbyte.cdk.test.util.IntegrationTest
import io.airbyte.cdk.test.util.NoopDestinationCleaner
import io.airbyte.cdk.test.util.NoopExpectedRecordMapper
import io.airbyte.cdk.util.Jsons
import io.airbyte.protocol.models.v0.AirbyteMessage
import java.nio.file.Files
import java.nio.file.Path
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll

private const val EXPECTED_SPEC_FILENAME = "expected-spec.json"
private val expectedSpecPath = Path.of(EXPECTED_SPEC_FILENAME)

/**
* This is largely copied from [io.airbyte.cdk.spec.SpecTest], but adapted to use our
* [DestinationProcessFactory].
*
* It also automatically writes the actual spec back to `expected-spec.json` for easier inspection
* of the diff. This diff is _really messy_ for the initial migration from the old CDK to the new
* one, but after that, it should be pretty readable.
*/
abstract class SpecTest :
IntegrationTest(
FakeDataDumper,
NoopDestinationCleaner,
NoopExpectedRecordMapper,
) {
@Test
fun testSpec() {
if (!Files.exists(expectedSpecPath)) {
Files.createFile(expectedSpecPath)
}
val expectedSpec = Files.readString(expectedSpecPath)
val process = destinationProcessFactory.createDestinationProcess("spec")
process.run()
val messages = process.readMessages()
val specMessages = messages.filter { it.type == AirbyteMessage.Type.SPEC }

assertEquals(
specMessages.size,
1,
"Expected to receive exactly one connection status message, but got ${specMessages.size}: $specMessages"
)

val spec = specMessages.first().spec
val actualSpecPrettyPrint: String =
Jsons.writerWithDefaultPrettyPrinter().writeValueAsString(spec)
Files.write(expectedSpecPath, actualSpecPrettyPrint.toByteArray())

val jsonMatcher: JsonMatcher =
CompositeJsonMatcher(
StrictJsonArrayPartialMatcher(),
LenientJsonObjectPartialMatcher(),
StrictPrimitivePartialMatcher(),
)
val diff: JsonDiff =
DiffGenerator.diff(expectedSpec, Jsons.writeValueAsString(spec), jsonMatcher)
assertAll(
"Spec snapshot test failed. Run this test locally and then `git diff <...>/expected_spec.json` to see what changed, and commit the diff if that change was intentional.",
{ assertEquals("", OnlyErrorDiffViewer.from(diff).toString()) },
{ assertEquals(expectedSpec, actualSpecPrettyPrint) }
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
{
"documentationUrl" : "https://docs.airbyte.com/integrations/destinations/e2e-test",
"connectionSpecification" : {
"$schema" : "http://json-schema.org/draft-07/schema#",
"title" : "E2E Test Destination Spec",
"type" : "object",
"additionalProperties" : true,
"properties" : {
"test_destination" : {
"oneOf" : [ {
"title" : "Logging",
"type" : "object",
"additionalProperties" : true,
"properties" : {
"test_destination_type" : {
"type" : "string",
"enum" : [ "LOGGING" ],
"default" : "LOGGING"
},
"logging_config" : {
"oneOf" : [ {
"title" : "First N Entries",
"type" : "object",
"additionalProperties" : true,
"description" : "Log first N entries per stream.",
"properties" : {
"logging_type" : {
"type" : "string",
"enum" : [ "FirstN" ],
"default" : "FirstN"
},
"max_entry_count" : {
"type" : "number",
"minimum" : 1,
"maximum" : 1000,
"default" : 100,
"description" : "Number of entries to log. This destination is for testing only. So it won't make sense to log infinitely. The maximum is 1,000 entries.",
"title" : "N",
"examples" : [ 100 ]
}
},
"required" : [ "logging_type", "max_entry_count" ]
}, {
"title" : "Every N-th Entry",
"type" : "object",
"additionalProperties" : true,
"description" : "For each stream, log every N-th entry with a maximum cap.",
"properties" : {
"logging_type" : {
"type" : "string",
"enum" : [ "EveryNth" ],
"default" : "EveryNth"
},
"nth_entry_to_log" : {
"type" : "integer",
"minimum" : 1,
"maximum" : 1000,
"description" : "The N-th entry to log for each stream. N starts from 1. For example, when N = 1, every entry is logged; when N = 2, every other entry is logged; when N = 3, one out of three entries is logged.",
"title" : "N",
"examples" : [ 3 ]
},
"max_entry_count" : {
"type" : "number",
"minimum" : 1,
"maximum" : 1000,
"default" : 100,
"description" : "Number of entries to log. This destination is for testing only. So it won't make sense to log infinitely. The maximum is 1,000 entries.",
"title" : "Max Log Entries",
"examples" : [ 100 ]
}
},
"required" : [ "logging_type", "nth_entry_to_log", "max_entry_count" ]
}, {
"title" : "Random Sampling",
"type" : "object",
"additionalProperties" : true,
"description" : "For each stream, randomly log a percentage of the entries with a maximum cap.",
"properties" : {
"logging_type" : {
"type" : "string",
"enum" : [ "RandomSampling" ],
"default" : "RandomSampling"
},
"sampling_ratio" : {
"type" : "number",
"minimum" : 0,
"maximum" : 1,
"description" : "A positive floating number smaller than 1.",
"title" : "Sampling Ratio",
"examples" : [ 0.001 ],
"default" : 0.001
},
"seed" : {
"type" : "number",
"description" : "When the seed is unspecified, the current time millis will be used as the seed.",
"title" : "Random Number Generator Seed",
"examples" : [ 1900 ]
},
"max_entry_count" : {
"type" : "number",
"minimum" : 1,
"maximum" : 1000,
"default" : 100,
"description" : "Number of entries to log. This destination is for testing only. So it won't make sense to log infinitely. The maximum is 1,000 entries.",
"title" : "Max Log Entries",
"examples" : [ 100 ]
}
},
"required" : [ "logging_type", "sampling_ratio", "max_entry_count" ]
} ],
"description" : "Configurate how the messages are logged.",
"title" : "Logging Configuration",
"type" : "object"
}
},
"required" : [ "test_destination_type", "logging_config" ]
}, {
"title" : "Silent",
"type" : "object",
"additionalProperties" : true,
"properties" : {
"test_destination_type" : {
"type" : "string",
"enum" : [ "SILENT" ],
"default" : "SILENT"
}
},
"required" : [ "test_destination_type" ]
}, {
"title" : "Throttled",
"type" : "object",
"additionalProperties" : true,
"properties" : {
"test_destination_type" : {
"type" : "string",
"enum" : [ "THROTTLED" ],
"default" : "THROTTLED"
},
"millis_per_record" : {
"type" : "integer",
"description" : "The number of milliseconds to wait between each record."
}
},
"required" : [ "test_destination_type", "millis_per_record" ]
}, {
"title" : "Failing",
"type" : "object",
"additionalProperties" : true,
"properties" : {
"test_destination_type" : {
"type" : "string",
"enum" : [ "FAILING" ],
"default" : "FAILING"
},
"num_messages" : {
"type" : "integer",
"description" : "Number of messages after which to fail."
}
},
"required" : [ "test_destination_type", "num_messages" ]
} ],
"description" : "The type of destination to be used",
"title" : "Test Destination",
"type" : "object"
}
},
"required" : [ "test_destination" ]
},
"supportsIncremental" : true,
"supportsNormalization" : false,
"supportsDBT" : false,
"supported_destination_sync_modes" : [ "overwrite", "append", "append_dedup" ]
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import com.fasterxml.jackson.annotation.JsonPropertyDescription
import com.fasterxml.jackson.annotation.JsonSubTypes
import com.fasterxml.jackson.annotation.JsonTypeInfo
import com.fasterxml.jackson.annotation.JsonValue
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaExamples
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaInject
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
import io.airbyte.cdk.command.ConfigurationJsonObjectBase
import jakarta.inject.Singleton
Expand All @@ -20,9 +20,13 @@ import javax.validation.constraints.Min
/**
* This doesn't quite conform with the old spec:
* - Some fields that make more sense as integral need to be Double to yield a "number" type
* - This causes @JsonSchemaExamples to break for some reason (neither "100" or "100.0" work)
* - Due to https://github.com/mbknor/mbknor-jackson-jsonSchema/issues/184, this causes
* ```
* (I've left these in place for now, commented out.)
* `@JsonSchemaExamples` to break. Instead, we inject a raw JSON blob to the schema.
* ```
* - Similarly, there are some cases where [JsonSchemaTitle] / [JsonClassDescription]
* ```
* don't work as expected. In these cases, we also inject raw JSON.
* ```
* - Additionally, there are extra fields:
* ```
Expand All @@ -35,7 +39,7 @@ import javax.validation.constraints.Min
class E2EDestinationConfigurationJsonObject : ConfigurationJsonObjectBase() {
@JsonProperty("test_destination")
@JsonSchemaTitle("Test Destination")
@JsonPropertyDescription("The type of destination to be used.")
@JsonPropertyDescription("The type of destination to be used")
val testDestination: TestDestination = LoggingDestination()
}

Expand All @@ -61,6 +65,7 @@ sealed class TestDestination(
}
}

@JsonSchemaTitle("Logging")
data class LoggingDestination(
@JsonProperty("test_destination_type") override val testDestinationType: Type = Type.LOGGING,
@JsonPropertyDescription("Configurate how the messages are logged.")
Expand All @@ -78,6 +83,7 @@ data class LoggingDestination(
JsonSubTypes.Type(value = EveryNthEntryConfig::class, name = "EveryNth"),
JsonSubTypes.Type(value = RandomSamplingConfig::class, name = "RandomSampling")
)
@JsonSchemaInject(json = """{"title":"Logging Configuration"}""")
sealed class LoggingConfig(
@JsonProperty("logging_type") open val loggingType: Type = Type.FIRST_N
) {
Expand All @@ -89,70 +95,73 @@ sealed class LoggingConfig(
}

@JsonSchemaTitle("First N Entries")
@JsonClassDescription("Log first N entries per stream.")
@JsonSchemaInject(json = """{"description":"Log first N entries per stream."}""")
data class FirstNEntriesConfig(
@JsonProperty("logging_type") override val loggingType: Type = Type.FIRST_N,
@JsonSchemaTitle("N")
@JsonPropertyDescription(
"Number of entries to log. This destination is for testing only. So it won't make sense to log infinitely. The maximum is 1,000 entries."
)
@JsonProperty("max_entry_count", defaultValue = "100")
// @JsonSchemaExamples("100")
@JsonSchemaInject(json = """{"examples":[100]}""")
@Max(1000)
@Min(1)
val maxEntryCount: Double = 100.0
) : LoggingConfig(loggingType)

@JsonSchemaTitle("Every N-th Entry")
@JsonClassDescription("For each stream, log every N-th entry with a maximum cap.")
@JsonSchemaInject(
json = """{"description":"For each stream, log every N-th entry with a maximum cap."}"""
)
data class EveryNthEntryConfig(
@JsonProperty("logging_type") override val loggingType: Type = Type.EVERY_NTH,
@JsonSchemaTitle("N")
@JsonPropertyDescription(
"The N-th entry to log for each stream. N starts from 1. For example, when N = 1, every entry is logged; when N = 2, every other entry is logged; when N = 3, one out of three entries is logged."
)
@JsonProperty("nth_entry_to_log")
@JsonSchemaExamples("3")
@JsonSchemaInject(json = """{"examples":[3]}""")
@Max(1000)
@Min(1)
val nthEntryToLog: Int,
@JsonSchemaTitle("N")
@JsonSchemaTitle("Max Log Entries")
@JsonPropertyDescription(
"Number of entries to log. This destination is for testing only. So it won't make sense to log infinitely. The maximum is 1,000 entries."
)
@JsonProperty("max_entry_count", defaultValue = "100")
// @JsonSchemaExamples("100")
@JsonSchemaInject(json = """{"examples":[100]}""")
@Max(1000)
@Min(1)
val maxEntryCount: Double
) : LoggingConfig(loggingType)

@JsonSchemaTitle("Random Sampling")
@JsonClassDescription(
"For each stream, randomly log a percentage of the entries with a maximum cap."
@JsonSchemaInject(
json =
"""{"description":"For each stream, randomly log a percentage of the entries with a maximum cap."}"""
)
data class RandomSamplingConfig(
@JsonProperty("logging_type") override val loggingType: Type = Type.RANDOM_SAMPLING,
@JsonSchemaTitle("Sampling Ratio")
@JsonPropertyDescription("A positive floating number smaller than 1.")
@JsonProperty("sampling_ratio")
// @JsonSchemaExamples("0.001")
@JsonSchemaInject(json = """{"examples":[0.001],"default":0.001}""")
@Max(1)
@Min(0)
val samplingRatio: Double = 0.001,
@JsonSchemaTitle("Random Number Generator Seed")
@JsonPropertyDescription(
"When the seed is unspecified, the current time millis will be used as the seed."
)
// @JsonSchemaExamples("1900")
@JsonSchemaInject(json = """{"examples":[1900]}""")
@JsonProperty("seed")
val seed: Double? = null,
@JsonSchemaTitle("N")
@JsonSchemaTitle("Max Log Entries")
@JsonPropertyDescription(
"Number of entries to log. This destination is for testing only. So it won't make sense to log infinitely. The maximum is 1,000 entries."
)
@JsonProperty("max_entry_count", defaultValue = "100")
// @JsonSchemaExamples("100")
@JsonSchemaInject(json = """{"examples":[100]}""")
@Max(1000)
@Min(1)
val maxEntryCount: Double = 100.0
Expand Down
Loading

0 comments on commit 66dc128

Please sign in to comment.