Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: ability to pause/restore a consumer #50

Open
wants to merge 27 commits into
base: master
Choose a base branch
from

Conversation

Soulou
Copy link
Contributor

@Soulou Soulou commented Aug 3, 2018

This pull request aims at pausing a consumer and then resuming it later. Why to pause a consumer? To stop it from receiving messages temporarily.

This is done by sending a RDY 0 to all NSQd instances which will immediately stop sending new messages to the consumer, and restoring the max_in_flight once the consumer is resume

The problem I really wanted to solve is this one: https://github.com/wistia/nsq-ruby/blob/2bbe78d6b30ab85d219eafd76c9868850d7c0e57/examples/consumer-pause-throttling/main.rb

My use case precisely:

  • I've a consumer doing long-running jobs (~minutes)
  • I want it to consumer exactly 6 messages in parallel
  • I've 5 NSQd in my infrastructure receiving messages on the topic
  • I'd like that there is a max_in_flight of 6 on each of these nsqd connection. (which means 30 at the consumer initialization option), like that, if only one of these nsqd has 6 messages for my consumer it can take them.
  • But once 6 jobs are running from any amount of nsqd, my consumer should stop receiving new messages until a slot is free.

There is no modification of existing features, only additions:

On the Consumer class

# No-op if already paused
consumer.pause

# No-op if not paused
consumer.resume

On the Connection class

conn.pause
conn.paused?
conn.resume

I've also added two examples in an examples directory to show how to use this.

What do you think of this feature, can it be integrated in the gem, or should I keep it our fork (more annoying for us)

Regards,

@bschwartz
Copy link
Member

Hi @Soulou, thanks for this PR and for providing the example to show what you're trying to achieve.
If I'm understanding you correctly, it looks like you're trying to achieve maximum throughput in the case where you have a low volume of messages that are not evenly distributed across your nsqds. You're right, that the was nsq-ruby is set up now, this is not readily possible to achieve. I'm not sure other client implementations address this either.

Your solution seems minimal enough, but I'm curious what @mreiferson thinks as he has seen many more client implementations. I'd prefer not to have nsq-ruby diverge too much from other implementations.

@mreiferson, do you think it makes sense to add these methods? Or is there a simpler way to achieve what @Soulou is looking for, perhaps with a modified architecture or is NSQ not generally the right fit for a small number of long running jobs?

@Soulou
Copy link
Contributor Author

Soulou commented Aug 6, 2018

Hi @bschwartz thanks for your input here. I think you've correctly understood what we're trying to do here. We've got long running tasks (messages are touched every 10 seconds to nsqd and they last multiple minutes), we want to have continuously N jobs working, but we've no certainty of the nsqd message distribution.

If it is considered as out of scope for the client, maybe it should be interesting to make the api open to allow users to design such kind of workflow?

I don't say our architecture is perfect, but I can confirm that with this patch it works exactly as expected :-D

@mreiferson
Copy link

I think it's perfectly reasonable for a client to support this for a variety of reasons. It is possible in pynsq to set_max_in_flight(0) to effectively "pause" incoming message flow, however it does not have a pause() method.

However, with respect to the actual problem you're trying to solve, I don't think trying to play "timing" games with RDY is a great way to do it. You're effectively accumulating some state in a consumer, and if you're doing that, perhaps consider more durable ways to store that interim state so that you don't need to rely on timing or an exact number of messages?

@Soulou
Copy link
Contributor Author

Soulou commented Aug 7, 2018

Thanks @mreiferson, yes I could just accept all the messages and store them in a redis or whatever and question this database to know if there are messages to be run. But it would mean that I should take a message from a message queue to add it.. in an other message queue and I'll to handle the message flow of this one.

I mean it seems to be a lot of complexity compared to play "timing" games. Actually if I get one message less or more at some point because of any reason (like race condition in NSQd for instance), it's ok, most of the time i'll still be sure I'm running at full throughput. Actually having this state in the consumer (number of message running) is a tradeoff which seems acceptable.

True that if max_if_flight was modifiable, I could inherit Nsq::Consumer and handle this quite easily without touching the gem.

Soulou added 20 commits January 17, 2019 22:50
…onous producers, asynchronous never return exception
This commit has been inspired greatly by what is done in the official
golang driver.

To follow the recommandations of the NSQ team, the producer is not able
to get initialized with nsqlookupd URL, a producer is connecting to
only one NSQd instance, not several.

- The producer is keeping a list of transactions to wait for the return
value of NSQd.
- A new class `NsqdsProducer` can be initialized with multiple NSQds
addresses and will apply a strategy to write messages:
  - `Nsq::NsqdsProducer::STRATEGY_FAILOVER` Always send on the same
nsqd except if not available, then get to the next one
  - `Nsq::NsqdsProducer::STRATEGY_ROUND_ROBIN` Apply round robin over
the different available nsqds
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants