Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement isConnected method #224

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions json_rpc/client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ method call*(client: RpcClient, name: string,

await client.call(name, params.paramsTx)

method isConnected*(client: RpcClient): bool {.base, gcsafe.} =
doAssert(false, "`RpcClient.isConnected` not implemented")

method close*(client: RpcClient): Future[void] {.base, gcsafe, async.} =
doAssert(false, "`RpcClient.close` not implemented")

Expand Down
4 changes: 4 additions & 0 deletions json_rpc/clients/httpclient.nim
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,12 @@ proc connect*(client: RpcHttpClient, address: string, port: Port, secure: bool)
else:
raise newException(RpcAddressUnresolvableError, res.error)

method isConnected*(client: RpcHttpClient): bool =
(not client.httpSession.isNil()) and client.httpAddress.isOk()

method close*(client: RpcHttpClient) {.async.} =
if not client.httpSession.isNil:
await client.httpSession.closeWait()
client.httpSession = nil
jangko marked this conversation as resolved.
Show resolved Hide resolved

{.pop.}
10 changes: 9 additions & 1 deletion json_rpc/clients/socketclient.nim
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ method callBatch*(client: RpcSocketClient,
proc processData(client: RpcSocketClient) {.async: (raises: []).} =
while true:
var localException: ref JsonRpcError
var stopping = false
while true:
try:
var value = await client.transport.readLine(defaultMaxRequestLength)
Expand All @@ -99,6 +100,7 @@ proc processData(client: RpcSocketClient) {.async: (raises: []).} =
break
except CancelledError as exc:
localException = newException(JsonRpcError, exc.msg)
stopping = true
await client.transport.closeWait()
break

Expand All @@ -108,6 +110,9 @@ proc processData(client: RpcSocketClient) {.async: (raises: []).} =
if client.batchFut.isNil.not and not client.batchFut.completed():
client.batchFut.fail(localException)

if stopping:
break

# async loop reconnection and waiting
try:
info "Reconnect to server", address=`$`(client.address)
Expand All @@ -130,8 +135,11 @@ proc connect*(client: RpcSocketClient, address: TransportAddress) {.async.} =
client.address = address
client.loop = processData(client)

method isConnected*(client: RpcSocketClient): bool =
not client.transport.isNil()

method close*(client: RpcSocketClient) {.async.} =
await client.loop.cancelAndWait()
if not client.transport.isNil:
client.transport.close()
await client.transport.closeWait()
client.transport = nil
3 changes: 3 additions & 0 deletions json_rpc/clients/websocketclientimpl.nim
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ proc connect*(
client.uri = uri
client.loop = processData(client)

method isConnected*(client: RpcWebSocketClient): bool =
not client.transport.isNil()

method close*(client: RpcWebSocketClient) {.async.} =
await client.loop.cancelAndWait()
if not client.transport.isNil:
Expand Down
2 changes: 1 addition & 1 deletion tests/testhttp.nim
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ proc continuousTest(address: string): Future[int] {.async.} =
var r = await client.call("myProc", %[%"abc", %[1, 2, 3, i]])
if r.string == "\"Hello abc data: [1, 2, 3, " & $i & "]\"":
result += 1
await client.close()
await client.close()

proc invalidTest(address: string): Future[bool] {.async.} =
var client = newRpcHttpClient()
Expand Down
2 changes: 1 addition & 1 deletion tests/testhttps.nim
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ proc continuousTest(address: string): Future[int] {.async.} =
var r = await client.call("myProc", %[%"abc", %[1, 2, 3, i]])
if r.string == "\"Hello abc data: [1, 2, 3, " & $i & "]\"":
result += 1
await client.close()
await client.close()

proc invalidTest(address: string): Future[bool] {.async.} =
var client = newRpcHttpClient(secure=true)
Expand Down
56 changes: 54 additions & 2 deletions tests/testserverclient.nim
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,51 @@ suite "Socket Server/Client RPC":
except CatchableError as e:
check e.msg == """{"code":-32001,"message":"Unknown payload"}"""

test "Client close and isConnected":
check client.isConnected() == true
# Is socket server close broken?
waitFor client.close()
check client.isConnected() == false

srv.stop()
waitFor srv.closeWait()

suite "HTTP Server/Client RPC":
var srv = newRpcHttpServer([initTAddress("127.0.0.1", Port(0))])
var client = newRpcHttpClient()

echo "address: ", $srv.localAddress()
srv.setupServer()
srv.start()
waitFor client.connect("http://" & $(srv.localAddress()[0]))

test "Successful RPC call":
let r = waitFor client.call("myProc", %[%"abc", %[1, 2, 3, 4]])
check r.string == "\"Hello abc data: [1, 2, 3, 4]\""

test "Missing params":
expect(CatchableError):
discard waitFor client.call("myProc", %[%"abc"])

test "Error RPC call":
expect(CatchableError): # The error type wont be translated
discard waitFor client.call("myError", %[%"abc", %[1, 2, 3, 4]])

test "Invalid request exception":
try:
discard waitFor client.call("invalidRequest", %[])
check false
except CatchableError as e:
check e.msg == """{"code":-32001,"message":"Unknown payload"}"""

test "Client close and isConnected":
check client.isConnected() == true
waitFor client.close()
check client.isConnected() == false

waitFor srv.stop()
waitFor srv.closeWait()

suite "Websocket Server/Client RPC":
var srv = newRpcWebSocketServer("127.0.0.1", Port(0))
var client = newRpcWebSocketClient()
Expand Down Expand Up @@ -79,6 +121,11 @@ suite "Websocket Server/Client RPC":
except CatchableError as e:
check e.msg == """{"code":-32001,"message":"Unknown payload"}"""

test "Client close and isConnected":
check client.isConnected() == true
waitFor client.close()
check client.isConnected() == false

srv.stop()
waitFor srv.closeWait()

Expand Down Expand Up @@ -111,16 +158,21 @@ suite "Websocket Server/Client RPC with Compression":
except CatchableError as e:
check e.msg == """{"code":-32001,"message":"Unknown payload"}"""

test "Client close and isConnected":
check client.isConnected() == true
waitFor client.close()
check client.isConnected() == false

srv.stop()
waitFor srv.closeWait()

suite "Custom processClient":
test "Should be able to use custom processClient":
var wasCalled: bool = false

proc processClientHook(server: StreamServer, transport: StreamTransport) {.async: (raises: []), gcsafe.} =
wasCalled = true

var srv = newRpcSocketServer(processClientHook)
srv.addStreamServer("localhost", Port(8888))
var client = newRpcSocketClient()
Expand Down