Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ concurrent cdk: Read multiple streams concurrently #32411

Merged
merged 77 commits into from
Nov 28, 2023

Conversation

girarda
Copy link
Contributor

@girarda girarda commented Nov 10, 2023

What

  • Implement a new type of Source which reads partitions from multiple streams concurrently
  • This is done by moving the logic to read from Partitions from the AbstractStream to the Source
  • The concurrent source submits partition generation tasks for multiple streams concurrently, and reads the records from a queue in a similar way as how the ThreadBasedConcurrentStream used to

How

Concurrent Source

  1. Iterate of the configured catalog, make sure the streams exist and filter out the ones that are not available
  2. Submit partition generation tasks to the threadpool
  3. Pulls items from the queue and forwards them to the ConcurrentStreamProcessor
  4. Stops when the queue is empty, the threadpool is done, and the concurrent stream processor is done

ThreadPoolManager

The ThreadPoolManager class is a wrapper on top of a threadpool executor. It is responsible for submitting tasks, throttling if there are too many pending tasks, and raising an exception if any of the tasks fails.

The logic is very similar to what we used to do in ThreadBasedConcurrentStream, but I extracted it in it's own class so it's easier to test

ConcurrentStreamProcessor

The ConcurrentSource polls items from the queue and sends them to the ConcurrentStreamProcessor which acts on them. It's responsible for submitting new tasks and creating the AirbyteMessages. I'm not a huge fan of the name and am open to any suggestion but I found it important to extract the logic out of ConcurrentSource for testability reasons

Partition and AbstractStream

The interfaces of Partition and AbstractStream changed a bit.

  • Partition is not responsible for closes itself
  • AbstractStream is not responsible for reading records. Instead, it generates Partitions that can read records
  • ThreadBasedConcurrentStream was stripped down of it's read method and renamed to DefaultStream since nothing about it is concurrent
  • Record holds its stream name. An alternative would be to give it a handler on its AbstractStream or Partition

Recommended reading order

  1. airbyte-cdk/python/airbyte_cdk/sources/concurrent_source/concurrent_source.py
  2. airbyte-cdk/python/airbyte_cdk/sources/concurrent_source/thread_pool_manager.py
  3. airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/partitions/types.py
  4. airbyte-cdk/python/airbyte_cdk/sources/concurrent_source/concurrent_stream_processor.py
  5. airbyte-cdk/python/airbyte_cdk/sources/concurrent_source/partition_generation_completed_sentinel.py
  6. airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/partitions/partition.py
  7. airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/abstract_stream.py
  8. airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/default_stream.py
  9. airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/partitions/record.py

Note: source-stripe needs to be updated in a follow up PR

Copy link

vercel bot commented Nov 10, 2023

The latest updates on your projects. Learn more about Vercel for Git ↗︎

Name Status Preview Comments Updated (UTC)
airbyte-docs ✅ Ready (Inspect) Visit Preview 💬 Add feedback Nov 28, 2023 10:17pm

@octavia-squidington-iii octavia-squidington-iii added the CDK Connector Development Kit label Nov 10, 2023
@girarda girarda changed the base branch from alex/apply_debug_logging to alex/stream_status November 10, 2023 03:19
@girarda girarda requested a review from a team as a code owner November 14, 2023 04:12
@girarda girarda requested review from clnoll and maxi297 November 14, 2023 04:13
self._raise_exception_on_missing_stream = raise_exception_on_missing_stream

@property
def message_repository(self) -> MessageRepository:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this now? I think it might have been necessary before because it inherited from AbstractSource but it seems like it has been removed.

I feel like a snippet of code of how Stripe or Salesforce would look like using this source would help understand the usage and end-to-end setup

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is still needed since it is passed to the ConcurrenttreamProcessor

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I see that it is being passed to ConcurrentStreamProcessor here but it's done using the private field. Do we need to expose it publicly using a method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh derp. no, the repository shouldn't be public. deleted the property method

@octavia-squidington-iv octavia-squidington-iv requested review from a team November 14, 2023 13:41
Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the test this morning and added a couple comments. Very thorough and clean code! I think I'm just missing the "how it'll be configured for a source" to make sure everything makes sense

self._message_repository,
self._partition_reader,
)
handler._streams_currently_generating_partitions = {_STREAM_NAME}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel these tests should be driven by the public interface only when possible. Could we do something like:

    def test_handle_partition_done_no_other_streams_to_generate_partitions_for(self):
        stream_instances_to_read_from = [self._stream]
        handler = ConcurrentStreamProcessor(
            stream_instances_to_read_from,
            self._partition_enqueuer,
            self._thread_pool_manager,
            self._logger,
            self._slice_logger,
            self._message_repository,
            self._partition_reader,
        )
        handler.start_next_partition_generator()
        handler.on_partition(self._stream_partition)
        handler._streams_currently_generating_partitions = {_STREAM_NAME}
        handler._streams_to_partitions = {_STREAM_NAME: {self._an_open_partition}}

        sentinel = PartitionGenerationCompletedSentinel(self._stream)
        messages = list(handler.on_partition_generation_completed(sentinel))

        expected_messages = []
        assert expected_messages == messages

My concerns are:

  • When a user of this class will interact with ConcurrentStreamProcessor, it'll use the public interface. Hence, also using the public interface in the tests will mimic real usage
  • As a tester, I need to know the exact internal steps with the dependencies between private variables. This is more involved an error-prone than calling public method
  • If the implementation changes, the tests will probably break. This is not true if we use the public interface as this is the contract we try to uphold and therefore should be more stable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed. done!

@octavia-squidington-iv octavia-squidington-iv requested a review from a team November 14, 2023 14:20
@girarda
Copy link
Contributor Author

girarda commented Nov 14, 2023

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! I'm eager to see this in action

Base automatically changed from alex/stream_status to master November 15, 2023 19:37
Copy link
Contributor

@clnoll clnoll left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome @girarda, looks really nice! Just a few comments and suggestions.

"""
:param threadpool: The threadpool to submit tasks to
:param message_repository: The repository to emit messages to
:param max_number_of_partition_generator_in_progress: The initial number of concurrent partition generation tasks. Limiting this number ensures will limit the latency of the first records emitted. While the latency is not critical, emitting the records early allows the platform and the destination to process them as early as possible.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: some of the params are missing and/or have been renamed since.

Regarding the comment for max_number_of_partition_generator_in_progress / initial_number_partitions_to_generate - looks like the idea is that we'll generate one partition per stream, and so this variable basically is there to say "you can start processing records after <initial_number_partitions_to_generate> partitions have been created"; if there are tons of streams, it means that we're allowed to start processing records before all of the stream partitions have been generated. Is that right?

It's not obvious to me that we'd ever want this to be more than one - did you have a use case in mind or is this here for configurability just in case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this variable basically is there to say "you can start processing records after <initial_number_partitions_to_generate> partitions have been created"

that's not quite what it says. records can be started after N tasks to generate partitions were submitted to the threadpool.

If the N == one and generating the partitions takes a long time (eg if we need to read all records from a parent stream), then the workers might be idle for some time.

If the N > number of workers it might take some time for the first records to be polled from the queue since the first many items will be partitions to process. I mostly want to avoid partition generation tasks from using all the workers and preventing them from actually processing the partitions.

An alternative implementation might be to submit the first partition generation before polling from the queue, and submit the others when we read the first record.

Copy link
Contributor

@clnoll clnoll Nov 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay thanks for the explanation. Tbh I'm not entirely sold on the extra complexity that this requires. If we were using coroutines/asyncio here we wouldn't have this issue because the executor would be able to flit between tasks while I/O is happening, whereas the standard threads are blocking. Even though your code helps alleviate some of this pressure we're always going to have the problem of threads idling during I/O as long as we're using standard threads. We could revisit using asyncio, though I recognize it might require significant changes. Alternatively, I think we could get a similar efficiency boost by replacing standard threads with green threads, as they offer context switching during I/O operations with less overhead. IMO either of those are preferable to adding complexity that doesn't entirely solve the problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We synced offline.

The crux of the issue comes from our threadpool based approach. If using N workers, and they are all processing long partition generation tasks, no workers are available for emitting records from those partitions, which affects the overall sync performance.

@clnoll's suggestion of using asyncio is great as it should allow us to simplify the flow and might also improve the performance. We'll prioritize a spike to evaluate the effort and go from there.

streams: List[AbstractStream],
) -> Iterator[AirbyteMessage]:
self._logger.info("Starting syncing")
stream_instances_to_read_from = self._get_streams_to_read_from(streams)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that availability checks can be time-consuming, is there any reason not to do this part concurrently? We can just put all the streams in the queue and let them be filtered out by the worker that's processing them.

That would make cost of the initial enqueuing of streams negligible and would allow us to simplify this method - we wouldn't need a separate _submit_initial_partition_generators step.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running the checks on the worker is a good idea. I'll do in a separate PR

@katmarkham katmarkham removed the request for review from a team November 20, 2023 21:10
@girarda girarda merged commit a84902e into master Nov 28, 2023
22 checks passed
@girarda girarda deleted the alex/concurrent_source branch November 28, 2023 23:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CDK Connector Development Kit
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants