Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RPC #30

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open

RPC #30

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ Style/StringLiteralsInInterpolation:
Layout/LineLength:
Max: 130

Metrics/ClassLength:
Max: 500

Metrics/MethodLength:
Max: 90

Naming/FileName:
Exclude:
- 'lib/amqp-client.rb'
Expand Down
47 changes: 10 additions & 37 deletions .rubocop_todo.yml
Original file line number Diff line number Diff line change
@@ -1,31 +1,24 @@
# This configuration was generated by
# `rubocop --auto-gen-config`
# on 2021-10-15 13:44:24 UTC using RuboCop version 1.19.1.
# on 2024-06-13 23:26:40 UTC using RuboCop version 1.62.1.
# The point is for the user to remove these configuration records
# one by one as the offenses are removed from the code base.
# Note that changes in the inspected code, or installation of new
# versions of RuboCop, may require this file to be generated again.

# Offense count: 1
# Cop supports --auto-correct.
# Configuration parameters: AutoCorrect, AllowHeredoc, AllowURI, URISchemes, IgnoreCopDirectives, IgnoredPatterns.
# URISchemes: http, https
Layout/LineLength:
Max: 132

# Offense count: 1
Lint/RescueException:
Exclude:
- 'lib/amqp/client/connection.rb'

# Offense count: 32
# Configuration parameters: IgnoredMethods, CountRepeatedAttributes.
# Offense count: 19
# Configuration parameters: AllowedMethods, AllowedPatterns, CountRepeatedAttributes.
Metrics/AbcSize:
Max: 175
Max: 158

# Offense count: 1
# Configuration parameters: CountComments, CountAsOne, ExcludedMethods, IgnoredMethods.
# IgnoredMethods: refine
# Configuration parameters: CountComments, CountAsOne, AllowedMethods, AllowedPatterns.
# AllowedMethods: refine
Metrics/BlockLength:
Max: 40

Expand All @@ -34,32 +27,12 @@ Metrics/BlockLength:
Metrics/BlockNesting:
Max: 4

# Offense count: 6
# Configuration parameters: CountComments, CountAsOne.
Metrics/ClassLength:
Max: 497

# Offense count: 10
# Configuration parameters: IgnoredMethods.
# Offense count: 9
# Configuration parameters: AllowedMethods, AllowedPatterns.
Metrics/CyclomaticComplexity:
Max: 46

# Offense count: 67
# Configuration parameters: CountComments, CountAsOne, ExcludedMethods, IgnoredMethods.
Metrics/MethodLength:
Max: 169

# Offense count: 2
# Configuration parameters: CountComments, CountAsOne.
Metrics/ModuleLength:
Max: 486

# Offense count: 1
# Configuration parameters: CountKeywordArgs, MaxOptionalParameters.
Metrics/ParameterLists:
Max: 13
Max: 47

# Offense count: 5
# Configuration parameters: IgnoredMethods.
# Configuration parameters: AllowedMethods, AllowedPatterns.
Metrics/PerceivedComplexity:
Max: 23
75 changes: 75 additions & 0 deletions lib/amqp/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,81 @@ def delete_exchange(name)
end
end

# Create a RPC server for a single method/function/procedure
# @param queue [String] name of the queue that RPC calls will be sent to
# @param worker_threads [Integer] number of threads that process requests
# @yield [String] The body of the request message, return a response String
# @return [Array<(String, Array<Thread>)>] Returns consumer_tag and an array of worker threads
def rpc_server(queue, worker_threads: 1, &_)
queue(queue)
subscribe(queue, prefetch: worker_threads, worker_threads: worker_threads) do |msg|
result = yield msg.body
msg.channel.basic_publish(result, "", msg.properties.reply_to, correlation_id: msg.properties.correlation_id)
msg.ack
rescue StandardError => e
msg.reject
raise e
end
end

# Do a RPC call, sends a messages, waits for a response
# @param queue [String] name of the queue that RPC calls will be sent to
# @param arguments [String] arguments/body to the call
# @return [String] Returns the result from the call
def rpc_call(queue, arguments)
ch = with_connection(&:channel)
begin
msg = ch.basic_consume_once("amq.rabbitmq.reply-to") do
ch.basic_publish(arguments, "", queue, reply_to: "amq.rabbitmq.reply-to")
end
msg.body
ensure
ch.close
end
end

# Create a reusable RPC client
# @return [RPCClient]
def rpc_client
ch = with_connection(&:channel)
RPCClient.new(ch)
end

# Reusable RPC client, when RPC performance is important
class RPCClient
# @param channel [AMQP::Client::Connection::Channel] the channel to use for the RPC calls
def initialize(channel)
@ch = channel
@correlation_id = 0
@lock = Mutex.new
@messages = ::Queue.new
@ch.basic_consume("amq.rabbitmq.reply-to") do |msg|
@messages.push msg
end
end

# Do a RPC call, sends a messages, waits for a response
# @param queue [String] name of the queue that RPC call will be sent to
# @param arguments [String] arguments/body to the call
# @return [String] Returns the result from the call
def call(queue, arguments)
correlation_id = (@lock.synchronize { @correlation_id += 1 }).to_s(36)
@ch.basic_publish(arguments, "", queue, reply_to: "amq.rabbitmq.reply-to", correlation_id: correlation_id)
loop do
msg = @messages.pop
return msg.body if msg.properties.correlation_id == correlation_id

@messages.push msg
end
end

# Closes the channel used by the RPCClient
def close
@ch.close
@messages.close
end
end

# @!endgroup

private
Expand Down
19 changes: 16 additions & 3 deletions lib/amqp/client/channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,11 @@ def basic_publish_confirm(body, exchange, routing_key, **properties)
# @yield [Message] Delivered message from the queue
# @return [Array<(String, Array<Thread>)>] Returns consumer_tag and an array of worker threads
# @return [nil] When `worker_threads` is 0 the method will return when the consumer is cancelled
def basic_consume(queue, tag: "", no_ack: true, exclusive: false, arguments: {}, worker_threads: 1, &blk)
write_bytes FrameBytes.basic_consume(@id, queue, tag, no_ack, exclusive, arguments)
tag, = expect(:basic_consume_ok)
def basic_consume(queue, tag: "", no_ack: true, exclusive: false, no_wait: false, arguments: {}, worker_threads: 1, &blk)
raise ArgumentError, "consumer_tag required when no_wait" if no_wait && tag.empty?

write_bytes FrameBytes.basic_consume(@id, queue, tag, no_ack, exclusive, no_wait, arguments)
tag, = expect(:basic_consume_ok) unless no_wait
@consumers[tag] = q = ::Queue.new
if worker_threads.zero?
consume_loop(q, tag, &blk)
Expand All @@ -330,6 +332,17 @@ def basic_consume(queue, tag: "", no_ack: true, exclusive: false, arguments: {},
end
end

def basic_consume_once(queue, &_)
tag = "consume-once-#{rand(1024)}"
write_bytes FrameBytes.basic_consume(@id, queue, tag, true, false, true, nil)
@consumers[tag] = q = ::Queue.new
yield
msg = q.pop
write_bytes FrameBytes.basic_cancel(@id, tag, no_wait: true)
@consumers.delete tag
msg
end

# Cancel/abort/stop a consumer
# @param consumer_tag [String] Tag of the consumer to cancel
# @param no_wait [Boolean] Will wait for a confirmation from the broker that the consumer is cancelled
Expand Down
3 changes: 2 additions & 1 deletion lib/amqp/client/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ def read_loop
READ_EXCEPTIONS = [IOError, OpenSSL::OpenSSLError, SystemCallError,
RUBY_ENGINE == "jruby" ? java.lang.NullPointerException : nil].compact.freeze

def parse_frame(type, channel_id, buf) # rubocop:disable Metrics/MethodLength, Metrics/CyclomaticComplexity
def parse_frame(type, channel_id, buf) # rubocop:disable Metrics/MethodLength
channel = @channels[channel_id]
case type
when 1 # method frame
Expand Down Expand Up @@ -433,6 +433,7 @@ def expect(expected_frame_type)
def open_socket(host, port, tls, options)
connect_timeout = options.fetch(:connect_timeout, 30).to_f
socket = Socket.tcp host, port, connect_timeout: connect_timeout
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
keepalive = options.fetch(:keepalive, "").split(":", 3).map!(&:to_i)
enable_tcp_keepalive(socket, *keepalive)
if tls
Expand Down
3 changes: 1 addition & 2 deletions lib/amqp/client/frame_bytes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -370,9 +370,8 @@ def self.body(id, body_part)
].pack("C S> L> a* C")
end

def self.basic_consume(id, queue, tag, no_ack, exclusive, arguments)
def self.basic_consume(id, queue, tag, no_ack, exclusive, no_wait, arguments)
no_local = false
no_wait = false
bits = 0
bits |= (1 << 0) if no_local
bits |= (1 << 1) if no_ack
Expand Down
4 changes: 2 additions & 2 deletions lib/amqp/client/table.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ module AMQP
class Client
# Encode and decode an AMQP table to/from hash, only used internally
# @api private
module Table
module Table # rubocop:disable Metrics/ModuleLength
# Encodes a hash into a byte array
# @param hash [Hash]
# @return [String] Byte array
def self.encode(hash)
return "" if hash.empty?
return "" if hash.nil? || hash.empty?

arr = []
fmt = String.new
Expand Down
27 changes: 27 additions & 0 deletions test/amqp/rpc_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# frozen_string_literal: true

require_relative "../test_helper"

class RPCTest < Minitest::Test
def test_that_rpc_server_responds_to_rpc_calls
client = AMQP::Client.new("amqp://localhost").start
client.rpc_server("rpc-test-method") do |request|
"foo #{request}"
end
result = client.rpc_call("rpc-test-method", "bar")
assert_equal "foo bar", result
end

def test_rpc_client_is_reusable
client = AMQP::Client.new("amqp://localhost").start
client.rpc_server("rpc-test-method") do |request|
"foo #{request}"
end

rpc_client = client.rpc_client
result = rpc_client.call("rpc-test-method", "bar")
assert_equal "foo bar", result
result = rpc_client.call("rpc-test-method", "foo")
assert_equal "foo foo", result
end
end
Loading