Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When to not to use async? #427

Open
kafkiansky opened this issue Dec 28, 2023 · 1 comment
Open

When to not to use async? #427

kafkiansky opened this issue Dec 28, 2023 · 1 comment

Comments

@kafkiansky
Copy link

Hello! I'm refactoring the client for RabbitMQ, moving it from amphp v2 to amphp v3, and am a bit confused about when not to use the async function. For example, we have a connection to rabbit over amphp/socket.

final class Connection
{
    public function open(): void
    {
        ....
        async(function (): void {
           while (null !== $chunk = $this->socket->read()) {
               $frame = $this->parser->parse($chunk);
               
               $future = $this->futures[$frame::class];
               $future->complete($frame);
           }
        });
    }
}

Its main task is to write frames to the socket and expect them from there. Once a complete frame has been collected, we find the Future waiting for that frame and complete it.

Here's an example of how Future is created that subscribes to frames:

/**
  * @template T of Protocol\AbstractFrame
  * @psalm-param class-string<T> $frame
  * @psalm-return Future<T>
 */
private function future(string $frame): Future
{
    /** @psalm-var DeferredFuture<T> $deferred */
    $deferred = new DeferredFuture();

    $this->connection->subscribe(
         $this->id,
         $frame,
         static function (Protocol\AbstractFrame $frame) use ($deferred) {
             /** @psalm-var T $frame */
             $deferred->complete($frame);

             return true;
         }
   );

    return $deferred->getFuture();
}

When we declare a queue, we write a special frame to the socket and expect a special frame from it.

    public function queueDeclare
    (
        string $queue = '',
        bool $passive = false,
        bool $durable = false,
        bool $exclusive = false,
        bool $autoDelete = false,
        bool $noWait = false,
        array $arguments = []
    ): ?Queue {
        $flags = [$passive, $durable, $exclusive, $autoDelete, $noWait];

        $this->connection->method($this->id, (new Buffer)
            ->appendUint16(50)
            ->appendUint16(10)
            ->appendInt16(0)
            ->appendString($queue)
            ->appendBits($flags)
            ->appendTable($arguments)
        );

        if ($noWait) {
            return null;
        }

        /** @var Protocol\QueueDeclareOkFrame $frame */
        $frame = $this->future(Protocol\QueueDeclareOkFrame::class)->await();

        return new Queue($frame->queue, $frame->messageCount, $frame->consumerCount);
    }

So the question is: should we wrap these two actions in the async function, given that the Connection::method method uses amphp/socket and our future method is also non-blocking or does that not make sense?

In other words, is it necessary to write code in this way?

public function queueDeclare
    (
        string $queue = '',
        bool $passive = false,
        bool $durable = false,
        bool $exclusive = false,
        bool $autoDelete = false,
        bool $noWait = false,
        array $arguments = []
    ): Future {
        return async(function () use (...): ?Queue {
            $flags = [$passive, $durable, $exclusive, $autoDelete, $noWait];

            $this->connection->method($this->id, (new Buffer)
                ->appendUint16(50)
                ->appendUint16(10)
                ->appendInt16(0)
                ->appendString($queue)
                ->appendBits($flags)
                ->appendTable($arguments)
            );

            if ($noWait) {
                return null;
            }

            /** @var Protocol\QueueDeclareOkFrame $frame */
            $frame = $this->future(Protocol\QueueDeclareOkFrame::class)->await();

            return new Queue($frame->queue, $frame->messageCount, $frame->consumerCount);
        });
    }
@marwan38
Copy link

marwan38 commented Nov 6, 2024

Bumping this out of curiosity to the answer, as I'm also currently migrating amphp v2 to v3.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants