diff --git a/.github/workflows/airbyte-ci-release-experiment.yml b/.github/workflows/airbyte-ci-release-experiment.yml index 416ee5569c0c..cdd572066739 100644 --- a/.github/workflows/airbyte-ci-release-experiment.yml +++ b/.github/workflows/airbyte-ci-release-experiment.yml @@ -14,19 +14,19 @@ jobs: strategy: fail-fast: false matrix: - os: ['ubuntu-latest', 'macos-latest'] + os: ["ubuntu-latest", "macos-latest"] steps: - - uses: actions/checkout@v2 - - uses: actions/setup-python@v2 - with: - python-version: 3.10 + - uses: actions/checkout@v2 + - uses: actions/setup-python@v2 + with: + python-version: 3.10 - - run: curl -sSL https://raw.githubusercontent.com/python-poetry/poetry/master/get-poetry.py | python - + - run: curl -sSL https://raw.githubusercontent.com/python-poetry/poetry/master/get-poetry.py | python - - - run: cd airbyte-ci/pipelines/airbyte_ci - - run: poetry install --with dev - - run: poetry run pyinstaller --collect-all pipelines --collect-all beartype --collect-all dagger --hidden-import strawberry --name airbyte-ci-${{ matrix.os }} --onefile pipelines/cli/airbyte_ci.py - - uses: actions/upload-artifact@v2 - with: - path: dist/* + - run: cd airbyte-ci/pipelines/airbyte_ci + - run: poetry install --with dev + - run: poetry run pyinstaller --collect-all pipelines --collect-all beartype --collect-all dagger --hidden-import strawberry --name airbyte-ci-${{ matrix.os }} --onefile pipelines/cli/airbyte_ci.py + - uses: actions/upload-artifact@v2 + with: + path: dist/* diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/thread_based_concurrent_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/thread_based_concurrent_stream.py index 44aa69de491c..cd2f92dd2405 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/thread_based_concurrent_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/thread_based_concurrent_stream.py @@ -154,7 +154,7 @@ def _prune_futures(self, futures: List[Future[Any]]) -> None: if len(futures) < self._max_concurrent_tasks: return - for index in range(len(futures)): + for index in reversed(range(len(futures))): future = futures[index] optional_exception = future.exception() if optional_exception: diff --git a/airbyte-cdk/python/unit_tests/sources/streams/concurrent/test_thread_based_concurrent_stream.py b/airbyte-cdk/python/unit_tests/sources/streams/concurrent/test_thread_based_concurrent_stream.py index 152560ee629c..3a667d4d1a52 100644 --- a/airbyte-cdk/python/unit_tests/sources/streams/concurrent/test_thread_based_concurrent_stream.py +++ b/airbyte-cdk/python/unit_tests/sources/streams/concurrent/test_thread_based_concurrent_stream.py @@ -13,6 +13,8 @@ from airbyte_cdk.sources.streams.concurrent.partitions.record import Record from airbyte_cdk.sources.streams.concurrent.thread_based_concurrent_stream import ThreadBasedConcurrentStream +_MAX_CONCURRENT_TASKS = 2 + class ThreadBasedConcurrentStreamTest(unittest.TestCase): def setUp(self): @@ -39,7 +41,7 @@ def setUp(self): self._logger, self._message_repository, 1, - 2, + _MAX_CONCURRENT_TASKS, 0, cursor=self._cursor, ) @@ -142,15 +144,33 @@ def test_given_exception_then_fail_immediately(self): f2 = Mock() # Verify that the done() method will be called until only one future is still running - f1.done.return_value = False + f1.done.return_value = True f1.exception.return_value = None - f2.done.return_value = False - f2.exception.return_value = ValueError("An exception") + f2.done.return_value = True + f2.exception.return_value = ValueError("ERROR") futures = [f1, f2] with pytest.raises(RuntimeError): self._stream._wait_while_too_many_pending_futures(futures) + def test_given_removing_multiple_elements_when_pruning_then_fail_immediately(self): + # Verify that the done() method will be called until only one future is still running + futures = [] + for _ in range(_MAX_CONCURRENT_TASKS + 1): + future = Mock() + future.done.return_value = True + future.exception.return_value = None + futures.append(future) + + pending_future = Mock() + pending_future.done.return_value = False + pending_future.exception.return_value = None + futures.append(pending_future) + + self._stream._wait_while_too_many_pending_futures(futures) + + assert futures == [pending_future] + def test_as_airbyte_stream(self): expected_airbyte_stream = AirbyteStream( name=self._name,