Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: ability to pause/restore a consumer #50

Open
wants to merge 27 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b227e65
Add more logs
john-scalingo May 28, 2018
ac0e4d4
No type asumption
john-scalingo May 28, 2018
ab393a2
Implementation of pause/resume on consumer to allow a consumer to sto…
Soulou Aug 2, 2018
2bbe78d
Add second example with throttling
Soulou Aug 3, 2018
6e048d4
Add spec for Consumer pause and resume
Soulou Aug 3, 2018
10a85dc
Add specs for Connection pause and restore
Soulou Aug 3, 2018
3489202
First step, single read and write loop
Soulou Jan 17, 2019
19867db
First working PoC to handle exception when sending messages
Soulou Jan 17, 2019
308dfd7
Fix retrocompaibility, default behavior is exactly how it was before …
Soulou Jan 18, 2019
f371fea
Add specs for synchronous producer (ensure exception is thrown)
Soulou Jan 18, 2019
9df8d0b
Add default retry behavior when writing to socket, only impact synchr…
Soulou Jan 27, 2019
fe8f9e9
synchronous producer: Add message acknowledgement timeout, default to…
Soulou Jan 27, 2019
0fa9ccb
Better logging when there is an exception writing a message
Soulou Jan 27, 2019
7dbed67
Add ability to send logger attributes and message as hash field for c…
Soulou Jan 27, 2019
198ddd5
Improve implementation of producer, timeout/retry
Soulou Jan 27, 2019
05b87ab
Add specs for round robin strategy
Soulou Jan 29, 2019
9cf2d5c
Merge branch 'feature/51/acknowledgement'
Soulou Jan 29, 2019
71a3b8d
Merge branch 'logger/hash'
Soulou Jan 29, 2019
b2cb426
Add SocketError to the rescued exceptions
Soulou Jan 29, 2019
5345349
Merge branch 'feature/51/acknowledgement'
Soulou Jan 29, 2019
3ac25e2
Reduce default retry/timeouts, we don't want to wait so much when a m…
Soulou Jan 29, 2019
c3f9338
Merge branch 'feature/51/acknowledgement'
Soulou Jan 29, 2019
51ecfd9
Correctly splat messages in NsqdsProducer
Soulou Jan 30, 2019
f78c494
Merge branch 'feature/51/acknowledgement'
Soulou Jan 30, 2019
30d717b
Merge branch 'master' into feature/pause-restore
Soulou Apr 23, 2019
cf182e0
Remove warning from ruby 2.7.0
Soulou Mar 17, 2020
4deb47b
Merge branch 'master' into feature/pause-restore
Soulou Mar 17, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .ruby-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.3
2.3.5
86 changes: 86 additions & 0 deletions examples/consumer-pause-throttling/main.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#!/usr/bin/env ruby

# The goal of this example is the following
# It's not because the max_in_flight is set to 10
# that 10 jobs will be consumed at the same time, a queue can still appear.
#
# Setup :
# 3 nsqd 10.0.0.1/2/3
# with max_in_flight == 10
# Each connection will receive a max_in_flight of 3 (global max /
# number of connections) If only one nsqd receives messages, only
# 3 messages will be accepted by the consumer even if we accept a
# maximum of 10 in parallel.
#
# So using the pause/unpause mecanism, the goal is to be able to have
# max_in_flight 10 per connection
#
# When 10 messages are handled, stop accepting new message by
# pausing the consumer, until one slot gets free

lib = File.expand_path("../../../lib", __FILE__)
$:.unshift(lib)

require 'nsq'
require 'logger'

logger = Logger.new STDOUT

# PRE: start 3 nsqd registering to nsqlookupd
# nsqd -lookupd-tcp-address 127.0.0.1:4160 -broadcast-address nsqd1
# nsqd -http-address 0.0.0.0:4251 -tcp-address 0.0.0.0:4250 -lookupd-tcp-address 127.0.0.1:4160 -broadcast-address nsqd2.localhost
# nsqd -http-address 0.0.0.0:4351 -tcp-address 0.0.0.0:4350 -lookupd-tcp-address 127.0.0.1:4160 -broadcast-address nsqd3.localhost

# Produce one message each nsqd to have them known from nsqlookupd
logger.info "initializing topics on all nsqd"
Nsq::Producer.new(topic: 'test-pause-throttling', nsqd: '127.0.0.1:4150').write("nsqd1")
Nsq::Producer.new(topic: 'test-pause-throttling', nsqd: '127.0.0.1:4250').write("nsqd2")
Nsq::Producer.new(topic: 'test-pause-throttling', nsqd: '127.0.0.1:4350').write("nsqd3")

logger.info "creating consumer"
consumer = Nsq::Consumer.new(
topic: 'test-pause-throttling',
channel: 'default',
max_in_flight: 30, nsqlookupds: ['http://localhost:4160'],
)

jobs_count_mutex = Mutex.new
jobs_count = 0
concurrency = 10

concurrency.times do |i|
logger.info "create consumer thread #{i}"
Thread.new {
loop do
msg = consumer.pop
jobs_count_mutex.synchronize {
jobs_count += 1
logger.info "##{i} New msg, current amount: #{jobs_count}"
if jobs_count == concurrency
logger.info "pausing consumer"
consumer.pause
logger.info "consumer paused"
end
}
puts "##{i} MSG #{msg.id}: #{msg.body}"

# Long job
sleep (Random.rand * 10).to_i

msg.finish
jobs_count_mutex.synchronize {
jobs_count -= 1
logger.info "##{i} Msg finished, current amount: #{jobs_count}"
consumer.resume
}
end
}
end

# Only producing to one nsqd instance
producer = Nsq::Producer.new( topic: 'test-pause-throttling', nsqd: '127.0.0.1:4150')

loop do
producer.write Time.now
sleep 0.2
end
50 changes: 50 additions & 0 deletions examples/consumer-pause/main.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#!/usr/bin/env ruby

lib = File.expand_path("../../../lib", __FILE__)
$:.unshift(lib)

require 'nsq'
require 'logger'

logger = Logger.new STDOUT

consumer = Nsq::Consumer.new(
topic: 'test-pause',
channel: 'default',
max_in_flight: 2,
)

2.times do |i|
Thread.new {
loop do
msg = consumer.pop
puts "#{i} MSG #{msg.id}: #{msg.body}"
sleep 2
msg.finish
end
}
end

producer = Nsq::Producer.new(
topic: 'test-pause',
)

producer.write Time.now
producer.write Time.now
producer.write Time.now
producer.write Time.now

logger.info "pausing consumer"
consumer.pause
logger.info "consumer paused"

producer.write Time.now
producer.write Time.now

sleep 5

logger.info "resuming consumer"
consumer.resume
logger.info "consumer resumed"

sleep 5
1 change: 1 addition & 0 deletions lib/nsq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@

require_relative 'nsq/consumer'
require_relative 'nsq/producer'
require_relative 'nsq/nsqds_producer'
Loading