-
Notifications
You must be signed in to change notification settings - Fork 28
/
socketclient.nim
137 lines (113 loc) · 4.22 KB
/
socketclient.nim
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# json-rpc
# Copyright (c) 2019-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import
std/tables,
chronicles,
results,
chronos,
../client,
../errors,
../private/jrpc_sys
export client, errors
logScope:
topics = "JSONRPC-SOCKET-CLIENT"
type
RpcSocketClient* = ref object of RpcClient
transport*: StreamTransport
address*: TransportAddress
loop*: Future[void]
const defaultMaxRequestLength* = 1024 * 128
{.push gcsafe, raises: [].}
proc new*(T: type RpcSocketClient): T =
T()
proc newRpcSocketClient*: RpcSocketClient =
## Creates a new client instance.
RpcSocketClient.new()
method call*(client: RpcSocketClient, name: string,
params: RequestParamsTx): Future[JsonString] {.async, gcsafe.} =
## Remotely calls the specified RPC method.
let id = client.getNextId()
var jsonBytes = requestTxEncode(name, params, id) & "\r\n"
if client.transport.isNil:
raise newException(JsonRpcError,
"Transport is not initialised (missing a call to connect?)")
# completed by processMessage.
var newFut = newFuture[JsonString]()
# add to awaiting responses
client.awaiting[id] = newFut
let res = await client.transport.write(jsonBytes)
# TODO: Add actions when not full packet was send, e.g. disconnect peer.
doAssert(res == jsonBytes.len)
return await newFut
method callBatch*(client: RpcSocketClient,
calls: RequestBatchTx): Future[ResponseBatchRx]
{.gcsafe, async.} =
if client.transport.isNil:
raise newException(JsonRpcError,
"Transport is not initialised (missing a call to connect?)")
if client.batchFut.isNil or client.batchFut.finished():
client.batchFut = newFuture[ResponseBatchRx]()
let
jsonBytes = requestBatchEncode(calls) & "\r\n"
res = await client.transport.write(jsonBytes)
# TODO: Add actions when not full packet was send, e.g. disconnect peer.
doAssert(res == jsonBytes.len)
return await client.batchFut
proc processData(client: RpcSocketClient) {.async: (raises: []).} =
while true:
var localException: ref JsonRpcError
while true:
try:
var value = await client.transport.readLine(defaultMaxRequestLength)
if value == "":
# transmission ends
await client.transport.closeWait()
break
let res = client.processMessage(value)
if res.isErr:
error "Error when processing RPC message", msg=res.error
localException = newException(JsonRpcError, res.error)
break
except TransportError as exc:
localException = newException(JsonRpcError, exc.msg)
await client.transport.closeWait()
break
except CancelledError as exc:
localException = newException(JsonRpcError, exc.msg)
await client.transport.closeWait()
break
if localException.isNil.not:
for _,fut in client.awaiting:
fut.fail(localException)
if client.batchFut.isNil.not and not client.batchFut.completed():
client.batchFut.fail(localException)
# async loop reconnection and waiting
try:
info "Reconnect to server", address=`$`(client.address)
client.transport = await connect(client.address)
except TransportError as exc:
error "Error when reconnecting to server", msg=exc.msg
break
except CancelledError as exc:
error "Error when reconnecting to server", msg=exc.msg
break
proc connect*(client: RpcSocketClient, address: string, port: Port) {.async.} =
let addresses = resolveTAddress(address, port)
client.transport = await connect(addresses[0])
client.address = addresses[0]
client.loop = processData(client)
proc connect*(client: RpcSocketClient, address: TransportAddress) {.async.} =
client.transport = await connect(address)
client.address = address
client.loop = processData(client)
method close*(client: RpcSocketClient) {.async.} =
await client.loop.cancelAndWait()
if not client.transport.isNil:
client.transport.close()
client.transport = nil