-
Notifications
You must be signed in to change notification settings - Fork 4.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
✨ concurrent cdk: Read multiple streams concurrently (#32411)
- Loading branch information
Showing
31 changed files
with
2,218 additions
and
516 deletions.
There are no files selected for viewing
3 changes: 3 additions & 0 deletions
3
airbyte-cdk/python/airbyte_cdk/sources/concurrent_source/__init__.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
# | ||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
# |
190 changes: 190 additions & 0 deletions
190
airbyte-cdk/python/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,190 @@ | ||
# | ||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
# | ||
import logging | ||
from typing import Dict, Iterable, List, Optional, Set | ||
|
||
from airbyte_cdk.models import AirbyteMessage, AirbyteStreamStatus | ||
from airbyte_cdk.models import Type as MessageType | ||
from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import PartitionGenerationCompletedSentinel | ||
from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager | ||
from airbyte_cdk.sources.message import MessageRepository | ||
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream | ||
from airbyte_cdk.sources.streams.concurrent.partition_enqueuer import PartitionEnqueuer | ||
from airbyte_cdk.sources.streams.concurrent.partition_reader import PartitionReader | ||
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition | ||
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record | ||
from airbyte_cdk.sources.streams.concurrent.partitions.types import PartitionCompleteSentinel | ||
from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message | ||
from airbyte_cdk.sources.utils.slice_logger import SliceLogger | ||
from airbyte_cdk.utils.stream_status_utils import as_airbyte_message as stream_status_as_airbyte_message | ||
|
||
|
||
class ConcurrentReadProcessor: | ||
def __init__( | ||
self, | ||
stream_instances_to_read_from: List[AbstractStream], | ||
partition_enqueuer: PartitionEnqueuer, | ||
thread_pool_manager: ThreadPoolManager, | ||
logger: logging.Logger, | ||
slice_logger: SliceLogger, | ||
message_repository: MessageRepository, | ||
partition_reader: PartitionReader, | ||
): | ||
""" | ||
This class is responsible for handling items from a concurrent stream read process. | ||
:param stream_instances_to_read_from: List of streams to read from | ||
:param partition_enqueuer: PartitionEnqueuer instance | ||
:param thread_pool_manager: ThreadPoolManager instance | ||
:param logger: Logger instance | ||
:param slice_logger: SliceLogger instance | ||
:param message_repository: MessageRepository instance | ||
:param partition_reader: PartitionReader instance | ||
""" | ||
self._stream_name_to_instance = {s.name: s for s in stream_instances_to_read_from} | ||
self._record_counter = {} | ||
self._streams_to_partitions: Dict[str, Set[Partition]] = {} | ||
for stream in stream_instances_to_read_from: | ||
self._streams_to_partitions[stream.name] = set() | ||
self._record_counter[stream.name] = 0 | ||
self._thread_pool_manager = thread_pool_manager | ||
self._partition_enqueuer = partition_enqueuer | ||
self._stream_instances_to_start_partition_generation = stream_instances_to_read_from | ||
self._streams_currently_generating_partitions: List[str] = [] | ||
self._logger = logger | ||
self._slice_logger = slice_logger | ||
self._message_repository = message_repository | ||
self._partition_reader = partition_reader | ||
|
||
def on_partition_generation_completed(self, sentinel: PartitionGenerationCompletedSentinel) -> Iterable[AirbyteMessage]: | ||
""" | ||
This method is called when a partition generation is completed. | ||
1. Remove the stream from the list of streams currently generating partitions | ||
2. If the stream is done, mark it as such and return a stream status message | ||
3. If there are more streams to read from, start the next partition generator | ||
""" | ||
stream_name = sentinel.stream.name | ||
self._streams_currently_generating_partitions.remove(sentinel.stream.name) | ||
ret = [] | ||
# It is possible for the stream to already be done if no partitions were generated | ||
if self._is_stream_done(stream_name): | ||
ret.append(self._on_stream_is_done(stream_name)) | ||
if self._stream_instances_to_start_partition_generation: | ||
ret.append(self.start_next_partition_generator()) | ||
return ret | ||
|
||
def on_partition(self, partition: Partition) -> None: | ||
""" | ||
This method is called when a partition is generated. | ||
1. Add the partition to the set of partitions for the stream | ||
2. Log the slice if necessary | ||
3. Submit the partition to the thread pool manager | ||
""" | ||
stream_name = partition.stream_name() | ||
self._streams_to_partitions[stream_name].add(partition) | ||
if self._slice_logger.should_log_slice_message(self._logger): | ||
self._message_repository.emit_message(self._slice_logger.create_slice_log_message(partition.to_slice())) | ||
self._thread_pool_manager.submit(self._partition_reader.process_partition, partition) | ||
|
||
def on_partition_complete_sentinel(self, sentinel: PartitionCompleteSentinel) -> Iterable[AirbyteMessage]: | ||
""" | ||
This method is called when a partition is completed. | ||
1. Close the partition | ||
2. If the stream is done, mark it as such and return a stream status message | ||
3. Emit messages that were added to the message repository | ||
""" | ||
partition = sentinel.partition | ||
partition.close() | ||
if self._is_stream_done(partition.stream_name()): | ||
yield self._on_stream_is_done(partition.stream_name()) | ||
yield from self._message_repository.consume_queue() | ||
|
||
def on_record(self, record: Record) -> Iterable[AirbyteMessage]: | ||
""" | ||
This method is called when a record is read from a partition. | ||
1. Convert the record to an AirbyteMessage | ||
2. If this is the first record for the stream, mark the stream as RUNNING | ||
3. Increment the record counter for the stream | ||
4. Emit the message | ||
5. Emit messages that were added to the message repository | ||
""" | ||
# Do not pass a transformer or a schema | ||
# AbstractStreams are expected to return data as they are expected. | ||
# Any transformation on the data should be done before reaching this point | ||
message = stream_data_to_airbyte_message(record.stream_name, record.data) | ||
stream = self._stream_name_to_instance[record.stream_name] | ||
|
||
if self._record_counter[stream.name] == 0: | ||
self._logger.info(f"Marking stream {stream.name} as RUNNING") | ||
yield stream_status_as_airbyte_message(stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING) | ||
|
||
if message.type == MessageType.RECORD: | ||
self._record_counter[stream.name] += 1 | ||
yield message | ||
yield from self._message_repository.consume_queue() | ||
|
||
def on_exception(self, exception: Exception) -> Iterable[AirbyteMessage]: | ||
""" | ||
This method is called when an exception is raised. | ||
1. Stop all running streams | ||
2. Raise the exception | ||
""" | ||
yield from self._stop_streams() | ||
raise exception | ||
|
||
def start_next_partition_generator(self) -> Optional[AirbyteMessage]: | ||
""" | ||
Start the next partition generator. | ||
1. Pop the next stream to read from | ||
2. Submit the partition generator to the thread pool manager | ||
3. Add the stream to the list of streams currently generating partitions | ||
4. Return a stream status message | ||
""" | ||
if self._stream_instances_to_start_partition_generation: | ||
stream = self._stream_instances_to_start_partition_generation.pop(0) | ||
self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream) | ||
self._streams_currently_generating_partitions.append(stream.name) | ||
self._logger.info(f"Marking stream {stream.name} as STARTED") | ||
self._logger.info(f"Syncing stream: {stream.name} ") | ||
return stream_status_as_airbyte_message( | ||
stream.as_airbyte_stream(), | ||
AirbyteStreamStatus.STARTED, | ||
) | ||
else: | ||
return None | ||
|
||
def is_done(self) -> bool: | ||
""" | ||
This method is called to check if the sync is done. | ||
The sync is done when: | ||
1. There are no more streams generating partitions | ||
2. There are no more streams to read from | ||
3. All partitions for all streams are closed | ||
""" | ||
return ( | ||
not self._streams_currently_generating_partitions | ||
and not self._stream_instances_to_start_partition_generation | ||
and all([all(p.is_closed() for p in partitions) for partitions in self._streams_to_partitions.values()]) | ||
) | ||
|
||
def _is_stream_done(self, stream_name: str) -> bool: | ||
return ( | ||
all([p.is_closed() for p in self._streams_to_partitions[stream_name]]) | ||
and stream_name not in self._streams_currently_generating_partitions | ||
) | ||
|
||
def _on_stream_is_done(self, stream_name: str) -> AirbyteMessage: | ||
self._logger.info(f"Read {self._record_counter[stream_name]} records from {stream_name} stream") | ||
self._logger.info(f"Marking stream {stream_name} as STOPPED") | ||
stream = self._stream_name_to_instance[stream_name] | ||
self._logger.info(f"Finished syncing {stream.name}") | ||
return stream_status_as_airbyte_message(stream.as_airbyte_stream(), AirbyteStreamStatus.COMPLETE) | ||
|
||
def _stop_streams(self) -> Iterable[AirbyteMessage]: | ||
self._thread_pool_manager.shutdown() | ||
for stream_name, partitions in self._streams_to_partitions.items(): | ||
stream = self._stream_name_to_instance[stream_name] | ||
if not all([p.is_closed() for p in partitions]): | ||
self._logger.info(f"Marking stream {stream.name} as STOPPED") | ||
self._logger.info(f"Finished syncing {stream.name}") | ||
yield stream_status_as_airbyte_message(stream.as_airbyte_stream(), AirbyteStreamStatus.INCOMPLETE) |
161 changes: 161 additions & 0 deletions
161
airbyte-cdk/python/airbyte_cdk/sources/concurrent_source/concurrent_source.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
# | ||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
# | ||
import concurrent | ||
import logging | ||
from queue import Queue | ||
from typing import Iterable, Iterator, List | ||
|
||
from airbyte_cdk.models import AirbyteMessage | ||
from airbyte_cdk.sources.concurrent_source.concurrent_read_processor import ConcurrentReadProcessor | ||
from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import PartitionGenerationCompletedSentinel | ||
from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager | ||
from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository | ||
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream | ||
from airbyte_cdk.sources.streams.concurrent.partition_enqueuer import PartitionEnqueuer | ||
from airbyte_cdk.sources.streams.concurrent.partition_reader import PartitionReader | ||
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition | ||
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record | ||
from airbyte_cdk.sources.streams.concurrent.partitions.types import PartitionCompleteSentinel, QueueItem | ||
from airbyte_cdk.sources.utils.slice_logger import DebugSliceLogger, SliceLogger | ||
|
||
|
||
class ConcurrentSource: | ||
""" | ||
A Source that reads data from multiple AbstractStreams concurrently. | ||
It does so by submitting partition generation, and partition read tasks to a thread pool. | ||
The tasks asynchronously add their output to a shared queue. | ||
The read is done when all partitions for all streams were generated and read. | ||
""" | ||
|
||
DEFAULT_TIMEOUT_SECONDS = 900 | ||
|
||
@staticmethod | ||
def create( | ||
num_workers: int, | ||
initial_number_of_partitions_to_generate: int, | ||
logger: logging.Logger, | ||
slice_logger: SliceLogger, | ||
message_repository: MessageRepository, | ||
timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, | ||
) -> "ConcurrentSource": | ||
threadpool = ThreadPoolManager( | ||
concurrent.futures.ThreadPoolExecutor(max_workers=num_workers, thread_name_prefix="workerpool"), logger, num_workers | ||
) | ||
return ConcurrentSource( | ||
threadpool, logger, slice_logger, message_repository, initial_number_of_partitions_to_generate, timeout_seconds | ||
) | ||
|
||
def __init__( | ||
self, | ||
threadpool: ThreadPoolManager, | ||
logger: logging.Logger, | ||
slice_logger: SliceLogger = DebugSliceLogger(), | ||
message_repository: MessageRepository = InMemoryMessageRepository(), | ||
initial_number_partitions_to_generate: int = 1, | ||
timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, | ||
) -> None: | ||
""" | ||
:param threadpool: The threadpool to submit tasks to | ||
:param logger: The logger to log to | ||
:param slice_logger: The slice logger used to create messages on new slices | ||
:param message_repository: The repository to emit messages to | ||
:param initial_number_partitions_to_generate: The initial number of concurrent partition generation tasks. Limiting this number ensures will limit the latency of the first records emitted. While the latency is not critical, emitting the records early allows the platform and the destination to process them as early as possible. | ||
:param timeout_seconds: The maximum number of seconds to wait for a record to be read from the queue. If no record is read within this time, the source will stop reading and return. | ||
""" | ||
self._threadpool = threadpool | ||
self._logger = logger | ||
self._slice_logger = slice_logger | ||
self._message_repository = message_repository | ||
self._initial_number_partitions_to_generate = initial_number_partitions_to_generate | ||
self._timeout_seconds = timeout_seconds | ||
|
||
def read( | ||
self, | ||
streams: List[AbstractStream], | ||
) -> Iterator[AirbyteMessage]: | ||
self._logger.info("Starting syncing") | ||
stream_instances_to_read_from = self._get_streams_to_read_from(streams) | ||
|
||
# Return early if there are no streams to read from | ||
if not stream_instances_to_read_from: | ||
return | ||
|
||
queue: Queue[QueueItem] = Queue() | ||
concurrent_stream_processor = ConcurrentReadProcessor( | ||
stream_instances_to_read_from, | ||
PartitionEnqueuer(queue), | ||
self._threadpool, | ||
self._logger, | ||
self._slice_logger, | ||
self._message_repository, | ||
PartitionReader(queue), | ||
) | ||
|
||
# Enqueue initial partition generation tasks | ||
yield from self._submit_initial_partition_generators(concurrent_stream_processor) | ||
|
||
# Read from the queue until all partitions were generated and read | ||
yield from self._consume_from_queue( | ||
queue, | ||
concurrent_stream_processor, | ||
) | ||
self._threadpool.check_for_errors_and_shutdown() | ||
self._logger.info("Finished syncing") | ||
|
||
def _submit_initial_partition_generators(self, concurrent_stream_processor: ConcurrentReadProcessor) -> Iterable[AirbyteMessage]: | ||
for _ in range(self._initial_number_partitions_to_generate): | ||
status_message = concurrent_stream_processor.start_next_partition_generator() | ||
if status_message: | ||
yield status_message | ||
|
||
def _consume_from_queue( | ||
self, | ||
queue: Queue[QueueItem], | ||
concurrent_stream_processor: ConcurrentReadProcessor, | ||
) -> Iterable[AirbyteMessage]: | ||
while airbyte_message_or_record_or_exception := queue.get(block=True, timeout=self._timeout_seconds): | ||
yield from self._handle_item( | ||
airbyte_message_or_record_or_exception, | ||
concurrent_stream_processor, | ||
) | ||
if concurrent_stream_processor.is_done() and queue.empty(): | ||
# all partitions were generated and processed. we're done here | ||
break | ||
|
||
def _handle_item( | ||
self, | ||
queue_item: QueueItem, | ||
concurrent_stream_processor: ConcurrentReadProcessor, | ||
) -> Iterable[AirbyteMessage]: | ||
# handle queue item and call the appropriate handler depending on the type of the queue item | ||
if isinstance(queue_item, Exception): | ||
yield from concurrent_stream_processor.on_exception(queue_item) | ||
|
||
elif isinstance(queue_item, PartitionGenerationCompletedSentinel): | ||
yield from concurrent_stream_processor.on_partition_generation_completed(queue_item) | ||
|
||
elif isinstance(queue_item, Partition): | ||
concurrent_stream_processor.on_partition(queue_item) | ||
elif isinstance(queue_item, PartitionCompleteSentinel): | ||
yield from concurrent_stream_processor.on_partition_complete_sentinel(queue_item) | ||
elif isinstance(queue_item, Record): | ||
yield from concurrent_stream_processor.on_record(queue_item) | ||
else: | ||
raise ValueError(f"Unknown queue item type: {type(queue_item)}") | ||
|
||
def _get_streams_to_read_from(self, streams: List[AbstractStream]) -> List[AbstractStream]: | ||
""" | ||
Iterate over the configured streams and return a list of streams to read from. | ||
If a stream is not configured, it will be skipped. | ||
If a stream is configured but does not exist in the source and self.raise_exception_on_missing_stream is True, an exception will be raised | ||
If a stream is not available, it will be skipped | ||
""" | ||
stream_instances_to_read_from = [] | ||
for stream in streams: | ||
stream_availability = stream.check_availability() | ||
if not stream_availability.is_available(): | ||
self._logger.warning(f"Skipped syncing stream '{stream.name}' because it was unavailable. {stream_availability.message()}") | ||
continue | ||
stream_instances_to_read_from.append(stream) | ||
return stream_instances_to_read_from |
Oops, something went wrong.