Skip to content

Commit

Permalink
feat: JSON-RPC responses async iterable iterator (#1937)
Browse files Browse the repository at this point in the history
* feat: JSON-RPC responses iterable iterator

* chore: update demos to use JSON-RPC responses async iterator

* Update wasm-node/CHANGELOG.md

---------

Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com>
  • Loading branch information
tien and tomaka authored Aug 14, 2024
1 parent 290ac55 commit bba4b37
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 24 deletions.
4 changes: 4 additions & 0 deletions wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

### Added

- Add `jsonRpcResponses` async iterable iterator to `Chain`, as a more convenient alternative to the `nextJsonRpcResponse` function. ([#1937](https://github.com/smol-dot/smoldot/pull/1937))

### Fixed

- Fix potential panic in parachain syncing code. ([#1912](https://github.com/smol-dot/smoldot/pull/1912))
Expand Down
3 changes: 1 addition & 2 deletions wasm-node/javascript/demo/demo-deno.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ while(true) {

(async () => {
try {
while(true) {
const response = await chain.nextJsonRpcResponse();
for await (const response of chain.jsonRpcResponses) {
socket.send(response);
}
} catch(_error) {}
Expand Down
6 changes: 2 additions & 4 deletions wasm-node/javascript/demo/demo.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,7 @@ wsServer.on('connection', function (connection, request) {

(async () => {
try {
while(true) {
const response = await para.nextJsonRpcResponse();
for await (const response of para.jsonRpcResponses) {
connection.send(response);
}
} catch(_error) {}
Expand All @@ -177,8 +176,7 @@ wsServer.on('connection', function (connection, request) {

(async () => {
try {
while(true) {
const response = await relay.nextJsonRpcResponse();
for await (const response of relay.jsonRpcResponses) {
connection.send(response);
}
} catch(_error) {}
Expand Down
50 changes: 32 additions & 18 deletions wasm-node/javascript/src/internals/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -552,27 +552,41 @@ export function start(options: ClientOptions, wasmModule: SmoldotBytecode | Prom
default: throw new Error("Internal error: unknown json_rpc_send error code: " + retVal)
}
},
jsonRpcResponses: {
next: async () => {
while (true) {
if (!state.chains.has(chainId))
return { done: true, value: undefined };
if (options.disableJsonRpc)
throw new JsonRpcDisabledError();
if (state.instance.status === "destroyed")
throw state.instance.error;
if (state.instance.status !== "ready")
throw new Error(); // Internal error. Never supposed to happen.

// Try to pop a message from the queue.
const message = state.instance.instance.peekJsonRpcResponse(chainId);
if (message)
return { done: false, value: message };

// If no message is available, wait for one to be.
await new Promise<void>((resolve) => {
state.chains.get(chainId)!.jsonRpcResponsesPromises.push(resolve)
});
}
},
[Symbol.asyncIterator]() {
return this
}
},
nextJsonRpcResponse: async () => {
while (true) {
if (!state.chains.has(chainId))
throw new AlreadyDestroyedError();
if (options.disableJsonRpc)
return Promise.reject(new JsonRpcDisabledError());
if (state.instance.status === "destroyed")
throw state.instance.error;
if (state.instance.status !== "ready")
throw new Error(); // Internal error. Never supposed to happen.
const result = await newChain.jsonRpcResponses.next();

// Try to pop a message from the queue.
const message = state.instance.instance.peekJsonRpcResponse(chainId);
if (message)
return message;

// If no message is available, wait for one to be.
await new Promise<void>((resolve) => {
state.chains.get(chainId)!.jsonRpcResponsesPromises.push(resolve)
});
if (result.done) {
throw new AlreadyDestroyedError();
}

return result.value;
},
remove: () => {
if (state.instance.status === "destroyed")
Expand Down
14 changes: 14 additions & 0 deletions wasm-node/javascript/src/public-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,20 @@ export interface Chain {
*/
nextJsonRpcResponse(): Promise<string>;

/**
* JSON-RPC responses or notifications async iterable.
*
* Each chain contains a buffer of the responses waiting to be sent out. Iterating over this
* pulls one element from the buffer. If the iteration happen at a slower rate than
* responses are generated, then the buffer will eventually become full, at which point calling
* {@link Chain.sendJsonRpc} will throw an exception. The size of this buffer can be configured
* through {@link AddChainOptions.jsonRpcMaxPendingRequests}.
*
* @throws {@link JsonRpcDisabledError} If the JSON-RPC system was disabled in the options of the chain.
* @throws {@link CrashError} If the background client has crashed.
*/
readonly jsonRpcResponses: AsyncIterableIterator<string>

/**
* Disconnects from the blockchain.
*
Expand Down

0 comments on commit bba4b37

Please sign in to comment.