diff --git a/extras/config.ini b/extras/config.ini index 56d2527693..549bda723e 100644 --- a/extras/config.ini +++ b/extras/config.ini @@ -22,5 +22,5 @@ tcp_proxy_protocol = false tls_port = 5671 unix_path = /tmp/lavinmq.sock unix_proxy_protocol = true -min_followers = 0 -max_lag = 0 +min_followers = 1 +max_lag = 100000 diff --git a/src/lavinmq/config.cr b/src/lavinmq/config.cr index fabc517d0c..7dd335eb64 100644 --- a/src/lavinmq/config.cr +++ b/src/lavinmq/config.cr @@ -6,7 +6,7 @@ module LavinMQ DEFAULT_LOG_LEVEL = Log::Severity::Info property data_dir : String = ENV.fetch("STATE_DIRECTORY", "/var/lib/lavinmq") - property config_file = File.exists?(File.join(ENV.fetch("CONFIGURATION_DIRECTORY", "/etc/lavinmq"), "lavinmq.ini")) ? File.join(ENV.fetch("CONFIGURATION_DIRECTORY", "/etc/lavinmq"), "lavinmq.ini") : "" + property config_file = File.exists?(File.join(ENV.fetch("CONFIGURATION_DIRECTORY", "/etc/lavinmq"), "lavinmq.ini")) ? File.join(ENV.fetch("CONFIGURATION_DIRECTORY", "/etc/lavinmq"), "lavinmq.ini") : "/Users/christinadahlen/84codes/lavinmq/extras/config.ini" property log_file : String? = nil property log_level : Log::Severity = DEFAULT_LOG_LEVEL property amqp_bind = "127.0.0.1" @@ -48,7 +48,7 @@ module LavinMQ property replication_bind : String? = nil property replication_port = 5679 property min_followers : Int64 = 0 - property max_lag : Int64 = 0 + property max_lag : Int64? = nil @@instance : Config = self.new def self.instance : LavinMQ::Config diff --git a/src/lavinmq/replication/server.cr b/src/lavinmq/replication/server.cr index b546dcef80..dcd37b9541 100644 --- a/src/lavinmq/replication/server.cr +++ b/src/lavinmq/replication/server.cr @@ -25,7 +25,9 @@ module LavinMQ @password : String @files = Hash(String, MFile?).new @min_followers : Int64 - @max_lag : Int64 + @max_lag : Int64? = nil + @resume_publishing : Channel(Nil) + @msg_action : Channel(Nil) def initialize @password = password @@ -34,6 +36,9 @@ module LavinMQ @tcp.reuse_address = true @min_followers = Config.instance.min_followers @max_lag = Config.instance.max_lag + @resume_publishing = Channel(Nil).new + @msg_action = Channel(Nil).new + spawn check_max_lag_loop, name: "Replication#check_max_lag_loop" end def clear @@ -108,14 +113,33 @@ module LavinMQ getter min_followers - def has_max_lag? + def max_lag? + return false if @followers.empty? + return false unless has_min_followers? + return false unless max_lag = @max_lag @followers.all? do |f| - f.lag <= @max_lag + f.lag > max_lag end end + getter resume_publishing + getter msg_action getter max_lag + def check_max_lag_loop + loop do + select + when @msg_action.receive + when timeout(5.seconds) + end + has_max_lag = max_lag? + if !has_max_lag + @resume_publishing.try_send nil + end + Fiber.yield + end + end + private def password : String path = File.join(Config.instance.data_dir, ".replication_secret") begin @@ -231,6 +255,7 @@ module LavinMQ len = socket.read_bytes(Int64, IO::ByteFormat::LittleEndian) @acked_bytes += len @ack.try_send nil + @server.msg_action.try_send nil end rescue IO::Error end diff --git a/src/lavinmq/vhost.cr b/src/lavinmq/vhost.cr index 9961b8fc4b..c4b7619673 100644 --- a/src/lavinmq/vhost.cr +++ b/src/lavinmq/vhost.cr @@ -24,7 +24,7 @@ module LavinMQ "redeliver", "reject", "consumer_added", "consumer_removed"}) getter name, exchanges, queues, data_dir, operator_policies, policies, parameters, shovels, - direct_reply_consumers, connections, dir, users + direct_reply_consumers, connections, dir, users, replicator property? flow = true getter? closed = false property max_connections : Int32? @@ -111,7 +111,13 @@ module LavinMQ headers = msg.properties.headers find_all_queues(ex, msg.routing_key, headers, visited, found_queues) headers.delete("BCC") if headers - return false unless @replicator.has_min_followers? || @replicator.has_max_lag? + + if @replicator.max_lag? + pp "blocking publish" + @replicator.resume_publishing.receive + pp "publish resumed" + end + if found_queues.empty? ex.unroutable_count += 1 return false diff --git a/static/js/nodes.js b/static/js/nodes.js index 0fae7ec6c8..ee9accff6a 100644 --- a/static/js/nodes.js +++ b/static/js/nodes.js @@ -74,7 +74,7 @@ const updateDetails = (nodeStats) => { } const updateFollowerSettings = (nodeStats) => { - document.getElementById('tr-max-lag').textContent = nodeStats.max_lag + document.getElementById('tr-max-lag').textContent = nodeStats.max_lag || 'No max_lag value specified' document.getElementById('tr-min-followers').textContent = nodeStats.min_followers }