Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/wait reopen channel state #533

Merged
merged 25 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
9.1.0
-----

The bulk of the changes are related to how the library entities are now
interconnected. In previous versions of `aio_pika.Channel` instances not
contains a link to the `aio_pika.Connection` instances for now is contains it.

While I don't want custom code to work directly with the `aiormq.Channel`
instance, this was a public API and I should warn you about the change here.
The `aio_pika.Channel.channel` property is deprecated. Use
`aio_pika.Channel.get_underlay_chanel()` instead.
Now all library entities already use this method.


9.0.7
-----

Expand All @@ -17,9 +31,9 @@
-----

* Prevent 'Task exception was never retrieved' #524
If future.exception() is not called (even on cancelled futures), it seems Python
will then log 'Task exception was never retrieved'. Rewriting this logic
slightly should hopefully achieve the same functionality while
If future.exception() is not called (even on cancelled futures), it seems Python
will then log 'Task exception was never retrieved'. Rewriting this logic
slightly should hopefully achieve the same functionality while
preventing the Python errors.
* Avoid implicitly depending on setuptools #526

Expand Down
5 changes: 2 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
all: test

RABBITMQ_CONTAINER_NAME:=aio_pika_rabbitmq
RABBITMQ_IMAGE:=mosquito/aiormq-rabbitmq

test:
find . -name "*.pyc" -type f -delete
tox

rabbitmq:
docker kill $(docker ps -f label=aio-pika.rabbitmq -q) || true
docker pull $(RABBITMQ_IMAGE)
docker kill $(RABBITMQ_CONTAINER_NAME) || true
docker run --rm -d \
--name $(RABBITMQ_CONTAINER_NAME) \
-l aio-pika.rabbitmq \
-p 5671:5671 \
-p 5672:5672 \
-p 15671:15671 \
Expand Down
21 changes: 8 additions & 13 deletions aio_pika/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
Generator, Iterator, Optional, Type, TypeVar, Union, overload,
)


if sys.version_info >= (3, 8):
from typing import Literal, TypedDict
else:
Expand Down Expand Up @@ -81,11 +82,6 @@ class DeclarationResult:
class AbstractTransaction:
state: TransactionState

@property
@abstractmethod
def channel(self) -> "AbstractChannel":
raise NotImplementedError

@abstractmethod
async def select(
self, timeout: TimeoutType = None,
Expand Down Expand Up @@ -244,7 +240,7 @@ async def __aexit__(


class AbstractQueue:
channel: aiormq.abc.AbstractChannel
channel: "AbstractChannel"
name: str
durable: bool
exclusive: bool
Expand Down Expand Up @@ -307,7 +303,7 @@ async def unbind(
@abstractmethod
async def consume(
self,
callback: Callable[[AbstractIncomingMessage], Any],
callback: Callable[[AbstractIncomingMessage], Awaitable[Any]],
no_ack: bool = False,
exclusive: bool = False,
arguments: Arguments = None,
Expand Down Expand Up @@ -409,7 +405,7 @@ class AbstractExchange(ABC):
@abstractmethod
def __init__(
self,
channel: aiormq.abc.AbstractChannel,
channel: "AbstractChannel",
name: str,
type: Union[ExchangeType, str] = ExchangeType.DIRECT,
*,
Expand Down Expand Up @@ -528,9 +524,8 @@ def is_closed(self) -> bool:
def close(self, exc: Optional[ExceptionType] = None) -> Awaitable[None]:
raise NotImplementedError

@property
@abstractmethod
def channel(self) -> aiormq.abc.AbstractChannel:
async def get_underlay_channel(self) -> aiormq.abc.AbstractChannel:
raise NotImplementedError

@property
Expand Down Expand Up @@ -760,7 +755,7 @@ async def update_secret(

class AbstractRobustQueue(AbstractQueue):
@abstractmethod
def restore(self, channel: aiormq.abc.AbstractChannel) -> Awaitable[None]:
def restore(self) -> Awaitable[None]:
raise NotImplementedError

@abstractmethod
Expand Down Expand Up @@ -791,7 +786,7 @@ async def consume(

class AbstractRobustExchange(AbstractExchange):
@abstractmethod
def restore(self, channel: aiormq.abc.AbstractChannel) -> Awaitable[None]:
def restore(self) -> Awaitable[None]:
raise NotImplementedError

@abstractmethod
Expand All @@ -815,7 +810,7 @@ def reopen(self) -> Awaitable[None]:
raise NotImplementedError

@abstractmethod
async def restore(self, connection: aiormq.abc.AbstractConnection) -> None:
async def restore(self) -> None:
raise NotImplementedError

@abstractmethod
Expand Down
84 changes: 53 additions & 31 deletions aio_pika/channel.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import warnings
from abc import ABC
from types import TracebackType
from typing import Any, AsyncContextManager, Generator, Optional, Type, Union
Expand All @@ -9,9 +10,10 @@
from pamqp.common import Arguments

from .abc import (
AbstractChannel, AbstractExchange, AbstractQueue, TimeoutType,
UnderlayChannel,
AbstractChannel, AbstractConnection, AbstractExchange, AbstractQueue,
TimeoutType, UnderlayChannel,
)
from .exceptions import ChannelInvalidStateError
from .exchange import Exchange, ExchangeType
from .log import get_logger
from .message import IncomingMessage
Expand Down Expand Up @@ -52,7 +54,7 @@ class Channel(ChannelContext):

def __init__(
self,
connection: aiormq.abc.AbstractConnection,
connection: AbstractConnection,
channel_number: Optional[int] = None,
publisher_confirms: bool = True,
on_return_raises: bool = False,
Expand All @@ -73,12 +75,12 @@ def __init__(
'without "publisher_confirms"',
)

self._connection: aiormq.abc.AbstractConnection = connection
self._connection: AbstractConnection = connection

# That's means user closed channel instance explicitly
self._closed: bool = False

self._channel = None
self._channel: Optional[UnderlayChannel] = None
self._channel_number = channel_number

self.close_callbacks = CallbackCollection(self)
Expand All @@ -99,9 +101,10 @@ def is_closed(self) -> bool:
side or after the close() method has been called."""
if not self.is_initialized or self._closed:
return True
if not self._channel:
channel = self._channel
if channel is None:
return True
return self._channel.channel.is_closed
return channel.channel.is_closed

async def close(
self,
Expand All @@ -119,45 +122,58 @@ async def close(
self._closed = True
await self._channel.close()

@property
def channel(self) -> aiormq.abc.AbstractChannel:
async def get_underlay_channel(self) -> aiormq.abc.AbstractChannel:

if not self.is_initialized or not self._channel:
raise aiormq.exceptions.ChannelInvalidStateError(
"Channel was not opened",
)

if self.is_closed:
raise aiormq.exceptions.ChannelInvalidStateError(
"Channel has been closed",
)
return self._channel.channel

@property
def channel(self) -> aiormq.abc.AbstractChannel:
warnings.warn(
"This property is deprecated, do not use this anymore.",
DeprecationWarning,
)
if self._channel is None:
raise aiormq.exceptions.ChannelInvalidStateError
return self._channel.channel

@property
def number(self) -> Optional[int]:
return (
self.channel.number
if self.is_initialized
else self._channel_number
)
if self._channel is None:
return self._channel_number

underlay_channel: UnderlayChannel = self._channel
return underlay_channel.channel.number

def __str__(self) -> str:
return "{}".format(self.number or "Not initialized channel")

async def _open(self) -> None:
await self._connection.ready()

transport = self._connection.transport
if transport is None:
raise ChannelInvalidStateError("No active transport in channel")

channel = await UnderlayChannel.create(
self._connection,
transport.connection,
self._on_close,
publisher_confirms=self.publisher_confirms,
on_return_raises=self.on_return_raises,
channel_number=self._channel_number,
)

await self._on_open(channel.channel)
self._channel = channel
try:
await self._on_open()
mosquito marked this conversation as resolved.
Show resolved Hide resolved
except BaseException as e:
await channel.close(e)
self._channel = None
raise
self._closed = False

async def initialize(self, timeout: TimeoutType = None) -> None:
Expand All @@ -169,9 +185,9 @@ async def initialize(self, timeout: TimeoutType = None) -> None:
await self._open()
await self._on_initialized()

async def _on_open(self, channel: aiormq.abc.AbstractChannel) -> None:
async def _on_open(self) -> None:
self.default_exchange: Exchange = self.EXCHANGE_CLASS(
channel=channel,
channel=self,
arguments=None,
auto_delete=False,
durable=False,
Expand All @@ -192,7 +208,8 @@ async def _on_close(self, closing: asyncio.Future) -> None:
self._channel.channel.on_return_callbacks.discard(self._on_return)

async def _on_initialized(self) -> None:
self.channel.on_return_callbacks.add(self._on_return)
channel = await self.get_underlay_channel()
channel.on_return_callbacks.add(self._on_return)

def _on_return(self, message: aiormq.abc.DeliveredMessage) -> None:
self.return_callbacks(IncomingMessage(message, no_ack=True))
Expand Down Expand Up @@ -241,7 +258,7 @@ async def declare_exchange(
durable = False

exchange = self.EXCHANGE_CLASS(
channel=self.channel,
channel=self,
name=name,
type=type,
durable=durable,
Expand Down Expand Up @@ -281,7 +298,7 @@ async def get_exchange(
return await self.declare_exchange(name=name, passive=True)
else:
return self.EXCHANGE_CLASS(
channel=self.channel,
channel=self,
name=name,
durable=False,
auto_delete=False,
Expand Down Expand Up @@ -321,7 +338,7 @@ async def declare_queue(
"""

queue: AbstractQueue = self.QUEUE_CLASS(
channel=self.channel,
channel=self,
name=name,
durable=durable,
exclusive=exclusive,
Expand Down Expand Up @@ -358,7 +375,7 @@ async def get_queue(
return await self.declare_queue(name=name, passive=True)
else:
return self.QUEUE_CLASS(
channel=self.channel,
channel=self,
name=name,
durable=False,
exclusive=False,
Expand All @@ -379,7 +396,9 @@ async def set_qos(
warn('Use "global_" instead of "all_channels"', DeprecationWarning)
global_ = all_channels

return await self.channel.basic_qos(
channel = await self.get_underlay_channel()

return await channel.basic_qos(
prefetch_count=prefetch_count,
prefetch_size=prefetch_size,
global_=global_,
Expand All @@ -394,7 +413,8 @@ async def queue_delete(
if_empty: bool = False,
nowait: bool = False,
) -> aiormq.spec.Queue.DeleteOk:
return await self.channel.queue_delete(
channel = await self.get_underlay_channel()
return await channel.queue_delete(
queue=queue_name,
if_unused=if_unused,
if_empty=if_empty,
Expand All @@ -409,7 +429,8 @@ async def exchange_delete(
if_unused: bool = False,
nowait: bool = False,
) -> aiormq.spec.Exchange.DeleteOk:
return await self.channel.exchange_delete(
channel = await self.get_underlay_channel()
return await channel.exchange_delete(
exchange=exchange_name,
if_unused=if_unused,
nowait=nowait,
Expand All @@ -426,7 +447,8 @@ def transaction(self) -> Transaction:
return Transaction(self)

async def flow(self, active: bool = True) -> aiormq.spec.Channel.FlowOk:
return await self.channel.flow(active=active)
channel = await self.get_underlay_channel()
return await channel.flow(active=active)


__all__ = ("Channel",)
Loading