-
Notifications
You must be signed in to change notification settings - Fork 6
/
amqpcat.cr
179 lines (169 loc) · 4.66 KB
/
amqpcat.cr
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
require "amqp-client"
require "compress/deflate"
require "compress/gzip"
require "./version"
class AMQPCat
def initialize(uri)
u = URI.parse(uri)
p = u.query_params
p["name"] = "AMQPCat #{VERSION}"
u.query = p.to_s
@client = AMQP::Client.new(u)
end
def produce(exchange : String, routing_key : String, exchange_type : String, publish_confirm : Bool, props : AMQP::Client::Properties)
STDIN.blocking = false
loop do
connection = @client.connect
channel = connection.channel
open_channel_declare_exchange(connection, exchange, exchange_type)
while line = STDIN.gets
if publish_confirm
channel.basic_publish_confirm line, exchange, routing_key, props: props
else
channel.basic_publish line, exchange, routing_key, props: props
end
end
connection.close
break
rescue ex
STDERR.puts ex.message
sleep 2
end
end
def consume(exchange_name : String, routing_key : String?, queue_name : String?, queue_type : String, format : String, offset = nil)
routing_key ||= ""
queue_name ||= ""
loop do
connection = @client.connect
q = case queue_type
when "stream"
stream_queue(connection, queue_name)
else
queue(connection, queue_name)
end
unless exchange_name.empty? && routing_key.empty?
q.bind(exchange_name, routing_key)
end
args = AMQP::Client::Arguments.new
args["x-stream-offset"] = offset if offset
q.subscribe(block: true, no_ack: false, args: args) do |msg|
format_output(STDOUT, format, msg)
msg.ack
end
rescue ex
STDERR.puts ex.message
sleep 2
end
end
def rpc(exchange : String, routing_key : String, exchange_type : String, format : String)
STDIN.blocking = false
loop do
connection = @client.connect
channel = connection.channel
open_channel_declare_exchange(connection, exchange, exchange_type)
channel.basic_consume("amq.rabbitmq.reply-to") do |msg|
format_output(STDOUT, format, msg)
end
props = AMQP::Client::Properties.new(reply_to: "amq.rabbitmq.reply-to")
while line = STDIN.gets
channel.basic_publish line, exchange, routing_key, props: props
end
sleep 1 # wait for the last reply
connection.close
break
rescue ex
STDERR.puts ex.message
sleep 2
end
end
private def queue(connection, name)
channel = connection.channel
channel.queue(name, auto_delete: true)
rescue
channel = connection.channel
channel.prefetch 10
channel.queue(name, passive: true)
end
private def stream_queue(connection, name)
args = AMQP::Client::Arguments.new({
"x-queue-type": "stream",
})
channel = connection.channel
channel.prefetch 10
channel.queue(name, auto_delete: false, args: args)
end
private def open_channel_declare_exchange(connection, exchange, exchange_type)
return if exchange == ""
channel = connection.channel
channel.exchange_declare exchange, exchange_type, passive: true
channel
rescue
channel = connection.channel
channel.exchange_declare exchange, exchange_type, passive: false
channel
end
private def decode_payload(msg, io)
case msg.properties.content_encoding
when "deflate"
Compress::Deflate::Reader.open(msg.body_io) do |r|
IO.copy(r, io)
end
when "gzip"
Compress::Gzip::Reader.open(msg.body_io) do |r|
IO.copy(r, io)
end
else
IO.copy(msg.body_io, io)
end
end
private def print_headers(io, headers)
headers.each do |k, v|
io << k << "=" << v << "\n"
end
end
private def format_output(io, format_str, msg)
io.sync = false
match = false
escape = false
Char::Reader.new(format_str).each do |c|
if c == '%'
match = true
elsif match
case c
when 's'
decode_payload(msg, io)
when 'e'
io << msg.exchange
when 'r'
io << msg.routing_key
when 'h'
if headers = msg.properties.headers
print_headers(io, headers)
end
when 't'
io << msg.properties.content_type
when '%'
io << '%'
else
raise "Invalid substitution argument '%#{c}'"
end
match = false
elsif c == '\\'
escape = true
elsif escape
case c
when 'n'
io << '\n'
when 't'
io << '\t'
else
raise "Invalid escape character '\#{c}'"
end
escape = false
else
io << c
end
end
io.flush
end
end