diff --git a/extras/config.ini b/extras/config.ini index 5a64646b94..99059bcb8e 100644 --- a/extras/config.ini +++ b/extras/config.ini @@ -22,3 +22,7 @@ tcp_proxy_protocol = false tls_port = 5671 unix_path = /tmp/lavinmq.sock unix_proxy_protocol = true + +[replication] +bind = 0.0.0.0 +port = 5679 diff --git a/src/lavinmq/replication/server.cr b/src/lavinmq/replication/server.cr index 4ac0e09d0e..0f208b90f3 100644 --- a/src/lavinmq/replication/server.cr +++ b/src/lavinmq/replication/server.cr @@ -24,6 +24,7 @@ module LavinMQ @followers = Array(Follower).new @password : String @files = Hash(String, MFile?).new + @min_followers = 5_i64 def initialize @password = password @@ -98,6 +99,12 @@ module LavinMQ end end + def min_followers : Int64 + @lock.synchronize do + @min_followers.dup # for thread safety + end + end + private def password : String path = File.join(Config.instance.data_dir, ".replication_secret") begin diff --git a/src/lavinmq/vhost.cr b/src/lavinmq/vhost.cr index 23a582825e..154f7ac4f4 100644 --- a/src/lavinmq/vhost.cr +++ b/src/lavinmq/vhost.cr @@ -113,6 +113,9 @@ module LavinMQ headers = properties.headers find_all_queues(ex, msg.routing_key, headers, visited, found_queues) headers.try(&.delete("BCC")) + puts @replicator.min_followers + puts @replicator.followers.size + return false if @replicator.followers.size < @replicator.min_followers # @log.debug { "publish queues#found=#{found_queues.size}" } if found_queues.empty? ex.unroutable_count += 1