Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk Load CDK: Simply Interface & Add Check #45369

Merged
merged 1 commit into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.check

import io.airbyte.cdk.Operation
import io.airbyte.cdk.output.ExceptionHandler
import io.micronaut.context.annotation.Requires
import jakarta.inject.Singleton

@Singleton
@Requires(property = Operation.PROPERTY, value = "check")
@Requires(env = ["destination"])
class CheckOperation(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason why we wouldn't want an abstract class here, with the check() as an abstract function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The @Requires aren't inherited. It would force each implementor to specify the conditions under which their implementation is run.

private val destination: DestinationCheck,
private val exceptionHandler: ExceptionHandler,
) : Operation {
override fun execute() {
try {
destination.check()
} catch (e: Exception) {
exceptionHandler.handle(e)
} finally {
destination.cleanup()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.check

interface DestinationCheck {
fun check()
fun cleanup() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import kotlinx.coroutines.sync.withLock
* TODO: Some degree of logging/monitoring around how accurate we're actually being?
*/
@Singleton
class MemoryManager(availableMemoryProvider: AvailableMemoryProvider) {
class MemoryManager(private val availableMemoryProvider: AvailableMemoryProvider) {
private val totalMemoryBytes: Long = availableMemoryProvider.availableMemoryBytes
private var usedMemoryBytes = AtomicLong(0L)
private val mutex = Mutex()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
package io.airbyte.cdk.task

import io.airbyte.cdk.command.DestinationStream
import io.airbyte.cdk.write.Destination
import io.airbyte.cdk.write.DestinationWrite
import io.airbyte.cdk.write.StreamLoader
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton

/**
* Wraps @[StreamLoader.open] and starts the spill-to-disk tasks.
* Wraps @[StreamLoader.start] and starts the spill-to-disk tasks.
*
* TODO: There's no reason to wait on initialization to start spilling to disk.
*/
Expand All @@ -20,15 +20,15 @@ class OpenStreamTask(
private val taskLauncher: DestinationTaskLauncher
) : Task {
override suspend fun execute() {
streamLoader.open()
streamLoader.start()
taskLauncher.startSpillToDiskTasks(streamLoader)
}
}

@Singleton
@Secondary
class OpenStreamTaskFactory(
private val destination: Destination,
private val destination: DestinationWrite,
) {
fun make(taskLauncher: DestinationTaskLauncher, stream: DestinationStream): OpenStreamTask {
return OpenStreamTask(destination.getStreamLoader(stream), taskLauncher)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@

package io.airbyte.cdk.task

import io.airbyte.cdk.write.Destination
import io.airbyte.cdk.write.DestinationWrite
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton

/**
* Wraps @[Destination.setup] and starts the open stream tasks.
* Wraps @[DestinationWrite.setup] and starts the open stream tasks.
*
* TODO: This should call something like "TaskLauncher.setupComplete" and let it decide what to do
* next.
*/
class SetupTask(
private val destination: Destination,
private val destination: DestinationWrite,
private val taskLauncher: DestinationTaskLauncher
) : Task {
override suspend fun execute() {
Expand All @@ -27,7 +27,7 @@ class SetupTask(
@Singleton
@Secondary
class SetupTaskFactory(
private val destination: Destination,
private val destination: DestinationWrite,
) {
fun make(taskLauncher: DestinationTaskLauncher): SetupTask {
return SetupTask(destination, taskLauncher)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@
package io.airbyte.cdk.task

import io.airbyte.cdk.state.StreamsManager
import io.airbyte.cdk.write.Destination
import io.airbyte.cdk.write.DestinationWrite
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton
import java.util.concurrent.atomic.AtomicBoolean

/**
* Wraps @[Destination.teardown] and stops the task launcher.
* Wraps @[DestinationWrite.teardown] and stops the task launcher.
*
* TODO: Report teardown-complete and let the task launcher decide what to do next.
*/
class TeardownTask(
private val destination: Destination,
private val destination: DestinationWrite,
private val streamsManager: StreamsManager,
private val taskLauncher: DestinationTaskLauncher
) : Task {
Expand All @@ -44,7 +44,7 @@ class TeardownTask(
@Singleton
@Secondary
class TeardownTaskFactory(
private val destination: Destination,
private val destination: DestinationWrite,
private val streamsManager: StreamsManager,
) {
fun make(taskLauncher: DestinationTaskLauncher): TeardownTask {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,30 @@ import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton

/**
* Implementor interface. Extended this only if you need to perform initialization and teardown
* *across all streams*, or if your per-stream operations need shared global state.
*
* If initialization can be done on a per-stream basis, implement @[StreamLoaderFactory] instead.
* Implementor interface. Every Destination must extend this and at least provide an implementation
* of [getStreamLoader].
*/
interface Destination {
interface DestinationWrite {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not in love with the name. DestinationWriteOperation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NVM, There s a WriteOperation down below. Begs the question why we need both though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See previous comment.

// Called once before anything else
suspend fun setup() {}

// Return a StreamLoader for the given stream
fun getStreamLoader(stream: DestinationStream): StreamLoader

// Called once at the end of the job
// Called once at the end of the job, unconditionally.
suspend fun teardown(succeeded: Boolean = true) {}
}

@Singleton
@Secondary
class DefaultDestination(private val streamLoaderFactory: StreamLoaderFactory) : Destination {
class DefaultDestinationWrite : DestinationWrite {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need a default that crashes? Might be worth explaining why

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't hurt to tell the implementor what's missing 🤷

init {
throw NotImplementedError(
"DestinationWrite not implemented. Please create a custom @Singleton implementation."
)
}

override fun getStreamLoader(stream: DestinationStream): StreamLoader {
return streamLoaderFactory.make(stream)
throw NotImplementedError()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import io.airbyte.cdk.message.DestinationMessage
import io.airbyte.cdk.message.MessageQueueWriter
import io.github.oshai.kotlinlogging.KLogger
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Factory
import jakarta.inject.Singleton
import java.io.InputStream
import java.nio.charset.StandardCharsets
Expand Down Expand Up @@ -63,12 +62,3 @@ class DefaultInputConsumer(
) : DeserializingInputStreamConsumer<DestinationMessage> {
override val log = KotlinLogging.logger {}
}

/** Override to provide a custom input stream. */
@Factory
class InputStreamFactory {
@Singleton
fun make(): InputStream {
return System.`in`
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,61 +8,35 @@ import io.airbyte.cdk.command.DestinationStream
import io.airbyte.cdk.message.Batch
import io.airbyte.cdk.message.DestinationRecord
import io.airbyte.cdk.message.SimpleBatch
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton

/**
* Implementor interface. The framework calls open and close once per stream at the beginning and
* end of processing. The framework calls processRecords once per batch of records as batches of the
* configured size become available. (Specified in @
* [io.airbyte.cdk.command.WriteConfiguration.recordBatchSizeBytes]
* [io.airbyte.cdk.command.WriteConfiguration.recordBatchSizeBytes])
*
* processBatch is called once per incomplete batch returned by either processRecords or
* processBatch itself. See @[io.airbyte.cdk.message.Batch] for more details.
* [start] is called once before any records are processed.
*
* [processRecords] is called whenever a batch of records is available for processing, and only
* after [start] has returned successfully. The return value is a client-defined implementation of @
* [Batch] that the framework may pass to [processBatch] and/or [finalize]. (See @[Batch] for more
* details.)
*
* [processBatch] is called once per incomplete batch returned by either [processRecords] or
* [processBatch] itself.
*
* [finalize] is called once after all records and batches have been processed successfully.
*
* [close] is called once after all records have been processed, regardless of success or failure.
* If there are failed batches, they are passed in as an argument.
*/
interface StreamLoader {
val stream: DestinationStream

suspend fun open() {}
suspend fun start() {}
suspend fun processRecords(records: Iterator<DestinationRecord>, totalSizeBytes: Long): Batch
suspend fun processBatch(batch: Batch): Batch = SimpleBatch(state = Batch.State.COMPLETE)
suspend fun close() {}
}

/**
* Default stream loader (Not yet implemented) will process the records into a locally staged file
* of a format specified in the configuration.
*/
class DefaultStreamLoader(
override val stream: DestinationStream,
) : StreamLoader {
val log = KotlinLogging.logger {}

override suspend fun processRecords(
records: Iterator<DestinationRecord>,
totalSizeBytes: Long
): Batch {
TODO(
"Default implementation adds airbyte metadata, maybe flattens, no-op maps, and converts to destination format"
)
}
}

/**
* If you do not need to perform initialization and teardown across all streams, or if your
* per-stream operations do not need shared global state, implement this interface instead of @
* [Destination]. The framework will call it exactly once per stream to create instances that will
* be used for the life cycle of the stream.
*/
interface StreamLoaderFactory {
fun make(stream: DestinationStream): StreamLoader
}
suspend fun processBatch(batch: Batch): Batch = SimpleBatch(Batch.State.COMPLETE)
suspend fun finalize() {}

@Singleton
@Secondary
class DefaultStreamLoaderFactory() : StreamLoaderFactory {
override fun make(stream: DestinationStream): StreamLoader {
TODO("See above")
}
suspend fun close(failedBatches: List<Batch> = emptyList()) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import io.airbyte.cdk.Operation
import io.airbyte.cdk.message.DestinationMessage
import io.airbyte.cdk.task.TaskLauncher
import io.airbyte.cdk.task.TaskRunner
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Requires
import io.micronaut.context.annotation.Secondary
import java.io.InputStream
import javax.inject.Singleton
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
Expand All @@ -34,3 +37,14 @@ class WriteOperation(
}
}
}

/** Override to provide a custom input stream. */
@Factory
class InputStreamFactory {
@Singleton
@Secondary
@Requires(property = Operation.PROPERTY, value = "write")
fun make(): InputStream {
return System.`in`
}
}
Loading