Skip to content

Commit

Permalink
Merge pull request #47 from JuliaComputing/tan/checkstaleconn
Browse files Browse the repository at this point in the history
allow using tcp keepalives instead of heartbeats
  • Loading branch information
tanmaykm authored Nov 16, 2021
2 parents 6145773 + 655ac4a commit 4388aec
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 23 deletions.
113 changes: 105 additions & 8 deletions src/protocol.jl
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,19 @@ const CONN_STATE_OPENING = 1
const CONN_STATE_OPEN = 2
const CONN_STATE_CLOSING = 3
const CONN_MAX_QUEUED = 1024 #typemax(Int)
const DEFAULT_KEEPALIVE_SECS = 60

abstract type AbstractChannel end

function keepalive!(sock, enable::Bool; interval::Integer=DEFAULT_KEEPALIVE_SECS)
@debug("setting tcp keepalive on tcp socket", enable, interval)
err = ccall(:uv_tcp_keepalive, Cint, (Ptr{Nothing}, Cint, Cuint), sock.handle, enable, interval)
if err != 0
throw(AMQPProtocolException("error setting keepalive on socket to $enable with interval $interval"))
end
return sock
end

mutable struct Connection
virtualhost::String
host::String
Expand All @@ -178,6 +188,9 @@ mutable struct Connection
channelmax::TAMQPShortInt
framemax::TAMQPLongInt
heartbeat::TAMQPShortInt
enable_heartbeat::Bool
keepalive::Integer
enable_keepalive::Bool

state::UInt8
sendq::Channel{TAMQPGenericFrame}
Expand All @@ -191,12 +204,22 @@ mutable struct Connection
heartbeat_time_server::Float64
heartbeat_time_client::Float64

function Connection(; virtualhost::String="/", host::String="localhost", port::Int=AMQP_DEFAULT_PORT, send_queue_size::Int=CONN_MAX_QUEUED)
function Connection(;
virtualhost::String="/",
host::String="localhost",
port::Int=AMQP_DEFAULT_PORT,
send_queue_size::Int=CONN_MAX_QUEUED,
heartbeat::Integer=0,
enable_heartbeat::Bool=true,
keepalive::Integer=DEFAULT_KEEPALIVE_SECS,
enable_keepalive::Bool=true,
)
sendq = Channel{TAMQPGenericFrame}(send_queue_size)
sendlck = Channel{UInt8}(1)
put!(sendlck, 1)
new(virtualhost, host, port, nothing,
Dict{Symbol,Any}(), Dict{String,Any}(), 0, 0, 0,
Dict{Symbol,Any}(), Dict{String,Any}(), 0, 0,
heartbeat, enable_heartbeat, keepalive, enable_keepalive,
CONN_STATE_CLOSED, sendq, sendlck, Dict{TAMQPChannel, AbstractChannel}(),
nothing, nothing, nothing,
0.0, 0.0)
Expand Down Expand Up @@ -502,6 +525,21 @@ function find_unused_channel(c::Connection)
end
throw(AMQPClientException("No free channel available (max: $maxid)"))
end

"""
channel(conn, id, create)
channel(f, args...)
Create or return an existing a channel object.
Multiple channels can be multiplexed over a single connection.
Can be used with the Julia do block syntax to create a channel and close it afterwards.
- `conn`: The connection over which to create the channel.
- `id`: Channels are identified by their numeric id. Specifying `AMQPClient.UNUSED_CHANNEL` as channel
id during creation will automatically assign an unused id.
- `create`: If true, a new channel will be created. Else an existing channel with the specified id
will be returned.
"""
channel(c::MessageChannel, id::Integer) = channel(c.conn, id)
channel(c::Connection, id::Integer) = c.channels[id]
channel(c::MessageChannel, id::Integer, create::Bool) = channel(c.conn, id, create)
Expand Down Expand Up @@ -546,26 +584,83 @@ function channel(f, args...; kwargs...)
end
end

"""
connection(f; kwargs...)
connection(;
virtualhost = "/",
host = "localhost",
port = AMQPClient.AMQP_DEFAULT_PORT,
framemax = 0,
heartbeat = true,
keepalive = DEFAULT_KEEPALIVE_SECS,
send_queue_size = CONN_MAX_QUEUED,
auth_params = AMQPClient.DEFAULT_AUTH_PARAMS,
channelmax = AMQPClient.DEFAULT_CHANNELMAX,
connect_timeout = AMQPClient.DEFAULT_CONNECT_TIMEOUT,
amqps = nothing
)
Creates a fresh connection to the AMQP server.
Returns a connection that can be used to open channels subsequently.
Can be used with the Julia do block syntax to create a connection and close it afterwards.
Keyword arguments:
- `host`: The message server host to connect to. Defaults to "localhost".
- `port`: The message server port to connect to. Defaults to the default AMQP port.
- `virtualhost`: The virtual host to connect to. Defaults to "/".
- `amqps`: If connection is to be done over AMQPS, the TLS options to use. See `amqps_configure`.
- `connect_timeout`: TCP connect timeout to impose. Default `AMQPClient.DEFAULT_CONNECT_TIMEOUT`,
- `framemax`: The maximum frame size to use. Defaults to 0, which means no limit.
- `heartbeat`: `true` to enable heartbeat, `false` to disable. Can also be set to a positive integer,
in which case it is the heartbeat interval in seconds. Defaults to `true`. If `false`, ensure
`keepalive` is enabled to detect dead connections. This parameter is negotiated with the server.
- `keepalive`: `true` to enable TCP keepalives, `false` to disable. Can also be set to a positive integer,
in which case it is the keepalive interval in seconds. Defaults to `DEFAULT_KEEPALIVE_SECS`.
- `send_queue_size`: Maximum number of items to buffer in memory before blocking the send API until
messages are drained. Defaults to CONN_MAX_QUEUED.
- `auth_params`: Parameters to use to authenticate the connection. Defaults to AMQPClient.DEFAULT_AUTH_PARAMS.
- `channelmax`: Maximum channel number to impose/negotiate with the server. Defaults to AMQPClient.DEFAULT_CHANNELMAX.
"""
function connection(; virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT,
framemax=0,
heartbeat=0,
heartbeat::Union{Int,Bool}=true,
keepalive::Union{Int,Bool}=DEFAULT_KEEPALIVE_SECS,
send_queue_size::Integer=CONN_MAX_QUEUED,
auth_params=AMQPClient.DEFAULT_AUTH_PARAMS,
channelmax::Integer=AMQPClient.DEFAULT_CHANNELMAX,
connect_timeout=AMQPClient.DEFAULT_CONNECT_TIMEOUT,
amqps::Union{MbedTLS.SSLConfig,Nothing}=nothing)
@debug("connecting", host, port, virtualhost)
conn = Connection(; virtualhost=virtualhost, host=host, port=port, send_queue_size=send_queue_size)

keepalive_interval = isa(keepalive, Bool) ? DEFAULT_KEEPALIVE_SECS : keepalive
enable_keepalive = isa(keepalive, Bool) ? keepalive : (keepalive_interval > 0)

heartbeat_interval = isa(heartbeat, Bool) ? 0 : heartbeat
enable_heartbeat = isa(heartbeat, Bool) ? heartbeat : (heartbeat > 0)

conn = Connection(;
virtualhost=virtualhost,
host=host,
port=port,
send_queue_size=send_queue_size,
heartbeat=heartbeat_interval,
enable_heartbeat=enable_heartbeat,
keepalive=keepalive_interval,
enable_keepalive=enable_keepalive,)
chan = channel(conn, AMQPClient.DEFAULT_CHANNEL, true)

# setup handler for Connection.Start
ctx = Dict(:auth_params=>auth_params, :channelmax=>channelmax, :framemax=>framemax, :heartbeat=>heartbeat)
ctx = Dict(:auth_params=>auth_params, :channelmax=>channelmax, :framemax=>framemax, :heartbeat=>heartbeat_interval)
AMQPClient.handle(chan, :Connection, :Start, AMQPClient.on_connection_start, ctx)

# open socket and start processor tasks
sock = connect(conn.host, conn.port)
isdefined(Sockets, :nagle) && Sockets.nagle(sock, false)
isdefined(Sockets, :quickack) && Sockets.quickack(sock, true)
keepalive!(sock, enable_keepalive; interval=keepalive_interval)

conn.sock = (amqps !== nothing) ? setup_tls(sock, host, amqps) : sock
conn.sender = @async AMQPClient.connection_processor(conn, "ConnectionSender", AMQPClient.connection_sender)
conn.receiver = @async AMQPClient.connection_processor(conn, "ConnectionReceiver", AMQPClient.connection_receiver)
Expand Down Expand Up @@ -1119,13 +1214,15 @@ function send_connection_tune_ok(chan::MessageChannel, channelmax=0, framemax=0,

conn.channelmax = opt(channelmax, conn.channelmax)
conn.framemax = opt(framemax, conn.framemax)
conn.heartbeat = opt(heartbeat, conn.heartbeat)
conn.heartbeat = conn.enable_heartbeat ? opt(heartbeat, conn.heartbeat) : 0

@debug("send_connection_tune_ok", channelmax=conn.channelmax, framemax=conn.framemax, heartbeat=conn.heartbeat)
send(chan, TAMQPMethodPayload(:Connection, :TuneOk, (conn.channelmax, conn.framemax, conn.heartbeat)))

# start heartbeat timer
conn.heartbeater = @async connection_processor(conn, "HeartBeater", connection_heartbeater)
if conn.enable_heartbeat
# start heartbeat timer
conn.heartbeater = @async connection_processor(conn, "HeartBeater", connection_heartbeater)
end
nothing
end

Expand Down
64 changes: 51 additions & 13 deletions test/runtests.jl
Original file line number Diff line number Diff line change
@@ -1,25 +1,63 @@
using AMQPClient
using Test

include("test_coverage.jl")
include("test_throughput.jl")
include("test_rpc.jl")

AMQPTestCoverage.runtests()
AMQPTestThroughput.runtests()
AMQPTestRPC.runtests()
@testset "AMQPClient" begin
@testset "AMQP" begin
@testset "Functionality" begin
for keepalive in [true, false]
for heartbeat in (true, false)
@testset "keepalive=$keepalive,heartbeat=$heartbeat" begin
AMQPTestCoverage.runtests(; keepalive=keepalive, heartbeat=heartbeat)
end
end
end
end
# @testset "Throughput" begin
# AMQPTestThroughput.runtests()
# end
# @testset "RPC" begin
# AMQPTestRPC.runtests()
# end
end

if length(ARGS) > 0
amqps_host = ARGS[1]
virtualhost = ARGS[2]
port = AMQPClient.AMQPS_DEFAULT_PORT
if length(ARGS) > 0
@testset "AMQPS" begin
amqps_host = ARGS[1]
virtualhost = ARGS[2]
port = AMQPClient.AMQPS_DEFAULT_PORT

login = ENV["AMQPPLAIN_LOGIN"]
password = ENV["AMQPPLAIN_PASSWORD"]
auth_params = Dict{String,Any}("MECHANISM"=>"AMQPLAIN", "LOGIN"=>login, "PASSWORD"=>password)
login = ENV["AMQPPLAIN_LOGIN"]
password = ENV["AMQPPLAIN_PASSWORD"]
auth_params = Dict{String,Any}("MECHANISM"=>"AMQPLAIN", "LOGIN"=>login, "PASSWORD"=>password)

AMQPTestCoverage.runtests(; host=amqps_host, port=AMQPClient.AMQPS_DEFAULT_PORT, virtualhost=virtualhost, amqps=amqps_configure(), auth_params=auth_params)
AMQPTestThroughput.runtests(; host=amqps_host, port=AMQPClient.AMQPS_DEFAULT_PORT, tls=true)
AMQPTestRPC.runtests(; host=amqps_host, port=AMQPClient.AMQPS_DEFAULT_PORT, amqps=amqps_configure())
@testset "Functionality" begin
for keepalive in [true, false]
for heartbeat in (true, false)
@testset "keepalive=$keepalive,heartbeat=$heartbeat" begin
AMQPTestCoverage.runtests(;
host=amqps_host,
port=AMQPClient.AMQPS_DEFAULT_PORT,
virtualhost=virtualhost,
amqps=amqps_configure(),
auth_params=auth_params,
keepalive=keepalive,
heartbeat=heartbeat)
end
end
end
end
@testset "Throughput" begin
AMQPTestThroughput.runtests(; host=amqps_host, port=AMQPClient.AMQPS_DEFAULT_PORT, tls=true)
end
@testset "RPC" begin
AMQPTestRPC.runtests(; host=amqps_host, port=AMQPClient.AMQPS_DEFAULT_PORT, amqps=amqps_configure())
end
end
end
end

exit(0)
6 changes: 4 additions & 2 deletions test/test_coverage.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const QUEUE1 = "queue1"
const ROUTE1 = "key1"
const invalid_auth_params = Dict{String,Any}("MECHANISM"=>"AMQPLAIN", "LOGIN"=>randstring(10), "PASSWORD"=>randstring(10))

function runtests(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT, auth_params=AMQPClient.DEFAULT_AUTH_PARAMS, amqps=nothing)
function runtests(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT, auth_params=AMQPClient.DEFAULT_AUTH_PARAMS, amqps=nothing, keepalive=true, heartbeat=true)
verify_spec()
test_types()
@test default_exchange_name("direct") == "amq.direct"
Expand All @@ -24,7 +24,7 @@ function runtests(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAU

# open a connection
@info("opening connection")
connection(;virtualhost=virtualhost, host=host, port=port, amqps=amqps, auth_params=auth_params, send_queue_size=512) do conn
connection(;virtualhost=virtualhost, host=host, port=port, amqps=amqps, auth_params=auth_params, send_queue_size=512, keepalive=keepalive, heartbeat=heartbeat) do conn
@test conn.conn.sendq.sz_max == 512

# open a channel
Expand Down Expand Up @@ -159,6 +159,8 @@ function runtests(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAU
end
@test c.heartbeat_time_server > ts1
@test c.heartbeat_time_client > tc1
elseif conn.conn.heartbeat == 0
@info("heartbeat disabled")
else
@info("not testing heartbeats (wait too long at $(3*conn.conn.heartbeat) secs)")
end
Expand Down

0 comments on commit 4388aec

Please sign in to comment.