forked from rabbitmq/rabbitmq-tutorials
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrpc_client.rb
66 lines (48 loc) · 1.31 KB
/
rpc_client.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#!/usr/bin/env ruby
# encoding: utf-8
require "bunny"
require "thread"
conn = Bunny.new(:automatically_recover => false)
conn.start
ch = conn.create_channel
class FibonacciClient
attr_reader :reply_queue
attr_accessor :response, :call_id
attr_reader :lock, :condition
def initialize(ch, server_queue)
@ch = ch
@x = ch.default_exchange
@server_queue = server_queue
@reply_queue = ch.queue("", :exclusive => true)
@lock = Mutex.new
@condition = ConditionVariable.new
that = self
@reply_queue.subscribe do |delivery_info, properties, payload|
if properties[:correlation_id] == that.call_id
that.response = payload.to_i
that.lock.synchronize{that.condition.signal}
end
end
end
def call(n)
self.call_id = self.generate_uuid
@x.publish(n.to_s,
:routing_key => @server_queue,
:correlation_id => call_id,
:reply_to => @reply_queue.name)
lock.synchronize{condition.wait(lock)}
response
end
protected
def generate_uuid
# very naive but good enough for code
# examples
"#{rand}#{rand}#{rand}"
end
end
client = FibonacciClient.new(ch, "rpc_queue")
puts " [x] Requesting fib(30)"
response = client.call(30)
puts " [.] Got #{response}"
ch.close
conn.close