Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ concurrent cdk: Read multiple streams concurrently #32411

Merged
merged 77 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from 75 commits
Commits
Show all changes
77 commits
Select commit Hold shift + click to select a range
95230db
Apply log level at the entrpoint level instead of the abstract source
girarda Nov 10, 2023
fee9d5a
remove apply logging from declarative framework
girarda Nov 10, 2023
b49ddbd
fix mypy
girarda Nov 10, 2023
c714cd6
apply changes
girarda Nov 10, 2023
84c6b50
pass airbytestream instead of configured stream to as_airbyte_message
girarda Nov 10, 2023
6275446
Merge branch 'alex/stream_status' into alex/concurrent_source
girarda Nov 10, 2023
18d87cd
update usage
girarda Nov 10, 2023
18dc14a
oops
girarda Nov 10, 2023
149212a
update
girarda Nov 10, 2023
03f53d6
rename
girarda Nov 10, 2023
e25f320
delte comment
girarda Nov 10, 2023
551a812
delete some dead code
girarda Nov 10, 2023
72fd3de
unit tests
girarda Nov 10, 2023
c7ea97a
start extracting into queue item handler
girarda Nov 10, 2023
a7a822a
move handle partition
girarda Nov 10, 2023
1a76377
on_partition_complete_sentinel
girarda Nov 10, 2023
2f7878f
delete dead code
girarda Nov 10, 2023
798fda7
delete more code
girarda Nov 10, 2023
438a0ea
on record
girarda Nov 10, 2023
1c3cd1c
on exception
girarda Nov 10, 2023
c918082
delete dead code
girarda Nov 10, 2023
2704468
is_done
girarda Nov 10, 2023
b6596f7
remove unused params
girarda Nov 10, 2023
2056d25
update
girarda Nov 10, 2023
d8ef071
fix
girarda Nov 10, 2023
4157008
delete dead code
girarda Nov 10, 2023
9a68f78
fix mypy
girarda Nov 10, 2023
dbe9262
cleanup the tests a bit
girarda Nov 10, 2023
907755a
update
girarda Nov 10, 2023
11ab634
remove param
girarda Nov 10, 2023
0a7c3f2
set max number of streams
girarda Nov 10, 2023
acd78b8
update
girarda Nov 10, 2023
39955e7
update
girarda Nov 10, 2023
5185868
update
girarda Nov 10, 2023
5b9fa3d
delete cruft
girarda Nov 10, 2023
0600795
delete cruft
girarda Nov 10, 2023
9254d55
update comment
girarda Nov 10, 2023
29ee92f
type annotation
girarda Nov 10, 2023
7dc764c
update
girarda Nov 10, 2023
e86b701
comments
girarda Nov 10, 2023
6843bfc
rename
girarda Nov 10, 2023
8ed60df
remove superfluous param
girarda Nov 10, 2023
c5a5e99
comment
girarda Nov 10, 2023
60d1a74
docstrings
girarda Nov 10, 2023
6ddea7f
small cleanup
girarda Nov 10, 2023
d2c3f86
rename
girarda Nov 10, 2023
d758fae
small update
girarda Nov 10, 2023
22aa854
docstrign
girarda Nov 10, 2023
7c8db6a
docstrings
girarda Nov 10, 2023
6bfed39
docstring
girarda Nov 10, 2023
6b56ee8
docstring
girarda Nov 10, 2023
a385b93
rename file
girarda Nov 10, 2023
2b89ac9
mypy
girarda Nov 10, 2023
cdc0d8f
remove superfluous call to shutdown
girarda Nov 13, 2023
6b8d34e
add unit test
girarda Nov 13, 2023
55203ec
yield from []
girarda Nov 13, 2023
63199fc
remove useless comment
girarda Nov 13, 2023
e2fe651
comment
girarda Nov 13, 2023
c84d150
fix comment
girarda Nov 13, 2023
623bdd9
remove superfluous check
girarda Nov 13, 2023
e7a0493
extract a facade
girarda Nov 14, 2023
cae8795
test the adapter
girarda Nov 14, 2023
53c3c7e
rename field and comment
girarda Nov 14, 2023
700bb68
rename field
girarda Nov 14, 2023
927e1bc
mypy
girarda Nov 14, 2023
9a1c26e
rename
girarda Nov 14, 2023
df0d2ea
comments
girarda Nov 14, 2023
65cf274
fix tests
girarda Nov 14, 2023
fd68c3e
use spec
girarda Nov 14, 2023
39a5f48
use public interface
girarda Nov 14, 2023
166d004
remove message repository property
girarda Nov 14, 2023
2ebfbf8
Merge branch 'master' into alex/concurrent_source
girarda Nov 15, 2023
e914feb
rename
girarda Nov 16, 2023
c4816c8
docstrings and remove unused param
girarda Nov 16, 2023
5fa7bbe
Merge branch 'master' into alex/concurrent_source
girarda Nov 27, 2023
8615786
remove superfluous yield from
girarda Nov 27, 2023
197d9d6
Merge branch 'master' into alex/concurrent_source
girarda Nov 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import logging
from typing import Dict, Iterable, List, Optional, Set

from airbyte_cdk.models import AirbyteMessage, AirbyteStreamStatus
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import PartitionGenerationCompletedSentinel
from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager
from airbyte_cdk.sources.message import MessageRepository
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
from airbyte_cdk.sources.streams.concurrent.partition_enqueuer import PartitionEnqueuer
from airbyte_cdk.sources.streams.concurrent.partition_reader import PartitionReader
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record
from airbyte_cdk.sources.streams.concurrent.partitions.types import PartitionCompleteSentinel
from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
from airbyte_cdk.sources.utils.slice_logger import SliceLogger
from airbyte_cdk.utils.stream_status_utils import as_airbyte_message as stream_status_as_airbyte_message


class ConcurrentReadProcessor:
def __init__(
self,
stream_instances_to_read_from: List[AbstractStream],
partition_enqueuer: PartitionEnqueuer,
thread_pool_manager: ThreadPoolManager,
logger: logging.Logger,
slice_logger: SliceLogger,
message_repository: MessageRepository,
partition_reader: PartitionReader,
):
"""
This class is responsible for handling items from a concurrent stream read process.
:param stream_instances_to_read_from: List of streams to read from
:param partition_enqueuer: PartitionEnqueuer instance
:param thread_pool_manager: ThreadPoolManager instance
:param logger: Logger instance
:param slice_logger: SliceLogger instance
:param message_repository: MessageRepository instance
:param partition_reader: PartitionReader instance
"""
self._stream_name_to_instance = {s.name: s for s in stream_instances_to_read_from}
self._record_counter = {}
self._streams_to_partitions: Dict[str, Set[Partition]] = {}
for stream in stream_instances_to_read_from:
self._streams_to_partitions[stream.name] = set()
self._record_counter[stream.name] = 0
self._thread_pool_manager = thread_pool_manager
self._partition_enqueuer = partition_enqueuer
self._stream_instances_to_start_partition_generation = stream_instances_to_read_from
self._streams_currently_generating_partitions: List[str] = []
self._logger = logger
self._slice_logger = slice_logger
self._message_repository = message_repository
self._partition_reader = partition_reader

def on_partition_generation_completed(self, sentinel: PartitionGenerationCompletedSentinel) -> Iterable[AirbyteMessage]:
"""
This method is called when a partition generation is completed.
1. Remove the stream from the list of streams currently generating partitions
2. If the stream is done, mark it as such and return a stream status message
3. If there are more streams to read from, start the next partition generator
"""
stream_name = sentinel.stream.name
self._streams_currently_generating_partitions.remove(sentinel.stream.name)
ret = []
# It is possible for the stream to already be done if no partitions were generated
if self._is_stream_done(stream_name):
ret.append(self._on_stream_is_done(stream_name))
if self._stream_instances_to_start_partition_generation:
ret.append(self.start_next_partition_generator())
return ret

def on_partition(self, partition: Partition) -> None:
"""
This method is called when a partition is generated.
1. Add the partition to the set of partitions for the stream
2. Log the slice if necessary
3. Submit the partition to the thread pool manager
"""
stream_name = partition.stream_name()
self._streams_to_partitions[stream_name].add(partition)
if self._slice_logger.should_log_slice_message(self._logger):
self._message_repository.emit_message(self._slice_logger.create_slice_log_message(partition.to_slice()))
self._thread_pool_manager.submit(self._partition_reader.process_partition, partition)

def on_partition_complete_sentinel(self, sentinel: PartitionCompleteSentinel) -> Iterable[AirbyteMessage]:
"""
This method is called when a partition is completed.
1. Close the partition
2. If the stream is done, mark it as such and return a stream status message
3. Emit messages that were added to the message repository
"""
partition = sentinel.partition
partition.close()
if self._is_stream_done(partition.stream_name()):
yield self._on_stream_is_done(partition.stream_name())
yield from self._message_repository.consume_queue()

def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
"""
This method is called when a record is read from a partition.
1. Convert the record to an AirbyteMessage
2. If this is the first record for the stream, mark the stream as RUNNING
3. Increment the record counter for the stream
4. Emit the message
5. Emit messages that were added to the message repository
"""
# Do not pass a transformer or a schema
# AbstractStreams are expected to return data as they are expected.
# Any transformation on the data should be done before reaching this point
message = stream_data_to_airbyte_message(record.stream_name, record.data)
stream = self._stream_name_to_instance[record.stream_name]

if self._record_counter[stream.name] == 0:
self._logger.info(f"Marking stream {stream.name} as RUNNING")
yield stream_status_as_airbyte_message(stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING)

if message.type == MessageType.RECORD:
self._record_counter[stream.name] += 1
yield message
yield from self._message_repository.consume_queue()

def on_exception(self, exception: Exception) -> Iterable[AirbyteMessage]:
"""
This method is called when an exception is raised.
1. Stop all running streams
2. Raise the exception
"""
yield from self._stop_streams()
raise exception

def start_next_partition_generator(self) -> Optional[AirbyteMessage]:
"""
Start the next partition generator.
1. Pop the next stream to read from
2. Submit the partition generator to the thread pool manager
3. Add the stream to the list of streams currently generating partitions
4. Return a stream status message
"""
if self._stream_instances_to_start_partition_generation:
stream = self._stream_instances_to_start_partition_generation.pop(0)
self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream)
self._streams_currently_generating_partitions.append(stream.name)
self._logger.info(f"Marking stream {stream.name} as STARTED")
self._logger.info(f"Syncing stream: {stream.name} ")
return stream_status_as_airbyte_message(
stream.as_airbyte_stream(),
AirbyteStreamStatus.STARTED,
)
else:
return None

def is_done(self) -> bool:
"""
This method is called to check if the sync is done.
The sync is done when:
1. There are no more streams generating partitions
2. There are no more streams to read from
3. All partitions for all streams are closed
"""
return (
not self._streams_currently_generating_partitions
and not self._stream_instances_to_start_partition_generation
and all([all(p.is_closed() for p in partitions) for partitions in self._streams_to_partitions.values()])
)

def _is_stream_done(self, stream_name: str) -> bool:
return (
all([p.is_closed() for p in self._streams_to_partitions[stream_name]])
and stream_name not in self._streams_currently_generating_partitions
)

def _on_stream_is_done(self, stream_name: str) -> AirbyteMessage:
self._logger.info(f"Read {self._record_counter[stream_name]} records from {stream_name} stream")
self._logger.info(f"Marking stream {stream_name} as STOPPED")
stream = self._stream_name_to_instance[stream_name]
self._logger.info(f"Finished syncing {stream.name}")
return stream_status_as_airbyte_message(stream.as_airbyte_stream(), AirbyteStreamStatus.COMPLETE)

def _stop_streams(self) -> Iterable[AirbyteMessage]:
self._thread_pool_manager.shutdown()
for stream_name, partitions in self._streams_to_partitions.items():
stream = self._stream_name_to_instance[stream_name]
if not all([p.is_closed() for p in partitions]):
self._logger.info(f"Marking stream {stream.name} as STOPPED")
self._logger.info(f"Finished syncing {stream.name}")
yield stream_status_as_airbyte_message(stream.as_airbyte_stream(), AirbyteStreamStatus.INCOMPLETE)
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import concurrent
import logging
from queue import Queue
from typing import Iterable, Iterator, List

from airbyte_cdk.models import AirbyteMessage
from airbyte_cdk.sources.concurrent_source.concurrent_read_processor import ConcurrentReadProcessor
from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import PartitionGenerationCompletedSentinel
from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager
from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
from airbyte_cdk.sources.streams.concurrent.partition_enqueuer import PartitionEnqueuer
from airbyte_cdk.sources.streams.concurrent.partition_reader import PartitionReader
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record
from airbyte_cdk.sources.streams.concurrent.partitions.types import PartitionCompleteSentinel, QueueItem
from airbyte_cdk.sources.utils.slice_logger import DebugSliceLogger, SliceLogger


class ConcurrentSource:
"""
A Source that reads data from multiple AbstractStreams concurrently.
It does so by submitting partition generation, and partition read tasks to a thread pool.
The tasks asynchronously add their output to a shared queue.
The read is done when all partitions for all streams were generated and read.
"""

DEFAULT_TIMEOUT_SECONDS = 900

@staticmethod
def create(
num_workers: int,
initial_number_of_partitions_to_generate: int,
logger: logging.Logger,
slice_logger: SliceLogger,
message_repository: MessageRepository,
timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
) -> "ConcurrentSource":
threadpool = ThreadPoolManager(
concurrent.futures.ThreadPoolExecutor(max_workers=num_workers, thread_name_prefix="workerpool"), logger, num_workers
)
return ConcurrentSource(
threadpool, logger, slice_logger, message_repository, initial_number_of_partitions_to_generate, timeout_seconds
)

def __init__(
self,
threadpool: ThreadPoolManager,
logger: logging.Logger,
slice_logger: SliceLogger = DebugSliceLogger(),
clnoll marked this conversation as resolved.
Show resolved Hide resolved
message_repository: MessageRepository = InMemoryMessageRepository(),
initial_number_partitions_to_generate: int = 1,
timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
) -> None:
"""
:param threadpool: The threadpool to submit tasks to
:param logger: The logger to log to
:param slice_logger: The slice logger used to create messages on new slices
:param message_repository: The repository to emit messages to
:param initial_number_partitions_to_generate: The initial number of concurrent partition generation tasks. Limiting this number ensures will limit the latency of the first records emitted. While the latency is not critical, emitting the records early allows the platform and the destination to process them as early as possible.
:param timeout_seconds: The maximum number of seconds to wait for a record to be read from the queue. If no record is read within this time, the source will stop reading and return.
"""
self._threadpool = threadpool
self._logger = logger
self._slice_logger = slice_logger
self._message_repository = message_repository
self._initial_number_partitions_to_generate = initial_number_partitions_to_generate
self._timeout_seconds = timeout_seconds

def read(
self,
streams: List[AbstractStream],
) -> Iterator[AirbyteMessage]:
self._logger.info("Starting syncing")
stream_instances_to_read_from = self._get_streams_to_read_from(streams)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that availability checks can be time-consuming, is there any reason not to do this part concurrently? We can just put all the streams in the queue and let them be filtered out by the worker that's processing them.

That would make cost of the initial enqueuing of streams negligible and would allow us to simplify this method - we wouldn't need a separate _submit_initial_partition_generators step.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running the checks on the worker is a good idea. I'll do in a separate PR


# Return early if there are no streams to read from
if not stream_instances_to_read_from:
yield from []
return
maxi297 marked this conversation as resolved.
Show resolved Hide resolved

queue: Queue[QueueItem] = Queue()
concurrent_stream_processor = ConcurrentReadProcessor(
stream_instances_to_read_from,
PartitionEnqueuer(queue),
self._threadpool,
self._logger,
self._slice_logger,
self._message_repository,
PartitionReader(queue),
)

# Enqueue initial partition generation tasks
yield from self._submit_initial_partition_generators(concurrent_stream_processor)

# Read from the queue until all partitions were generated and read
yield from self._consume_from_queue(
queue,
concurrent_stream_processor,
)
self._threadpool.check_for_errors_and_shutdown()
self._logger.info("Finished syncing")

def _submit_initial_partition_generators(self, concurrent_stream_processor: ConcurrentReadProcessor) -> Iterable[AirbyteMessage]:
for _ in range(self._initial_number_partitions_to_generate):
status_message = concurrent_stream_processor.start_next_partition_generator()
if status_message:
yield status_message

def _consume_from_queue(
self,
queue: Queue[QueueItem],
concurrent_stream_processor: ConcurrentReadProcessor,
) -> Iterable[AirbyteMessage]:
while airbyte_message_or_record_or_exception := queue.get(block=True, timeout=self._timeout_seconds):
yield from self._handle_item(
airbyte_message_or_record_or_exception,
concurrent_stream_processor,
)
if concurrent_stream_processor.is_done() and queue.empty():
# all partitions were generated and processed. we're done here
break

def _handle_item(
self,
queue_item: QueueItem,
concurrent_stream_processor: ConcurrentReadProcessor,
) -> Iterable[AirbyteMessage]:
# handle queue item and call the appropriate handler depending on the type of the queue item
if isinstance(queue_item, Exception):
yield from concurrent_stream_processor.on_exception(queue_item)

elif isinstance(queue_item, PartitionGenerationCompletedSentinel):
yield from concurrent_stream_processor.on_partition_generation_completed(queue_item)

elif isinstance(queue_item, Partition):
concurrent_stream_processor.on_partition(queue_item)
elif isinstance(queue_item, PartitionCompleteSentinel):
yield from concurrent_stream_processor.on_partition_complete_sentinel(queue_item)
elif isinstance(queue_item, Record):
yield from concurrent_stream_processor.on_record(queue_item)
else:
raise ValueError(f"Unknown queue item type: {type(queue_item)}")

def _get_streams_to_read_from(self, streams: List[AbstractStream]) -> List[AbstractStream]:
"""
Iterate over the configured streams and return a list of streams to read from.
If a stream is not configured, it will be skipped.
If a stream is configured but does not exist in the source and self.raise_exception_on_missing_stream is True, an exception will be raised
If a stream is not available, it will be skipped
"""
stream_instances_to_read_from = []
for stream in streams:
maxi297 marked this conversation as resolved.
Show resolved Hide resolved
stream_availability = stream.check_availability()
if not stream_availability.is_available():
self._logger.warning(f"Skipped syncing stream '{stream.name}' because it was unavailable. {stream_availability.message()}")
continue
stream_instances_to_read_from.append(stream)
return stream_instances_to_read_from
Loading
Loading