Skip to content

Commit 288ee51

Browse files
authored
clients: reduce mem usage, refactor protocol handling (#247)
JSON-RPC is modelled as request-response where each response corresponds to one request, sent in the same order. In addition, batching is possible where several requests can be sent as one batch, receiving one batch of responses in return, but still maintaining the general 1:1 messaging structure. This PR exploits this 1:1 relationship between request and response to simplify the client implementation and improve efficieny (reduces mem usage from 1.2gb to 800mb to send a 128mb message - still a lot, but quite a bit better) while at the same time cleaning up error handling and moving the JSON-RPC specifics out of the transports so that they're equal and shared between each transport. The transports themselves now just provide the simple ability to transfer request-response pairs. In doing so, protocol adherence in edge cases is increased to more closely follow the semantics suggested in the spec. * match messages by order, getting rid of unnecessary matching table * move all JSON-RPC protocol implementation details such as encoding/decoding etc to `clients`, avoiding it being spread out and repeated in each transport - this also opens the door to using a different encoding than JSON in the future (ie CBOR) * stream-encode requests and use `seq[byte]` throughout to avoid costly `string` conversions / copies * clean up error raising - in particular, differentiate between transport and protocol errors more clearly and strive to raise similar exceptions in similar situations for each transport * add `maxMessageSize` parameter to each transport, make it work the same * remove socket client reconnect loop to match websockets - possibly it should be re-added to both instead in a future PR * add `raises` to `async`, where relevant * order request/response json fields the way they're ordered in the spec * this makes it more efficient to parse messages following the same order * make parser more spec-compliant, moving some of the validation to an earlier stage in the parsing pipeline * limit the length of string-based `id` fields to avoid having the log spammed * stream-write requests, avoiding an extra copy of the parameters * use raw async procs where applicable to avoid copies and async overhead
1 parent b6e40a7 commit 288ee51

17 files changed

+586
-669
lines changed

json_rpc/client.nim

Lines changed: 214 additions & 163 deletions
Large diffs are not rendered by default.

json_rpc/clients/httpclient.nim

Lines changed: 66 additions & 165 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# json-rpc
2-
# Copyright (c) 2019-2024 Status Research & Development GmbH
2+
# Copyright (c) 2019-2025 Status Research & Development GmbH
33
# Licensed under either of
44
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
55
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@@ -10,21 +10,12 @@
1010
{.push raises: [], gcsafe.}
1111

1212
import
13-
std/[tables, uri],
14-
stew/byteutils,
15-
results,
13+
std/uri,
1614
chronos/apps/http/httpclient,
17-
chronicles, httputils,
18-
json_serialization/std/net as jsnet,
19-
../client,
20-
../errors,
21-
../private/jrpc_sys
15+
httputils,
16+
../[client, errors]
2217

23-
export
24-
client, errors, jsnet, HttpClientFlag, HttpClientFlags
25-
26-
logScope:
27-
topics = "JSONRPC-HTTP-CLIENT"
18+
export client, errors, HttpClientFlag, HttpClientFlags
2819

2920
type
3021
HttpClientOptions* = object
@@ -33,53 +24,32 @@ type
3324
RpcHttpClient* = ref object of RpcClient
3425
httpSession: HttpSessionRef
3526
httpAddress: HttpAddress
36-
maxBodySize: int
3727
getHeaders: GetJsonRpcRequestHeaders
3828

39-
# ------------------------------------------------------------------------------
40-
# Private helpers
41-
# ------------------------------------------------------------------------------
42-
43-
proc `$`(v: HttpAddress): string =
44-
v.id
45-
46-
proc new(
47-
T: type RpcHttpClient, maxBodySize = MaxMessageBodyBytes, secure = false,
48-
getHeaders: GetJsonRpcRequestHeaders = nil, flags: HttpClientFlags = {}): T =
49-
29+
proc new*(
30+
T: type RpcHttpClient,
31+
secure = false,
32+
getHeaders: GetJsonRpcRequestHeaders = nil,
33+
flags: HttpClientFlags = {},
34+
maxMessageSize = defaultMaxMessageSize,
35+
): T =
5036
var moreFlags: HttpClientFlags
5137
if secure:
5238
moreFlags.incl HttpClientFlag.NoVerifyHost
5339
moreFlags.incl HttpClientFlag.NoVerifyServerName
5440

5541
T(
56-
maxBodySize: maxBodySize,
42+
maxMessageSize: maxMessageSize,
5743
httpSession: HttpSessionRef.new(flags = flags + moreFlags),
58-
getHeaders: getHeaders
44+
getHeaders: getHeaders,
5945
)
6046

61-
template closeRefs(req, res: untyped) =
62-
# We can't trust try/finally in async/await in all nim versions, so we
63-
# do it manually instead
64-
if req != nil:
65-
try:
66-
await req.closeWait()
67-
except CatchableError as exc: # shouldn't happen
68-
debug "Error closing JSON-RPC HTTP resuest/response", err = exc.msg
69-
discard exc
70-
71-
if res != nil:
72-
try:
73-
await res.closeWait()
74-
except CatchableError as exc: # shouldn't happen
75-
debug "Error closing JSON-RPC HTTP resuest/response", err = exc.msg
76-
discard exc
77-
78-
proc callImpl(client: RpcHttpClient, reqBody: string): Future[string] {.async.} =
47+
method request(
48+
client: RpcHttpClient, reqData: seq[byte]
49+
): Future[seq[byte]] {.async: (raises: [CancelledError, JsonRpcError]).} =
7950
doAssert client.httpSession != nil
8051
if client.httpAddress.addresses.len == 0:
81-
raise newException(RpcPostError, "Not connected")
82-
52+
raise newException(RpcTransportError, "No remote addresses to connect to")
8353

8454
var headers =
8555
if not isNil(client.getHeaders):
@@ -88,137 +58,68 @@ proc callImpl(client: RpcHttpClient, reqBody: string): Future[string] {.async.}
8858
@[]
8959
headers.add(("Content-Type", "application/json"))
9060

91-
var req: HttpClientRequestRef
92-
var res: HttpClientResponseRef
93-
94-
req = HttpClientRequestRef.post(client.httpSession,
95-
client.httpAddress,
96-
body = reqBody.toOpenArrayByte(0, reqBody.len - 1),
97-
headers = headers)
98-
res =
99-
try:
100-
await req.send()
101-
except CancelledError as e:
102-
debug "Cancelled POST Request with JSON-RPC", e = e.msg
103-
closeRefs(req, res)
104-
raise e
105-
except CatchableError as e:
106-
debug "Failed to send POST Request with JSON-RPC", e = e.msg
107-
closeRefs(req, res)
108-
raise (ref RpcPostError)(msg: "Failed to send POST Request with JSON-RPC: " & e.msg, parent: e)
109-
110-
if res.status < 200 or res.status >= 300: # res.status is not 2xx (success)
111-
debug "Unsuccessful POST Request with JSON-RPC",
112-
status = res.status, reason = res.reason
113-
closeRefs(req, res)
114-
raise (ref ErrorResponse)(status: res.status, msg: res.reason)
115-
116-
let resBytes =
117-
try:
118-
await res.getBodyBytes(client.maxBodySize)
119-
except CancelledError as e:
120-
debug "Cancelled POST Response for JSON-RPC", e = e.msg
121-
closeRefs(req, res)
122-
raise e
123-
except CatchableError as e:
124-
debug "Failed to read POST Response for JSON-RPC", e = e.msg
125-
closeRefs(req, res)
126-
raise (ref FailedHttpResponse)(msg: "Failed to read POST Response for JSON-RPC: " & e.msg, parent: e)
127-
128-
result = string.fromBytes(resBytes)
129-
trace "Response", text = result
130-
closeRefs(req, res)
131-
132-
# ------------------------------------------------------------------------------
133-
# Public functions
134-
# ------------------------------------------------------------------------------
61+
let
62+
req = HttpClientRequestRef.post(
63+
client.httpSession, client.httpAddress, body = reqData, headers = headers
64+
)
65+
66+
res =
67+
try:
68+
await req.send()
69+
except HttpError as exc:
70+
raise (ref RpcPostError)(msg: exc.msg, parent: exc)
71+
finally:
72+
await req.closeWait()
73+
74+
try:
75+
if res.status < 200 or res.status >= 300: # res.status is not 2xx (success)
76+
raise (ref ErrorResponse)(status: res.status, msg: res.reason)
77+
78+
let
79+
resData = await res.getBodyBytes(client.maxMessageSize)
80+
# TODO remove this processMessage hook when subscriptions / pubsub is
81+
# properly supported
82+
fallback = client.callOnProcessMessage(resData).valueOr:
83+
raise (ref RequestDecodeError)(msg: error, payload: resData)
84+
85+
if not fallback:
86+
# TODO http channels are unidirectional, so it doesn't really make sense
87+
# to call onProcessMessage from http - this should be deprecated
88+
# as soon as bidirectionality is supported
89+
raise (ref InvalidResponse)(msg: "onProcessMessage handled response")
90+
91+
resData
92+
except HttpError as exc:
93+
raise (ref RpcTransportError)(msg: exc.msg, parent: exc)
94+
finally:
95+
await req.closeWait()
13596

13697
proc newRpcHttpClient*(
137-
maxBodySize = MaxMessageBodyBytes, secure = false,
98+
maxBodySize = defaultMaxMessageSize,
99+
secure = false,
138100
getHeaders: GetJsonRpcRequestHeaders = nil,
139-
flags: HttpClientFlags = {}): RpcHttpClient =
140-
RpcHttpClient.new(maxBodySize, secure, getHeaders, flags)
101+
flags: HttpClientFlags = {},
102+
): RpcHttpClient =
103+
RpcHttpClient.new(secure, getHeaders, flags, maxBodySize)
141104

142-
method call*(client: RpcHttpClient, name: string,
143-
params: RequestParamsTx): Future[JsonString]
144-
{.async.} =
145-
let
146-
id = client.getNextId()
147-
reqBody = requestTxEncode(name, params, id)
148-
149-
debug "Sending JSON-RPC request",
150-
address = $client.httpAddress, len = len(reqBody), name, id
151-
trace "Message", msg = reqBody
152-
153-
let resText = await client.callImpl(reqBody)
154-
155-
# completed by processMessage - the flow is quite weird here to accomodate
156-
# socket and ws clients, but could use a more thorough refactoring
157-
var newFut = newFuture[JsonString]()
158-
# add to awaiting responses
159-
client.awaiting[id] = newFut
160-
161-
# Might error for all kinds of reasons
162-
let msgRes = client.processMessage(resText)
163-
if msgRes.isErr:
164-
# Need to clean up in case the answer was invalid
165-
let exc = newException(JsonRpcError, msgRes.error)
166-
newFut.fail(exc)
167-
client.awaiting.del(id)
168-
raise exc
169-
170-
client.awaiting.del(id)
171-
172-
# processMessage should have completed this future - if it didn't, `read` will
173-
# raise, which is reasonable
174-
if newFut.finished:
175-
return newFut.read()
176-
else:
177-
# TODO: Provide more clarity regarding the failure here
178-
debug "Invalid POST Response for JSON-RPC"
179-
raise newException(InvalidResponse, "Invalid response")
180-
181-
method callBatch*(client: RpcHttpClient,
182-
calls: RequestBatchTx): Future[ResponseBatchRx]
183-
{.async.} =
184-
let reqBody = requestBatchEncode(calls)
185-
debug "Sending JSON-RPC batch",
186-
address = $client.httpAddress, len = len(reqBody)
187-
let resText = await client.callImpl(reqBody)
188-
189-
if client.batchFut.isNil or client.batchFut.finished():
190-
client.batchFut = newFuture[ResponseBatchRx]()
191-
192-
# Might error for all kinds of reasons
193-
let msgRes = client.processMessage(resText)
194-
if msgRes.isErr:
195-
# Need to clean up in case the answer was invalid
196-
debug "Failed to process POST Response for JSON-RPC", msg = msgRes.error
197-
let exc = newException(JsonRpcError, msgRes.error)
198-
client.batchFut.fail(exc)
199-
raise exc
200-
201-
# processMessage should have completed this future - if it didn't, `read` will
202-
# raise, which is reasonable
203-
if client.batchFut.finished:
204-
return client.batchFut.read()
205-
else:
206-
# TODO: Provide more clarity regarding the failure here
207-
debug "Invalid POST Response for JSON-RPC"
208-
raise newException(InvalidResponse, "Invalid response")
209-
210-
proc connect*(client: RpcHttpClient, url: string) {.async.} =
105+
proc connect*(
106+
client: RpcHttpClient, url: string
107+
) {.async: (raises: [CancelledError, JsonRpcError]).} =
211108
client.httpAddress = client.httpSession.getAddress(url).valueOr:
212109
raise newException(RpcAddressUnresolvableError, error)
110+
client.remote = client.httpAddress.id
213111

214-
proc connect*(client: RpcHttpClient, address: string, port: Port, secure: bool) {.async.} =
112+
proc connect*(
113+
client: RpcHttpClient, address: string, port: Port, secure: bool
114+
) {.async: (raises: [CancelledError, JsonRpcError]).} =
215115
let uri = Uri(
216116
scheme: if secure: "https" else: "http",
217117
hostname: address,
218118
port: $port)
219119

220120
client.httpAddress = getAddress(client.httpSession, uri).valueOr:
221121
raise newException(RpcAddressUnresolvableError, error)
122+
client.remote = client.httpAddress.id
222123

223124
method close*(client: RpcHttpClient) {.async: (raises: []).} =
224125
if not client.httpSession.isNil:

0 commit comments

Comments
 (0)