Skip to content

Commit

Permalink
Make max deleted defintions a config option
Browse files Browse the repository at this point in the history
  • Loading branch information
carlhoerberg committed Sep 8, 2023
1 parent a35548a commit 9a03fa4
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 38 deletions.
12 changes: 4 additions & 8 deletions spec/vhost_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,15 @@ describe LavinMQ::VHost do
end

it "should compact definitions during runtime" do
Server.vhosts.create("test")
v = Server.vhosts["test"].not_nil!
(LavinMQ::VHost::DEFINITIONS_DIRT_COMPACT_THREASHOLD - 1).times do
v = Server.vhosts.create("test")
(LavinMQ::Config.instance.max_deleted_definitions - 1).times do
v.declare_queue("q", true, false)
v.delete_queue("q")
end
v.@definitions_dirt_counter.should eq(LavinMQ::VHost::DEFINITIONS_DIRT_COMPACT_THREASHOLD - 1)
definitions_file_pos_before = v.@definitions_file.pos
file_size = v.@definitions_file.size
v.declare_queue("q", true, false)
v.delete_queue("q")
wait_for(timeout: 1.second) { v.@definitions_dirt_counter.zero? }
v.@definitions_file.pos.should be < definitions_file_pos_before
v.@definitions_dirt_counter.should eq 0
v.@definitions_file.size.should be < file_size
end

describe "auto add permissions" do
Expand Down
44 changes: 23 additions & 21 deletions src/lavinmq/config.cr
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ module LavinMQ
property replication_follow : URI? = nil
property replication_bind : String? = nil
property replication_port = 5679
property max_deleted_definitions = 8192 # number of deleted queues, unbinds etc that compacts the definitions file
@@instance : Config = self.new

def self.instance : LavinMQ::Config
Expand Down Expand Up @@ -80,27 +81,28 @@ module LavinMQ
private def parse_main(settings)
settings.each do |config, v|
case config
when "data_dir" then @data_dir = v
when "data_dir_lock" then @data_dir_lock = true?(v)
when "log_level" then @log_level = Log::Severity.parse(v)
when "log_file" then @log_file = v
when "stats_interval" then @stats_interval = v.to_i32
when "stats_log_size" then @stats_log_size = v.to_i32
when "segment_size" then @segment_size = v.to_i32
when "set_timestamp" then @set_timestamp = true?(v)
when "socket_buffer_size" then @socket_buffer_size = v.to_i32
when "tcp_nodelay" then @tcp_nodelay = true?(v)
when "tcp_keepalive" then @tcp_keepalive = tcp_keepalive?(v)
when "tcp_recv_buffer_size" then @tcp_recv_buffer_size = v.to_i32?
when "tcp_send_buffer_size" then @tcp_send_buffer_size = v.to_i32?
when "tls_cert" then @tls_cert_path = v
when "tls_key" then @tls_key_path = v
when "tls_ciphers" then @tls_ciphers = v
when "tls_min_version" then @tls_min_version = v
when "guest_only_loopback" then @guest_only_loopback = true?(v)
when "log_exchange" then @log_exchange = true?(v)
when "free_disk_min" then @free_disk_min = v.to_i64
when "free_disk_warn" then @free_disk_warn = v.to_i64
when "data_dir" then @data_dir = v
when "data_dir_lock" then @data_dir_lock = true?(v)
when "log_level" then @log_level = Log::Severity.parse(v)
when "log_file" then @log_file = v
when "stats_interval" then @stats_interval = v.to_i32
when "stats_log_size" then @stats_log_size = v.to_i32
when "segment_size" then @segment_size = v.to_i32
when "set_timestamp" then @set_timestamp = true?(v)
when "socket_buffer_size" then @socket_buffer_size = v.to_i32
when "tcp_nodelay" then @tcp_nodelay = true?(v)
when "tcp_keepalive" then @tcp_keepalive = tcp_keepalive?(v)
when "tcp_recv_buffer_size" then @tcp_recv_buffer_size = v.to_i32?
when "tcp_send_buffer_size" then @tcp_send_buffer_size = v.to_i32?
when "tls_cert" then @tls_cert_path = v
when "tls_key" then @tls_key_path = v
when "tls_ciphers" then @tls_ciphers = v
when "tls_min_version" then @tls_min_version = v
when "guest_only_loopback" then @guest_only_loopback = true?(v)
when "log_exchange" then @log_exchange = true?(v)
when "free_disk_min" then @free_disk_min = v.to_i64
when "free_disk_warn" then @free_disk_warn = v.to_i64
when "max_deleted_definitions" then @max_deleted_definitions = v.to_i
else
STDERR.puts "WARNING: Unrecognized configuration 'main/#{config}'"
end
Expand Down
13 changes: 4 additions & 9 deletions src/lavinmq/vhost.cr
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ module LavinMQ
include SortableJSON
include Stats

DEFINITIONS_DIRT_COMPACT_THREASHOLD = 10_000

rate_stats({"channel_closed", "channel_created", "connection_closed", "connection_created",
"queue_declared", "queue_deleted", "ack", "deliver", "get", "publish", "confirm",
"redeliver", "reject", "consumer_added", "consumer_removed"})
Expand All @@ -42,7 +40,7 @@ module LavinMQ
@definitions_file : File
@definitions_lock = Mutex.new(:reentrant)
@definitions_file_path : String
@definitions_dirt_counter = 0
@definitions_deletes = 0

def initialize(@name : String, @server_data_dir : String, @users : UserStore, @replicator : Replication::Server)
@log = Log.for "vhost[name=#{@name}]"
Expand Down Expand Up @@ -614,7 +612,6 @@ module LavinMQ
@replicator.replace_file @definitions_file_path
@definitions_file.close
@definitions_file = io
@definitions_dirt_counter = 0
end
end

Expand All @@ -625,11 +622,9 @@ module LavinMQ
@replicator.append @definitions_file_path, bytes
@definitions_file.fsync
if dirty
@definitions_dirt_counter += 1
# By doing equality comparision we'll only start one fiber
# without having to keep track if it's started or not
if @definitions_dirt_counter == DEFINITIONS_DIRT_COMPACT_THREASHOLD
spawn(name: "VHost #{name} definitions compact") { compact! }
if (@definitions_deletes += 1) >= Config.instance.max_deleted_definitions
compact!
@definitions_deletes = 0
end
end
end
Expand Down

0 comments on commit 9a03fa4

Please sign in to comment.