Skip to content

Commit

Permalink
Finalize Dev Null for Release
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt committed Sep 24, 2024
1 parent 7d0f590 commit ca2701b
Show file tree
Hide file tree
Showing 81 changed files with 2,547 additions and 1,263 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class BufferingOutputConsumer(
private val catalogs = mutableListOf<AirbyteCatalog>()
private val traces = mutableListOf<AirbyteTraceMessage>()
private val messages = mutableListOf<AirbyteMessage>()
private var messagesIndex: Int = 0

var callback: (AirbyteMessage) -> Unit = {}
set(value) {
Expand Down Expand Up @@ -79,4 +80,15 @@ class BufferingOutputConsumer(
fun traces(): List<AirbyteTraceMessage> = synchronized(this) { listOf(*traces.toTypedArray()) }

fun messages(): List<AirbyteMessage> = synchronized(this) { listOf(*messages.toTypedArray()) }

fun newMessages(): List<AirbyteMessage> =
synchronized(this) {
val newMessages = messages.subList(messagesIndex, messages.size)
messagesIndex = messages.size
newMessages
}

fun resetNewMessagesCursor() {
synchronized(this) { messagesIndex = 0 }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ import jakarta.inject.Singleton
class CheckOperation<T : ConfigurationJsonObjectBase, C : DestinationConfiguration>(
val configJsonObjectSupplier: ConfigurationJsonObjectSupplier<T>,
val configFactory: DestinationConfigurationFactory<T, C>,
private val destinationCheckOperation: DestinationCheckOperation<C>,
private val destinationChecker: DestinationChecker<C>,
private val exceptionHandler: ExceptionHandler,
private val outputConsumer: OutputConsumer,
) : Operation {
override fun execute() {
try {
val pojo: T = configJsonObjectSupplier.get()
val config: C = configFactory.make(pojo)
destinationCheckOperation.check(config)
val pojo = configJsonObjectSupplier.get()
val config = configFactory.make(pojo)
destinationChecker.check(config)
val successMessage =
AirbyteMessage()
.withType(AirbyteMessage.Type.CONNECTION_STATUS)
Expand All @@ -44,7 +44,7 @@ class CheckOperation<T : ConfigurationJsonObjectBase, C : DestinationConfigurati
outputConsumer.accept(traceMessage)
outputConsumer.accept(statusMessage)
} finally {
destinationCheckOperation.cleanup()
destinationChecker.cleanup()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import io.airbyte.cdk.command.DestinationConfiguration
* * Implementors should not throw exceptions in the constructor.
* * Implementors should not inject configuration; only use the config passed in [check].
*/
interface DestinationCheckOperation<C : DestinationConfiguration> {
interface DestinationChecker<C : DestinationConfiguration> {
fun check(config: C)
fun cleanup() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ data class DestinationCatalog(val streams: List<DestinationStream> = emptyList()
private val byDescriptor: Map<DestinationStream.Descriptor, DestinationStream> =
streams.associateBy { it.descriptor }

fun getStream(name: String, namespace: String): DestinationStream {
fun getStream(name: String, namespace: String?): DestinationStream {
val descriptor = DestinationStream.Descriptor(namespace = namespace, name = name)
return byDescriptor[descriptor]
?: throw IllegalArgumentException("Stream not found: namespace=$namespace, name=$name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,33 @@

package io.airbyte.cdk.command

import io.micronaut.context.annotation.ConfigurationProperties
import io.micronaut.context.annotation.Factory
import jakarta.inject.Singleton
import java.nio.file.Path

abstract class DestinationConfiguration : Configuration {
open val recordBatchSizeBytes: Long = 200L * 1024L * 1024L
open val tmpFileDirectory: Path = Path.of("airbyte-cdk-load")
open val firstStageTmpFilePrefix: String = "staged-raw-records"
open val firstStageTmpFileSuffix: String = ".jsonl"

/** Memory queue settings */
open val maxMessageQueueMemoryUsageRatio: Double = 0.2 // 0 => No limit, 1.0 => 100% of JVM heap
open val estimatedRecordMemoryOverheadRatio: Double =
0.1 // 0 => No overhead, 1.0 => 100% overhead

@ConfigurationProperties("destination.config")
interface DestinationConfiguration : Configuration {
/**
* Micronaut factory which glues [ConfigurationJsonObjectSupplier] and
* [DestinationConfigurationFactory] together to produce a [DestinationConfiguration] singleton.
*/
@Factory
private class MicronautFactory {
@Singleton
fun <I : ConfigurationJsonObjectBase> sourceConfig(
fun <I : ConfigurationJsonObjectBase> destinationConfig(
pojoSupplier: ConfigurationJsonObjectSupplier<I>,
factory: DestinationConfigurationFactory<I, out DestinationConfiguration>,
): DestinationConfiguration = factory.make(pojoSupplier.get())
): DestinationConfiguration {
return factory.make(pojoSupplier.get())
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,13 @@ data class DestinationStream(
val minimumGenerationId: Long,
val syncId: Long,
) {
data class Descriptor(val namespace: String, val name: String) {
data class Descriptor(val namespace: String?, val name: String) {
fun asProtocolObject(): StreamDescriptor =
StreamDescriptor().withNamespace(namespace).withName(name)
StreamDescriptor().withName(name).also {
if (namespace != null) {
it.namespace = namespace
}
}
}

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,154 @@
package io.airbyte.cdk.data

import java.math.BigDecimal
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import java.time.OffsetDateTime
import java.time.OffsetTime
import java.time.ZoneOffset

sealed interface AirbyteValue
sealed interface AirbyteValue {
companion object {
fun from(value: Any?): AirbyteValue =
when (value) {
null -> NullValue
is String -> StringValue(value)
is Boolean -> BooleanValue(value)
is Int -> IntegerValue(value.toLong())
is Long -> IntegerValue(value)
is Double -> NumberValue(BigDecimal.valueOf(value))
is BigDecimal -> NumberValue(value)
is LocalDate -> DateValue(value.toString())
is OffsetDateTime,
is LocalDateTime -> TimestampValue(value.toString())
is OffsetTime,
is LocalTime -> TimeValue(value.toString())
is Map<*, *> ->
ObjectValue.from(@Suppress("UNCHECKED_CAST") (value as Map<String, Any?>))
is List<*> -> ArrayValue.from(value)
else ->
throw IllegalArgumentException(
"Unrecognized value (${value.javaClass.name}: $value"
)
}
}
}

data object NullValue : AirbyteValue
// Comparable implementations are intended for use in tests.
// They're not particularly robust, and probably shouldn't be relied on
// for actual sync-time logic.
// (mostly the date/timestamp/time types - everything else is fine)
data object NullValue : AirbyteValue, Comparable<NullValue> {
override fun compareTo(other: NullValue): Int = 0
}

@JvmInline value class StringValue(val value: String) : AirbyteValue
@JvmInline
value class StringValue(val value: String) : AirbyteValue, Comparable<StringValue> {
override fun compareTo(other: StringValue): Int = value.compareTo(other.value)
}

@JvmInline value class BooleanValue(val value: Boolean) : AirbyteValue
@JvmInline
value class BooleanValue(val value: Boolean) : AirbyteValue, Comparable<BooleanValue> {
override fun compareTo(other: BooleanValue): Int = value.compareTo(other.value)
}

@JvmInline value class IntegerValue(val value: Long) : AirbyteValue
@JvmInline
value class IntegerValue(val value: Long) : AirbyteValue, Comparable<IntegerValue> {
override fun compareTo(other: IntegerValue): Int = value.compareTo(other.value)
}

@JvmInline value class NumberValue(val value: BigDecimal) : AirbyteValue
@JvmInline
value class NumberValue(val value: BigDecimal) : AirbyteValue, Comparable<NumberValue> {
override fun compareTo(other: NumberValue): Int = value.compareTo(other.value)
}

@JvmInline value class DateValue(val value: String) : AirbyteValue
@JvmInline
value class DateValue(val value: String) : AirbyteValue, Comparable<DateValue> {
override fun compareTo(other: DateValue): Int {
val thisDate =
try {
LocalDate.parse(value)
} catch (e: Exception) {
LocalDate.MIN
}
val otherDate =
try {
LocalDate.parse(other.value)
} catch (e: Exception) {
LocalDate.MIN
}
return thisDate.compareTo(otherDate)
}
}

@JvmInline value class TimestampValue(val value: String) : AirbyteValue
@JvmInline
value class TimestampValue(val value: String) : AirbyteValue, Comparable<TimestampValue> {
override fun compareTo(other: TimestampValue): Int {
// Do all comparisons using OffsetDateTime for convenience.
// First, try directly parsing as OffsetDateTime.
// If that fails, try parsing as LocalDateTime and assume UTC.
// We could maybe have separate value classes for these cases,
// but that comes with its own set of problems
// (mostly around sources declaring bad schemas).
val thisTimestamp =
try {
OffsetDateTime.parse(value)
} catch (e: Exception) {
LocalDateTime.parse(value).atOffset(ZoneOffset.UTC)
} catch (e: Exception) {
LocalDateTime.MIN.atOffset(ZoneOffset.UTC)
}
val otherTimestamp =
try {
OffsetDateTime.parse(other.value)
} catch (e: Exception) {
LocalDateTime.parse(other.value).atOffset(ZoneOffset.UTC)
} catch (e: Exception) {
LocalDateTime.MIN.atOffset(ZoneOffset.UTC)
}
return thisTimestamp.compareTo(otherTimestamp)
}
}

@JvmInline value class TimeValue(val value: String) : AirbyteValue
@JvmInline
value class TimeValue(val value: String) : AirbyteValue, Comparable<TimeValue> {
override fun compareTo(other: TimeValue): Int {
// Similar to TimestampValue, try parsing with/without timezone,
// and do all comparisons using OffsetTime.
val thisTime =
try {
OffsetTime.parse(value)
} catch (e: Exception) {
LocalTime.parse(value).atOffset(ZoneOffset.UTC)
} catch (e: Exception) {
LocalTime.MIN.atOffset(ZoneOffset.UTC)
}
val otherTime =
try {
OffsetTime.parse(other.value)
} catch (e: Exception) {
LocalTime.parse(other.value).atOffset(ZoneOffset.UTC)
} catch (e: Exception) {
LocalTime.MIN.atOffset(ZoneOffset.UTC)
}
return thisTime.compareTo(otherTime)
}
}

@JvmInline value class ArrayValue(val values: List<AirbyteValue>) : AirbyteValue
@JvmInline
value class ArrayValue(val values: List<AirbyteValue>) : AirbyteValue {
companion object {
fun from(list: List<Any?>): ArrayValue = ArrayValue(list.map { it as AirbyteValue })
}
}

@JvmInline value class ObjectValue(val values: LinkedHashMap<String, AirbyteValue>) : AirbyteValue
@JvmInline
value class ObjectValue(val values: LinkedHashMap<String, AirbyteValue>) : AirbyteValue {
companion object {
fun from(map: Map<String, Any?>): ObjectValue =
ObjectValue(map.mapValuesTo(linkedMapOf()) { (_, v) -> AirbyteValue.from(v) })
}
}

@JvmInline value class UnknownValue(val what: String) : AirbyteValue
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,20 @@

package io.airbyte.cdk.file

import io.micronaut.context.annotation.DefaultImplementation
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton
import java.nio.file.Files
import java.nio.file.Path

@DefaultImplementation(DefaultTempFileProvider::class)
interface TempFileProvider {
fun createTempFile(directory: Path, prefix: String, suffix: String): LocalFile
}

@Singleton
@Secondary
class DefaultTempFileProvider : TempFileProvider {
override fun createTempFile(directory: Path, prefix: String, suffix: String): LocalFile {
Files.createDirectories(directory)
return DefaultLocalFile(Files.createTempFile(directory, prefix, suffix))
}
}
Loading

0 comments on commit ca2701b

Please sign in to comment.