Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow using tcp keepalives instead of heartbeats #47

Merged
merged 1 commit into from
Nov 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 105 additions & 8 deletions src/protocol.jl
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,19 @@ const CONN_STATE_OPENING = 1
const CONN_STATE_OPEN = 2
const CONN_STATE_CLOSING = 3
const CONN_MAX_QUEUED = 1024 #typemax(Int)
const DEFAULT_KEEPALIVE_SECS = 60

abstract type AbstractChannel end

function keepalive!(sock, enable::Bool; interval::Integer=DEFAULT_KEEPALIVE_SECS)
@debug("setting tcp keepalive on tcp socket", enable, interval)
err = ccall(:uv_tcp_keepalive, Cint, (Ptr{Nothing}, Cint, Cuint), sock.handle, enable, interval)
if err != 0
throw(AMQPProtocolException("error setting keepalive on socket to $enable with interval $interval"))
end
return sock
end

mutable struct Connection
virtualhost::String
host::String
Expand All @@ -178,6 +188,9 @@ mutable struct Connection
channelmax::TAMQPShortInt
framemax::TAMQPLongInt
heartbeat::TAMQPShortInt
enable_heartbeat::Bool
keepalive::Integer
enable_keepalive::Bool

state::UInt8
sendq::Channel{TAMQPGenericFrame}
Expand All @@ -191,12 +204,22 @@ mutable struct Connection
heartbeat_time_server::Float64
heartbeat_time_client::Float64

function Connection(; virtualhost::String="/", host::String="localhost", port::Int=AMQP_DEFAULT_PORT, send_queue_size::Int=CONN_MAX_QUEUED)
function Connection(;
virtualhost::String="/",
host::String="localhost",
port::Int=AMQP_DEFAULT_PORT,
send_queue_size::Int=CONN_MAX_QUEUED,
heartbeat::Integer=0,
enable_heartbeat::Bool=true,
keepalive::Integer=DEFAULT_KEEPALIVE_SECS,
enable_keepalive::Bool=true,
)
sendq = Channel{TAMQPGenericFrame}(send_queue_size)
sendlck = Channel{UInt8}(1)
put!(sendlck, 1)
new(virtualhost, host, port, nothing,
Dict{Symbol,Any}(), Dict{String,Any}(), 0, 0, 0,
Dict{Symbol,Any}(), Dict{String,Any}(), 0, 0,
heartbeat, enable_heartbeat, keepalive, enable_keepalive,
CONN_STATE_CLOSED, sendq, sendlck, Dict{TAMQPChannel, AbstractChannel}(),
nothing, nothing, nothing,
0.0, 0.0)
Expand Down Expand Up @@ -502,6 +525,21 @@ function find_unused_channel(c::Connection)
end
throw(AMQPClientException("No free channel available (max: $maxid)"))
end

"""
channel(conn, id, create)
channel(f, args...)

Create or return an existing a channel object.
Multiple channels can be multiplexed over a single connection.
Can be used with the Julia do block syntax to create a channel and close it afterwards.

- `conn`: The connection over which to create the channel.
- `id`: Channels are identified by their numeric id. Specifying `AMQPClient.UNUSED_CHANNEL` as channel
id during creation will automatically assign an unused id.
- `create`: If true, a new channel will be created. Else an existing channel with the specified id
will be returned.
"""
channel(c::MessageChannel, id::Integer) = channel(c.conn, id)
channel(c::Connection, id::Integer) = c.channels[id]
channel(c::MessageChannel, id::Integer, create::Bool) = channel(c.conn, id, create)
Expand Down Expand Up @@ -546,26 +584,83 @@ function channel(f, args...; kwargs...)
end
end

"""
connection(f; kwargs...)

connection(;
virtualhost = "/",
host = "localhost",
port = AMQPClient.AMQP_DEFAULT_PORT,
framemax = 0,
heartbeat = true,
keepalive = DEFAULT_KEEPALIVE_SECS,
send_queue_size = CONN_MAX_QUEUED,
auth_params = AMQPClient.DEFAULT_AUTH_PARAMS,
channelmax = AMQPClient.DEFAULT_CHANNELMAX,
connect_timeout = AMQPClient.DEFAULT_CONNECT_TIMEOUT,
amqps = nothing
)

Creates a fresh connection to the AMQP server.
Returns a connection that can be used to open channels subsequently.
Can be used with the Julia do block syntax to create a connection and close it afterwards.

Keyword arguments:
- `host`: The message server host to connect to. Defaults to "localhost".
- `port`: The message server port to connect to. Defaults to the default AMQP port.
- `virtualhost`: The virtual host to connect to. Defaults to "/".
- `amqps`: If connection is to be done over AMQPS, the TLS options to use. See `amqps_configure`.
- `connect_timeout`: TCP connect timeout to impose. Default `AMQPClient.DEFAULT_CONNECT_TIMEOUT`,
- `framemax`: The maximum frame size to use. Defaults to 0, which means no limit.
- `heartbeat`: `true` to enable heartbeat, `false` to disable. Can also be set to a positive integer,
in which case it is the heartbeat interval in seconds. Defaults to `true`. If `false`, ensure
`keepalive` is enabled to detect dead connections. This parameter is negotiated with the server.
- `keepalive`: `true` to enable TCP keepalives, `false` to disable. Can also be set to a positive integer,
in which case it is the keepalive interval in seconds. Defaults to `DEFAULT_KEEPALIVE_SECS`.
- `send_queue_size`: Maximum number of items to buffer in memory before blocking the send API until
messages are drained. Defaults to CONN_MAX_QUEUED.
- `auth_params`: Parameters to use to authenticate the connection. Defaults to AMQPClient.DEFAULT_AUTH_PARAMS.
- `channelmax`: Maximum channel number to impose/negotiate with the server. Defaults to AMQPClient.DEFAULT_CHANNELMAX.

"""
function connection(; virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT,
framemax=0,
heartbeat=0,
heartbeat::Union{Int,Bool}=true,
keepalive::Union{Int,Bool}=DEFAULT_KEEPALIVE_SECS,
send_queue_size::Integer=CONN_MAX_QUEUED,
auth_params=AMQPClient.DEFAULT_AUTH_PARAMS,
channelmax::Integer=AMQPClient.DEFAULT_CHANNELMAX,
connect_timeout=AMQPClient.DEFAULT_CONNECT_TIMEOUT,
amqps::Union{MbedTLS.SSLConfig,Nothing}=nothing)
@debug("connecting", host, port, virtualhost)
conn = Connection(; virtualhost=virtualhost, host=host, port=port, send_queue_size=send_queue_size)

keepalive_interval = isa(keepalive, Bool) ? DEFAULT_KEEPALIVE_SECS : keepalive
enable_keepalive = isa(keepalive, Bool) ? keepalive : (keepalive_interval > 0)

heartbeat_interval = isa(heartbeat, Bool) ? 0 : heartbeat
enable_heartbeat = isa(heartbeat, Bool) ? heartbeat : (heartbeat > 0)

conn = Connection(;
virtualhost=virtualhost,
host=host,
port=port,
send_queue_size=send_queue_size,
heartbeat=heartbeat_interval,
enable_heartbeat=enable_heartbeat,
keepalive=keepalive_interval,
enable_keepalive=enable_keepalive,)
chan = channel(conn, AMQPClient.DEFAULT_CHANNEL, true)

# setup handler for Connection.Start
ctx = Dict(:auth_params=>auth_params, :channelmax=>channelmax, :framemax=>framemax, :heartbeat=>heartbeat)
ctx = Dict(:auth_params=>auth_params, :channelmax=>channelmax, :framemax=>framemax, :heartbeat=>heartbeat_interval)
AMQPClient.handle(chan, :Connection, :Start, AMQPClient.on_connection_start, ctx)

# open socket and start processor tasks
sock = connect(conn.host, conn.port)
isdefined(Sockets, :nagle) && Sockets.nagle(sock, false)
isdefined(Sockets, :quickack) && Sockets.quickack(sock, true)
keepalive!(sock, enable_keepalive; interval=keepalive_interval)

conn.sock = (amqps !== nothing) ? setup_tls(sock, host, amqps) : sock
conn.sender = @async AMQPClient.connection_processor(conn, "ConnectionSender", AMQPClient.connection_sender)
conn.receiver = @async AMQPClient.connection_processor(conn, "ConnectionReceiver", AMQPClient.connection_receiver)
Expand Down Expand Up @@ -1119,13 +1214,15 @@ function send_connection_tune_ok(chan::MessageChannel, channelmax=0, framemax=0,

conn.channelmax = opt(channelmax, conn.channelmax)
conn.framemax = opt(framemax, conn.framemax)
conn.heartbeat = opt(heartbeat, conn.heartbeat)
conn.heartbeat = conn.enable_heartbeat ? opt(heartbeat, conn.heartbeat) : 0

@debug("send_connection_tune_ok", channelmax=conn.channelmax, framemax=conn.framemax, heartbeat=conn.heartbeat)
send(chan, TAMQPMethodPayload(:Connection, :TuneOk, (conn.channelmax, conn.framemax, conn.heartbeat)))

# start heartbeat timer
conn.heartbeater = @async connection_processor(conn, "HeartBeater", connection_heartbeater)
if conn.enable_heartbeat
# start heartbeat timer
conn.heartbeater = @async connection_processor(conn, "HeartBeater", connection_heartbeater)
end
nothing
end

Expand Down
64 changes: 51 additions & 13 deletions test/runtests.jl
Original file line number Diff line number Diff line change
@@ -1,25 +1,63 @@
using AMQPClient
using Test

include("test_coverage.jl")
include("test_throughput.jl")
include("test_rpc.jl")

AMQPTestCoverage.runtests()
AMQPTestThroughput.runtests()
AMQPTestRPC.runtests()
@testset "AMQPClient" begin
@testset "AMQP" begin
@testset "Functionality" begin
for keepalive in [true, false]
for heartbeat in (true, false)
@testset "keepalive=$keepalive,heartbeat=$heartbeat" begin
AMQPTestCoverage.runtests(; keepalive=keepalive, heartbeat=heartbeat)
end
end
end
end
# @testset "Throughput" begin
# AMQPTestThroughput.runtests()
# end
# @testset "RPC" begin
# AMQPTestRPC.runtests()
# end
end

if length(ARGS) > 0
amqps_host = ARGS[1]
virtualhost = ARGS[2]
port = AMQPClient.AMQPS_DEFAULT_PORT
if length(ARGS) > 0
@testset "AMQPS" begin
amqps_host = ARGS[1]
virtualhost = ARGS[2]
port = AMQPClient.AMQPS_DEFAULT_PORT

login = ENV["AMQPPLAIN_LOGIN"]
password = ENV["AMQPPLAIN_PASSWORD"]
auth_params = Dict{String,Any}("MECHANISM"=>"AMQPLAIN", "LOGIN"=>login, "PASSWORD"=>password)
login = ENV["AMQPPLAIN_LOGIN"]
password = ENV["AMQPPLAIN_PASSWORD"]
auth_params = Dict{String,Any}("MECHANISM"=>"AMQPLAIN", "LOGIN"=>login, "PASSWORD"=>password)

AMQPTestCoverage.runtests(; host=amqps_host, port=AMQPClient.AMQPS_DEFAULT_PORT, virtualhost=virtualhost, amqps=amqps_configure(), auth_params=auth_params)
AMQPTestThroughput.runtests(; host=amqps_host, port=AMQPClient.AMQPS_DEFAULT_PORT, tls=true)
AMQPTestRPC.runtests(; host=amqps_host, port=AMQPClient.AMQPS_DEFAULT_PORT, amqps=amqps_configure())
@testset "Functionality" begin
for keepalive in [true, false]
for heartbeat in (true, false)
@testset "keepalive=$keepalive,heartbeat=$heartbeat" begin
AMQPTestCoverage.runtests(;
host=amqps_host,
port=AMQPClient.AMQPS_DEFAULT_PORT,
virtualhost=virtualhost,
amqps=amqps_configure(),
auth_params=auth_params,
keepalive=keepalive,
heartbeat=heartbeat)
end
end
end
end
@testset "Throughput" begin
AMQPTestThroughput.runtests(; host=amqps_host, port=AMQPClient.AMQPS_DEFAULT_PORT, tls=true)
end
@testset "RPC" begin
AMQPTestRPC.runtests(; host=amqps_host, port=AMQPClient.AMQPS_DEFAULT_PORT, amqps=amqps_configure())
end
end
end
end

exit(0)
6 changes: 4 additions & 2 deletions test/test_coverage.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const QUEUE1 = "queue1"
const ROUTE1 = "key1"
const invalid_auth_params = Dict{String,Any}("MECHANISM"=>"AMQPLAIN", "LOGIN"=>randstring(10), "PASSWORD"=>randstring(10))

function runtests(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT, auth_params=AMQPClient.DEFAULT_AUTH_PARAMS, amqps=nothing)
function runtests(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT, auth_params=AMQPClient.DEFAULT_AUTH_PARAMS, amqps=nothing, keepalive=true, heartbeat=true)
verify_spec()
test_types()
@test default_exchange_name("direct") == "amq.direct"
Expand All @@ -24,7 +24,7 @@ function runtests(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAU

# open a connection
@info("opening connection")
connection(;virtualhost=virtualhost, host=host, port=port, amqps=amqps, auth_params=auth_params, send_queue_size=512) do conn
connection(;virtualhost=virtualhost, host=host, port=port, amqps=amqps, auth_params=auth_params, send_queue_size=512, keepalive=keepalive, heartbeat=heartbeat) do conn
@test conn.conn.sendq.sz_max == 512

# open a channel
Expand Down Expand Up @@ -159,6 +159,8 @@ function runtests(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAU
end
@test c.heartbeat_time_server > ts1
@test c.heartbeat_time_client > tc1
elseif conn.conn.heartbeat == 0
@info("heartbeat disabled")
else
@info("not testing heartbeats (wait too long at $(3*conn.conn.heartbeat) secs)")
end
Expand Down