diff --git a/src/protocol.jl b/src/protocol.jl index 8459c6a..75f89cb 100644 --- a/src/protocol.jl +++ b/src/protocol.jl @@ -164,9 +164,19 @@ const CONN_STATE_OPENING = 1 const CONN_STATE_OPEN = 2 const CONN_STATE_CLOSING = 3 const CONN_MAX_QUEUED = 1024 #typemax(Int) +const DEFAULT_KEEPALIVE_SECS = 60 abstract type AbstractChannel end +function keepalive!(sock, enable::Bool; interval::Integer=DEFAULT_KEEPALIVE_SECS) + @debug("setting tcp keepalive on tcp socket", enable, interval) + err = ccall(:uv_tcp_keepalive, Cint, (Ptr{Nothing}, Cint, Cuint), sock.handle, enable, interval) + if err != 0 + throw(AMQPProtocolException("error setting keepalive on socket to $enable with interval $interval")) + end + return sock +end + mutable struct Connection virtualhost::String host::String @@ -178,6 +188,9 @@ mutable struct Connection channelmax::TAMQPShortInt framemax::TAMQPLongInt heartbeat::TAMQPShortInt + enable_heartbeat::Bool + keepalive::Integer + enable_keepalive::Bool state::UInt8 sendq::Channel{TAMQPGenericFrame} @@ -191,12 +204,22 @@ mutable struct Connection heartbeat_time_server::Float64 heartbeat_time_client::Float64 - function Connection(; virtualhost::String="/", host::String="localhost", port::Int=AMQP_DEFAULT_PORT, send_queue_size::Int=CONN_MAX_QUEUED) + function Connection(; + virtualhost::String="/", + host::String="localhost", + port::Int=AMQP_DEFAULT_PORT, + send_queue_size::Int=CONN_MAX_QUEUED, + heartbeat::Integer=0, + enable_heartbeat::Bool=true, + keepalive::Integer=DEFAULT_KEEPALIVE_SECS, + enable_keepalive::Bool=true, + ) sendq = Channel{TAMQPGenericFrame}(send_queue_size) sendlck = Channel{UInt8}(1) put!(sendlck, 1) new(virtualhost, host, port, nothing, - Dict{Symbol,Any}(), Dict{String,Any}(), 0, 0, 0, + Dict{Symbol,Any}(), Dict{String,Any}(), 0, 0, + heartbeat, enable_heartbeat, keepalive, enable_keepalive, CONN_STATE_CLOSED, sendq, sendlck, Dict{TAMQPChannel, AbstractChannel}(), nothing, nothing, nothing, 0.0, 0.0) @@ -502,6 +525,21 @@ function find_unused_channel(c::Connection) end throw(AMQPClientException("No free channel available (max: $maxid)")) end + +""" + channel(conn, id, create) + channel(f, args...) + +Create or return an existing a channel object. +Multiple channels can be multiplexed over a single connection. +Can be used with the Julia do block syntax to create a channel and close it afterwards. + +- `conn`: The connection over which to create the channel. +- `id`: Channels are identified by their numeric id. Specifying `AMQPClient.UNUSED_CHANNEL` as channel + id during creation will automatically assign an unused id. +- `create`: If true, a new channel will be created. Else an existing channel with the specified id + will be returned. +""" channel(c::MessageChannel, id::Integer) = channel(c.conn, id) channel(c::Connection, id::Integer) = c.channels[id] channel(c::MessageChannel, id::Integer, create::Bool) = channel(c.conn, id, create) @@ -546,26 +584,83 @@ function channel(f, args...; kwargs...) end end +""" + connection(f; kwargs...) + + connection(; + virtualhost = "/", + host = "localhost", + port = AMQPClient.AMQP_DEFAULT_PORT, + framemax = 0, + heartbeat = true, + keepalive = DEFAULT_KEEPALIVE_SECS, + send_queue_size = CONN_MAX_QUEUED, + auth_params = AMQPClient.DEFAULT_AUTH_PARAMS, + channelmax = AMQPClient.DEFAULT_CHANNELMAX, + connect_timeout = AMQPClient.DEFAULT_CONNECT_TIMEOUT, + amqps = nothing + ) + +Creates a fresh connection to the AMQP server. +Returns a connection that can be used to open channels subsequently. +Can be used with the Julia do block syntax to create a connection and close it afterwards. + +Keyword arguments: +- `host`: The message server host to connect to. Defaults to "localhost". +- `port`: The message server port to connect to. Defaults to the default AMQP port. +- `virtualhost`: The virtual host to connect to. Defaults to "/". +- `amqps`: If connection is to be done over AMQPS, the TLS options to use. See `amqps_configure`. +- `connect_timeout`: TCP connect timeout to impose. Default `AMQPClient.DEFAULT_CONNECT_TIMEOUT`, +- `framemax`: The maximum frame size to use. Defaults to 0, which means no limit. +- `heartbeat`: `true` to enable heartbeat, `false` to disable. Can also be set to a positive integer, + in which case it is the heartbeat interval in seconds. Defaults to `true`. If `false`, ensure + `keepalive` is enabled to detect dead connections. This parameter is negotiated with the server. +- `keepalive`: `true` to enable TCP keepalives, `false` to disable. Can also be set to a positive integer, + in which case it is the keepalive interval in seconds. Defaults to `DEFAULT_KEEPALIVE_SECS`. +- `send_queue_size`: Maximum number of items to buffer in memory before blocking the send API until + messages are drained. Defaults to CONN_MAX_QUEUED. +- `auth_params`: Parameters to use to authenticate the connection. Defaults to AMQPClient.DEFAULT_AUTH_PARAMS. +- `channelmax`: Maximum channel number to impose/negotiate with the server. Defaults to AMQPClient.DEFAULT_CHANNELMAX. + +""" function connection(; virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT, framemax=0, - heartbeat=0, + heartbeat::Union{Int,Bool}=true, + keepalive::Union{Int,Bool}=DEFAULT_KEEPALIVE_SECS, send_queue_size::Integer=CONN_MAX_QUEUED, auth_params=AMQPClient.DEFAULT_AUTH_PARAMS, channelmax::Integer=AMQPClient.DEFAULT_CHANNELMAX, connect_timeout=AMQPClient.DEFAULT_CONNECT_TIMEOUT, amqps::Union{MbedTLS.SSLConfig,Nothing}=nothing) @debug("connecting", host, port, virtualhost) - conn = Connection(; virtualhost=virtualhost, host=host, port=port, send_queue_size=send_queue_size) + + keepalive_interval = isa(keepalive, Bool) ? DEFAULT_KEEPALIVE_SECS : keepalive + enable_keepalive = isa(keepalive, Bool) ? keepalive : (keepalive_interval > 0) + + heartbeat_interval = isa(heartbeat, Bool) ? 0 : heartbeat + enable_heartbeat = isa(heartbeat, Bool) ? heartbeat : (heartbeat > 0) + + conn = Connection(; + virtualhost=virtualhost, + host=host, + port=port, + send_queue_size=send_queue_size, + heartbeat=heartbeat_interval, + enable_heartbeat=enable_heartbeat, + keepalive=keepalive_interval, + enable_keepalive=enable_keepalive,) chan = channel(conn, AMQPClient.DEFAULT_CHANNEL, true) # setup handler for Connection.Start - ctx = Dict(:auth_params=>auth_params, :channelmax=>channelmax, :framemax=>framemax, :heartbeat=>heartbeat) + ctx = Dict(:auth_params=>auth_params, :channelmax=>channelmax, :framemax=>framemax, :heartbeat=>heartbeat_interval) AMQPClient.handle(chan, :Connection, :Start, AMQPClient.on_connection_start, ctx) # open socket and start processor tasks sock = connect(conn.host, conn.port) isdefined(Sockets, :nagle) && Sockets.nagle(sock, false) isdefined(Sockets, :quickack) && Sockets.quickack(sock, true) + keepalive!(sock, enable_keepalive; interval=keepalive_interval) + conn.sock = (amqps !== nothing) ? setup_tls(sock, host, amqps) : sock conn.sender = @async AMQPClient.connection_processor(conn, "ConnectionSender", AMQPClient.connection_sender) conn.receiver = @async AMQPClient.connection_processor(conn, "ConnectionReceiver", AMQPClient.connection_receiver) @@ -1119,13 +1214,15 @@ function send_connection_tune_ok(chan::MessageChannel, channelmax=0, framemax=0, conn.channelmax = opt(channelmax, conn.channelmax) conn.framemax = opt(framemax, conn.framemax) - conn.heartbeat = opt(heartbeat, conn.heartbeat) + conn.heartbeat = conn.enable_heartbeat ? opt(heartbeat, conn.heartbeat) : 0 @debug("send_connection_tune_ok", channelmax=conn.channelmax, framemax=conn.framemax, heartbeat=conn.heartbeat) send(chan, TAMQPMethodPayload(:Connection, :TuneOk, (conn.channelmax, conn.framemax, conn.heartbeat))) - # start heartbeat timer - conn.heartbeater = @async connection_processor(conn, "HeartBeater", connection_heartbeater) + if conn.enable_heartbeat + # start heartbeat timer + conn.heartbeater = @async connection_processor(conn, "HeartBeater", connection_heartbeater) + end nothing end diff --git a/test/runtests.jl b/test/runtests.jl index aa79838..6388753 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,25 +1,63 @@ using AMQPClient +using Test include("test_coverage.jl") include("test_throughput.jl") include("test_rpc.jl") -AMQPTestCoverage.runtests() -AMQPTestThroughput.runtests() -AMQPTestRPC.runtests() +@testset "AMQPClient" begin + @testset "AMQP" begin + @testset "Functionality" begin + for keepalive in [true, false] + for heartbeat in (true, false) + @testset "keepalive=$keepalive,heartbeat=$heartbeat" begin + AMQPTestCoverage.runtests(; keepalive=keepalive, heartbeat=heartbeat) + end + end + end + end + # @testset "Throughput" begin + # AMQPTestThroughput.runtests() + # end + # @testset "RPC" begin + # AMQPTestRPC.runtests() + # end + end -if length(ARGS) > 0 - amqps_host = ARGS[1] - virtualhost = ARGS[2] - port = AMQPClient.AMQPS_DEFAULT_PORT + if length(ARGS) > 0 + @testset "AMQPS" begin + amqps_host = ARGS[1] + virtualhost = ARGS[2] + port = AMQPClient.AMQPS_DEFAULT_PORT - login = ENV["AMQPPLAIN_LOGIN"] - password = ENV["AMQPPLAIN_PASSWORD"] - auth_params = Dict{String,Any}("MECHANISM"=>"AMQPLAIN", "LOGIN"=>login, "PASSWORD"=>password) + login = ENV["AMQPPLAIN_LOGIN"] + password = ENV["AMQPPLAIN_PASSWORD"] + auth_params = Dict{String,Any}("MECHANISM"=>"AMQPLAIN", "LOGIN"=>login, "PASSWORD"=>password) - AMQPTestCoverage.runtests(; host=amqps_host, port=AMQPClient.AMQPS_DEFAULT_PORT, virtualhost=virtualhost, amqps=amqps_configure(), auth_params=auth_params) - AMQPTestThroughput.runtests(; host=amqps_host, port=AMQPClient.AMQPS_DEFAULT_PORT, tls=true) - AMQPTestRPC.runtests(; host=amqps_host, port=AMQPClient.AMQPS_DEFAULT_PORT, amqps=amqps_configure()) + @testset "Functionality" begin + for keepalive in [true, false] + for heartbeat in (true, false) + @testset "keepalive=$keepalive,heartbeat=$heartbeat" begin + AMQPTestCoverage.runtests(; + host=amqps_host, + port=AMQPClient.AMQPS_DEFAULT_PORT, + virtualhost=virtualhost, + amqps=amqps_configure(), + auth_params=auth_params, + keepalive=keepalive, + heartbeat=heartbeat) + end + end + end + end + @testset "Throughput" begin + AMQPTestThroughput.runtests(; host=amqps_host, port=AMQPClient.AMQPS_DEFAULT_PORT, tls=true) + end + @testset "RPC" begin + AMQPTestRPC.runtests(; host=amqps_host, port=AMQPClient.AMQPS_DEFAULT_PORT, amqps=amqps_configure()) + end + end + end end exit(0) diff --git a/test/test_coverage.jl b/test/test_coverage.jl index 9f27dd3..e518db1 100644 --- a/test/test_coverage.jl +++ b/test/test_coverage.jl @@ -10,7 +10,7 @@ const QUEUE1 = "queue1" const ROUTE1 = "key1" const invalid_auth_params = Dict{String,Any}("MECHANISM"=>"AMQPLAIN", "LOGIN"=>randstring(10), "PASSWORD"=>randstring(10)) -function runtests(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT, auth_params=AMQPClient.DEFAULT_AUTH_PARAMS, amqps=nothing) +function runtests(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT, auth_params=AMQPClient.DEFAULT_AUTH_PARAMS, amqps=nothing, keepalive=true, heartbeat=true) verify_spec() test_types() @test default_exchange_name("direct") == "amq.direct" @@ -24,7 +24,7 @@ function runtests(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAU # open a connection @info("opening connection") - connection(;virtualhost=virtualhost, host=host, port=port, amqps=amqps, auth_params=auth_params, send_queue_size=512) do conn + connection(;virtualhost=virtualhost, host=host, port=port, amqps=amqps, auth_params=auth_params, send_queue_size=512, keepalive=keepalive, heartbeat=heartbeat) do conn @test conn.conn.sendq.sz_max == 512 # open a channel @@ -159,6 +159,8 @@ function runtests(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAU end @test c.heartbeat_time_server > ts1 @test c.heartbeat_time_client > tc1 + elseif conn.conn.heartbeat == 0 + @info("heartbeat disabled") else @info("not testing heartbeats (wait too long at $(3*conn.conn.heartbeat) secs)") end