Skip to content

Commit

Permalink
use channels for blocking max_lag
Browse files Browse the repository at this point in the history
  • Loading branch information
kickster97 committed Sep 11, 2023
1 parent 2d5058a commit 6ff9af3
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 10 deletions.
4 changes: 2 additions & 2 deletions extras/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ tcp_proxy_protocol = false
tls_port = 5671
unix_path = /tmp/lavinmq.sock
unix_proxy_protocol = true
min_followers = 0
max_lag = 0
min_followers = 1
max_lag = 100000
4 changes: 2 additions & 2 deletions src/lavinmq/config.cr
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ module LavinMQ
DEFAULT_LOG_LEVEL = Log::Severity::Info

property data_dir : String = ENV.fetch("STATE_DIRECTORY", "/var/lib/lavinmq")
property config_file = File.exists?(File.join(ENV.fetch("CONFIGURATION_DIRECTORY", "/etc/lavinmq"), "lavinmq.ini")) ? File.join(ENV.fetch("CONFIGURATION_DIRECTORY", "/etc/lavinmq"), "lavinmq.ini") : ""
property config_file = File.exists?(File.join(ENV.fetch("CONFIGURATION_DIRECTORY", "/etc/lavinmq"), "lavinmq.ini")) ? File.join(ENV.fetch("CONFIGURATION_DIRECTORY", "/etc/lavinmq"), "lavinmq.ini") : "/Users/christinadahlen/84codes/lavinmq/extras/config.ini"
property log_file : String? = nil
property log_level : Log::Severity = DEFAULT_LOG_LEVEL
property amqp_bind = "127.0.0.1"
Expand Down Expand Up @@ -48,7 +48,7 @@ module LavinMQ
property replication_bind : String? = nil
property replication_port = 5679
property min_followers : Int64 = 0
property max_lag : Int64 = 0
property max_lag : Int64? = nil
@@instance : Config = self.new

def self.instance : LavinMQ::Config
Expand Down
31 changes: 28 additions & 3 deletions src/lavinmq/replication/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ module LavinMQ
@password : String
@files = Hash(String, MFile?).new
@min_followers : Int64
@max_lag : Int64
@max_lag : Int64? = nil
@resume_publishing : Channel(Nil)
@msg_action : Channel(Nil)

def initialize
@password = password
Expand All @@ -34,6 +36,9 @@ module LavinMQ
@tcp.reuse_address = true
@min_followers = Config.instance.min_followers
@max_lag = Config.instance.max_lag
@resume_publishing = Channel(Nil).new
@msg_action = Channel(Nil).new
spawn check_max_lag_loop, name: "Replication#check_max_lag_loop"
end

def clear
Expand Down Expand Up @@ -108,14 +113,33 @@ module LavinMQ

getter min_followers

def has_max_lag?
def max_lag?
return false if @followers.empty?
return false unless has_min_followers?
return false unless max_lag = @max_lag
@followers.all? do |f|
f.lag <= @max_lag
f.lag > max_lag
end
end

getter resume_publishing
getter msg_action
getter max_lag

def check_max_lag_loop
loop do
select
when @msg_action.receive
when timeout(5.seconds)
end
has_max_lag = max_lag?
if !has_max_lag
@resume_publishing.try_send nil
end
Fiber.yield
end
end

private def password : String
path = File.join(Config.instance.data_dir, ".replication_secret")
begin
Expand Down Expand Up @@ -231,6 +255,7 @@ module LavinMQ
len = socket.read_bytes(Int64, IO::ByteFormat::LittleEndian)
@acked_bytes += len
@ack.try_send nil
@server.msg_action.try_send nil
end
rescue IO::Error
end
Expand Down
10 changes: 8 additions & 2 deletions src/lavinmq/vhost.cr
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ module LavinMQ
"redeliver", "reject", "consumer_added", "consumer_removed"})

getter name, exchanges, queues, data_dir, operator_policies, policies, parameters, shovels,
direct_reply_consumers, connections, dir, users
direct_reply_consumers, connections, dir, users, replicator
property? flow = true
getter? closed = false
property max_connections : Int32?
Expand Down Expand Up @@ -111,7 +111,13 @@ module LavinMQ
headers = msg.properties.headers
find_all_queues(ex, msg.routing_key, headers, visited, found_queues)
headers.delete("BCC") if headers
return false unless @replicator.has_min_followers? || @replicator.has_max_lag?

if @replicator.max_lag?
pp "blocking publish"
@replicator.resume_publishing.receive
pp "publish resumed"
end

if found_queues.empty?
ex.unroutable_count += 1
return false
Expand Down
2 changes: 1 addition & 1 deletion static/js/nodes.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ const updateDetails = (nodeStats) => {
}

const updateFollowerSettings = (nodeStats) => {
document.getElementById('tr-max-lag').textContent = nodeStats.max_lag
document.getElementById('tr-max-lag').textContent = nodeStats.max_lag || 'No max_lag value specified'
document.getElementById('tr-min-followers').textContent = nodeStats.min_followers
}

Expand Down

0 comments on commit 6ff9af3

Please sign in to comment.