Skip to content

Commit

Permalink
@web5/agent DWN + Web5 RPC Clients (#433)
Browse files Browse the repository at this point in the history
- organize rpc clients into `agent/prototyping/clients` until they have a permanent home
- copy `JsonRpcSocket` class from `@web5/dwn-server`, using `isomorphic-ws` to allow for isomorphic socket client.
- create `WebSocketDwnRpcClient` and `WebSocketWeb5RpcClient` to support web5 requests over sockets.
  • Loading branch information
LiranCohen authored May 1, 2024
1 parent 393d483 commit ac1e6f1
Show file tree
Hide file tree
Showing 15 changed files with 1,467 additions and 134 deletions.
14 changes: 14 additions & 0 deletions .changeset/proud-bottles-develop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
"@web5/agent": patch
"@web5/identity-agent": patch
"@web5/proxy-agent": patch
"@web5/user-agent": patch
---

Extend and Test RPC DWN/Web5 Clients to support `http` and `ws`
- move `HttpDwnRpcClient` to `/prototyping` folder
- move `JSON RPC` related files to `/prototyping` folder
- create `WebSocketDwnRpcClient` in `/prototyping` folder
- create `WebSocketWeb5RpcClient` wrapper in `rpc-client`
- does not support `sendDidRequest` via sockets

1 change: 1 addition & 0 deletions packages/agent/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
"@web5/dids": "1.0.1",
"abstract-level": "1.0.4",
"ed25519-keygen": "0.4.11",
"isomorphic-ws": "^5.0.0",
"level": "8.0.0",
"ms": "2.1.3",
"readable-web-to-node-stream": "3.0.2",
Expand Down
3 changes: 1 addition & 2 deletions packages/agent/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ export * from './did-api.js';
export * from './dwn-api.js';
export * from './hd-identity-vault.js';
export * from './identity-api.js';
export * from './json-rpc.js';
export * from './local-key-manager.js';
export * from './rpc-client.js';
export * from './store-data.js';
Expand All @@ -22,4 +21,4 @@ export * from './store-key.js';
export * from './sync-api.js';
export * from './sync-engine-level.js';
export * from './test-harness.js';
export * from './utils.js';
export * from './utils.js';
55 changes: 55 additions & 0 deletions packages/agent/src/prototyping/clients/dwn-rpc-types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import type { MessageEvent, RecordsReadReply, UnionMessageReply } from '@tbd54566975/dwn-sdk-js';

export interface SerializableDwnMessage {
toJSON(): string;
}

export type DwnEventSubscriptionHandler = (event: MessageEvent) => void;

/**
* Interface for communicating with {@link https://github.com/TBD54566975/dwn-server | DWN Servers}
* via JSON-RPC, supporting operations like sending DWN requests.
*/
export interface DwnRpc {
/**
* Lists the transport protocols supported by the DWN RPC client, such as HTTP or HTTPS.
* @returns An array of strings representing the supported transport protocols.
*/
get transportProtocols(): string[]

/**
* Sends a request to a DWN Server using the specified DWN RPC request parameters.
*
* @param request - The DWN RPC request containing the URL, target DID, message, and optional data.
* @returns A promise that resolves to the response from the DWN server.
*/
sendDwnRequest(request: DwnRpcRequest): Promise<DwnRpcResponse>
}


/**
* Represents a JSON RPC request to a DWN server, including the URL, target DID, the message to be
* processed, and optional data.
*/
export type DwnRpcRequest = {
/** Optional data to be sent with the request. */
data?: any;

/** The URL of the DWN server to which the request is sent. */
dwnUrl: string;

/** The message to be processed by the DWN server, which can be a serializable DWN message. */
message: SerializableDwnMessage | any;

/** The DID of the target to which the message is addressed. */
targetDid: string;

/** Optional subscription handler for DWN events. */
subscriptionHandler?: DwnEventSubscriptionHandler;
}

/**
* Represents the JSON RPC response from a DWN server to a request, combining the results of various
* DWN operations.
*/
export type DwnRpcResponse = UnionMessageReply & RecordsReadReply;
68 changes: 68 additions & 0 deletions packages/agent/src/prototyping/clients/http-dwn-rpc-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import type { JsonRpcResponse } from './json-rpc.js';
import type { DwnRpc, DwnRpcRequest, DwnRpcResponse } from './dwn-rpc-types.js';

import { createJsonRpcRequest, parseJson } from './json-rpc.js';
import { utils as cryptoUtils } from '@web5/crypto';

/**
* HTTP client that can be used to communicate with Dwn Servers
*/
export class HttpDwnRpcClient implements DwnRpc {
get transportProtocols() { return ['http:', 'https:']; }

async sendDwnRequest(request: DwnRpcRequest): Promise<DwnRpcResponse> {
const requestId = cryptoUtils.randomUuid();
const jsonRpcRequest = createJsonRpcRequest(requestId, 'dwn.processMessage', {
target : request.targetDid,
message : request.message
});

const fetchOpts = {
method : 'POST',
headers : {
'dwn-request': JSON.stringify(jsonRpcRequest)
}
};

if (request.data) {
// @ts-expect-error TODO: REMOVE
fetchOpts.headers['content-type'] = 'application/octet-stream';
// @ts-expect-error TODO: REMOVE
fetchOpts['body'] = request.data;
}

const resp = await fetch(request.dwnUrl, fetchOpts);
let dwnRpcResponse: JsonRpcResponse;

// check to see if response is in header first. if it is, that means the response is a ReadableStream
let dataStream;
const { headers } = resp;
if (headers.has('dwn-response')) {
// @ts-expect-error TODO: REMOVE
const jsonRpcResponse = parseJson(headers.get('dwn-response')) as JsonRpcResponse;

if (jsonRpcResponse == null) {
throw new Error(`failed to parse json rpc response. dwn url: ${request.dwnUrl}`);
}

dataStream = resp.body;
dwnRpcResponse = jsonRpcResponse;
} else {
// TODO: wonder if i need to try/catch this?
const responseBody = await resp.text();
dwnRpcResponse = JSON.parse(responseBody);
}

if (dwnRpcResponse.error) {
const { code, message } = dwnRpcResponse.error;
throw new Error(`(${code}) - ${message}`);
}

const { reply } = dwnRpcResponse.result;
if (dataStream) {
reply['record']['data'] = dataStream;
}

return reply as DwnRpcResponse;
}
}
169 changes: 169 additions & 0 deletions packages/agent/src/prototyping/clients/json-rpc-socket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import { utils as cryptoUtils } from '@web5/crypto';
import IsomorphicWebSocket from 'isomorphic-ws';
import { JsonRpcId, JsonRpcRequest, JsonRpcResponse, createJsonRpcSubscriptionRequest, parseJson } from './json-rpc.js';

// These were arbitrarily chosen, but can be modified via connect options
const CONNECT_TIMEOUT = 3_000;
const RESPONSE_TIMEOUT = 30_000;

export interface JsonRpcSocketOptions {
/** socket connection timeout in milliseconds */
connectTimeout?: number;
/** response timeout for rpc requests in milliseconds */
responseTimeout?: number;
/** optional connection close handler */
onclose?: () => void;
/** optional socket error handler */
onerror?: (error?: any) => void;
}

/**
* JSON RPC Socket Client for WebSocket request/response and long-running subscriptions.
*
* NOTE: This is temporarily copied over from https://github.com/TBD54566975/dwn-server/blob/main/src/json-rpc-socket.ts
* This was done in order to avoid taking a dependency on the `dwn-server`, until a future time when there will be a `clients` package.
*/
export class JsonRpcSocket {
private messageHandlers: Map<JsonRpcId, (event: { data: any }) => void> = new Map();

private constructor(private socket: IsomorphicWebSocket, private responseTimeout: number) {}

static async connect(url: string, options: JsonRpcSocketOptions = {}): Promise<JsonRpcSocket> {
const { connectTimeout = CONNECT_TIMEOUT, responseTimeout = RESPONSE_TIMEOUT, onclose, onerror } = options;

const socket = new IsomorphicWebSocket(url);

if (!onclose) {
socket.onclose = ():void => {
console.info(`JSON RPC Socket close ${url}`);
};
} else {
socket.onclose = onclose;
}

if (!onerror) {
socket.onerror = (error?: any):void => {
console.error(`JSON RPC Socket error ${url}`, error);
};
} else {
socket.onerror = onerror;
}

return new Promise<JsonRpcSocket>((resolve, reject) => {
socket.addEventListener('open', () => {
const jsonRpcSocket = new JsonRpcSocket(socket, responseTimeout);

socket.addEventListener('message', (event: { data: any }) => {
const jsonRpcResponse = parseJson(event.data) as JsonRpcResponse;
const handler = jsonRpcSocket.messageHandlers.get(jsonRpcResponse.id);
if (handler) {
handler(event);
}
});

resolve(jsonRpcSocket);
});

socket.addEventListener('error', (error: any) => {
reject(error);
});

setTimeout(() => reject, connectTimeout);
});
}

close(): void {
this.socket.close();
}

/**
* Sends a JSON-RPC request through the socket and waits for a single response.
*/
async request(request: JsonRpcRequest): Promise<JsonRpcResponse> {
return new Promise((resolve, reject) => {
request.id ??= cryptoUtils.randomUuid();

const handleResponse = (event: { data: any }):void => {
const jsonRpsResponse = parseJson(event.data) as JsonRpcResponse;
if (jsonRpsResponse.id === request.id) {
// if the incoming response id matches the request id, we will remove the listener and resolve the response
this.messageHandlers.delete(request.id);
return resolve(jsonRpsResponse);
}
};

// add the listener to the map of message handlers
this.messageHandlers.set(request.id, handleResponse);
this.send(request);

// reject this promise if we don't receive any response back within the timeout period
setTimeout(() => {
this.messageHandlers.delete(request.id!);
reject(new Error('request timed out'));
}, this.responseTimeout);
});
}

/**
* Sends a JSON-RPC request through the socket and keeps a listener open to read associated responses as they arrive.
* Returns a close method to clean up the listener.
*/
async subscribe(request: JsonRpcRequest, listener: (response: JsonRpcResponse) => void): Promise<{
response: JsonRpcResponse;
close?: () => Promise<void>;
}> {

if (!request.method.startsWith('rpc.subscribe.')) {
throw new Error('subscribe rpc requests must include the `rpc.subscribe` prefix');
}

if (!request.subscription) {
throw new Error('subscribe rpc requests must include subscribe options');
}

const subscriptionId = request.subscription.id;
const socketEventListener = (event: { data: any }):void => {
const jsonRpcResponse = parseJson(event.data.toString()) as JsonRpcResponse;
if (jsonRpcResponse.id === subscriptionId) {
if (jsonRpcResponse.error !== undefined) {
// remove the event listener upon receipt of a JSON RPC Error.
this.messageHandlers.delete(subscriptionId);
this.closeSubscription(subscriptionId);
}
listener(jsonRpcResponse);
}
};

this.messageHandlers.set(subscriptionId, socketEventListener);

const response = await this.request(request);
if (response.error) {
this.messageHandlers.delete(subscriptionId);
return { response };
}

// clean up listener and create a `rpc.subscribe.close` message to use when closing this JSON RPC subscription
const close = async (): Promise<void> => {
this.messageHandlers.delete(subscriptionId);
await this.closeSubscription(subscriptionId);
};

return {
response,
close
};
}

private closeSubscription(id: JsonRpcId): Promise<JsonRpcResponse> {
const requestId = cryptoUtils.randomUuid();
const request = createJsonRpcSubscriptionRequest(requestId, 'close', id, {});
return this.request(request);
}

/**
* Sends a JSON-RPC request through the socket. You must subscribe to a message listener separately to capture the response.
*/
send(request: JsonRpcRequest):void {
this.socket.send(JSON.stringify(request));
}
}
Loading

0 comments on commit ac1e6f1

Please sign in to comment.