From d9fc12381ab85f87983c032d266bcac81394eb46 Mon Sep 17 00:00:00 2001 From: Christina Date: Tue, 7 May 2024 10:58:17 +0200 Subject: [PATCH 1/9] add min_followers and max_lag --- extras/lavinmq.ini | 2 + spec/replication_spec.cr | 80 ++++++++++++++++++++++++++++ spec/spec_helper.cr | 2 +- src/lavinmq/config.cr | 6 ++- src/lavinmq/http/controller/nodes.cr | 2 + src/lavinmq/replication/client.cr | 4 +- src/lavinmq/replication/follower.cr | 22 ++++++-- src/lavinmq/replication/server.cr | 34 +++++++++++- src/lavinmq/server.cr | 2 + src/lavinmq/vhost.cr | 4 ++ static/js/nodes.js | 6 +++ views/nodes.ecr | 14 ++++- 12 files changed, 168 insertions(+), 10 deletions(-) diff --git a/extras/lavinmq.ini b/extras/lavinmq.ini index 5a64646b94..8d2a9386cb 100644 --- a/extras/lavinmq.ini +++ b/extras/lavinmq.ini @@ -22,3 +22,5 @@ tcp_proxy_protocol = false tls_port = 5671 unix_path = /tmp/lavinmq.sock unix_proxy_protocol = true +min_followers = 1 +max_lag = 10000000000 diff --git a/spec/replication_spec.cr b/spec/replication_spec.cr index e3851c52d0..6dbd3ba75f 100644 --- a/spec/replication_spec.cr +++ b/spec/replication_spec.cr @@ -65,3 +65,83 @@ describe LavinMQ::Replication::Client do end end end + +describe LavinMQ::Replication::Server do + data_dir = "/tmp/lavinmq-follower" + + before_each do + FileUtils.rm_rf data_dir + Dir.mkdir_p data_dir + File.write File.join(data_dir, ".replication_secret"), Server.@replicator.@password, 0o400 + Server.vhosts["/"].declare_queue("repli", true, false) + LavinMQ::Config.instance.min_followers = 1 + end + + after_each do + FileUtils.rm_rf data_dir + LavinMQ::Config.instance.min_followers = 0 + end + + + it "should shut down gracefully" do + repli = LavinMQ::Replication::Client.new(data_dir) + 3.times do + spawn do + repli.follow("127.0.0.1", LavinMQ::Config.instance.replication_port) + end + end + + # repli.closing + + end + + it "should publish when min_followers is fulfilled" do + q = Server.vhosts["/"].queues["repli"].as(LavinMQ::Queue) + repli = LavinMQ::Replication::Client.new(data_dir) + spawn do + repli.follow("127.0.0.1", LavinMQ::Config.instance.replication_port) + end + with_channel do |ch| + ch.basic_publish "hello world", "", "repli" + end + q.basic_get(true) { }.should be_true + repli.close + end + + it "should not publish when min_followers is not fulfilled" do + done = Channel(Nil).new + client : AMQP::Client::Connection? = nil + spawn do + with_channel do |ch, conn| + client = conn + q = ch.queue("repli") + q.publish_confirm "hello world" + done.send nil + end + end + select + when done.receive + fail "Should not receive message" + when timeout(0.1.seconds) + client.try &.close(no_wait: true) + Server.close + end + end + + # it "should publish when max_lag is not reached" do + # LavinMQ::Config.instance.max_lag = 10000 + # q = Server.vhosts["/"].queues["repli"].as(LavinMQ::Queue) + # repli = LavinMQ::Replication::Client.new(data_dir) + # spawn do + # repli.follow("127.0.0.1", LavinMQ::Config.instance.replication_port) + # end + # with_channel do |ch| + # ch.basic_publish "hello world", "", "repli" + # end + # q.basic_get(true) { }.should be_true + # repli.close + # end + + # it "should not publish when max_lag is reached" do + # end +end diff --git a/spec/spec_helper.cr b/spec/spec_helper.cr index c395a9dbf1..bb1f28d1e5 100644 --- a/spec/spec_helper.cr +++ b/spec/spec_helper.cr @@ -43,7 +43,7 @@ def with_channel(file = __FILE__, line = __LINE__, **args, &) args = {port: LavinMQ::Config.instance.amqp_port, name: name}.merge(args) conn = AMQP::Client.new(**args).connect ch = conn.channel - yield ch + yield ch, conn ensure conn.try &.close(no_wait: false) end diff --git a/src/lavinmq/config.cr b/src/lavinmq/config.cr index de1cdc8d4b..c8f57c4f15 100644 --- a/src/lavinmq/config.cr +++ b/src/lavinmq/config.cr @@ -6,7 +6,7 @@ module LavinMQ DEFAULT_LOG_LEVEL = Log::Severity::Info property data_dir : String = ENV.fetch("STATE_DIRECTORY", "/var/lib/lavinmq") - property config_file = File.exists?(File.join(ENV.fetch("CONFIGURATION_DIRECTORY", "/etc/lavinmq"), "lavinmq.ini")) ? File.join(ENV.fetch("CONFIGURATION_DIRECTORY", "/etc/lavinmq"), "lavinmq.ini") : "" + property config_file = File.exists?(File.join(ENV.fetch("CONFIGURATION_DIRECTORY", "/etc/lavinmq"), "lavinmq.ini")) ? File.join(ENV.fetch("CONFIGURATION_DIRECTORY", "/etc/lavinmq"), "lavinmq.ini") : "/Users/christinadahlen/84codes/lavinmq/extras/lavinmq.ini" property log_file : String? = nil property log_level : Log::Severity = DEFAULT_LOG_LEVEL property amqp_bind = "127.0.0.1" @@ -50,6 +50,8 @@ module LavinMQ property max_deleted_definitions = 8192 # number of deleted queues, unbinds etc that compacts the definitions file property consumer_timeout : UInt64? = nil property consumer_timeout_loop_interval = 60 # seconds + property min_followers : Int64 = 0 + property max_lag : Int64? = nil @@instance : Config = self.new def self.instance : LavinMQ::Config @@ -141,6 +143,8 @@ module LavinMQ when "systemd_socket_name" then @amqp_systemd_socket_name = v when "unix_proxy_protocol" then @unix_proxy_protocol = true?(v) ? 1u8 : v.to_u8? || 0u8 when "tcp_proxy_protocol" then @tcp_proxy_protocol = true?(v) ? 1u8 : v.to_u8? || 0u8 + when "min_followers" then @min_followers = v.to_i64 + when "max_lag" then @max_lag = v.to_i64 else STDERR.puts "WARNING: Unrecognized configuration 'amqp/#{config}'" end diff --git a/src/lavinmq/http/controller/nodes.cr b/src/lavinmq/http/controller/nodes.cr index 01d42ae489..49f33691c9 100644 --- a/src/lavinmq/http/controller/nodes.cr +++ b/src/lavinmq/http/controller/nodes.cr @@ -77,6 +77,8 @@ module LavinMQ run_queue: 0, sockets_used: @amqp_server.vhosts.sum { |_, v| v.connections.size }, followers: @amqp_server.followers, + min_followers: LavinMQ::Config.instance.min_followers, + max_lag: LavinMQ::Config.instance.max_lag, } end diff --git a/src/lavinmq/replication/client.cr b/src/lavinmq/replication/client.cr index 67881cba70..d46f9ba9d9 100644 --- a/src/lavinmq/replication/client.cr +++ b/src/lavinmq/replication/client.cr @@ -10,13 +10,13 @@ module LavinMQ @data_dir_lock : DataDirLock? @closed = false - def initialize(@data_dir : String) + def initialize(@data_dir : String, pwd : String? = nil) System.maximize_fd_limit @socket = TCPSocket.new @socket.sync = true @socket.read_buffering = false @lz4 = Compress::LZ4::Reader.new(@socket) - @password = password + @password = pwd || password @files = Hash(String, File).new do |h, k| path = File.join(@data_dir, k) Dir.mkdir_p File.dirname(path) diff --git a/src/lavinmq/replication/follower.cr b/src/lavinmq/replication/follower.cr index 0c35da4fd6..454a85eeac 100644 --- a/src/lavinmq/replication/follower.cr +++ b/src/lavinmq/replication/follower.cr @@ -5,7 +5,7 @@ module LavinMQ module Replication class Follower Log = ::Log.for(self) - + @ack = Channel(Int64).new @acked_bytes = 0_i64 @sent_bytes = 0_i64 @actions = Channel(Action).new(4096) @@ -46,6 +46,11 @@ module LavinMQ spawn action_loop, name: "Follower#action_loop" loop do read_ack(socket) + if max_lag = Config.instance.max_lag + if lag < max_lag + @ack.try_send lag + end + end end rescue IO::Error end @@ -55,13 +60,23 @@ module LavinMQ @acked_bytes += len end + def wait_for_max_lag + if max_lag = Config.instance.max_lag + current_lag = lag + until current_lag < max_lag + break unless current_lag = @ack.receive? + end + end + end + private def action_loop(socket = @lz4) while action = @actions.receive? - action.send(socket) + sent_bytes = action.send(socket) while action2 = @actions.try_receive? - action2.send(socket) + sent_bytes += action2.send(socket) end socket.flush + @sent_bytes += sent_bytes end rescue IO::Error ensure @@ -157,6 +172,7 @@ module LavinMQ Log.info { "Disconnected" } wait_for_sync if synced_close @actions.close + @ack.close @lz4.close @socket.close rescue IO::Error diff --git a/src/lavinmq/replication/server.cr b/src/lavinmq/replication/server.cr index c285631f90..f0ca4e00cc 100644 --- a/src/lavinmq/replication/server.cr +++ b/src/lavinmq/replication/server.cr @@ -24,13 +24,15 @@ module LavinMQ include FileIndex include Replicator Log = ::Log.for("replication") + getter followers_changed = Channel(Nil).new + getter? closing @lock = Mutex.new(:unchecked) @followers = Array(Follower).new @password : String @files = Hash(String, MFile?).new + @closing = false - def initialize - @password = password + def initialize(@password = password) @tcp = TCPServer.new @tcp.sync = false @tcp.reuse_address = true @@ -59,11 +61,13 @@ module LavinMQ def append(path : String, obj) Log.debug { "appending #{obj} to #{path}" } + wait_for_max_lag unless closing? each_follower &.append(path, obj) end def delete_file(path : String) @files.delete(path) + wait_for_max_lag unless closing? each_follower &.delete(path) end @@ -106,6 +110,22 @@ module LavinMQ end end + def wait_for_max_lag + # was_closing = @closing + until @closing || @followers.size >= Config.instance.min_followers + @followers_changed.receive + end + # unless (!was_closing && @closing) || @followers.size >= Config.instance.min_followers + # raise Exception.new("Not enough followers") + # end + # use waitgroup instead l8er + @followers.each_with_index do |f, i| + break if i > Config.instance.min_followers + f.wait_for_max_lag + end + rescue Channel::ClosedError + end + private def password : String path = File.join(Config.instance.data_dir, ".replication_secret") begin @@ -143,12 +163,14 @@ module LavinMQ follower.full_sync @followers << follower end + followers_changed.try_send nil begin follower.read_acks ensure @lock.synchronize do @followers.delete(follower) end + followers_changed.try_send nil end rescue ex : AuthenticationError Log.warn { "Follower negotiation error" } @@ -163,6 +185,7 @@ module LavinMQ end def close + Log.debug { "closing" } @tcp.close @lock.synchronize do done = Channel({Follower, Bool}).new @@ -175,9 +198,16 @@ module LavinMQ end @followers.clear end + @followers_changed.close + Log.debug { "closed" } Fiber.yield # required for follower/listener fibers to actually finish end + def closing + @closing = true + @followers_changed.try_send? nil + end + private def each_follower(& : Follower -> Nil) : Nil @lock.synchronize do @followers.each do |f| diff --git a/src/lavinmq/server.cr b/src/lavinmq/server.cr index 411e4fad53..6a108bf4a9 100644 --- a/src/lavinmq/server.cr +++ b/src/lavinmq/server.cr @@ -171,10 +171,12 @@ module LavinMQ def close @closed = true + @replicator.closing @log.debug { "Closing listeners" } @listeners.each_key &.close @log.debug { "Closing vhosts" } @vhosts.close + @log.debug { "Closing replicator" } @replicator.close end diff --git a/src/lavinmq/vhost.cr b/src/lavinmq/vhost.cr index 62f3924b39..29e0028cc1 100644 --- a/src/lavinmq/vhost.cr +++ b/src/lavinmq/vhost.cr @@ -125,6 +125,7 @@ module LavinMQ headers = msg.properties.headers find_all_queues(ex, msg.routing_key, headers, visited, found_queues) headers.delete("BCC") if headers + if found_queues.empty? ex.unroutable_count += 1 return false @@ -456,8 +457,11 @@ module LavinMQ sleep 0.1 end # then force close the remaining (close tcp socket) + @log.debug { "force closing connection" } unless connections.empty? + @connections.each &.force_close Fiber.yield # yield so that Client read_loops can shutdown + @log.debug { "Closing queues" } @queues.each_value &.close Fiber.yield compact! diff --git a/static/js/nodes.js b/static/js/nodes.js index c90db1e3c2..ee9accff6a 100644 --- a/static/js/nodes.js +++ b/static/js/nodes.js @@ -30,6 +30,7 @@ function render (data) { document.querySelector('#version').textContent = data[0].applications[0].version for (const node of data) { updateDetails(node) + updateFollowerSettings(node) updateStats(node) } } @@ -72,6 +73,11 @@ const updateDetails = (nodeStats) => { document.getElementById('tr-disk').textContent = diskUsage } +const updateFollowerSettings = (nodeStats) => { + document.getElementById('tr-max-lag').textContent = nodeStats.max_lag || 'No max_lag value specified' + document.getElementById('tr-min-followers').textContent = nodeStats.min_followers +} + const stats = [ { heading: 'Connection', diff --git a/views/nodes.ecr b/views/nodes.ecr index f3e9b96b9b..bcd073cbe4 100644 --- a/views/nodes.ecr +++ b/views/nodes.ecr @@ -74,7 +74,7 @@

Queue churn

-
+

Followers @@ -94,6 +94,18 @@

+
+

Follower Settings

+ + + + + + + + + +
Minimum amount of followers
Maximum amount of lag
<% render "footer" %> From ed46e7575546b4fce8e6dc6c2de4ab783b961427 Mon Sep 17 00:00:00 2001 From: Christina Date: Tue, 14 May 2024 11:37:47 +0200 Subject: [PATCH 2/9] beginning of specs for max_lag --- extras/lavinmq.ini | 2 +- spec/replication_spec.cr | 149 +++++++++++++++++++-------- spec/spec_helper.cr | 6 ++ src/lavinmq/http/controller/nodes.cr | 1 - src/lavinmq/replication/server.cr | 27 +++-- 5 files changed, 128 insertions(+), 57 deletions(-) diff --git a/extras/lavinmq.ini b/extras/lavinmq.ini index 8d2a9386cb..9fe6336312 100644 --- a/extras/lavinmq.ini +++ b/extras/lavinmq.ini @@ -23,4 +23,4 @@ tls_port = 5671 unix_path = /tmp/lavinmq.sock unix_proxy_protocol = true min_followers = 1 -max_lag = 10000000000 +max_lag = 100 diff --git a/spec/replication_spec.cr b/spec/replication_spec.cr index 6dbd3ba75f..f973eebc0a 100644 --- a/spec/replication_spec.cr +++ b/spec/replication_spec.cr @@ -5,12 +5,14 @@ describe LavinMQ::Replication::Client do data_dir = "/tmp/lavinmq-follower" before_each do + puts "client bfe" FileUtils.rm_rf data_dir Dir.mkdir_p data_dir File.write File.join(data_dir, ".replication_secret"), Server.@replicator.@password, 0o400 end after_each do + puts "client ae" FileUtils.rm_rf data_dir end @@ -70,19 +72,18 @@ describe LavinMQ::Replication::Server do data_dir = "/tmp/lavinmq-follower" before_each do + puts "before_each" FileUtils.rm_rf data_dir Dir.mkdir_p data_dir File.write File.join(data_dir, ".replication_secret"), Server.@replicator.@password, 0o400 Server.vhosts["/"].declare_queue("repli", true, false) - LavinMQ::Config.instance.min_followers = 1 end after_each do + puts "after_each" FileUtils.rm_rf data_dir - LavinMQ::Config.instance.min_followers = 0 end - it "should shut down gracefully" do repli = LavinMQ::Replication::Client.new(data_dir) 3.times do @@ -92,56 +93,116 @@ describe LavinMQ::Replication::Server do end # repli.closing - end - it "should publish when min_followers is fulfilled" do - q = Server.vhosts["/"].queues["repli"].as(LavinMQ::Queue) - repli = LavinMQ::Replication::Client.new(data_dir) - spawn do - repli.follow("127.0.0.1", LavinMQ::Config.instance.replication_port) + describe "min_followers" do + before_each do + LavinMQ::Config.instance.min_followers = 1 end - with_channel do |ch| - ch.basic_publish "hello world", "", "repli" + + after_each do + LavinMQ::Config.instance.min_followers = 0 + end + it "should publish when min_followers is fulfilled" do + q = Server.vhosts["/"].queues["repli"].as(LavinMQ::Queue) + repli = LavinMQ::Replication::Client.new(data_dir) + spawn do + repli.follow("127.0.0.1", LavinMQ::Config.instance.replication_port) + end + with_channel do |ch| + ch.basic_publish "hello world", "", "repli" + end + q.basic_get(true) { }.should be_true + repli.close + end + + it "should not publish when min_followers is not fulfilled" do + done = Channel(Nil).new + client : AMQP::Client::Connection? = nil + spawn do + with_channel do |ch, conn| + client = conn + q = ch.queue("repli") + q.publish_confirm "hello world" + done.send nil + end + end + select + when done.receive + fail "Should not receive message" + when timeout(0.1.seconds) + client.try &.close(no_wait: true) + Server.close + end end - q.basic_get(true) { }.should be_true - repli.close end - it "should not publish when min_followers is not fulfilled" do - done = Channel(Nil).new - client : AMQP::Client::Connection? = nil - spawn do - with_channel do |ch, conn| - client = conn - q = ch.queue("repli") - q.publish_confirm "hello world" + describe "max_lag" do + before_each do + LavinMQ::Config.instance.max_lag = 1 + end + + after_each do + LavinMQ::Config.instance.max_lag = nil + end + # it "should publish when max_lag is not reached" do + # LavinMQ::Config.instance.max_lag = 10000 + # q = Server.vhosts["/"].queues["repli"].as(LavinMQ::Queue) + # repli = LavinMQ::Replication::Client.new(data_dir) + # spawn do + # repli.follow("127.0.0.1", LavinMQ::Config.instance.replication_port) + # end + # with_channel do |ch| + # ch.basic_publish "hello world", "", "repli" + # end + # q.basic_get(true) { }.should be_true + # repli.close + # end + + it "should not publish when max_lag is reached" do + pp "config: #{LavinMQ::Config.instance.max_lag}" + Server.vhosts["/"].declare_queue("test123", true, false) + repli = LavinMQ::Replication::Client.new(data_dir) + done = Channel(Nil).new + spawn(name: "repli_sync") do + repli.sync("127.0.0.1", LavinMQ::Config.instance.replication_port) done.send nil end - end - select - when done.receive - fail "Should not receive message" - when timeout(0.1.seconds) + done.receive + + client : AMQP::Client::Connection? = nil + spawn(name: "with_channel") do + pp 0 + with_channel do |ch, conn| + pp "-1" + client = conn + pp "-2" + pp 1 + ch.basic_publish_confirm "hello world", "", "test123" + Fiber.list { |f| puts f.inspect } + pp 2 + + ch.basic_publish_confirm "hello world2", "", "test123" + pp 3 + # done.send nil + rescue e + end + + end + sleep 1.seconds client.try &.close(no_wait: true) - Server.close + Server.vhosts["/"].queues["test123"].message_count.should eq 1 + + + # select + # when done.receive + # fail "should not receive mssage" + # when timeout(1.seconds) + # pp "7" + # Server.vhosts["/"].queues["test123"].message_count.should eq 1 + # end + # ensure + # client.try &.close(no_wait: true) end end - - # it "should publish when max_lag is not reached" do - # LavinMQ::Config.instance.max_lag = 10000 - # q = Server.vhosts["/"].queues["repli"].as(LavinMQ::Queue) - # repli = LavinMQ::Replication::Client.new(data_dir) - # spawn do - # repli.follow("127.0.0.1", LavinMQ::Config.instance.replication_port) - # end - # with_channel do |ch| - # ch.basic_publish "hello world", "", "repli" - # end - # q.basic_get(true) { }.should be_true - # repli.close - # end - - # it "should not publish when max_lag is reached" do - # end end diff --git a/spec/spec_helper.cr b/spec/spec_helper.cr index bb1f28d1e5..251c741a0f 100644 --- a/spec/spec_helper.cr +++ b/spec/spec_helper.cr @@ -160,9 +160,15 @@ start_http_server Spec.after_each do Server.stop FileUtils.rm_rf("/tmp/lavinmq-spec") +end + +Spec.before_each do + Server.stop + FileUtils.rm_rf("/tmp/lavinmq-spec") Server.restart end + class Invalid < Exception def initialize super("invalid input") diff --git a/src/lavinmq/http/controller/nodes.cr b/src/lavinmq/http/controller/nodes.cr index 49f33691c9..648e7a08e4 100644 --- a/src/lavinmq/http/controller/nodes.cr +++ b/src/lavinmq/http/controller/nodes.cr @@ -77,7 +77,6 @@ module LavinMQ run_queue: 0, sockets_used: @amqp_server.vhosts.sum { |_, v| v.connections.size }, followers: @amqp_server.followers, - min_followers: LavinMQ::Config.instance.min_followers, max_lag: LavinMQ::Config.instance.max_lag, } end diff --git a/src/lavinmq/replication/server.cr b/src/lavinmq/replication/server.cr index f0ca4e00cc..d6068fad6b 100644 --- a/src/lavinmq/replication/server.cr +++ b/src/lavinmq/replication/server.cr @@ -61,13 +61,19 @@ module LavinMQ def append(path : String, obj) Log.debug { "appending #{obj} to #{path}" } - wait_for_max_lag unless closing? + unless closing? + wait_for_min_followers + wait_for_max_lag + end each_follower &.append(path, obj) end def delete_file(path : String) @files.delete(path) - wait_for_max_lag unless closing? + unless closing? + wait_for_min_followers + wait_for_max_lag + end each_follower &.delete(path) end @@ -110,17 +116,16 @@ module LavinMQ end end - def wait_for_max_lag - # was_closing = @closing - until @closing || @followers.size >= Config.instance.min_followers + def wait_for_min_followers + return unless min_followers = Config.instance.min_followers + until closing? || @followers.size >= min_followers @followers_changed.receive end - # unless (!was_closing && @closing) || @followers.size >= Config.instance.min_followers - # raise Exception.new("Not enough followers") - # end - # use waitgroup instead l8er - @followers.each_with_index do |f, i| - break if i > Config.instance.min_followers + end + + def wait_for_max_lag + return unless max_lag = Config.instance.max_lag + each_follower do |f| f.wait_for_max_lag end rescue Channel::ClosedError From 7602dbf5e0df61d03362e39456233df2d8f93d69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Tue, 14 May 2024 14:59:49 +0200 Subject: [PATCH 3/9] Close replication to release client read_loop --- spec/replication_spec.cr | 36 +++++++++--------------------------- 1 file changed, 9 insertions(+), 27 deletions(-) diff --git a/spec/replication_spec.cr b/spec/replication_spec.cr index f973eebc0a..29bf6e010f 100644 --- a/spec/replication_spec.cr +++ b/spec/replication_spec.cr @@ -5,14 +5,12 @@ describe LavinMQ::Replication::Client do data_dir = "/tmp/lavinmq-follower" before_each do - puts "client bfe" FileUtils.rm_rf data_dir Dir.mkdir_p data_dir File.write File.join(data_dir, ".replication_secret"), Server.@replicator.@password, 0o400 end after_each do - puts "client ae" FileUtils.rm_rf data_dir end @@ -72,7 +70,6 @@ describe LavinMQ::Replication::Server do data_dir = "/tmp/lavinmq-follower" before_each do - puts "before_each" FileUtils.rm_rf data_dir Dir.mkdir_p data_dir File.write File.join(data_dir, ".replication_secret"), Server.@replicator.@password, 0o400 @@ -80,7 +77,6 @@ describe LavinMQ::Replication::Server do end after_each do - puts "after_each" FileUtils.rm_rf data_dir end @@ -160,7 +156,6 @@ describe LavinMQ::Replication::Server do # end it "should not publish when max_lag is reached" do - pp "config: #{LavinMQ::Config.instance.max_lag}" Server.vhosts["/"].declare_queue("test123", true, false) repli = LavinMQ::Replication::Client.new(data_dir) done = Channel(Nil).new @@ -172,37 +167,24 @@ describe LavinMQ::Replication::Server do client : AMQP::Client::Connection? = nil spawn(name: "with_channel") do - pp 0 with_channel do |ch, conn| - pp "-1" client = conn - pp "-2" - pp 1 ch.basic_publish_confirm "hello world", "", "test123" - Fiber.list { |f| puts f.inspect } - pp 2 - ch.basic_publish_confirm "hello world2", "", "test123" - pp 3 - # done.send nil + done.send nil rescue e end + end + select + when done.receive + fail "should not receive mssage" + when timeout(1.seconds) + Server.vhosts["/"].queues["test123"].message_count.should eq 1 end - sleep 1.seconds + ensure client.try &.close(no_wait: true) - Server.vhosts["/"].queues["test123"].message_count.should eq 1 - - - # select - # when done.receive - # fail "should not receive mssage" - # when timeout(1.seconds) - # pp "7" - # Server.vhosts["/"].queues["test123"].message_count.should eq 1 - # end - # ensure - # client.try &.close(no_wait: true) + repli.try &.close end end end From 5c77d1365c6a5ab6195f4da7e9f2c6cadbdfa6eb Mon Sep 17 00:00:00 2001 From: Christina Date: Thu, 16 May 2024 12:47:04 +0200 Subject: [PATCH 4/9] needs better stop/start for specs --- spec/replication_spec.cr | 40 ++++++++++++++++++------------- src/lavinmq/replication/server.cr | 13 ++++------ src/lavinmq/server.cr | 1 + 3 files changed, 29 insertions(+), 25 deletions(-) diff --git a/spec/replication_spec.cr b/spec/replication_spec.cr index 29bf6e010f..99b57b0c41 100644 --- a/spec/replication_spec.cr +++ b/spec/replication_spec.cr @@ -87,8 +87,6 @@ describe LavinMQ::Replication::Server do repli.follow("127.0.0.1", LavinMQ::Config.instance.replication_port) end end - - # repli.closing end describe "min_followers" do @@ -128,7 +126,14 @@ describe LavinMQ::Replication::Server do fail "Should not receive message" when timeout(0.1.seconds) client.try &.close(no_wait: true) - Server.close + # Ugly hack to free us + # repli = LavinMQ::Replication::Client.new(data_dir) + # done = Channel(Nil).new + # spawn do + # repli.follow("127.0.0.1", LavinMQ::Config.instance.replication_port) + # done.send nil + # end + # done.receive end end end @@ -141,26 +146,27 @@ describe LavinMQ::Replication::Server do after_each do LavinMQ::Config.instance.max_lag = nil end - # it "should publish when max_lag is not reached" do - # LavinMQ::Config.instance.max_lag = 10000 - # q = Server.vhosts["/"].queues["repli"].as(LavinMQ::Queue) - # repli = LavinMQ::Replication::Client.new(data_dir) - # spawn do - # repli.follow("127.0.0.1", LavinMQ::Config.instance.replication_port) - # end - # with_channel do |ch| - # ch.basic_publish "hello world", "", "repli" - # end - # q.basic_get(true) { }.should be_true - # repli.close - # end + + it "should publish when max_lag is not reached" do + LavinMQ::Config.instance.max_lag = 10000 + q = Server.vhosts["/"].queues["repli"].as(LavinMQ::Queue) + repli = LavinMQ::Replication::Client.new(data_dir) + spawn do + repli.follow("127.0.0.1", LavinMQ::Config.instance.replication_port) + end + with_channel do |ch| + ch.basic_publish "hello world", "", "repli" + end + q.basic_get(true) { }.should be_true + repli.close + end it "should not publish when max_lag is reached" do Server.vhosts["/"].declare_queue("test123", true, false) repli = LavinMQ::Replication::Client.new(data_dir) done = Channel(Nil).new spawn(name: "repli_sync") do - repli.sync("127.0.0.1", LavinMQ::Config.instance.replication_port) + repli.sync("127.0.0.1", LavinMQ::Config.instance.replication_port, true) done.send nil end done.receive diff --git a/src/lavinmq/replication/server.cr b/src/lavinmq/replication/server.cr index d6068fad6b..77aaeca776 100644 --- a/src/lavinmq/replication/server.cr +++ b/src/lavinmq/replication/server.cr @@ -169,14 +169,7 @@ module LavinMQ @followers << follower end followers_changed.try_send nil - begin - follower.read_acks - ensure - @lock.synchronize do - @followers.delete(follower) - end - followers_changed.try_send nil - end + follower.read_acks rescue ex : AuthenticationError Log.warn { "Follower negotiation error" } rescue ex : InvalidStartHeaderError @@ -187,6 +180,10 @@ module LavinMQ Log.warn(exception: ex) { "Follower connection error" } unless socket.closed? ensure follower.try &.close + @lock.synchronize do + @followers.delete(follower) + end + followers_changed.try_send nil end def close diff --git a/src/lavinmq/server.cr b/src/lavinmq/server.cr index 6a108bf4a9..6c643999b7 100644 --- a/src/lavinmq/server.cr +++ b/src/lavinmq/server.cr @@ -53,6 +53,7 @@ module LavinMQ def restart stop Dir.mkdir_p @data_dir + # @replicator = Replication::Server.new Schema.migrate(@data_dir, @replicator) @users = UserStore.new(@data_dir, @replicator) @vhosts = VHostStore.new(@data_dir, @users, @replicator) From 1fe2e677daf361b689af9cda436144556712485d Mon Sep 17 00:00:00 2001 From: Christina Date: Mon, 20 May 2024 10:01:14 +0200 Subject: [PATCH 5/9] passing min_followers and max_lag replication specs --- spec/replication_spec.cr | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/spec/replication_spec.cr b/spec/replication_spec.cr index 99b57b0c41..0fb89bfa5e 100644 --- a/spec/replication_spec.cr +++ b/spec/replication_spec.cr @@ -125,15 +125,16 @@ describe LavinMQ::Replication::Server do when done.receive fail "Should not receive message" when timeout(0.1.seconds) + #ugly hack to release replicator from waiting for lag + repli = LavinMQ::Replication::Client.new(data_dir) + done = Channel(Nil).new + spawn do + repli.follow("127.0.0.1", LavinMQ::Config.instance.replication_port) + done.send nil + end + done.receive client.try &.close(no_wait: true) - # Ugly hack to free us - # repli = LavinMQ::Replication::Client.new(data_dir) - # done = Channel(Nil).new - # spawn do - # repli.follow("127.0.0.1", LavinMQ::Config.instance.replication_port) - # done.send nil - # end - # done.receive + repli.try &.close end end end From 50ea08a27e21d4613cc184612fe59cfa1a1cfb87 Mon Sep 17 00:00:00 2001 From: Christina Date: Mon, 20 May 2024 10:40:34 +0200 Subject: [PATCH 6/9] merge in improved replication lag --- src/lavinmq/replication/follower.cr | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/lavinmq/replication/follower.cr b/src/lavinmq/replication/follower.cr index 454a85eeac..8c085a23dc 100644 --- a/src/lavinmq/replication/follower.cr +++ b/src/lavinmq/replication/follower.cr @@ -71,12 +71,11 @@ module LavinMQ private def action_loop(socket = @lz4) while action = @actions.receive? - sent_bytes = action.send(socket) + action.send(socket) while action2 = @actions.try_receive? - sent_bytes += action2.send(socket) + action2.send(socket) end socket.flush - @sent_bytes += sent_bytes end rescue IO::Error ensure From f67f61942e8246a09d8f2aab041451be5e729074 Mon Sep 17 00:00:00 2001 From: Christina Date: Mon, 20 May 2024 10:43:57 +0200 Subject: [PATCH 7/9] format --- spec/replication_spec.cr | 2 +- spec/spec_helper.cr | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/spec/replication_spec.cr b/spec/replication_spec.cr index 0fb89bfa5e..13ca66f07b 100644 --- a/spec/replication_spec.cr +++ b/spec/replication_spec.cr @@ -125,7 +125,7 @@ describe LavinMQ::Replication::Server do when done.receive fail "Should not receive message" when timeout(0.1.seconds) - #ugly hack to release replicator from waiting for lag + # ugly hack to release replicator from waiting for lag repli = LavinMQ::Replication::Client.new(data_dir) done = Channel(Nil).new spawn do diff --git a/spec/spec_helper.cr b/spec/spec_helper.cr index 251c741a0f..b766a76df4 100644 --- a/spec/spec_helper.cr +++ b/spec/spec_helper.cr @@ -168,7 +168,6 @@ Spec.before_each do Server.restart end - class Invalid < Exception def initialize super("invalid input") From cd363d811ee93f231c4d050845504cd0ab0d86e9 Mon Sep 17 00:00:00 2001 From: Christina Date: Mon, 20 May 2024 10:56:28 +0200 Subject: [PATCH 8/9] lint --- src/lavinmq/replication/server.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lavinmq/replication/server.cr b/src/lavinmq/replication/server.cr index 77aaeca776..a0ad1710bf 100644 --- a/src/lavinmq/replication/server.cr +++ b/src/lavinmq/replication/server.cr @@ -124,7 +124,7 @@ module LavinMQ end def wait_for_max_lag - return unless max_lag = Config.instance.max_lag + return unless Config.instance.max_lag each_follower do |f| f.wait_for_max_lag end From ca9e85e68eec461482d2cb033817c683d971ddf3 Mon Sep 17 00:00:00 2001 From: Christina Date: Mon, 20 May 2024 11:53:33 +0200 Subject: [PATCH 9/9] fix --- extras/lavinmq.ini | 2 -- src/lavinmq/config.cr | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/extras/lavinmq.ini b/extras/lavinmq.ini index 9fe6336312..5a64646b94 100644 --- a/extras/lavinmq.ini +++ b/extras/lavinmq.ini @@ -22,5 +22,3 @@ tcp_proxy_protocol = false tls_port = 5671 unix_path = /tmp/lavinmq.sock unix_proxy_protocol = true -min_followers = 1 -max_lag = 100 diff --git a/src/lavinmq/config.cr b/src/lavinmq/config.cr index c8f57c4f15..1ac4999f09 100644 --- a/src/lavinmq/config.cr +++ b/src/lavinmq/config.cr @@ -6,7 +6,7 @@ module LavinMQ DEFAULT_LOG_LEVEL = Log::Severity::Info property data_dir : String = ENV.fetch("STATE_DIRECTORY", "/var/lib/lavinmq") - property config_file = File.exists?(File.join(ENV.fetch("CONFIGURATION_DIRECTORY", "/etc/lavinmq"), "lavinmq.ini")) ? File.join(ENV.fetch("CONFIGURATION_DIRECTORY", "/etc/lavinmq"), "lavinmq.ini") : "/Users/christinadahlen/84codes/lavinmq/extras/lavinmq.ini" + property config_file = File.exists?(File.join(ENV.fetch("CONFIGURATION_DIRECTORY", "/etc/lavinmq"), "lavinmq.ini")) ? File.join(ENV.fetch("CONFIGURATION_DIRECTORY", "/etc/lavinmq"), "lavinmq.ini") : "" property log_file : String? = nil property log_level : Log::Severity = DEFAULT_LOG_LEVEL property amqp_bind = "127.0.0.1"