diff --git a/spec/replication_spec.cr b/spec/replication_spec.cr index e3851c52d0..13ca66f07b 100644 --- a/spec/replication_spec.cr +++ b/spec/replication_spec.cr @@ -65,3 +65,133 @@ describe LavinMQ::Replication::Client do end end end + +describe LavinMQ::Replication::Server do + data_dir = "/tmp/lavinmq-follower" + + before_each do + FileUtils.rm_rf data_dir + Dir.mkdir_p data_dir + File.write File.join(data_dir, ".replication_secret"), Server.@replicator.@password, 0o400 + Server.vhosts["/"].declare_queue("repli", true, false) + end + + after_each do + FileUtils.rm_rf data_dir + end + + it "should shut down gracefully" do + repli = LavinMQ::Replication::Client.new(data_dir) + 3.times do + spawn do + repli.follow("127.0.0.1", LavinMQ::Config.instance.replication_port) + end + end + end + + describe "min_followers" do + before_each do + LavinMQ::Config.instance.min_followers = 1 + end + + after_each do + LavinMQ::Config.instance.min_followers = 0 + end + it "should publish when min_followers is fulfilled" do + q = Server.vhosts["/"].queues["repli"].as(LavinMQ::Queue) + repli = LavinMQ::Replication::Client.new(data_dir) + spawn do + repli.follow("127.0.0.1", LavinMQ::Config.instance.replication_port) + end + with_channel do |ch| + ch.basic_publish "hello world", "", "repli" + end + q.basic_get(true) { }.should be_true + repli.close + end + + it "should not publish when min_followers is not fulfilled" do + done = Channel(Nil).new + client : AMQP::Client::Connection? = nil + spawn do + with_channel do |ch, conn| + client = conn + q = ch.queue("repli") + q.publish_confirm "hello world" + done.send nil + end + end + select + when done.receive + fail "Should not receive message" + when timeout(0.1.seconds) + # ugly hack to release replicator from waiting for lag + repli = LavinMQ::Replication::Client.new(data_dir) + done = Channel(Nil).new + spawn do + repli.follow("127.0.0.1", LavinMQ::Config.instance.replication_port) + done.send nil + end + done.receive + client.try &.close(no_wait: true) + repli.try &.close + end + end + end + + describe "max_lag" do + before_each do + LavinMQ::Config.instance.max_lag = 1 + end + + after_each do + LavinMQ::Config.instance.max_lag = nil + end + + it "should publish when max_lag is not reached" do + LavinMQ::Config.instance.max_lag = 10000 + q = Server.vhosts["/"].queues["repli"].as(LavinMQ::Queue) + repli = LavinMQ::Replication::Client.new(data_dir) + spawn do + repli.follow("127.0.0.1", LavinMQ::Config.instance.replication_port) + end + with_channel do |ch| + ch.basic_publish "hello world", "", "repli" + end + q.basic_get(true) { }.should be_true + repli.close + end + + it "should not publish when max_lag is reached" do + Server.vhosts["/"].declare_queue("test123", true, false) + repli = LavinMQ::Replication::Client.new(data_dir) + done = Channel(Nil).new + spawn(name: "repli_sync") do + repli.sync("127.0.0.1", LavinMQ::Config.instance.replication_port, true) + done.send nil + end + done.receive + + client : AMQP::Client::Connection? = nil + spawn(name: "with_channel") do + with_channel do |ch, conn| + client = conn + ch.basic_publish_confirm "hello world", "", "test123" + ch.basic_publish_confirm "hello world2", "", "test123" + done.send nil + rescue e + end + end + + select + when done.receive + fail "should not receive mssage" + when timeout(1.seconds) + Server.vhosts["/"].queues["test123"].message_count.should eq 1 + end + ensure + client.try &.close(no_wait: true) + repli.try &.close + end + end +end diff --git a/spec/spec_helper.cr b/spec/spec_helper.cr index c395a9dbf1..b766a76df4 100644 --- a/spec/spec_helper.cr +++ b/spec/spec_helper.cr @@ -43,7 +43,7 @@ def with_channel(file = __FILE__, line = __LINE__, **args, &) args = {port: LavinMQ::Config.instance.amqp_port, name: name}.merge(args) conn = AMQP::Client.new(**args).connect ch = conn.channel - yield ch + yield ch, conn ensure conn.try &.close(no_wait: false) end @@ -160,6 +160,11 @@ start_http_server Spec.after_each do Server.stop FileUtils.rm_rf("/tmp/lavinmq-spec") +end + +Spec.before_each do + Server.stop + FileUtils.rm_rf("/tmp/lavinmq-spec") Server.restart end diff --git a/src/lavinmq/config.cr b/src/lavinmq/config.cr index de1cdc8d4b..1ac4999f09 100644 --- a/src/lavinmq/config.cr +++ b/src/lavinmq/config.cr @@ -50,6 +50,8 @@ module LavinMQ property max_deleted_definitions = 8192 # number of deleted queues, unbinds etc that compacts the definitions file property consumer_timeout : UInt64? = nil property consumer_timeout_loop_interval = 60 # seconds + property min_followers : Int64 = 0 + property max_lag : Int64? = nil @@instance : Config = self.new def self.instance : LavinMQ::Config @@ -141,6 +143,8 @@ module LavinMQ when "systemd_socket_name" then @amqp_systemd_socket_name = v when "unix_proxy_protocol" then @unix_proxy_protocol = true?(v) ? 1u8 : v.to_u8? || 0u8 when "tcp_proxy_protocol" then @tcp_proxy_protocol = true?(v) ? 1u8 : v.to_u8? || 0u8 + when "min_followers" then @min_followers = v.to_i64 + when "max_lag" then @max_lag = v.to_i64 else STDERR.puts "WARNING: Unrecognized configuration 'amqp/#{config}'" end diff --git a/src/lavinmq/http/controller/nodes.cr b/src/lavinmq/http/controller/nodes.cr index 01d42ae489..648e7a08e4 100644 --- a/src/lavinmq/http/controller/nodes.cr +++ b/src/lavinmq/http/controller/nodes.cr @@ -77,6 +77,7 @@ module LavinMQ run_queue: 0, sockets_used: @amqp_server.vhosts.sum { |_, v| v.connections.size }, followers: @amqp_server.followers, + max_lag: LavinMQ::Config.instance.max_lag, } end diff --git a/src/lavinmq/replication/client.cr b/src/lavinmq/replication/client.cr index 67881cba70..d46f9ba9d9 100644 --- a/src/lavinmq/replication/client.cr +++ b/src/lavinmq/replication/client.cr @@ -10,13 +10,13 @@ module LavinMQ @data_dir_lock : DataDirLock? @closed = false - def initialize(@data_dir : String) + def initialize(@data_dir : String, pwd : String? = nil) System.maximize_fd_limit @socket = TCPSocket.new @socket.sync = true @socket.read_buffering = false @lz4 = Compress::LZ4::Reader.new(@socket) - @password = password + @password = pwd || password @files = Hash(String, File).new do |h, k| path = File.join(@data_dir, k) Dir.mkdir_p File.dirname(path) diff --git a/src/lavinmq/replication/follower.cr b/src/lavinmq/replication/follower.cr index 0c35da4fd6..8c085a23dc 100644 --- a/src/lavinmq/replication/follower.cr +++ b/src/lavinmq/replication/follower.cr @@ -5,7 +5,7 @@ module LavinMQ module Replication class Follower Log = ::Log.for(self) - + @ack = Channel(Int64).new @acked_bytes = 0_i64 @sent_bytes = 0_i64 @actions = Channel(Action).new(4096) @@ -46,6 +46,11 @@ module LavinMQ spawn action_loop, name: "Follower#action_loop" loop do read_ack(socket) + if max_lag = Config.instance.max_lag + if lag < max_lag + @ack.try_send lag + end + end end rescue IO::Error end @@ -55,6 +60,15 @@ module LavinMQ @acked_bytes += len end + def wait_for_max_lag + if max_lag = Config.instance.max_lag + current_lag = lag + until current_lag < max_lag + break unless current_lag = @ack.receive? + end + end + end + private def action_loop(socket = @lz4) while action = @actions.receive? action.send(socket) @@ -157,6 +171,7 @@ module LavinMQ Log.info { "Disconnected" } wait_for_sync if synced_close @actions.close + @ack.close @lz4.close @socket.close rescue IO::Error diff --git a/src/lavinmq/replication/server.cr b/src/lavinmq/replication/server.cr index c285631f90..a0ad1710bf 100644 --- a/src/lavinmq/replication/server.cr +++ b/src/lavinmq/replication/server.cr @@ -24,13 +24,15 @@ module LavinMQ include FileIndex include Replicator Log = ::Log.for("replication") + getter followers_changed = Channel(Nil).new + getter? closing @lock = Mutex.new(:unchecked) @followers = Array(Follower).new @password : String @files = Hash(String, MFile?).new + @closing = false - def initialize - @password = password + def initialize(@password = password) @tcp = TCPServer.new @tcp.sync = false @tcp.reuse_address = true @@ -59,11 +61,19 @@ module LavinMQ def append(path : String, obj) Log.debug { "appending #{obj} to #{path}" } + unless closing? + wait_for_min_followers + wait_for_max_lag + end each_follower &.append(path, obj) end def delete_file(path : String) @files.delete(path) + unless closing? + wait_for_min_followers + wait_for_max_lag + end each_follower &.delete(path) end @@ -106,6 +116,21 @@ module LavinMQ end end + def wait_for_min_followers + return unless min_followers = Config.instance.min_followers + until closing? || @followers.size >= min_followers + @followers_changed.receive + end + end + + def wait_for_max_lag + return unless Config.instance.max_lag + each_follower do |f| + f.wait_for_max_lag + end + rescue Channel::ClosedError + end + private def password : String path = File.join(Config.instance.data_dir, ".replication_secret") begin @@ -143,13 +168,8 @@ module LavinMQ follower.full_sync @followers << follower end - begin - follower.read_acks - ensure - @lock.synchronize do - @followers.delete(follower) - end - end + followers_changed.try_send nil + follower.read_acks rescue ex : AuthenticationError Log.warn { "Follower negotiation error" } rescue ex : InvalidStartHeaderError @@ -160,9 +180,14 @@ module LavinMQ Log.warn(exception: ex) { "Follower connection error" } unless socket.closed? ensure follower.try &.close + @lock.synchronize do + @followers.delete(follower) + end + followers_changed.try_send nil end def close + Log.debug { "closing" } @tcp.close @lock.synchronize do done = Channel({Follower, Bool}).new @@ -175,9 +200,16 @@ module LavinMQ end @followers.clear end + @followers_changed.close + Log.debug { "closed" } Fiber.yield # required for follower/listener fibers to actually finish end + def closing + @closing = true + @followers_changed.try_send? nil + end + private def each_follower(& : Follower -> Nil) : Nil @lock.synchronize do @followers.each do |f| diff --git a/src/lavinmq/server.cr b/src/lavinmq/server.cr index 411e4fad53..6c643999b7 100644 --- a/src/lavinmq/server.cr +++ b/src/lavinmq/server.cr @@ -53,6 +53,7 @@ module LavinMQ def restart stop Dir.mkdir_p @data_dir + # @replicator = Replication::Server.new Schema.migrate(@data_dir, @replicator) @users = UserStore.new(@data_dir, @replicator) @vhosts = VHostStore.new(@data_dir, @users, @replicator) @@ -171,10 +172,12 @@ module LavinMQ def close @closed = true + @replicator.closing @log.debug { "Closing listeners" } @listeners.each_key &.close @log.debug { "Closing vhosts" } @vhosts.close + @log.debug { "Closing replicator" } @replicator.close end diff --git a/src/lavinmq/vhost.cr b/src/lavinmq/vhost.cr index 62f3924b39..29e0028cc1 100644 --- a/src/lavinmq/vhost.cr +++ b/src/lavinmq/vhost.cr @@ -125,6 +125,7 @@ module LavinMQ headers = msg.properties.headers find_all_queues(ex, msg.routing_key, headers, visited, found_queues) headers.delete("BCC") if headers + if found_queues.empty? ex.unroutable_count += 1 return false @@ -456,8 +457,11 @@ module LavinMQ sleep 0.1 end # then force close the remaining (close tcp socket) + @log.debug { "force closing connection" } unless connections.empty? + @connections.each &.force_close Fiber.yield # yield so that Client read_loops can shutdown + @log.debug { "Closing queues" } @queues.each_value &.close Fiber.yield compact! diff --git a/static/js/nodes.js b/static/js/nodes.js index c90db1e3c2..ee9accff6a 100644 --- a/static/js/nodes.js +++ b/static/js/nodes.js @@ -30,6 +30,7 @@ function render (data) { document.querySelector('#version').textContent = data[0].applications[0].version for (const node of data) { updateDetails(node) + updateFollowerSettings(node) updateStats(node) } } @@ -72,6 +73,11 @@ const updateDetails = (nodeStats) => { document.getElementById('tr-disk').textContent = diskUsage } +const updateFollowerSettings = (nodeStats) => { + document.getElementById('tr-max-lag').textContent = nodeStats.max_lag || 'No max_lag value specified' + document.getElementById('tr-min-followers').textContent = nodeStats.min_followers +} + const stats = [ { heading: 'Connection', diff --git a/views/nodes.ecr b/views/nodes.ecr index f3e9b96b9b..bcd073cbe4 100644 --- a/views/nodes.ecr +++ b/views/nodes.ecr @@ -74,7 +74,7 @@
Minimum amount of followers | ++ |
---|---|
Maximum amount of lag | ++ |