From d057d0b5261b8157c4755f2d3f96f3e902e7f3c5 Mon Sep 17 00:00:00 2001 From: tan Date: Mon, 18 Jan 2021 21:40:32 +0530 Subject: [PATCH] add AMQPS support This adds AMQPS (TLS) connections support. The IANA assigned port number for AMQPS is 5671. It is available as the constant `AMQPClient.AMQPS_DEFAULT_PORT`. An example of making an AMQPS connection: ```julia conn = connection(; virtualhost="/", host = "amqps.example.com", port = AMQPFlient.AMQPS_DEFAULT_PORT auth_params = Dict{String,Any}("MECHANISM"=>"AMQPLAIN", "LOGIN"=>"guest", "PASSWORD"=>"guest"), amqps = amqps_configure() ) ``` The `amqps_configure` method can be provided additional parameters for TLS connections: - cacerts: A CA certificate file (or it's contents) to use for certificate verification. - verify: Whether to verify server certificate. Default is false if cacerts is not provided and true if it is. - client_cert and client_key: The client certificate and corresponding private key to use. Default is nothing (no client certificate). Values can either be the file name or certificate/key contents. ```julia amqps_configure(; cacerts = nothing, verify = MbedTLS.MBEDTLS_SSL_VERIFY_NONE, client_cert = nothing, client_key = nothing ) ``` --- CONNECTIONS.md | 63 ++++++-- Project.toml | 4 +- src/AMQPClient.jl | 5 +- src/amqps.jl | 74 +++++++++ src/buffered_socket.jl | 77 ++++++++++ src/protocol.jl | 98 +++++++++--- src/spec.jl | 1 + test/runtests.jl | 10 ++ test/test_coverage.jl | 327 ++++++++++++++++++++-------------------- test/test_rpc.jl | 115 ++++++++------ test/test_throughput.jl | 71 ++++----- 11 files changed, 562 insertions(+), 283 deletions(-) create mode 100644 src/amqps.jl create mode 100644 src/buffered_socket.jl diff --git a/CONNECTIONS.md b/CONNECTIONS.md index 68d3aed..95d6aec 100644 --- a/CONNECTIONS.md +++ b/CONNECTIONS.md @@ -3,10 +3,11 @@ More than one connection can be made to a single server, though one is sufficient for most cases. The IANA assigned port number for AMQP is 5672. It is available as the constant `AMQPClient.AMQP_DEFAULT_PORT`. +The IANA assigned port number for AMQPS is 5671. It is available as the constant `AMQPClient.AMQPS_DEFAULT_PORT`. The `AMQPPLAIN` authentication mechanism is supported as of now. -````julia +```julia using AMQPClient port = AMQPClient.AMQP_DEFAULT_PORT @@ -14,8 +15,36 @@ login = get_userid() # default is usually "guest" password = get_password() # default is usually "guest" auth_params = Dict{String,Any}("MECHANISM"=>"AMQPLAIN", "LOGIN"=>login, "PASSWORD"=>password) -conn = connection(;virtualhost="/", host="localhost", port=port, auth_params=auth_params) -```` +conn = connection(; virtualhost="/", host="localhost", port=port, auth_params=auth_params) +``` + +An example of making an AMQPS connection: + +```julia +using AMQPClient + +port = AMQPFlient.AMQPS_DEFAULT_PORT +login = get_userid() # default is usually "guest" +password = get_password() # default is usually "guest" +auth_params = Dict{String,Any}("MECHANISM"=>"AMQPLAIN", "LOGIN"=>login, "PASSWORD"=>password) +amqps = amqps_configure() + +conn = connection(; virtualhost="/", host="amqps.example.com", port=port, auth_params=auth_params, amqps=amqps) +``` + +The `amqps_configure` method can be provided additional parameters for TLS connections: +- cacerts: A CA certificate file (or it's contents) to use for certificate verification. +- verify: Whether to verify server certificate. Default is false if cacerts is not provided and true if it is. +- client_cert and client_key: The client certificate and corresponding private key to use. Default is nothing (no client certificate). Values can either be the file name or certificate/key contents. + +```julia +amqps_configure(; + cacerts = nothing, + verify = MbedTLS.MBEDTLS_SSL_VERIFY_NONE, + client_cert = nothing, + client_key = nothing +) +``` Multiple channels can be multiplexed over a single connection. Channels are identified by their numeric id. @@ -23,7 +52,7 @@ An existing channel can be attached to, or a new one created if it does not exis Specifying `AMQPClient.UNUSED_CHANNEL` as channel id during creation will automatically assign an unused id. -````julia +```julia chan1 = channel(conn, AMQPClient.UNUSED_CHANNEL, true) # to attach to a channel only if it already exists: @@ -33,28 +62,36 @@ chan2 = channel(conn, chanid) # to specify a channel id and create if it does not exists yet: chanid = 3 chan3 = channel(conn, chanid, true) -```` +``` Channels and connections remain open until they are closed or they run into an error. The server can also initiate a close in some cases. Channels represent logical multiplexing over a single connection, so closing a connection implicitly closes all its channels. -````julia +```julia if isopen(conn) close(conn) # close is an asynchronous operation. To wait for the negotiation to complete: AMQPClient.wait_for_state(conn, AMQPClient.CONN_STATE_CLOSED) end # an individual channel can be closed similarly too -```` +``` + +The `connection` and `channel` methods can also be used with Julia's do-block syntax, which ensures it's closure when the block exits. + +```julia +connection(; virtualhost="/", host="localhost", port=port, auth_params=auth_params) do conn + channel(conn, AMQPClient.UNUSED_CHANNEL, true) do chan + # use channel + end +end +``` If a channel or connection is closed due to an error or by the server, the `closereason` attribute (type `CloseReason`) of the channel or connection object may contain the error code and diagnostic message. -````julia -if !isnull(conn.closereason) - reason = get(conn.closereason) - println("Error code: ", reason.code) - println("Message: ", reason.msg) +```julia +if conn.closereason !== nothing + @error("connection has errors", code=conn.closereason.code, message=conn.closereason.msg) end -```` +``` diff --git a/Project.toml b/Project.toml index b7b38cb..0f7c0df 100644 --- a/Project.toml +++ b/Project.toml @@ -3,14 +3,16 @@ uuid = "79c8b4cd-a41a-55fa-907c-fab5288e1383" keywords = ["amqpclient", "rabbitmq", "amqp", "amqp-client", "message-queue"] license = "MIT" desc = "A Julia AMQP (Advanced Message Queuing Protocol) / RabbitMQ Client." -version = "0.3.1" +version = "0.4.0" [deps] Logging = "56ddb016-857b-54e1-b83d-db4d58db5568" +MbedTLS = "739be429-bea8-5141-9913-cc70e7f3736d" Sockets = "6462fe0b-24de-5631-8697-dd941f90decc" [compat] julia = "1" +MbedTLS = "0.6.8, 0.7, 1" [extras] Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" diff --git a/src/AMQPClient.jl b/src/AMQPClient.jl index 2c0b656..1b9db69 100644 --- a/src/AMQPClient.jl +++ b/src/AMQPClient.jl @@ -3,6 +3,7 @@ module AMQPClient import Base: write, read, read!, close, convert, show, isopen, flush using Sockets +using MbedTLS # Client property info that gets sent to the server on connection startup const CLIENT_IDENTIFICATION = Dict{String,Any}( @@ -15,11 +16,13 @@ include("types.jl") include("spec.jl") include("message.jl") include("auth.jl") +include("buffered_socket.jl") +include("amqps.jl") include("protocol.jl") include("convert.jl") include("show.jl") -export connection, channel, CloseReason +export connection, channel, CloseReason, amqps_configure export exchange_declare, exchange_delete, exchange_bind, exchange_unbind, default_exchange_name export queue_declare, queue_bind, queue_unbind, queue_purge, queue_delete export tx_select, tx_commit, tx_rollback diff --git a/src/amqps.jl b/src/amqps.jl new file mode 100644 index 0000000..a0169dd --- /dev/null +++ b/src/amqps.jl @@ -0,0 +1,74 @@ +function default_tls_debug(level, filename, number, msg) + @debug(level, filename, number, msg) +end + +function default_tls_rng() + entropy = MbedTLS.Entropy() + rng = MbedTLS.CtrDrbg() + MbedTLS.seed!(rng, entropy) + rng +end + +""" + amqps_configure(; + cacerts = nothing, + verify = MbedTLS.MBEDTLS_SSL_VERIFY_NONE, + client_cert = nothing, + client_key = nothing + ) + +Creates and returns a configuration for making AMQPS connections. +- cacerts: A CA certificate file (or it's contents) to use for certificate verification. +- verify: Whether to verify server certificate. Default is false if cacerts is not provided and true if it is. +- client_cert and client_key: The client certificate and corresponding private key to use. Default is nothing (no client certificate). Values can either be the file name or certificate/key contents. +""" +function amqps_configure(; + rng = default_tls_rng(), + cacerts::Union{String,Nothing} = nothing, + verify::Int64 = (cacerts === nothing) ? MbedTLS.MBEDTLS_SSL_VERIFY_NONE : MbedTLS.MBEDTLS_SSL_VERIFY_REQUIRED, + client_cert::Union{String,Nothing} = nothing, + client_key::Union{String,Nothing} = nothing, + debug::Union{Function,Nothing} = nothing) + + conf = MbedTLS.SSLConfig() + MbedTLS.config_defaults!(conf) + MbedTLS.rng!(conf, rng) + (debug === nothing) || MbedTLS.dbg!(conf, debug) + + if cacerts !== nothing + if isfile(cacerts) + # if it is a file name instead of certificate contents, read the contents + cacerts = read(cacerts, String) + end + MbedTLS.ca_chain!(conf, MbedTLS.crt_parse(cacerts)) + end + MbedTLS.authmode!(conf, verify) + + if (client_cert !== nothing) && (client_key !== nothing) + if isfile(client_cert) + # if it is a file name instead of certificate contents, read the contents + client_cert = read(client_cert, String) + end + if isfile(client_key) + client_key = read(client_key, String) + end + key = MbedTLS.PKContext() + MbedTLS.parse_key!(key, client_key) + MbedTLS.own_cert!(conf, MbedTLS.crt_parse(client_cert), key) + end + + conf +end + +function setup_tls(sock::TCPSocket, hostname::String, ssl_options::MbedTLS.SSLConfig) + @debug("setting up TLS") + + ctx = MbedTLS.SSLContext() + MbedTLS.setup!(ctx, ssl_options) + MbedTLS.set_bio!(ctx, sock) + MbedTLS.hostname!(ctx, hostname) + MbedTLS.handshake(ctx) + @debug("TLS setup done") + + BufferedTLSSocket(ctx) +end \ No newline at end of file diff --git a/src/buffered_socket.jl b/src/buffered_socket.jl new file mode 100644 index 0000000..5cb3885 --- /dev/null +++ b/src/buffered_socket.jl @@ -0,0 +1,77 @@ +const TLS_BUSY_READ_SECS = 1 +const TLS_BUSY_READ_YIELD_SECS = 0.001 +const TLS_READBUFF_SIZE = MbedTLS.MBEDTLS_SSL_MAX_CONTENT_LEN * 5 +const TLS_MIN_WRITEBUFF_SIZE = MbedTLS.MBEDTLS_SSL_MAX_CONTENT_LEN +const TCP_MAX_WRITEBUFF_SIZE = 1024*512 +const TCP_MIN_WRITEBUFF_SIZE = 1024*64 + +struct BufferedTLSSocket <: IO + in::IOBuffer # no read lock, single task reads socket and distributes messages to channels + out::IOBuffer + sock::MbedTLS.SSLContext + readbuff::Vector{UInt8} + out_lck::ReentrantLock # protect out::IOBuffer when there are multiple channels on the connection + + function BufferedTLSSocket(sock::MbedTLS.SSLContext; readbuff_size::Int=TLS_READBUFF_SIZE) + new(PipeBuffer(), PipeBuffer(), sock, Vector{UInt8}(undef, readbuff_size), ReentrantLock()) + end +end + +isopen(bio::BufferedTLSSocket) = isopen(bio.sock) +close(bio::BufferedTLSSocket) = close(bio.sock) + +function read(bio::BufferedTLSSocket, ::Type{UInt8}) + fill_in(bio, 1) + read(bio.in, UInt8) +end + +function read(bio::BufferedTLSSocket, T::Union{Type{Int16},Type{UInt16},Type{Int32},Type{UInt32},Type{Int64},Type{UInt64},Type{Int128},Type{UInt128},Type{Float16},Type{Float32},Type{Float64}}) + fill_in(bio, sizeof(T)) + read(bio.in, T) +end + +function read!(bio::BufferedTLSSocket, buff::Vector{UInt8}) + fill_in(bio, length(buff)) + read!(bio.in, buff) +end + +function peek(bio::BufferedTLSSocket, T::Union{Type{Int16},Type{UInt16},Type{Int32},Type{UInt32},Type{Int64},Type{UInt64},Type{Int128},Type{UInt128},Type{Float16},Type{Float32},Type{Float64}}) + fill_in(bio, sizeof(T)) + peek(bio.in, T) +end + +function fill_in(bio::BufferedTLSSocket, atleast::Int) + avail = bytesavailable(bio.in) + if atleast > avail + while (atleast > avail) && isopen(bio.sock) + bytes_read = isreadable(bio.sock) ? readbytes!(bio.sock, bio.readbuff; all=false) : 0 + if bytes_read > 0 + avail += Base.write_sub(bio.in, bio.readbuff, 1, bytes_read) + else + MbedTLS.wait_for_decrypted_data(bio.sock) + end + end + end +end + +function write(bio::BufferedTLSSocket, data::UInt8) + lock(bio.out_lck) do + write(bio.out, data) + end +end +function write(bio::BufferedTLSSocket, data::Union{Int16,UInt16,Int32,UInt32,Int64,UInt64,Int128,UInt128,Float16,Float32,Float64}) + lock(bio.out_lck) do + write(bio.out, data) + end +end +function write(bio::BufferedTLSSocket, data::Array) + lock(bio.out_lck) do + write(bio.out, data) + end +end +function flush(bio::BufferedTLSSocket) + lock(bio.out_lck) do + write(bio.sock, take!(bio.out)) + end + nothing +end \ No newline at end of file diff --git a/src/protocol.jl b/src/protocol.jl index 69182a2..900129c 100644 --- a/src/protocol.jl +++ b/src/protocol.jl @@ -80,7 +80,7 @@ function write(io::IO, ft::TAMQPFieldTable) end buff = take!(iob) len = TAMQPLongUInt(length(buff)) - @debug("write fieldtable", len, type=typeof(len)) + @debug("write fieldtable", len) l = write(io, hton(len)) if len > 0 l += write(io, buff) @@ -171,7 +171,7 @@ mutable struct Connection virtualhost::String host::String port::Int - sock::Union{TCPSocket, Nothing} + sock::Union{TCPSocket, BufferedTLSSocket, Nothing} properties::Dict{Symbol,Any} capabilities::Dict{String,Any} @@ -191,7 +191,7 @@ mutable struct Connection heartbeat_time_server::Float64 heartbeat_time_client::Float64 - function Connection(virtualhost::String="/", host::String="localhost", port::Int=AMQP_DEFAULT_PORT; send_queue_size::Int=CONN_MAX_QUEUED) + function Connection(; virtualhost::String="/", host::String="localhost", port::Int=AMQP_DEFAULT_PORT, send_queue_size::Int=CONN_MAX_QUEUED) sendq = Channel{TAMQPGenericFrame}(send_queue_size) sendlck = Channel{UInt8}(1) put!(sendlck, 1) @@ -326,10 +326,11 @@ function connection_processor(c, name, fn) catch err reason = "$name task exiting." if isa(c, MessageConsumer) + if !(c.state in (CONN_STATE_CLOSING, CONN_STATE_CLOSED)) + reason = reason * " Unhandled exception: $err" + @warn(reason, exception=(err,catch_backtrace())) + end close(c) - reason = reason * " Unhandled exception: $err" - #showerror(STDERR, err) - @debug(reason) else isconnclosed = !isopen(c) ischanclosed = isa(c, MessageChannel) && isa(err, InvalidStateException) && err.state == :closed @@ -339,30 +340,51 @@ function connection_processor(c, name, fn) reason = reason * " by peer" close(c, false, true) end - @debug(reason) + @debug(reason, exception=(err,catch_backtrace())) else - reason = reason * " Unhandled exception: $err" - #showerror(stderr, err) - @debug(reason) + if !(c.state in (CONN_STATE_CLOSING, CONN_STATE_CLOSED)) + reason = reason * " Unhandled exception: $err" + @warn(reason, exception=(err,catch_backtrace())) + end close(c, false, true) - #rethrow(err) end end end end function connection_sender(c::Connection) - msg = take!(c.sendq) @debug("==> sending on conn", host=c.virtualhost) - nbytes = write(sock(c), msg) + nbytes = sendq_to_stream(sock(c), c.sendq) @debug("==> sent", nbytes) - - # update heartbeat time for client - c.heartbeat_time_client = time() - + c.heartbeat_time_client = time() # update heartbeat time for client nothing end +function sendq_to_stream(conn::TCPSocket, sendq::Channel{TAMQPGenericFrame}) + msg = take!(sendq) + if length(msg.payload.data) > TCP_MIN_WRITEBUFF_SIZE # write large messages directly + nbytes = write(conn, msg) + else # coalesce short messages and do single write + buff = IOBuffer() + nbytes = write(buff, msg) + while isready(sendq) && (nbytes < TCP_MAX_WRITEBUFF_SIZE) + nbytes += write(buff, take!(sendq)) + end + write(conn, take!(buff)) + end + nbytes +end +function sendq_to_stream(conn::BufferedTLSSocket, sendq::Channel{TAMQPGenericFrame}) + # avoid multiple small writes to TLS layer + nbytes = write(conn, take!(sendq)) + while isready(sendq) && (nbytes < MbedTLS.MBEDTLS_SSL_MAX_CONTENT_LEN) + nbytes += write(conn, take!(sendq)) + end + # flush does a single write of accumulated buffer + flush(conn) + nbytes +end + function connection_receiver(c::Connection) f = read(sock(c), TAMQPGenericFrame) @@ -372,7 +394,7 @@ function connection_receiver(c::Connection) channelid = f.props.channel @debug("<== read message on conn", host=c.virtualhost, channelid) if !(channelid in keys(c.channels)) - @debug("Discarding message for unknown channel", channelid) + @warn("Discarding message for unknown channel", channelid) end chan = channel(c, channelid) put!(chan.recvq, f) @@ -390,7 +412,7 @@ function connection_heartbeater(c::Connection) end if (now - c.heartbeat_time_server) > (2 * c.heartbeat) - @debug("server heartbeat missed", secs=(now - c.heartbeat_time_server)) + @warn("server heartbeat missed", secs=(now - c.heartbeat_time_server)) close(c, false, false) end nothing @@ -416,7 +438,7 @@ function channel_receiver(c::MessageChannel) cbkey = (f.hdr,) else m = f - @debug("<== received unhandled frame type", channel=f.props.channel, type=f.hdr) + @warn("<== received unhandled frame type", channel=f.props.channel, type=f.hdr) cbkey = (f.hdr,) end (cb,ctx) = get(c.callbacks, cbkey, (on_unexpected_message, nothing)) @@ -498,6 +520,16 @@ function channel(c::Connection, id::Integer, create::Bool; connect_timeout=DEFAU end chan end +function channel(f, args...; kwargs...) + chan = channel(args...; kwargs...) + try + f(chan) + catch + rethrow() + finally + close(chan) + end +end function connection(; virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT, framemax=0, @@ -505,9 +537,10 @@ function connection(; virtualhost="/", host="localhost", port=AMQPClient.AMQP_DE send_queue_size::Integer=CONN_MAX_QUEUED, auth_params=AMQPClient.DEFAULT_AUTH_PARAMS, channelmax::Integer=AMQPClient.DEFAULT_CHANNELMAX, - connect_timeout=AMQPClient.DEFAULT_CONNECT_TIMEOUT) - @debug("connecting", host, port, virtualhost) - conn = AMQPClient.Connection(virtualhost, host, port; send_queue_size=send_queue_size) + connect_timeout=AMQPClient.DEFAULT_CONNECT_TIMEOUT, + amqps::Union{MbedTLS.SSLConfig,Nothing}) + @debug("connecting", host, port, virtualhost, tls) + conn = Connection(; virtualhost=virtualhost, host=host, port=port, send_queue_size=send_queue_size) chan = channel(conn, AMQPClient.DEFAULT_CHANNEL, true) # setup handler for Connection.Start @@ -515,7 +548,10 @@ function connection(; virtualhost="/", host="localhost", port=AMQPClient.AMQP_DE AMQPClient.handle(chan, :Connection, :Start, AMQPClient.on_connection_start, ctx) # open socket and start processor tasks - conn.sock = connect(conn.host, conn.port) + sock = connect(conn.host, conn.port) + isdefined(Sockets, :nagle) && Sockets.nagle(sock, false) + isdefined(Sockets, :quickack) && Sockets.quickack(sock, true) + conn.sock = (amqps !== nothing) ? setup_tls(sock, host, amqps) : sock conn.sender = @async AMQPClient.connection_processor(conn, "ConnectionSender", AMQPClient.connection_sender) conn.receiver = @async AMQPClient.connection_processor(conn, "ConnectionReceiver", AMQPClient.connection_receiver) chan.receiver = @async AMQPClient.connection_processor(chan, "ChannelReceiver($(chan.id))", AMQPClient.channel_receiver) @@ -523,6 +559,7 @@ function connection(; virtualhost="/", host="localhost", port=AMQPClient.AMQP_DE # initiate handshake conn.state = chan.state = AMQPClient.CONN_STATE_OPENING write(AMQPClient.sock(chan), AMQPClient.ProtocolHeader) + flush(AMQPClient.sock(chan)) if !AMQPClient.wait_for_state(conn, AMQPClient.CONN_STATE_OPEN; timeout=connect_timeout) || !AMQPClient.wait_for_state(chan, AMQPClient.CONN_STATE_OPEN; timeout=connect_timeout) throw(AMQPClientException("Connection handshake failed")) @@ -530,6 +567,17 @@ function connection(; virtualhost="/", host="localhost", port=AMQPClient.AMQP_DE chan end +function connection(f; kwargs...) + conn = connection(; kwargs...) + + try + f(conn) + catch + rethrow() + finally + close(conn) + end +end # ---------------------------------------- # Open channel / connection end @@ -1250,4 +1298,4 @@ on_confirm_select_ok(chan::MessageChannel, m::TAMQPMethodFrame, ctx) = _on_ack(c # ---------------------------------------- # send and recv for methods end -# ---------------------------------------- \ No newline at end of file +# ---------------------------------------- diff --git a/src/spec.jl b/src/spec.jl index 5894f1b..72efffd 100644 --- a/src/spec.jl +++ b/src/spec.jl @@ -5,6 +5,7 @@ const AMQP_VERSION = v"0.9.1" const AMQP_DEFAULT_PORT = 5672 +const AMQPS_DEFAULT_PORT = 5671 # Constants const FrameMethod = 1 diff --git a/test/runtests.jl b/test/runtests.jl index 7cfd4dc..2f3d5af 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,3 +1,5 @@ +using AMQPClient + include("test_coverage.jl") include("test_throughput.jl") include("test_rpc.jl") @@ -5,4 +7,12 @@ include("test_rpc.jl") AMQPTestCoverage.runtests() AMQPTestThroughput.runtests() AMQPTestRPC.runtests() + +if length(ARGS) > 0 + amqps_host = ARGS[1] + AMQPTestCoverage.runtests(; host=amqps_host, port=AMQPClient.AMQPS_DEFAULT_PORT, amqps=amqps_configure()) + AMQPTestThroughput.runtests(; host=amqps_host, port=AMQPClient.AMQPS_DEFAULT_PORT, tls=true) + AMQPTestRPC.runtests(; host=amqps_host, port=AMQPClient.AMQPS_DEFAULT_PORT, amqps=amqps_configure()) +end + exit(0) diff --git a/test/test_coverage.jl b/test/test_coverage.jl index 63185e2..1d1a10a 100644 --- a/test/test_coverage.jl +++ b/test/test_coverage.jl @@ -9,179 +9,186 @@ const EXCG_FANOUT = "ExcgFanout" const QUEUE1 = "queue1" const ROUTE1 = "key1" -testlog(msg) = println(msg) - -function runtests(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT, auth_params=AMQPClient.DEFAULT_AUTH_PARAMS) +function runtests(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT, auth_params=AMQPClient.DEFAULT_AUTH_PARAMS, amqps=nothing) verify_spec() test_types() @test default_exchange_name("direct") == "amq.direct" @test default_exchange_name() == "" @test AMQPClient.method_name(AMQPClient.TAMQPMethodPayload(:Basic, :Ack, (1, false))) == "Basic.Ack" - # open a connection - testlog("opening connection...") - conn = connection(;virtualhost=virtualhost, host=host, port=port, auth_params=auth_params, send_queue_size=512) - @test conn.conn.sendq.sz_max == 512 - - # open a channel - testlog("opening channel...") - chan1 = channel(conn, AMQPClient.UNUSED_CHANNEL, true) - @test chan1.id == 1 - @test conn.conn.sendq.sz_max == 512 - - # test default exchange names - @test default_exchange_name() == "" - @test default_exchange_name(EXCHANGE_TYPE_DIRECT) == "amq.direct" - - # create exchanges - testlog("creating exchanges...") - @test exchange_declare(chan1, EXCG_DIRECT, EXCHANGE_TYPE_DIRECT; arguments=Dict{String,Any}("Hello"=>"World", "Foo"=>"bar")) - @test exchange_declare(chan1, EXCG_FANOUT, EXCHANGE_TYPE_FANOUT) - # redeclaring the exchange with same attributes should be fine - @test exchange_declare(chan1, EXCG_FANOUT, EXCHANGE_TYPE_FANOUT) - # redeclaring an existing exchange with different attributes should fail - @test_throws AMQPClient.AMQPClientException exchange_declare(chan1, EXCG_FANOUT, EXCHANGE_TYPE_DIRECT) - - # must reconnect as channel gets closed after a channel exception - close(chan1) # closing an already closed channel should be fine - chan1 = channel(conn, AMQPClient.UNUSED_CHANNEL, true) - @test chan1.id == 1 - - # create and bind queues - testlog("creating queues...") - success, queue_name, message_count, consumer_count = queue_declare(chan1, QUEUE1) - @test success - @test message_count == 0 - @test consumer_count == 0 - - @test queue_bind(chan1, QUEUE1, EXCG_DIRECT, ROUTE1) - - # rabbitmq 3.6.5 does not support qos - # basic_qos(chan1, 1024*10, 10, false) - - M = Message(Vector{UInt8}("hello world"), content_type="text/plain", delivery_mode=PERSISTENT) - - testlog("testing basic publish and get...") - # publish 10 messages - for idx in 1:10 - basic_publish(chan1, M; exchange=EXCG_DIRECT, routing_key=ROUTE1) - flush(chan1) - @test !isready(chan1.conn.sendq) - end - - # basic get 10 messages - for idx in 1:10 - result = basic_get(chan1, QUEUE1, false) - @test result !== nothing - rcvd_msg = result - basic_ack(chan1, rcvd_msg.delivery_tag) - @test rcvd_msg.remaining == (10-idx) - @test rcvd_msg.exchange == EXCG_DIRECT - @test rcvd_msg.redelivered == false - @test rcvd_msg.routing_key == ROUTE1 - @test rcvd_msg.data == M.data - @test :content_type in keys(rcvd_msg.properties) - @test convert(String, rcvd_msg.properties[:content_type]) == "text/plain" - end - - # basic get returns null if no more messages - @test basic_get(chan1, QUEUE1, false) === nothing - - ## test reject and requeue - basic_publish(chan1, M; exchange=EXCG_DIRECT, routing_key=ROUTE1) - - result = basic_get(chan1, QUEUE1, false) - @test result !== nothing - rcvd_msg = result - @test rcvd_msg.redelivered == false - - basic_reject(chan1, rcvd_msg.delivery_tag; requeue=true) - - result = basic_get(chan1, QUEUE1, false) - @test result !== nothing - rcvd_msg = result - @test rcvd_msg.redelivered == true - - basic_ack(chan1, rcvd_msg.delivery_tag) - - testlog("testing basic consumer...") - # start a consumer task - global msg_count = 0 - consumer_fn = (rcvd_msg) -> begin - @test rcvd_msg.exchange == EXCG_DIRECT - @test rcvd_msg.redelivered == false - @test rcvd_msg.routing_key == ROUTE1 - @test rcvd_msg.data == M.data - global msg_count - msg_count += 1 - println("received msg $(msg_count): $(String(rcvd_msg.data))") - basic_ack(chan1, rcvd_msg.delivery_tag) - end - success, consumer_tag = basic_consume(chan1, QUEUE1, consumer_fn) - @test success - - # publish 10 messages - for idx in 1:10 - basic_publish(chan1, M; exchange=EXCG_DIRECT, routing_key=ROUTE1) - end + conn_ref = nothing - # wait for a reasonable time to receive all messages - for idx in 1:10 - (msg_count == 10) && break - sleep(1) - end - @test msg_count == 10 - - # cancel the consumer task - @test basic_cancel(chan1, consumer_tag) - - # test transactions - testlog("testing tx...") - @test tx_select(chan1) - @test tx_commit(chan1) - @test tx_rollback(chan1) - - # test heartbeats - if 120 >= conn.conn.heartbeat > 0 - c = conn.conn - testlog("testing heartbeats (waiting $(3*c.heartbeat) secs)...") - ts1 = c.heartbeat_time_server - tc1 = c.heartbeat_time_client - sleeptime = c.heartbeat/2 - for idx in 1:6 - (c.heartbeat_time_server > ts1) && (c.heartbeat_time_client > tc1) && break - sleep(sleeptime) + # open a connection + @info("opening connection") + connection(;virtualhost=virtualhost, host=host, port=port, amqps=amqps, auth_params=auth_params, send_queue_size=512) do conn + @test conn.conn.sendq.sz_max == 512 + + # open a channel + @info("opening channel") + channel(conn, AMQPClient.UNUSED_CHANNEL, true) do chan1 + @test chan1.id == 1 + @test conn.conn.sendq.sz_max == 512 + + # test default exchange names + @test default_exchange_name() == "" + @test default_exchange_name(EXCHANGE_TYPE_DIRECT) == "amq.direct" + + # create exchanges + @info("creating exchanges") + @test exchange_declare(chan1, EXCG_DIRECT, EXCHANGE_TYPE_DIRECT; arguments=Dict{String,Any}("Hello"=>"World", "Foo"=>"bar")) + @test exchange_declare(chan1, EXCG_FANOUT, EXCHANGE_TYPE_FANOUT) + # redeclaring the exchange with same attributes should be fine + @test exchange_declare(chan1, EXCG_FANOUT, EXCHANGE_TYPE_FANOUT) + # redeclaring an existing exchange with different attributes should fail + @test_throws AMQPClient.AMQPClientException exchange_declare(chan1, EXCG_FANOUT, EXCHANGE_TYPE_DIRECT) end - @test c.heartbeat_time_server > ts1 - @test c.heartbeat_time_client > tc1 - else - testlog("not testing heartbeats (wait too long at $(3*conn.conn.heartbeat) secs)") - end - - testlog("closing down...") - success, message_count = queue_purge(chan1, QUEUE1) - @test success - @test message_count == 0 - @test queue_unbind(chan1, QUEUE1, EXCG_DIRECT, ROUTE1) - success, message_count = queue_delete(chan1, QUEUE1) - @test success - @test message_count == 0 + chan_ref = nothing + # must reconnect as channel gets closed after a channel exception + channel(conn, AMQPClient.UNUSED_CHANNEL, true) do chan1 + @test chan1.id == 1 + + # create and bind queues + @info("creating queues") + success, queue_name, message_count, consumer_count = queue_declare(chan1, QUEUE1) + @test success + @test message_count == 0 + @test consumer_count == 0 + + @test queue_bind(chan1, QUEUE1, EXCG_DIRECT, ROUTE1) + + # rabbitmq 3.6.5 does not support qos + # basic_qos(chan1, 1024*10, 10, false) + + M = Message(Vector{UInt8}("hello world"), content_type="text/plain", delivery_mode=PERSISTENT) + + @info("testing basic publish and get") + # publish 10 messages + for idx in 1:10 + basic_publish(chan1, M; exchange=EXCG_DIRECT, routing_key=ROUTE1) + flush(chan1) + @test !isready(chan1.conn.sendq) + end + + # basic get 10 messages + for idx in 1:10 + result = basic_get(chan1, QUEUE1, false) + @test result !== nothing + rcvd_msg = result + basic_ack(chan1, rcvd_msg.delivery_tag) + @test rcvd_msg.remaining == (10-idx) + @test rcvd_msg.exchange == EXCG_DIRECT + @test rcvd_msg.redelivered == false + @test rcvd_msg.routing_key == ROUTE1 + @test rcvd_msg.data == M.data + @test :content_type in keys(rcvd_msg.properties) + @test convert(String, rcvd_msg.properties[:content_type]) == "text/plain" + end + + # basic get returns null if no more messages + @test basic_get(chan1, QUEUE1, false) === nothing + + ## test reject and requeue + basic_publish(chan1, M; exchange=EXCG_DIRECT, routing_key=ROUTE1) + + result = basic_get(chan1, QUEUE1, false) + @test result !== nothing + rcvd_msg = result + @test rcvd_msg.redelivered == false + + basic_reject(chan1, rcvd_msg.delivery_tag; requeue=true) + + result = basic_get(chan1, QUEUE1, false) + @test result !== nothing + rcvd_msg = result + @test rcvd_msg.redelivered == true + + basic_ack(chan1, rcvd_msg.delivery_tag) + + @info("testing basic consumer") + # start a consumer task + global msg_count = 0 + consumer_fn = (rcvd_msg) -> begin + @test rcvd_msg.exchange == EXCG_DIRECT + @test rcvd_msg.redelivered == false + @test rcvd_msg.routing_key == ROUTE1 + @test rcvd_msg.data == M.data + global msg_count + msg_count += 1 + println("received msg $(msg_count): $(String(rcvd_msg.data))") + basic_ack(chan1, rcvd_msg.delivery_tag) + end + success, consumer_tag = basic_consume(chan1, QUEUE1, consumer_fn) + @test success + + # publish 10 messages + for idx in 1:10 + basic_publish(chan1, M; exchange=EXCG_DIRECT, routing_key=ROUTE1) + end + + # wait for a reasonable time to receive all messages + for idx in 1:10 + (msg_count == 10) && break + sleep(1) + end + @test msg_count == 10 + + # cancel the consumer task + @test basic_cancel(chan1, consumer_tag) + + # test transactions + @info("testing tx") + @test tx_select(chan1) + @test tx_commit(chan1) + @test tx_rollback(chan1) + + # test heartbeats + if 120 >= conn.conn.heartbeat > 0 + c = conn.conn + @info("testing heartbeats (waiting $(3*c.heartbeat) secs)...") + ts1 = c.heartbeat_time_server + tc1 = c.heartbeat_time_client + sleeptime = c.heartbeat/2 + for idx in 1:6 + (c.heartbeat_time_server > ts1) && (c.heartbeat_time_client > tc1) && break + sleep(sleeptime) + end + @test c.heartbeat_time_server > ts1 + @test c.heartbeat_time_client > tc1 + else + @info("not testing heartbeats (wait too long at $(3*conn.conn.heartbeat) secs)") + end + + @info("closing down") + success, message_count = queue_purge(chan1, QUEUE1) + @test success + @test message_count == 0 + + @test queue_unbind(chan1, QUEUE1, EXCG_DIRECT, ROUTE1) + success, message_count = queue_delete(chan1, QUEUE1) + @test success + @test message_count == 0 + + # delete exchanges + @test exchange_delete(chan1, EXCG_DIRECT; nowait=true) + @test exchange_delete(chan1, EXCG_FANOUT) + + chan_ref = chan1 # to do additional tests on a closed channel + end - # delete exchanges - @test exchange_delete(chan1, EXCG_DIRECT; nowait=true) - @test exchange_delete(chan1, EXCG_FANOUT) + close(chan_ref) # closing a closed channel should not be an issue + AMQPClient.wait_for_state(chan_ref, AMQPClient.CONN_STATE_CLOSED) + @test !isopen(chan_ref) - # close channels and connection - close(chan1) - AMQPClient.wait_for_state(chan1, AMQPClient.CONN_STATE_CLOSED) - @test !isopen(chan1) + conn_ref = conn # to do additional tests on a closed connection + end - close(conn) - AMQPClient.wait_for_state(conn, AMQPClient.CONN_STATE_CLOSED) - @test !isopen(conn) + # closing a closed connection should not be an issue + close(conn_ref) + AMQPClient.wait_for_state(conn_ref, AMQPClient.CONN_STATE_CLOSED) + @test !isopen(conn_ref) - testlog("done.") + @info("done") nothing end diff --git a/test/test_rpc.jl b/test/test_rpc.jl index 459f7bc..47ae578 100644 --- a/test/test_rpc.jl +++ b/test/test_rpc.jl @@ -8,32 +8,32 @@ const NRPC_MSGS = 100 const NRPC_CLNTS = 4 const NRPC_SRVRS = 4 const server_lck = Ref(ReentrantLock()) -const queue_declared = Ref(false) const servers_done = Channel{Int}(NRPC_SRVRS) +const server_rpc_count = Ref(0) -testlog(msg) = println(msg) - -function test_rpc_client(reply_queue_id; virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT, auth_params=AMQPClient.DEFAULT_AUTH_PARAMS) +function test_rpc_client(reply_queue_id; virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT, auth_params=AMQPClient.DEFAULT_AUTH_PARAMS, amqps=amqps) + rpc_queue_name = QUEUE_RPC * ((amqps === nothing) ? "amqp" : "amqps") # open a connection - testlog("client opening connection...") - conn = connection(;virtualhost=virtualhost, host=host, port=port, auth_params=auth_params) + @info("client opening connection", reply_queue_id) + conn = connection(;virtualhost=virtualhost, host=host, port=port, auth_params=auth_params, amqps=amqps) # open a channel - testlog("client opening channel...") + @debug("client opening channel") chan1 = channel(conn, AMQPClient.UNUSED_CHANNEL, true) # create a reply queue for a client - queue_name = QUEUE_RPC * "_" * string(reply_queue_id) * "_" * string(getpid()) - testlog("client creating queue " * queue_name * "...") + queue_name = rpc_queue_name * "_" * string(reply_queue_id) * "_" * string(getpid()) + @debug("client creating queue", queue_name) success, queue_name, message_count, consumer_count = queue_declare(chan1, queue_name; exclusive=true) + @test success - testlog("client testing rpc...") + @debug("client testing rpc") rpc_reply_count = 0 rpc_fn = (rcvd_msg) -> begin rpc_reply_count += 1 msg_str = String(rcvd_msg.data) - println("client ", msg_str) + @debug("client", reply_quque_id, msg_str) basic_ack(chan1, rcvd_msg.delivery_tag) end @@ -48,7 +48,7 @@ function test_rpc_client(reply_queue_id; virtualhost="/", host="localhost", port while correlation_id < NRPC_MSGS correlation_id += 1 M = Message(Vector{UInt8}("hello from " * queue_name), content_type="text/plain", delivery_mode=PERSISTENT, reply_to=queue_name, correlation_id=string(correlation_id)) - basic_publish(chan1, M; exchange=default_exchange_name(), routing_key=QUEUE_RPC) + basic_publish(chan1, M; exchange=default_exchange_name(), routing_key=rpc_queue_name) # sleep a random time between 1 and 5 seconds between requests sleep(rand()) end @@ -57,7 +57,7 @@ function test_rpc_client(reply_queue_id; virtualhost="/", host="localhost", port sleep(1) end - testlog("client closing down...") + @debug("client closing down", reply_queue_id) success, message_count = queue_purge(chan1, queue_name) @test success @test message_count == 0 @@ -77,42 +77,40 @@ function test_rpc_client(reply_queue_id; virtualhost="/", host="localhost", port AMQPClient.wait_for_state(conn, AMQPClient.CONN_STATE_CLOSED) @test !isopen(conn) - testlog("client done.") + @info("client done", reply_queue_id, rpc_reply_count) end -function test_rpc_server(my_server_id; virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT, auth_params=AMQPClient.DEFAULT_AUTH_PARAMS) + +function test_rpc_server(my_server_id; virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT, auth_params=AMQPClient.DEFAULT_AUTH_PARAMS, amqps=amqps) + rpc_queue_name = QUEUE_RPC * ((amqps === nothing) ? "amqp" : "amqps") # open a connection - testlog("server $my_server_id opening connection...") - conn = connection(;virtualhost=virtualhost, host=host, port=port, auth_params=auth_params) + @info("server opening connection", my_server_id) + conn = connection(;virtualhost=virtualhost, host=host, port=port, auth_params=auth_params, amqps=amqps) # open a channel - testlog("server $my_server_id opening channel...") + @debug("server opening channel", my_server_id) chan1 = channel(conn, AMQPClient.UNUSED_CHANNEL, true) # create queues (no need to bind if we are using the default exchange) lock(server_lck[]) do - if !(queue_declared[]) - testlog("server $my_server_id creating queues...") - # this is the callback queue - success, message_count, consumer_count = queue_declare(chan1, QUEUE_RPC) - @test success - queue_declared[] = true - end + @debug("server creating queues", my_server_id) + # this is the callback queue + success, message_count, consumer_count = queue_declare(chan1, rpc_queue_name) + @test success end # test RPC - testlog("server $my_server_id testing rpc...") - global rpc_count = 0 + @debug("server testing rpc", my_server_id) rpc_fn = (rcvd_msg) -> begin - global rpc_count - rpc_count += 1 - + rpc_count = lock(server_lck[]) do + server_rpc_count[] = server_rpc_count[] + 1 + end @test :reply_to in keys(rcvd_msg.properties) reply_to = convert(String, rcvd_msg.properties[:reply_to]) correlation_id = convert(String, rcvd_msg.properties[:correlation_id]) resp_str = "$(my_server_id) received msg $(rpc_count) - $(reply_to): $(String(rcvd_msg.data))" - println("server ", resp_str) + @debug("server response", resp_str) M = Message(Vector{UInt8}(resp_str), content_type="text/plain", delivery_mode=PERSISTENT, correlation_id=correlation_id) basic_publish(chan1, M; exchange=default_exchange_name(), routing_key=reply_to) @@ -121,30 +119,35 @@ function test_rpc_server(my_server_id; virtualhost="/", host="localhost", port=A end # start a consumer task - success, consumer_tag = basic_consume(chan1, QUEUE_RPC, rpc_fn) + success, consumer_tag = basic_consume(chan1, rpc_queue_name, rpc_fn) @test success - while (rpc_count < NRPC_MSGS*NRPC_CLNTS) - sleep(1) + server_done = false + while !server_done + sleep(5) + lock(server_lck[]) do + server_done = (server_rpc_count[] >= NRPC_MSGS*NRPC_CLNTS) + @debug("rpc_count", server_rpc_count[], my_server_id) + end end - testlog("server $my_server_id closing down...") + @debug("server closing down", my_server_id) @test basic_cancel(chan1, consumer_tag) - testlog("server $my_server_id cancelled consumer...") + @debug("server cancelled consumer", my_server_id) lock(server_lck[]) do take!(servers_done) # the last server to finish will purge and delete the queue if length(servers_done.data) == 0 - success, message_count = queue_purge(chan1, QUEUE_RPC) + success, message_count = queue_purge(chan1, rpc_queue_name) @test success @test message_count == 0 - testlog("server $my_server_id purged queue...") + @debug("server purged queue", my_server_id) - success, message_count = queue_delete(chan1, QUEUE_RPC) + success, message_count = queue_delete(chan1, rpc_queue_name) @test success @test message_count == 0 - testlog("server $my_server_id deleted rpc queue") + @debug("server deleted rpc queue", my_server_id) end end @@ -157,29 +160,43 @@ function test_rpc_server(my_server_id; virtualhost="/", host="localhost", port=A AMQPClient.wait_for_state(conn, AMQPClient.CONN_STATE_CLOSED) @test !isopen(conn) - testlog("server $my_server_id done.") + @info("server done", my_server_id) nothing end -function runtests() - testlog("testing multiple client server rpc") - +function runtests(; host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT, amqps=nothing) + @info("testing multiple client server rpc") + server_rpc_count[] = 0 + for idx in 1:NRPC_SRVRS put!(servers_done, idx) end @sync begin for idx in 1:NRPC_SRVRS - @async test_rpc_server(idx) + @async begin + try + test_rpc_server(idx, host=host, port=port, amqps=amqps) + catch ex + @error("server exception", exception=(ex,catch_backtrace())) + rethrow() + end + end end for idx in 1:NRPC_CLNTS - @async test_rpc_client(idx) + @async begin + try + test_rpc_client(idx, host=host, port=port, amqps=amqps) + catch ex + @error("client exception", exception=(ex,catch_backtrace())) + rethrow() + end + end end end - testlog("done") + @info("testing multiple client server rpc done") end -end # module AMQPTestRPC - +end # module AMQPTestRPC \ No newline at end of file diff --git a/test/test_throughput.jl b/test/test_throughput.jl index 63f4dd0..f65e506 100644 --- a/test/test_throughput.jl +++ b/test/test_throughput.jl @@ -13,20 +13,19 @@ const no_ack = true const M = Message(rand(UInt8, 1024), content_type="application/octet-stream", delivery_mode=PERSISTENT) -testlog(msg) = println(msg) - -function setup(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT, auth_params=AMQPClient.DEFAULT_AUTH_PARAMS) +function setup(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT, auth_params=AMQPClient.DEFAULT_AUTH_PARAMS, tls=false) # open a connection - testlog("opening connection...") - conn = connection(;virtualhost=virtualhost, host=host, port=port, auth_params=auth_params) + @debug("opening connection") + amqps = tls ? amqps_configure() : nothing + conn = connection(;virtualhost=virtualhost, host=host, port=port, auth_params=auth_params, amqps=amqps) # open a channel - testlog("opening channel...") + @debug("opening channel") chan1 = channel(conn, AMQPClient.UNUSED_CHANNEL, true) @test chan1.id == 1 # create and bind queues - testlog("creating queues...") + @debug("creating queues") success, name, message_count, consumer_count = queue_declare(chan1, QUEUE1) @test success @test message_count == 0 @@ -37,7 +36,7 @@ function setup(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAULT_ end function teardown(conn, chan1, delete=false) - testlog("closing down...") + @info("closing down") if delete success, message_count = queue_purge(chan1, QUEUE1) @test success @@ -60,30 +59,28 @@ function teardown(conn, chan1, delete=false) end function publish(conn, chan1) - testlog("starting basic publisher...") + @info("starting basic publisher") # publish N messages for idx in 1:NMSGS basic_publish(chan1, M; exchange=EXCG_DIRECT, routing_key=ROUTE1) if (idx % 10000) == 0 - println("publishing $idx ...") + @info("publishing", idx) sleep(1) end end end function consume(conn, chan1) - testlog("starting basic consumer...") + @info("starting basic consumer") # start a consumer task - global msg_count = 0 - global start_time = time() - global end_time = 0 + msg_count = 0 + start_time = time() + end_time = 0 consumer_fn = (rcvd_msg) -> begin - global msg_count - global end_time msg_count += 1 if ((msg_count % 10000) == 0) || (msg_count == NMSGS) #basic_ack(chan1, 0; all_upto=true) - println("ack sent $msg_count ...") + @info("ack sent", msg_count) end no_ack || basic_ack(chan1, rcvd_msg.delivery_tag) if msg_count == NMSGS @@ -95,7 +92,7 @@ function consume(conn, chan1) # wait to receive all messages while msg_count < NMSGS - println("$msg_count of $NMSGS messages processed") + @info("$msg_count of $NMSGS messages processed") sleep(2) end @@ -104,47 +101,53 @@ function consume(conn, chan1) # time to send and receive total_time = max(end_time - start_time, 1) - println("time to send and receive $NMSGS messages: $(end_time - start_time) secs @ $(NMSGS/total_time) msgs per second") + @info("time to send and receive", message_count=NMSGS, total_time, rate=NMSGS/total_time) end function run_publisher() - conn, chan1 = AMQPTestThroughput.setup() + host = ARGS[2] + port = parse(Int, ARGS[3]) + tls = parse(Bool, ARGS[4]) + conn, chan1 = AMQPTestThroughput.setup(; host=host, port=port, tls=tls) AMQPTestThroughput.publish(conn, chan1) AMQPTestThroughput.teardown(conn, chan1, false) # exit without destroying queue nothing end function run_consumer() - conn, chan1 = AMQPTestThroughput.setup() + host = ARGS[2] + port = parse(Int, ARGS[3]) + tls = parse(Bool, ARGS[4]) + conn, chan1 = AMQPTestThroughput.setup(; host=host, port=port, tls=tls) AMQPTestThroughput.consume(conn, chan1) - println("waiting for publisher to exit gracefully...") + @debug("waiting for publisher to exit gracefully...") sleep(10) # wait for publisher to exit gracefully AMQPTestThroughput.teardown(conn, chan1, true) nothing end -function spawn_test(script, flags) +function spawn_test(script, flags, host, port, tls) opts = Base.JLOptions() inline_flag = opts.can_inline == 1 ? `` : `--inline=no` cov_flag = (opts.code_coverage == 1) ? `--code-coverage=user` : (opts.code_coverage == 2) ? `--code-coverage=all` : `` srvrscript = joinpath(dirname(@__FILE__), script) - srvrcmd = `$(joinpath(JULIA_HOME, "julia")) $cov_flag $inline_flag $srvrscript $flags` - println("Running tests from ", script, "\n", "="^60) + srvrcmd = `$(joinpath(JULIA_HOME, "julia")) $cov_flag $inline_flag $srvrscript $flags $host $port $tls` + @debug("Running tests from ", script, flags, host, port, tls) ret = run(srvrcmd) - println("Finished ", script, "\n", "="^60) + @debug("Finished ", script, flags, host, port, tls) nothing end -function runtests() - println("starting consumer") - consumer = @async spawn_test("test_throughput.jl", "--runconsumer") - sleep(10) - println("starting publisher") - publisher = @async spawn_test("test_throughput.jl", "--runpublisher") - wait(consumer) - wait(publisher) +function runtests(; host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT, tls=false) + @sync begin + @info("starting consumer") + consumer = @async spawn_test("test_throughput.jl", "--runconsumer", host, port, tls) + sleep(10) + @info("starting publisher") + publisher = @async spawn_test("test_throughput.jl", "--runpublisher", host, port, tls) + end nothing end