Skip to content

Commit

Permalink
Options for different publish properties
Browse files Browse the repository at this point in the history
Such as content_type, content_encoding etc.

Fixes #13
  • Loading branch information
carlhoerberg committed Apr 17, 2024
1 parent c798423 commit b38aab3
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 5 deletions.
7 changes: 3 additions & 4 deletions src/amqpcat.cr
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@ class AMQPCat
@client = AMQP::Client.new(u)
end

def produce(exchange : String, routing_key : String, exchange_type : String, publish_confirm = false)
def produce(exchange : String, routing_key : String, exchange_type : String, publish_confirm : Bool, props : AMQP::Client::Properties)
STDIN.blocking = false
loop do
connection = @client.connect
channel = connection.channel
open_channel_declare_exchange(connection, exchange, exchange_type)
props = AMQP::Client::Properties.new(delivery_mode: 2_u8)
while line = STDIN.gets
if publish_confirm
channel.basic_publish_confirm line, exchange, routing_key, props: props
Expand Down Expand Up @@ -104,7 +103,7 @@ class AMQPCat
end
end

private def write_headers(io, headers)
private def print_headers(io, headers)
headers.each do |k, v|
io << k << "=" << v << "\n"
end
Expand All @@ -127,7 +126,7 @@ class AMQPCat
io << msg.routing_key
when 'h'
if headers = msg.properties.headers
write_headers(io, headers)
print_headers(io, headers)
end
when 't'
io << msg.properties.content_type
Expand Down
7 changes: 6 additions & 1 deletion src/cli.cr
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ routing_key : String? = nil
format = "%s\n"
publish_confirm = false
offset = "next"
props = AMQP::Client::Properties.new(delivery_mode: 2_u8)

FORMAT_STRING_HELP = <<-HELP
Format string (default "%s\\n")
Expand Down Expand Up @@ -46,6 +47,10 @@ p = OptionParser.parse do |parser|
end
end
parser.on("-f FORMAT", "--format=FORMAT", FORMAT_STRING_HELP) { |v| format = v }
parser.on("--content-type=TYPE", "Content type header") { |v| props.content_type = v }
parser.on("--content-encoding=ENC", "Content encoding header") { |v| props.content_encoding = v }
parser.on("--priority=LEVEL", "Priority header") { |v| props.priority = v.to_u8? || abort "Priority must be between 0 and 255" }
parser.on("--expiration=TIME", "Expiration header (ms before msg is dead lettered)") { |v| props.expiration = v }
parser.on("-v", "--version", "Display version") { puts AMQPCat::VERSION; exit 0 }
parser.on("-h", "--help", "Show this help message") { puts parser; exit 0 }
parser.invalid_option do |flag|
Expand All @@ -61,7 +66,7 @@ when :producer
STDERR.puts "Error: Missing exchange or queue argument."
abort p
end
cat.produce(exchange, routing_key || queue || "", exchange_type, publish_confirm)
cat.produce(exchange, routing_key || queue || "", exchange_type, publish_confirm, props)
when :consumer
unless routing_key || queue
STDERR.puts "Error: Missing routing key or queue argument."
Expand Down

0 comments on commit b38aab3

Please sign in to comment.