Skip to content

AmqpConsumer::receiveBasicGet, only one message per timeout consumed #159

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
dkarlovi opened this issue Aug 8, 2017 · 14 comments · Fixed by #165
Closed

AmqpConsumer::receiveBasicGet, only one message per timeout consumed #159

dkarlovi opened this issue Aug 8, 2017 · 14 comments · Fixed by #165
Labels

Comments

@dkarlovi
Copy link
Contributor

dkarlovi commented Aug 8, 2017

I've upgraded to 0.6 couple of days ago and I'm seeing a regression in AmqpConsumer::receiveBasicGet
basically, it will only receive one message per timeout, for some reason.

Say I have say 10 messages in a queue, it will only receive the first one and stall, reach timeout, retry and then receive another. I've added some debug statements in the while() loop and after usleep(), it will go like:

message
............ # outputing a single dot every while iteration
<timeout>
message
............ # outputing a single dot every while iteration
<timeout>

No matter how many messages there are ready to process in the queue. If I set $timeout = 1 to the top of the method, everything works as expected so it shouldn't be related to my processors being slow or hanging.

I've cleared my RabbitMQ Docker volume to avoid any possible misconfiguration between 0.5 and 0.6, it did not help. Using ext/amqp 1.9.0 (which I also used on 0.5).

@makasim
Copy link
Member

makasim commented Aug 8, 2017

@dkarlovi Could you please provide more information about your setup and if possible code snippets. The libs versions, a framework if used, configs, do you use the enqueue client or amqp transport directly. do you ack messages?

@makasim makasim added the bug label Aug 8, 2017
@dkarlovi
Copy link
Contributor Author

dkarlovi commented Aug 8, 2017

Sure, using:

  • PHP 7.1.5 from Docker Alpine 3.6, ext/amqp 1.9.0
  • RabbitMQ 3.6.10
  • Symfony 3.3.6 + Enqueue bundle (config below)
  • running bin/console enqueue:consume --setup-broker -vvv, I can see the delay and described debug output there

Will try to downgrade to 0.5 to figure out where exactly the regression comes from. Might also disable my processors and just leave the built-in debug output.

enqueue:
    async_events:
        enabled: true
        # spool_producer: true
    transport:
        default: 'rabbitmq_amqp'
        rabbitmq_amqp:
            host: '%broker_host%'
            port: '%broker_port%'
            user: '%broker_user%'
            pass: '%broker_password%'
            vhost: '%broker_name%'
            persisted: true
            delay_plugin_installed: true
    client:
        traceable_producer: '%kernel.debug%'
    job: true
    extensions:
        doctrine_ping_connection_extension: true
        doctrine_clear_identity_map_extension: true
        signal_extension: true

@dkarlovi
Copy link
Contributor Author

dkarlovi commented Aug 9, 2017

It seems it's trying to read from the wrong queue and will try the proper queue once and then only the other queue until the timeout.

If I echo the queue name attempted, it will look like

string(14) "symfony_events"
string(14) "symfony_events"
string(14) "symfony_events"
string(14) "symfony_events"
string(14) "symfony_events"
string(14) "symfony_events"
string(14) "symfony_events"
string(14) "symfony_events"
string(14) "symfony_events"
string(14) "symfony_events"
string(19) "enqueue.app.default" # reads a message
string(14) "symfony_events"
string(14) "symfony_events"
string(14) "symfony_events"
string(14) "symfony_events"
string(14) "symfony_events"
string(14) "symfony_events"
string(14) "symfony_events"
string(14) "symfony_events"
string(14) "symfony_events"
string(14) "symfony_events"
string(19) "enqueue.app.default" # reads a message

When I set $timeout = 1, it looks like something I'd expect:

string(19) "enqueue.app.default"
string(14) "symfony_events"
string(19) "enqueue.app.default"
string(14) "symfony_events"
string(19) "enqueue.app.default"
string(14) "symfony_events"
string(19) "enqueue.app.default"
string(14) "symfony_events"

@makasim
Copy link
Member

makasim commented Aug 9, 2017

well that's expected behavior. the consume command by default subscribes to all known queues. It round-robins them all the time.

Solutions:

  • Decrease timeout
  • Add a queue name to consume command explicitly so it consumes only from this particular queue

@dkarlovi
Copy link
Contributor Author

dkarlovi commented Aug 9, 2017

@makasim but it only attempts to read from the default queue every 5sec while reading from the events queue every while loop (100msec).

@makasim
Copy link
Member

makasim commented Aug 9, 2017

@makasim but it only attempts to read from the default queue every 5sec.

No, it is not. according to what you previously posted it consumes from two queues: "enqueue.app.default" and "symfony_events"

@dkarlovi
Copy link
Contributor Author

dkarlovi commented Aug 9, 2017

Here's the output with timestamps:

string(45) "2017-08-09T08:53:16+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:16+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:16+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:17+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:17+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:17+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:17+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:17+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:17+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:17+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:17+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:17+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:17+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:18+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:18+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:18+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:18+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:18+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:18+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:18+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:18+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:18+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:18+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:19+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:19+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:19+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:19+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:19+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:19+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:19+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:19+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:19+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:19+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:20+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:20+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:20+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:20+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:20+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:20+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:20+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:20+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:20+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:21+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:21+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:21+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:21+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:21+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:21+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:21+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:21+00:00 enqueue.app.default"
string(40) "2017-08-09T08:53:21+00:00 symfony_events"
string(40) "2017-08-09T08:53:22+00:00 symfony_events"
string(40) "2017-08-09T08:53:22+00:00 symfony_events"
string(40) "2017-08-09T08:53:22+00:00 symfony_events"
string(40) "2017-08-09T08:53:22+00:00 symfony_events"
string(40) "2017-08-09T08:53:22+00:00 symfony_events"
string(40) "2017-08-09T08:53:22+00:00 symfony_events"
string(40) "2017-08-09T08:53:22+00:00 symfony_events"
string(40) "2017-08-09T08:53:22+00:00 symfony_events"
string(40) "2017-08-09T08:53:22+00:00 symfony_events"
string(40) "2017-08-09T08:53:22+00:00 symfony_events"
string(40) "2017-08-09T08:53:23+00:00 symfony_events"
string(40) "2017-08-09T08:53:23+00:00 symfony_events"
string(40) "2017-08-09T08:53:23+00:00 symfony_events"
string(40) "2017-08-09T08:53:23+00:00 symfony_events"
string(40) "2017-08-09T08:53:23+00:00 symfony_events"
string(40) "2017-08-09T08:53:23+00:00 symfony_events"
string(40) "2017-08-09T08:53:23+00:00 symfony_events"
string(40) "2017-08-09T08:53:23+00:00 symfony_events"
string(40) "2017-08-09T08:53:23+00:00 symfony_events"
string(40) "2017-08-09T08:53:23+00:00 symfony_events"
string(40) "2017-08-09T08:53:24+00:00 symfony_events"
string(40) "2017-08-09T08:53:24+00:00 symfony_events"
string(40) "2017-08-09T08:53:24+00:00 symfony_events"
string(40) "2017-08-09T08:53:24+00:00 symfony_events"
string(40) "2017-08-09T08:53:24+00:00 symfony_events"
string(40) "2017-08-09T08:53:24+00:00 symfony_events"
string(40) "2017-08-09T08:53:24+00:00 symfony_events"
string(40) "2017-08-09T08:53:24+00:00 symfony_events"
string(40) "2017-08-09T08:53:24+00:00 symfony_events"
string(40) "2017-08-09T08:53:24+00:00 symfony_events"
string(40) "2017-08-09T08:53:25+00:00 symfony_events"
string(40) "2017-08-09T08:53:25+00:00 symfony_events"
string(40) "2017-08-09T08:53:25+00:00 symfony_events"
string(40) "2017-08-09T08:53:25+00:00 symfony_events"
string(40) "2017-08-09T08:53:25+00:00 symfony_events"
string(40) "2017-08-09T08:53:25+00:00 symfony_events"
string(40) "2017-08-09T08:53:25+00:00 symfony_events"
string(40) "2017-08-09T08:53:25+00:00 symfony_events"
string(40) "2017-08-09T08:53:25+00:00 symfony_events"
string(40) "2017-08-09T08:53:25+00:00 symfony_events"
string(40) "2017-08-09T08:53:26+00:00 symfony_events"
string(40) "2017-08-09T08:53:26+00:00 symfony_events"
string(40) "2017-08-09T08:53:26+00:00 symfony_events"
string(40) "2017-08-09T08:53:26+00:00 symfony_events"
string(40) "2017-08-09T08:53:26+00:00 symfony_events"
string(40) "2017-08-09T08:53:26+00:00 symfony_events"
string(40) "2017-08-09T08:53:26+00:00 symfony_events"
string(40) "2017-08-09T08:53:26+00:00 symfony_events"
string(40) "2017-08-09T08:53:26+00:00 symfony_events"
string(45) "2017-08-09T08:53:27+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:27+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:27+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:27+00:00 enqueue.app.default"
string(45) "2017-08-09T08:53:27+00:00 enqueue.app.default"
...

@dkarlovi
Copy link
Contributor Author

dkarlovi commented Aug 9, 2017

BTW yes, this differs from the previous behaviour, since I've added the timestamp, it will spend 5sec on each queue, not only on events. Don't know why, this is really weird.

@dkarlovi
Copy link
Contributor Author

dkarlovi commented Aug 9, 2017

Well this actually makes sense, now that I think about it and read the code:

  1. Consume command will run the consumer
  2. Consumer will pick a queue and store the queue instance as private property (it will never change after this)
  3. it will try to read from said queue for $timeout or until first message is read
  4. after timeout is done / message read, consumer returns and the command selects the next consumer which does the same on the next queue

If you only have one consume command instance running, it will work exactly like this, it will NOT read from any queue ASAP but in $timeout chunks. Me setting $timeout = 1 means queues are read interchangeably, not sequentially.

You're right, it does work as designed, but I don't think it's something the user would expect here.

@dkarlovi
Copy link
Contributor Author

dkarlovi commented Aug 9, 2017

Also figured out why I got only one read from default queue before: it was full. :)

So, if you have two queues, Q1 and Q2, say Q1 is FULL of messages, Q2 is empty, you'll get

Q1 # reads the message
Q2
Q2
Q2
Q2
Q2
Q2
Q2
Q2
Q1 # reads the message
Q2
Q2
Q2
Q2
Q2
Q2
...

You're spending most of your time monitoring the empty queue, not the full one.

@makasim
Copy link
Member

makasim commented Aug 9, 2017

That's not good.

@dkarlovi
Copy link
Contributor Author

dkarlovi commented Aug 9, 2017

You can say that again. :)

Maybe rethink the timeout logic and go for round robin approach? Shorted the timeout significantly?

@makasim makasim changed the title [Regression] AmqpConsumer::receiveBasicGet, only one message per timeout consumed AmqpConsumer::receiveBasicGet, only one message per timeout consumed Aug 9, 2017
@makasim
Copy link
Member

makasim commented Aug 9, 2017

I am going to do next:

  • Set the timeout to 100ms or 10ms.
  • Add ability to configure it.
  • Remove idleTimeout

@makasim
Copy link
Member

makasim commented Aug 16, 2017

@dkarlovi there was a bug that forces async events to be enabled. It has to be disabled by default. This PR fixes it #169

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants