Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Log#emit and Log::Metadata #573

Merged
merged 2 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/lavinmq/client/amqp_connection.cr
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ module LavinMQ

def self.start(socket, connection_info, vhosts, users) : Client?
remote_address = connection_info.src
Log.context.set(address: remote_address.to_s)
socket.read_timeout = 15.seconds
if confirm_header(socket)
if start_ok = start(socket)
Expand Down
29 changes: 15 additions & 14 deletions src/lavinmq/client/channel.cr
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ module LavinMQ
getter id, name
property? running = true
getter? flow = true
getter log : Log
getter consumers = Array(Consumer).new
getter prefetch_count = 0_u16
getter global_prefetch_count = 0_u16
Expand All @@ -39,8 +38,10 @@ module LavinMQ

rate_stats({"ack", "get", "publish", "deliver", "redeliver", "reject", "confirm", "return_unroutable"})

Log = ::Log.for("channel")

def initialize(@client : Client, @id : UInt16)
@log = Log.for "channel[client=#{@client.remote_address} id=#{@id}]"
@metadata = ::Log::Metadata.new(nil, {client: @client.remote_address.to_s, channel: @id.to_i})
@name = "#{@client.channel_name_prefix}[#{@id}]"
end

Expand Down Expand Up @@ -81,7 +82,7 @@ module LavinMQ

def send(frame)
unless @running
@log.debug { "Channel is closed so is not sending #{frame.inspect}" }
Log.debug &.emit "Channel is closed so is not sending #{frame.inspect}", @metadata
return false
end
@client.send frame, true
Expand Down Expand Up @@ -138,7 +139,7 @@ module LavinMQ
if frame.body_size > Config.instance.max_message_size
error = "message size #{frame.body_size} larger than max size #{Config.instance.max_message_size}"
@client.send_precondition_failed(frame, error)
@log.warn { "Message size exceeded, #{frame.body_size}/#{Config.instance.max_message_size}" }
Log.warn &.emit "Message size exceeded, #{frame.body_size}/#{Config.instance.max_message_size}", @metadata
return
end
@next_msg_size = frame.body_size
Expand Down Expand Up @@ -257,7 +258,7 @@ module LavinMQ
current_user = @client.user
if user_id && user_id != current_user.name && !current_user.can_impersonate?
text = "Message's user_id property '#{user_id}' doesn't match actual user '#{current_user.name}'"
@log.error { text }
Log.error &.emit text, @metadata
raise Error::PreconditionFailed.new(text)
end
end
Expand Down Expand Up @@ -340,7 +341,7 @@ module LavinMQ
@client.send_precondition_failed(frame, "Direct replys must be consumed in no-ack mode")
return
end
@log.debug { "Saving direct reply consumer #{frame.consumer_tag}" }
Log.debug &.emit "Saving direct reply consumer #{frame.consumer_tag}", @metadata
@direct_reply_consumer = frame.consumer_tag
@client.vhost.direct_reply_consumers[frame.consumer_tag] = self
unless frame.no_wait
Expand Down Expand Up @@ -402,11 +403,11 @@ module LavinMQ
# @unacked is always sorted so can do a binary search
# optimization for acking first unacked
if @unacked[0]?.try(&.tag) == delivery_tag
# @log.debug { "Unacked found tag:#{delivery_tag} at front" }
# Log.debug &.emit "Unacked found tag:#{delivery_tag} at front", @metadata
found = @unacked.shift
elsif idx = @unacked.bsearch_index { |unack, _| unack.tag >= delivery_tag }
return nil unless @unacked[idx].tag == delivery_tag
# @log.debug { "Unacked bsearch found tag:#{delivery_tag} at index:#{idx}" }
# Log.debug &.emit "Unacked bsearch found tag:#{delivery_tag} at index:#{idx}", @metadata
found = @unacked.delete_at(idx)
end
end
Expand All @@ -426,7 +427,7 @@ module LavinMQ
idx = @unacked.bsearch_index { |unack, _| unack.tag >= delivery_tag }
return nil unless idx
return nil unless @unacked[idx].tag == delivery_tag
# @log.debug { "Unacked bsearch found tag:#{delivery_tag} at index:#{idx}" }
# Log.debug &.emit "Unacked bsearch found tag:#{delivery_tag} at index:#{idx}", @metadata
(idx + 1).times do
yield @unacked.shift
count += 1
Expand Down Expand Up @@ -499,7 +500,7 @@ module LavinMQ
return
end

@log.debug { "Rejecting #{frame.inspect}" }
Log.debug &.emit "Rejecting #{frame.inspect}", @metadata
if unack = delete_unacked(frame.delivery_tag)
do_reject(frame.requeue, unack)
else
Expand Down Expand Up @@ -630,15 +631,15 @@ module LavinMQ
end
@unack_lock.synchronize do
@unacked.each do |unack|
@log.debug { "Requeing unacked msg #{unack.sp}" }
Log.debug &.emit "Requeing unacked msg #{unack.sp}", @metadata
unack.queue.reject(unack.sp, true)
end
@unacked.clear
end
@has_capacity.close
@next_msg_body_file.try &.close
@client.vhost.event_tick(EventType::ChannelClosed)
@log.debug { "Closed" }
Log.debug &.emit "Closed", @metadata
end

protected def next_delivery_tag(queue : Queue, sp, no_ack, consumer) : UInt64
Expand Down Expand Up @@ -690,7 +691,7 @@ module LavinMQ
end

def cancel_consumer(frame)
@log.debug { "Cancelling consumer '#{frame.consumer_tag}'" }
Log.debug &.emit "Cancelling consumer '#{frame.consumer_tag}'", @metadata
if idx = @consumers.index { |cons| cons.tag == frame.consumer_tag }
c = @consumers.delete_at idx
c.close
Expand Down Expand Up @@ -751,7 +752,7 @@ module LavinMQ
@tx_acks.each do |tx_ack|
if idx = @unacked.bsearch_index { |u, _| u.tag >= tx_ack.delivery_tag }
raise "BUG: Delivery tag not found" unless @unacked[idx].tag == tx_ack.delivery_tag
@log.debug { "Unacked bsearch found tag:#{tx_ack.delivery_tag} at index:#{idx}" }
Log.debug &.emit "Unacked bsearch found tag:#{tx_ack.delivery_tag} at index:#{idx}", @metadata
if tx_ack.multiple
(idx + 1).times do
unack = @unacked.shift
Expand Down
35 changes: 18 additions & 17 deletions src/lavinmq/client/channel/consumer.cr
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ module LavinMQ
class Channel
class Consumer
include SortableJSON
Log = ::Log.for("consumer")
getter tag : String
getter priority : Int32
getter? exclusive : Bool
Expand All @@ -15,8 +16,8 @@ module LavinMQ
getter prefetch_count = 0u16
getter unacked = 0_u32
getter? closed = false
@log : Log
@flow : Bool
@metadata : ::Log::Metadata

def initialize(@channel : Client::Channel, @queue : Queue, frame : AMQP::Frame::Basic::Consume)
@tag = frame.consumer_tag
Expand All @@ -25,7 +26,7 @@ module LavinMQ
@priority = consumer_priority(frame) # Must be before ConsumeOk, can close channel
@prefetch_count = @channel.prefetch_count
@flow = @channel.flow?
@log = @channel.log.for "consumer=#{@tag}"
@metadata = @channel.@metadata.extend({consumer: @tag})
spawn deliver_loop, name: "Consumer deliver loop", same_thread: true
end

Expand Down Expand Up @@ -66,21 +67,21 @@ module LavinMQ
break
end
{% unless flag?(:release) %}
@log.debug { "Getting a new message" }
Log.debug &.emit "Getting a new message", @metadata
{% end %}
queue.consume_get(self) do |env|
deliver(env.message, env.segment_position, env.redelivered)
end
Fiber.yield if (i &+= 1) % 32768 == 0
end
rescue ex : ClosedError | Queue::ClosedError | Client::Channel::ClosedError | ::Channel::ClosedError
@log.debug { "deliver loop exiting: #{ex.inspect}" }
Log.debug &.emit "deliver loop exiting: #{ex.inspect}", @metadata
end

private def wait_for_global_capacity
ch = @channel
return if ch.has_capacity?
@log.debug { "Waiting for global prefetch capacity" }
Log.debug &.emit "Waiting for global prefetch capacity", @metadata
select
when ch.has_capacity.receive
when @notify_closed.receive
Expand All @@ -91,18 +92,18 @@ module LavinMQ
private def wait_for_single_active_consumer
case @queue.single_active_consumer
when self
@log.debug { "This consumer is the single active consumer" }
Log.debug &.emit "This consumer is the single active consumer", @metadata
when nil
@log.debug { "The queue isn't a single active consumer queue" }
Log.debug &.emit "The queue isn't a single active consumer queue", @metadata
else
@log.debug { "Waiting for this consumer to become the single active consumer" }
Log.debug &.emit "Waiting for this consumer to become the single active consumer", @metadata
loop do
select
when sca = @queue.single_active_consumer_change.receive
if sca == self
break
else
@log.debug { "New single active consumer, but not me" }
Log.debug &.emit "New single active consumer, but not me", @metadata
end
when @notify_closed.receive
break
Expand All @@ -116,7 +117,7 @@ module LavinMQ
# single active consumer queues can't have priority consumers
if @queue.has_priority_consumers? && @queue.single_active_consumer.nil?
if @queue.consumers.any? { |c| c.priority > @priority && c.accepts? }
@log.debug { "Waiting for higher priority consumers to not have capacity" }
Log.debug &.emit "Waiting for higher priority consumers to not have capacity", @metadata
begin
::Channel.receive_first(@queue.consumers.map(&.has_capacity))
rescue ::Channel::ClosedError
Expand All @@ -128,10 +129,10 @@ module LavinMQ

private def wait_for_queue_ready
if @queue.empty?
@log.debug { "Waiting for queue not to be empty" }
Log.debug &.emit "Waiting for queue not to be empty", @metadata
select
when is_empty = @queue.empty_change.receive
@log.debug { "Queue is #{is_empty ? "" : "not"} empty" }
Log.debug &.emit "Queue is #{is_empty ? "" : "not"} empty"
when @notify_closed.receive
end
return true
Expand All @@ -140,10 +141,10 @@ module LavinMQ

private def wait_for_paused_queue
if @queue.paused?
@log.debug { "Waiting for queue not to be paused" }
Log.debug &.emit "Waiting for queue not to be paused", @metadata
select
when is_paused = @queue.paused_change.receive
@log.debug { "Queue is #{is_paused ? "" : "not"} paused" }
Log.debug &.emit "Queue is #{is_paused ? "" : "not"} paused"
when @notify_closed.receive
end
return true
Expand All @@ -152,9 +153,9 @@ module LavinMQ

private def wait_for_flow
unless @flow
@log.debug { "Waiting for flow" }
Log.debug &.emit "Waiting for flow", @metadata
is_flow = @flow_change.receive
@log.debug { "Channel flow=#{is_flow}" }
Log.debug &.emit "Channel flow=#{is_flow}", @metadata
return true
end
end
Expand All @@ -163,7 +164,7 @@ module LavinMQ
private def wait_for_capacity : Nil
if @prefetch_count > 0
until @unacked < @prefetch_count
@log.debug { "Waiting for prefetch capacity" }
Log.debug &.emit "Waiting for prefetch capacity", @metadata
@has_capacity.receive
end
end
Expand Down
10 changes: 5 additions & 5 deletions src/lavinmq/client/channel/stream_consumer.cr
Original file line number Diff line number Diff line change
Expand Up @@ -51,25 +51,25 @@ module LavinMQ
break
end
{% unless flag?(:release) %}
@log.debug { "Getting a new message" }
Log.debug &.emit "Getting a new message", @metadata
{% end %}
stream_queue.consume_get(self) do |env|
deliver(env.message, env.segment_position, env.redelivered)
end
Fiber.yield if (i &+= 1) % 32768 == 0
end
rescue ex : ClosedError | Queue::ClosedError | Client::Channel::ClosedError | ::Channel::ClosedError
@log.debug { "deliver loop exiting: #{ex.inspect}" }
Log.debug &.emit "deliver loop exiting: #{ex.inspect}", @metadata
end

private def wait_for_queue_ready
if @offset > stream_queue.last_offset && @requeued.empty?
@log.debug { "Waiting for queue not to be empty" }
Log.debug &.emit "Waiting for queue not to be empty", @metadata
select
when stream_queue.new_messages.receive
@log.debug { "Queue is not empty" }
Log.debug &.emit "Queue is not empty", @metadata
when @has_requeued.receive
@log.debug { "Got a requeued message" }
Log.debug &.emit "Got a requeued message", @metadata
when @notify_closed.receive
end
return true
Expand Down
Loading
Loading