diff --git a/src/amqpcat.cr b/src/amqpcat.cr index 86c7b9f..ceaf447 100644 --- a/src/amqpcat.cr +++ b/src/amqpcat.cr @@ -12,13 +12,12 @@ class AMQPCat @client = AMQP::Client.new(u) end - def produce(exchange : String, routing_key : String, exchange_type : String, publish_confirm = false) + def produce(exchange : String, routing_key : String, exchange_type : String, publish_confirm : Bool, props : AMQP::Client::Properties) STDIN.blocking = false loop do connection = @client.connect channel = connection.channel open_channel_declare_exchange(connection, exchange, exchange_type) - props = AMQP::Client::Properties.new(delivery_mode: 2_u8) while line = STDIN.gets if publish_confirm channel.basic_publish_confirm line, exchange, routing_key, props: props @@ -104,7 +103,7 @@ class AMQPCat end end - private def write_headers(io, headers) + private def print_headers(io, headers) headers.each do |k, v| io << k << "=" << v << "\n" end @@ -127,7 +126,7 @@ class AMQPCat io << msg.routing_key when 'h' if headers = msg.properties.headers - write_headers(io, headers) + print_headers(io, headers) end when 't' io << msg.properties.content_type diff --git a/src/cli.cr b/src/cli.cr index bf90664..41288a3 100644 --- a/src/cli.cr +++ b/src/cli.cr @@ -12,6 +12,7 @@ routing_key : String? = nil format = "%s\n" publish_confirm = false offset = "next" +props = AMQP::Client::Properties.new(delivery_mode: 2_u8) FORMAT_STRING_HELP = <<-HELP Format string (default "%s\\n") @@ -46,6 +47,10 @@ p = OptionParser.parse do |parser| end end parser.on("-f FORMAT", "--format=FORMAT", FORMAT_STRING_HELP) { |v| format = v } + parser.on("--content-type=TYPE", "Content type header") { |v| props.content_type = v } + parser.on("--content-encoding=ENC", "Content encoding header") { |v| props.content_encoding = v } + parser.on("--priority=LEVEL", "Priority header") { |v| props.priority = v.to_u8? || abort "Priority must be between 0 and 255" } + parser.on("--expiration=TIME", "Expiration header (ms before msg is dead lettered)") { |v| props.expiration = v } parser.on("-v", "--version", "Display version") { puts AMQPCat::VERSION; exit 0 } parser.on("-h", "--help", "Show this help message") { puts parser; exit 0 } parser.invalid_option do |flag| @@ -61,7 +66,7 @@ when :producer STDERR.puts "Error: Missing exchange or queue argument." abort p end - cat.produce(exchange, routing_key || queue || "", exchange_type, publish_confirm) + cat.produce(exchange, routing_key || queue || "", exchange_type, publish_confirm, props) when :consumer unless routing_key || queue STDERR.puts "Error: Missing routing key or queue argument."