Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename close() to aclose() in PubSubItem #55

Merged
merged 1 commit into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion nextline/continuous.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async def close(self) -> None:
await self._task
if self._tasks:
await asyncio.gather(*self._tasks)
await self._pubsub_enabled.close()
await self._pubsub_enabled.aclose()

async def __aenter__(self) -> 'Continuous':
await self.start()
Expand Down
4 changes: 2 additions & 2 deletions nextline/utils/pubsub/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async def end(self, key: _KT) -> None:

"""
if q := self._queue.pop(key, None):
await q.close()
await q.aclose()

async def close(self) -> None:
"""End all subscriptions for all keys
Expand All @@ -52,7 +52,7 @@ async def close(self) -> None:
"""
while self._queue:
_, q = self._queue.popitem()
await q.close()
await q.aclose()

async def __aenter__(self) -> "PubSub[_KT, _VT]":
return self
Expand Down
6 changes: 3 additions & 3 deletions nextline/utils/pubsub/item.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class PubSubItem(Generic[_Item]):
>>> async def distributor_rest(obj):
... for i in items[2:]:
... await obj.publish(i)
... await obj.close()
... await obj.aclose()

The second distributor calls the method `close()` to end the subscriptions.
The method `subscribe()` will return when the method `close()` is called.
Expand Down Expand Up @@ -161,7 +161,7 @@ async def subscribe(self, last: Optional[bool] = True) -> AsyncIterator[_Item]:
finally:
self._queues.remove(q)

async def close(self) -> None:
async def aclose(self) -> None:
'''End gracefully'''
self._lock_close = self._lock_close or Condition()
async with self._lock_close:
Expand All @@ -174,7 +174,7 @@ async def __aenter__(self) -> 'PubSubItem[_Item]':
return self

async def __aexit__(self, *_: Any, **__: Any) -> None:
await self.close()
await self.aclose()

async def _enumerate(self, item: _Item | _End) -> None:
self._idx += 1
Expand Down
12 changes: 6 additions & 6 deletions tests/utils/pubsub/test_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ async def test_close() -> None:

async def test_close_multiple_times() -> None:
async with PubSubItem[str]() as obj:
await obj.close()
await obj.close()
await obj.aclose()
await obj.aclose()


@given(...)
Expand All @@ -42,7 +42,7 @@ async def receive() -> tuple[str, ...]:
async def send() -> None:
async for i in aiterable(items):
await obj.publish(i)
await obj.close()
await obj.aclose()

*results, _ = await asyncio.gather(
*(receive() for _ in range(n_subscriptions)),
Expand Down Expand Up @@ -90,7 +90,7 @@ async def receive() -> tuple[int | str, ...]:
async def send() -> None:
async for i in aiterable(items):
await obj.publish(i)
await obj.close()
await obj.aclose()

for i in pre_items:
await obj.publish(i)
Expand Down Expand Up @@ -159,7 +159,7 @@ async def receive() -> tuple[str, ...]:
async def send() -> None:
async for i in aiterable(items):
await obj.publish(i)
await obj.close()
await obj.aclose()

results, _ = await asyncio.gather(receive(), send())
assert results == expected
Expand Down Expand Up @@ -194,7 +194,7 @@ async def send() -> None:
event.set()
async for i in aiterable(post_items):
await obj.publish(i)
await obj.close()
await obj.aclose()

*results, _ = await asyncio.gather(
*(receive() for _ in range(n_subscriptions)),
Expand Down