Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server settings for replication #547

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions spec/replication_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,133 @@ describe LavinMQ::Replication::Client do
end
end
end

describe LavinMQ::Replication::Server do
data_dir = "/tmp/lavinmq-follower"

before_each do
FileUtils.rm_rf data_dir
Dir.mkdir_p data_dir
File.write File.join(data_dir, ".replication_secret"), Server.@replicator.@password, 0o400
Server.vhosts["/"].declare_queue("repli", true, false)
end

after_each do
FileUtils.rm_rf data_dir
end

it "should shut down gracefully" do
repli = LavinMQ::Replication::Client.new(data_dir)
3.times do
spawn do
repli.follow("127.0.0.1", LavinMQ::Config.instance.replication_port)
end
end
end

describe "min_followers" do
before_each do
LavinMQ::Config.instance.min_followers = 1
end

after_each do
LavinMQ::Config.instance.min_followers = 0
end
it "should publish when min_followers is fulfilled" do
q = Server.vhosts["/"].queues["repli"].as(LavinMQ::Queue)
repli = LavinMQ::Replication::Client.new(data_dir)
spawn do
repli.follow("127.0.0.1", LavinMQ::Config.instance.replication_port)
end
with_channel do |ch|
ch.basic_publish "hello world", "", "repli"
end
q.basic_get(true) { }.should be_true
repli.close
end

it "should not publish when min_followers is not fulfilled" do
done = Channel(Nil).new
client : AMQP::Client::Connection? = nil
spawn do
with_channel do |ch, conn|
client = conn
q = ch.queue("repli")
q.publish_confirm "hello world"
done.send nil
end
end
select
when done.receive
fail "Should not receive message"
when timeout(0.1.seconds)
# ugly hack to release replicator from waiting for lag
repli = LavinMQ::Replication::Client.new(data_dir)
done = Channel(Nil).new
spawn do
repli.follow("127.0.0.1", LavinMQ::Config.instance.replication_port)
done.send nil
end
done.receive
client.try &.close(no_wait: true)
repli.try &.close
end
end
end

describe "max_lag" do
before_each do
LavinMQ::Config.instance.max_lag = 1
end

after_each do
LavinMQ::Config.instance.max_lag = nil
end

it "should publish when max_lag is not reached" do
LavinMQ::Config.instance.max_lag = 10000
q = Server.vhosts["/"].queues["repli"].as(LavinMQ::Queue)
repli = LavinMQ::Replication::Client.new(data_dir)
spawn do
repli.follow("127.0.0.1", LavinMQ::Config.instance.replication_port)
end
with_channel do |ch|
ch.basic_publish "hello world", "", "repli"
end
q.basic_get(true) { }.should be_true
repli.close
end

it "should not publish when max_lag is reached" do
Server.vhosts["/"].declare_queue("test123", true, false)
repli = LavinMQ::Replication::Client.new(data_dir)
done = Channel(Nil).new
spawn(name: "repli_sync") do
repli.sync("127.0.0.1", LavinMQ::Config.instance.replication_port, true)
done.send nil
end
done.receive

client : AMQP::Client::Connection? = nil
spawn(name: "with_channel") do
with_channel do |ch, conn|
client = conn
ch.basic_publish_confirm "hello world", "", "test123"
ch.basic_publish_confirm "hello world2", "", "test123"
done.send nil
rescue e
end
end

select
when done.receive
fail "should not receive mssage"
when timeout(1.seconds)
Server.vhosts["/"].queues["test123"].message_count.should eq 1
end
ensure
client.try &.close(no_wait: true)
repli.try &.close
end
end
end
7 changes: 6 additions & 1 deletion spec/spec_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def with_channel(file = __FILE__, line = __LINE__, **args, &)
args = {port: LavinMQ::Config.instance.amqp_port, name: name}.merge(args)
conn = AMQP::Client.new(**args).connect
ch = conn.channel
yield ch
yield ch, conn
ensure
conn.try &.close(no_wait: false)
end
Expand Down Expand Up @@ -160,6 +160,11 @@ start_http_server
Spec.after_each do
Server.stop
FileUtils.rm_rf("/tmp/lavinmq-spec")
end

Spec.before_each do
Server.stop
FileUtils.rm_rf("/tmp/lavinmq-spec")
Server.restart
end

Expand Down
4 changes: 4 additions & 0 deletions src/lavinmq/config.cr
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ module LavinMQ
property max_deleted_definitions = 8192 # number of deleted queues, unbinds etc that compacts the definitions file
property consumer_timeout : UInt64? = nil
property consumer_timeout_loop_interval = 60 # seconds
property min_followers : Int64 = 0
property max_lag : Int64? = nil
@@instance : Config = self.new

def self.instance : LavinMQ::Config
Expand Down Expand Up @@ -141,6 +143,8 @@ module LavinMQ
when "systemd_socket_name" then @amqp_systemd_socket_name = v
when "unix_proxy_protocol" then @unix_proxy_protocol = true?(v) ? 1u8 : v.to_u8? || 0u8
when "tcp_proxy_protocol" then @tcp_proxy_protocol = true?(v) ? 1u8 : v.to_u8? || 0u8
when "min_followers" then @min_followers = v.to_i64
when "max_lag" then @max_lag = v.to_i64
else
STDERR.puts "WARNING: Unrecognized configuration 'amqp/#{config}'"
end
Expand Down
1 change: 1 addition & 0 deletions src/lavinmq/http/controller/nodes.cr
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ module LavinMQ
run_queue: 0,
sockets_used: @amqp_server.vhosts.sum { |_, v| v.connections.size },
followers: @amqp_server.followers,
max_lag: LavinMQ::Config.instance.max_lag,
}
end

Expand Down
4 changes: 2 additions & 2 deletions src/lavinmq/replication/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ module LavinMQ
@data_dir_lock : DataDirLock?
@closed = false

def initialize(@data_dir : String)
def initialize(@data_dir : String, pwd : String? = nil)
System.maximize_fd_limit
@socket = TCPSocket.new
@socket.sync = true
@socket.read_buffering = false
@lz4 = Compress::LZ4::Reader.new(@socket)
@password = password
@password = pwd || password
@files = Hash(String, File).new do |h, k|
path = File.join(@data_dir, k)
Dir.mkdir_p File.dirname(path)
Expand Down
17 changes: 16 additions & 1 deletion src/lavinmq/replication/follower.cr
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ module LavinMQ
module Replication
class Follower
Log = ::Log.for(self)

@ack = Channel(Int64).new
@acked_bytes = 0_i64
@sent_bytes = 0_i64
@actions = Channel(Action).new(4096)
Expand Down Expand Up @@ -46,6 +46,11 @@ module LavinMQ
spawn action_loop, name: "Follower#action_loop"
loop do
read_ack(socket)
if max_lag = Config.instance.max_lag
if lag < max_lag
@ack.try_send lag
end
end
end
rescue IO::Error
end
Expand All @@ -55,6 +60,15 @@ module LavinMQ
@acked_bytes += len
end

def wait_for_max_lag
if max_lag = Config.instance.max_lag
current_lag = lag
until current_lag < max_lag
break unless current_lag = @ack.receive?
end
end
end

private def action_loop(socket = @lz4)
while action = @actions.receive?
action.send(socket)
Expand Down Expand Up @@ -157,6 +171,7 @@ module LavinMQ
Log.info { "Disconnected" }
wait_for_sync if synced_close
@actions.close
@ack.close
@lz4.close
@socket.close
rescue IO::Error
Expand Down
50 changes: 41 additions & 9 deletions src/lavinmq/replication/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ module LavinMQ
include FileIndex
include Replicator
Log = ::Log.for("replication")
getter followers_changed = Channel(Nil).new
getter? closing
@lock = Mutex.new(:unchecked)
@followers = Array(Follower).new
@password : String
@files = Hash(String, MFile?).new
@closing = false

def initialize
@password = password
def initialize(@password = password)
@tcp = TCPServer.new
@tcp.sync = false
@tcp.reuse_address = true
Expand Down Expand Up @@ -59,11 +61,19 @@ module LavinMQ

def append(path : String, obj)
Log.debug { "appending #{obj} to #{path}" }
unless closing?
wait_for_min_followers
wait_for_max_lag
end
each_follower &.append(path, obj)
end

def delete_file(path : String)
@files.delete(path)
unless closing?
wait_for_min_followers
wait_for_max_lag
end
each_follower &.delete(path)
end

Expand Down Expand Up @@ -106,6 +116,21 @@ module LavinMQ
end
end

def wait_for_min_followers
return unless min_followers = Config.instance.min_followers
until closing? || @followers.size >= min_followers
@followers_changed.receive
end
end

def wait_for_max_lag
return unless Config.instance.max_lag
each_follower do |f|
f.wait_for_max_lag
end
rescue Channel::ClosedError
end

private def password : String
path = File.join(Config.instance.data_dir, ".replication_secret")
begin
Expand Down Expand Up @@ -143,13 +168,8 @@ module LavinMQ
follower.full_sync
@followers << follower
end
begin
follower.read_acks
ensure
@lock.synchronize do
@followers.delete(follower)
end
end
followers_changed.try_send nil
follower.read_acks
rescue ex : AuthenticationError
Log.warn { "Follower negotiation error" }
rescue ex : InvalidStartHeaderError
Expand All @@ -160,9 +180,14 @@ module LavinMQ
Log.warn(exception: ex) { "Follower connection error" } unless socket.closed?
ensure
follower.try &.close
@lock.synchronize do
@followers.delete(follower)
end
followers_changed.try_send nil
end

def close
Log.debug { "closing" }
@tcp.close
@lock.synchronize do
done = Channel({Follower, Bool}).new
Expand All @@ -175,9 +200,16 @@ module LavinMQ
end
@followers.clear
end
@followers_changed.close
Log.debug { "closed" }
Fiber.yield # required for follower/listener fibers to actually finish
end

def closing
@closing = true
@followers_changed.try_send? nil
end

private def each_follower(& : Follower -> Nil) : Nil
@lock.synchronize do
@followers.each do |f|
Expand Down
3 changes: 3 additions & 0 deletions src/lavinmq/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ module LavinMQ
def restart
stop
Dir.mkdir_p @data_dir
# @replicator = Replication::Server.new
Schema.migrate(@data_dir, @replicator)
@users = UserStore.new(@data_dir, @replicator)
@vhosts = VHostStore.new(@data_dir, @users, @replicator)
Expand Down Expand Up @@ -171,10 +172,12 @@ module LavinMQ

def close
@closed = true
@replicator.closing
@log.debug { "Closing listeners" }
@listeners.each_key &.close
@log.debug { "Closing vhosts" }
@vhosts.close
@log.debug { "Closing replicator" }
@replicator.close
end

Expand Down
4 changes: 4 additions & 0 deletions src/lavinmq/vhost.cr
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ module LavinMQ
headers = msg.properties.headers
find_all_queues(ex, msg.routing_key, headers, visited, found_queues)
headers.delete("BCC") if headers

if found_queues.empty?
ex.unroutable_count += 1
return false
Expand Down Expand Up @@ -456,8 +457,11 @@ module LavinMQ
sleep 0.1
end
# then force close the remaining (close tcp socket)
@log.debug { "force closing connection" } unless connections.empty?

@connections.each &.force_close
Fiber.yield # yield so that Client read_loops can shutdown
@log.debug { "Closing queues" }
@queues.each_value &.close
Fiber.yield
compact!
Expand Down
Loading
Loading