Skip to content

Commit

Permalink
Make purge yield interval a constant, not a config option
Browse files Browse the repository at this point in the history
  • Loading branch information
spuun committed Sep 27, 2023
1 parent 06d1ec1 commit d8f10be
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 32 deletions.
6 changes: 2 additions & 4 deletions spec/queue_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -335,12 +335,10 @@ describe LavinMQ::Queue do
Dir.mkdir_p tmpdir
store = LavinMQ::Queue::MessageStore.new(tmpdir, nil)

10.times do
(LavinMQ::Queue::MessageStore::PURGE_YIELD_INTERVAL * 2 + 1).times do
store.push(LavinMQ::Message.new(0i64, "a", "b", AMQ::Protocol::Properties.new, 0u64, IO::Memory.new(0)))
end

LavinMQ::Config.instance.queue_purge_yield_interval = 2

yields = 0
done = Channel(Nil).new
spawn(name: "yield counter", same_thread: true) do
Expand All @@ -361,7 +359,7 @@ describe LavinMQ::Queue do

done.receive

yields.should eq 5
yields.should eq 2
ensure
store.try &.delete
end
Expand Down
48 changes: 23 additions & 25 deletions src/lavinmq/config.cr
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ module LavinMQ
property replication_follow : URI? = nil
property replication_bind : String? = nil
property replication_port = 5679
property max_deleted_definitions = 8192 # number of deleted queues, unbinds etc that compacts the definitions file
property queue_purge_yield_interval = 1024 # When purging queues, yield every n:th message
property max_deleted_definitions = 8192 # number of deleted queues, unbinds etc that compacts the definitions file
@@instance : Config = self.new

def self.instance : LavinMQ::Config
Expand Down Expand Up @@ -82,29 +81,28 @@ module LavinMQ
private def parse_main(settings)
settings.each do |config, v|
case config
when "data_dir" then @data_dir = v
when "data_dir_lock" then @data_dir_lock = true?(v)
when "log_level" then @log_level = Log::Severity.parse(v)
when "log_file" then @log_file = v
when "stats_interval" then @stats_interval = v.to_i32
when "stats_log_size" then @stats_log_size = v.to_i32
when "segment_size" then @segment_size = v.to_i32
when "set_timestamp" then @set_timestamp = true?(v)
when "socket_buffer_size" then @socket_buffer_size = v.to_i32
when "tcp_nodelay" then @tcp_nodelay = true?(v)
when "tcp_keepalive" then @tcp_keepalive = tcp_keepalive?(v)
when "tcp_recv_buffer_size" then @tcp_recv_buffer_size = v.to_i32?
when "tcp_send_buffer_size" then @tcp_send_buffer_size = v.to_i32?
when "tls_cert" then @tls_cert_path = v
when "tls_key" then @tls_key_path = v
when "tls_ciphers" then @tls_ciphers = v
when "tls_min_version" then @tls_min_version = v
when "guest_only_loopback" then @guest_only_loopback = true?(v)
when "log_exchange" then @log_exchange = true?(v)
when "free_disk_min" then @free_disk_min = v.to_i64
when "free_disk_warn" then @free_disk_warn = v.to_i64
when "max_deleted_definitions" then @max_deleted_definitions = v.to_i
when "queue_purge_yield_interval" then @queue_purge_yield_interval = v.to_i
when "data_dir" then @data_dir = v
when "data_dir_lock" then @data_dir_lock = true?(v)
when "log_level" then @log_level = Log::Severity.parse(v)
when "log_file" then @log_file = v
when "stats_interval" then @stats_interval = v.to_i32
when "stats_log_size" then @stats_log_size = v.to_i32
when "segment_size" then @segment_size = v.to_i32
when "set_timestamp" then @set_timestamp = true?(v)
when "socket_buffer_size" then @socket_buffer_size = v.to_i32
when "tcp_nodelay" then @tcp_nodelay = true?(v)
when "tcp_keepalive" then @tcp_keepalive = tcp_keepalive?(v)
when "tcp_recv_buffer_size" then @tcp_recv_buffer_size = v.to_i32?
when "tcp_send_buffer_size" then @tcp_send_buffer_size = v.to_i32?
when "tls_cert" then @tls_cert_path = v
when "tls_key" then @tls_key_path = v
when "tls_ciphers" then @tls_ciphers = v
when "tls_min_version" then @tls_min_version = v
when "guest_only_loopback" then @guest_only_loopback = true?(v)
when "log_exchange" then @log_exchange = true?(v)
when "free_disk_min" then @free_disk_min = v.to_i64
when "free_disk_warn" then @free_disk_warn = v.to_i64
when "max_deleted_definitions" then @max_deleted_definitions = v.to_i
else
STDERR.puts "WARNING: Unrecognized configuration 'main/#{config}'"
end
Expand Down
6 changes: 3 additions & 3 deletions src/lavinmq/queue/message_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ module LavinMQ
# Messages are refered to as SegmentPositions
# Deleted messages are written to acks.#{segment}
class MessageStore
Log = ::Log.for("MessageStore")
PURGE_YIELD_INTERVAL = 16_384
Log = ::Log.for("MessageStore")
@segments = Hash(UInt32, MFile).new
@deleted = Hash(UInt32, Array(UInt32)).new
@segment_msg_count = Hash(UInt32, UInt32).new(0u32)
Expand Down Expand Up @@ -177,12 +178,11 @@ module LavinMQ
def purge(max_count : Int = UInt32::MAX) : UInt32
raise ClosedError.new if @closed
count = 0u32
yield_interval = {1, Config.instance.queue_purge_yield_interval}.max
while count < max_count && (env = shift?)
delete(env.segment_position)
count += 1
break if count >= max_count
Fiber.yield if (count % yield_interval).zero?
Fiber.yield if (count % PURGE_YIELD_INTERVAL).zero?
end
count
end
Expand Down

0 comments on commit d8f10be

Please sign in to comment.