Skip to content

Commit 5c7c726

Browse files
committed
add Elixir source for tutorial 6
1 parent 5757dfd commit 5c7c726

File tree

2 files changed

+69
-0
lines changed

2 files changed

+69
-0
lines changed

elixir/rpc_client.exs

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
defmodule FibonacciRpcClient do
2+
def wait_for_messages(_channel, correlation_id) do
3+
receive do
4+
{:basic_deliver, payload, %{correlation_id: ^correlation_id}} ->
5+
{n, _} = Integer.parse(payload)
6+
n
7+
end
8+
end
9+
def call(n) do
10+
{:ok, connection} = AMQP.Connection.open
11+
{:ok, channel} = AMQP.Channel.open(connection)
12+
13+
{:ok, %{queue: queue_name}} = AMQP.Queue.declare(channel, "", exclusive: true)
14+
AMQP.Basic.consume(channel, queue_name, nil, no_ack: true)
15+
correlation_id = :erlang.unique_integer |> :erlang.integer_to_binary |> Base.encode64
16+
request = to_string(n)
17+
AMQP.Basic.publish(channel, "", "rpc_queue", request, reply_to: queue_name, correlation_id: correlation_id)
18+
19+
FibonacciRpcClient.wait_for_messages(channel, correlation_id)
20+
end
21+
end
22+
23+
num =
24+
case System.argv do
25+
[] -> 30
26+
param ->
27+
{x, _} =
28+
param
29+
|> Enum.join(" ")
30+
|> Integer.parse
31+
x
32+
end
33+
34+
IO.puts " [x] Requesting fib(#{num})"
35+
response = FibonacciRpcClient.call(num)
36+
IO.puts " [.] Got #{response}"

elixir/rpc_server.exs

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
defmodule FibServer do
2+
def fib(0), do: 0
3+
def fib(1), do: 1
4+
def fib(n) when n > 1, do: fib(n-1) + fib(n-2)
5+
6+
def wait_for_messages(channel) do
7+
receive do
8+
{:basic_deliver, payload, meta} ->
9+
{n, _} = Integer.parse(payload)
10+
IO.puts " [.] fib(#{n})"
11+
response = fib(n)
12+
13+
AMQP.Basic.publish(channel, "", meta.reply_to, "#{response}", correlation_id: meta.correlation_id)
14+
AMQP.Basic.ack(channel, meta.delivery_tag)
15+
16+
wait_for_messages(channel)
17+
end
18+
end
19+
end
20+
21+
{:ok, connection} = AMQP.Connection.open
22+
{:ok, channel} = AMQP.Channel.open(connection)
23+
24+
AMQP.Queue.declare(channel, "rpc_queue")
25+
26+
AMQP.Basic.qos(channel, prefetch_count: 1)
27+
28+
AMQP.Basic.consume(channel, "rpc_queue")
29+
30+
IO.puts " [x] Awaiting RPC requests"
31+
32+
FibServer.wait_for_messages(channel)
33+

0 commit comments

Comments
 (0)