Skip to content

Commit

Permalink
Fixing futures pruning (#32399)
Browse files Browse the repository at this point in the history
  • Loading branch information
maxi297 authored Nov 9, 2023
1 parent 3159f10 commit b3f0e05
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 17 deletions.
24 changes: 12 additions & 12 deletions .github/workflows/airbyte-ci-release-experiment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@ jobs:
strategy:
fail-fast: false
matrix:
os: ['ubuntu-latest', 'macos-latest']
os: ["ubuntu-latest", "macos-latest"]

steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
with:
python-version: 3.10
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
with:
python-version: 3.10

- run: curl -sSL https://raw.githubusercontent.com/python-poetry/poetry/master/get-poetry.py | python -
- run: curl -sSL https://raw.githubusercontent.com/python-poetry/poetry/master/get-poetry.py | python -

- run: cd airbyte-ci/pipelines/airbyte_ci
- run: poetry install --with dev
- run: poetry run pyinstaller --collect-all pipelines --collect-all beartype --collect-all dagger --hidden-import strawberry --name airbyte-ci-${{ matrix.os }} --onefile pipelines/cli/airbyte_ci.py
- uses: actions/upload-artifact@v2
with:
path: dist/*
- run: cd airbyte-ci/pipelines/airbyte_ci
- run: poetry install --with dev
- run: poetry run pyinstaller --collect-all pipelines --collect-all beartype --collect-all dagger --hidden-import strawberry --name airbyte-ci-${{ matrix.os }} --onefile pipelines/cli/airbyte_ci.py
- uses: actions/upload-artifact@v2
with:
path: dist/*
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def _prune_futures(self, futures: List[Future[Any]]) -> None:
if len(futures) < self._max_concurrent_tasks:
return

for index in range(len(futures)):
for index in reversed(range(len(futures))):
future = futures[index]
optional_exception = future.exception()
if optional_exception:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record
from airbyte_cdk.sources.streams.concurrent.thread_based_concurrent_stream import ThreadBasedConcurrentStream

_MAX_CONCURRENT_TASKS = 2


class ThreadBasedConcurrentStreamTest(unittest.TestCase):
def setUp(self):
Expand All @@ -39,7 +41,7 @@ def setUp(self):
self._logger,
self._message_repository,
1,
2,
_MAX_CONCURRENT_TASKS,
0,
cursor=self._cursor,
)
Expand Down Expand Up @@ -142,15 +144,33 @@ def test_given_exception_then_fail_immediately(self):
f2 = Mock()

# Verify that the done() method will be called until only one future is still running
f1.done.return_value = False
f1.done.return_value = True
f1.exception.return_value = None
f2.done.return_value = False
f2.exception.return_value = ValueError("An exception")
f2.done.return_value = True
f2.exception.return_value = ValueError("ERROR")
futures = [f1, f2]

with pytest.raises(RuntimeError):
self._stream._wait_while_too_many_pending_futures(futures)

def test_given_removing_multiple_elements_when_pruning_then_fail_immediately(self):
# Verify that the done() method will be called until only one future is still running
futures = []
for _ in range(_MAX_CONCURRENT_TASKS + 1):
future = Mock()
future.done.return_value = True
future.exception.return_value = None
futures.append(future)

pending_future = Mock()
pending_future.done.return_value = False
pending_future.exception.return_value = None
futures.append(pending_future)

self._stream._wait_while_too_many_pending_futures(futures)

assert futures == [pending_future]

def test_as_airbyte_stream(self):
expected_airbyte_stream = AirbyteStream(
name=self._name,
Expand Down

0 comments on commit b3f0e05

Please sign in to comment.