Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update RPC transports to use new types #3148

Merged
merged 1 commit into from
Sep 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/stupid-tomatoes-reflect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@solana/rpc-transport-http': patch
'@solana/rpc-spec': patch
'@solana/rpc-api': patch
'@solana/rpc': patch
---

Make `RpcTransport` return new `RpcReponse` type instead of parsed JSON data
11 changes: 9 additions & 2 deletions packages/rpc-api/src/__tests__/get-health-test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import { SOLANA_ERROR__JSON_RPC__SERVER_ERROR_NODE_UNHEALTHY, SolanaError } from '@solana/errors';
import { createRpc, type Rpc } from '@solana/rpc-spec';
import { createRpc, type Rpc, type RpcResponse } from '@solana/rpc-spec';

import { createSolanaRpcApi, GetHealthApi } from '../index';
import { createLocalhostSolanaRpc } from './__setup__';

function createMockResponse<T>(jsonResponse: T): RpcResponse<T> {
return {
json: () => Promise.resolve(jsonResponse),
text: () => Promise.resolve(JSON.stringify(jsonResponse)),
};
}

describe('getHealth', () => {
describe('when the node is healthy', () => {
let rpc: Rpc<GetHealthApi>;
Expand All @@ -29,7 +36,7 @@ describe('getHealth', () => {
beforeEach(() => {
rpc = createRpc({
api: createSolanaRpcApi(),
transport: jest.fn().mockResolvedValue({ error: errorObject }),
transport: jest.fn().mockResolvedValue(createMockResponse({ error: errorObject })),
});
});
it('returns an error message', async () => {
Expand Down
14 changes: 11 additions & 3 deletions packages/rpc-spec/src/__tests__/rpc-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,20 @@ import { createRpcMessage } from '@solana/rpc-spec-types';
import { createRpc, Rpc } from '../rpc';
import { RpcApi } from '../rpc-api';
import { RpcApiRequestPlan } from '../rpc-request';
import { RpcResponse } from '../rpc-shared';
import { RpcTransport } from '../rpc-transport';

interface TestRpcMethods {
someMethod(...args: unknown[]): unknown;
}

function createMockResponse<T>(jsonResponse: T): RpcResponse<T> {
return {
json: () => Promise.resolve(jsonResponse),
text: () => Promise.resolve(JSON.stringify(jsonResponse)),
};
}

describe('JSON-RPC 2.0', () => {
let makeHttpRequest: RpcTransport;
let rpc: Rpc<TestRpcMethods>;
Expand All @@ -34,7 +42,7 @@ describe('JSON-RPC 2.0', () => {
});
it('returns results from the transport', async () => {
expect.assertions(1);
(makeHttpRequest as jest.Mock).mockResolvedValueOnce(123);
(makeHttpRequest as jest.Mock).mockResolvedValueOnce(createMockResponse(123));
const result = await rpc.someMethod().send();
expect(result).toBe(123);
});
Expand Down Expand Up @@ -90,13 +98,13 @@ describe('JSON-RPC 2.0', () => {
});
it('calls the response transformer with the response from the JSON-RPC 2.0 endpoint', async () => {
expect.assertions(1);
(makeHttpRequest as jest.Mock).mockResolvedValueOnce(123);
(makeHttpRequest as jest.Mock).mockResolvedValueOnce(createMockResponse(123));
await rpc.someMethod().send();
expect(responseTransformer).toHaveBeenCalledWith(123, 'someMethod');
});
it('returns the processed response', async () => {
expect.assertions(1);
(makeHttpRequest as jest.Mock).mockResolvedValueOnce(123);
(makeHttpRequest as jest.Mock).mockResolvedValueOnce(createMockResponse(123));
const result = await rpc.someMethod().send();
expect(result).toBe('123 processed response');
});
Expand Down
10 changes: 6 additions & 4 deletions packages/rpc-spec/src/rpc-transport.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
type RpcTransportConfig = Readonly<{
import { RpcResponse } from './rpc-shared';

type RpcTransportRequest = Readonly<{
payload: unknown;
signal?: AbortSignal;
}>;

export interface RpcTransport {
<TResponse>(config: RpcTransportConfig): Promise<TResponse>;
}
export type RpcTransport = {
<TResponse>(request: RpcTransportRequest): Promise<RpcResponse<TResponse>>;
};
9 changes: 4 additions & 5 deletions packages/rpc-spec/src/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import {
createRpcMessage,
Flatten,
OverloadImplementations,
RpcResponseData,
UnionToIntersection,
} from '@solana/rpc-spec-types';

Expand Down Expand Up @@ -68,12 +67,12 @@ function createPendingRpcRequest<TRpcMethods, TRpcTransport extends RpcTransport
return {
async send(options?: RpcSendOptions): Promise<TResponse> {
const { methodName, params, responseTransformer } = pendingRequest;
const payload = createRpcMessage(methodName, params);
const response = await rpcConfig.transport<RpcResponseData<unknown>>({
payload,
const response = await rpcConfig.transport<TResponse>({
payload: createRpcMessage(methodName, params),
signal: options?.abortSignal,
});
return (responseTransformer ? responseTransformer(response, methodName) : response) as TResponse;
const responseData = await response.json();
return responseTransformer ? responseTransformer(responseData, methodName) : responseData;
lorisleiva marked this conversation as resolved.
Show resolved Hide resolved
},
};
}
26 changes: 14 additions & 12 deletions packages/rpc-transport-http/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const transport = createHttpTransport({ url: 'https://api.mainnet-beta.solana.co
const response = await transport({
payload: { id: 1, jsonrpc: '2.0', method: 'getSlot' },
});
const data = await response.json();
```

#### Config
Expand Down Expand Up @@ -67,16 +68,17 @@ const transport = createHttpTransport({
});
let id = 0;
const balances = await Promise.allSettled(
accounts.map(account =>
transport({
accounts.map(async account => {
const response = await transport({
payload: {
id: ++id,
jsonrpc: '2.0',
method: 'getBalance',
params: [account],
},
}),
),
});
return await response.json();
}),
);
```

Expand Down Expand Up @@ -109,7 +111,7 @@ Using this core transport, you can implement specialized functionality for lever
Here’s an example of how someone might implement a “round robin” approach to distribute requests to multiple transports:

```ts
import { RpcTransport } from '@solana/rpc-spec';
import { RpcResponse, RpcTransport } from '@solana/rpc-spec';
import { createHttpTransport } from '@solana/rpc-transport-http';

// Create a transport for each RPC server
Expand All @@ -121,7 +123,7 @@ const transports = [

// Create a wrapper transport that distributes requests to them
let nextTransport = 0;
async function roundRobinTransport<TResponse>(...args: Parameters<RpcTransport>): Promise<TResponse> {
async function roundRobinTransport<TResponse>(...args: Parameters<RpcTransport>): Promise<RpcResponse<TResponse>> {
const transport = transports[nextTransport];
nextTransport = (nextTransport + 1) % transports.length;
return await transport(...args);
Expand All @@ -135,7 +137,7 @@ Another example of a possible customization for a transport is to shard requests
Perhaps your application needs to make a large number of requests, or needs to fan request for different methods out to different servers. Here’s an example of an implementation that does the latter:

```ts
import { RpcTransport } from '@solana/rpc-spec';
import { RpcResponse, RpcTransport } from '@solana/rpc-spec';
import { createHttpTransport } from '@solana/rpc-transport-http';

// Create multiple transports
Expand All @@ -160,7 +162,7 @@ function selectShard(method: string): RpcTransport {
}
}

async function shardingTransport<TResponse>(...args: Parameters<RpcTransport>): Promise<TResponse> {
async function shardingTransport<TResponse>(...args: Parameters<RpcTransport>): Promise<RpcResponse<TResponse>> {
const payload = args[0].payload as { method: string };
const selectedTransport = selectShard(payload.method);
return await selectedTransport(...args);
Expand All @@ -172,7 +174,7 @@ async function shardingTransport<TResponse>(...args: Parameters<RpcTransport>):
The transport library can also be used to implement custom retry logic on any request:

```ts
import { RpcTransport } from '@solana/rpc-spec';
import { RpcResponse, RpcTransport } from '@solana/rpc-spec';
import { createHttpTransport } from '@solana/rpc-transport-http';

// Set the maximum number of attempts to retry a request
Expand All @@ -193,7 +195,7 @@ function calculateRetryDelay(attempt: number): number {
}

// A retrying transport that will retry up to `MAX_ATTEMPTS` times before failing
async function retryingTransport<TResponse>(...args: Parameters<RpcTransport>): Promise<TResponse> {
async function retryingTransport<TResponse>(...args: Parameters<RpcTransport>): Promise<RpcResponse<TResponse>> {
let requestError;
for (let attempts = 0; attempts < MAX_ATTEMPTS; attempts++) {
try {
Expand All @@ -216,7 +218,7 @@ async function retryingTransport<TResponse>(...args: Parameters<RpcTransport>):
Here’s an example of some failover logic integrated into a transport:

```ts
import { RpcTransport } from '@solana/rpc-spec';
import { RpcResponse, RpcTransport } from '@solana/rpc-spec';
import { createHttpTransport } from '@solana/rpc-transport-http';

// Create a transport for each RPC server
Expand All @@ -227,7 +229,7 @@ const transports = [
];

// A failover transport that will try each transport in order until one succeeds before failing
async function failoverTransport<TResponse>(...args: Parameters<RpcTransport>): Promise<TResponse> {
async function failoverTransport<TResponse>(...args: Parameters<RpcTransport>): Promise<RpcResponse<TResponse>> {
let requestError;

for (const transport of transports) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,13 @@ describe('createHttpTransport and `AbortSignal`', () => {
it('resolves with the response', async () => {
expect.assertions(1);
jest.mocked(fetchSpy).mockResolvedValueOnce({
json: () => ({ ok: true }),
json: () => Promise.resolve({ ok: true }),
ok: true,
} as unknown as Response);
const sendPromise = makeHttpRequest({ payload: 123, signal: abortSignal });
abortController.abort('I got bored waiting');
await expect(sendPromise).resolves.toMatchObject({
const response = await sendPromise;
await expect(response.json()).resolves.toMatchObject({
ok: true,
});
});
Expand Down
9 changes: 6 additions & 3 deletions packages/rpc-transport-http/src/http-transport.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { SOLANA_ERROR__RPC__TRANSPORT_HTTP_ERROR, SolanaError } from '@solana/errors';
import { RpcTransport } from '@solana/rpc-spec';
import { RpcResponse, RpcTransport } from '@solana/rpc-spec';
import type Dispatcher from 'undici-types/dispatcher';

import {
Expand Down Expand Up @@ -44,7 +44,7 @@ export function createHttpTransport(config: Config): RpcTransport {
return async function makeHttpRequest<TResponse>({
payload,
signal,
}: Parameters<RpcTransport>[0]): Promise<TResponse> {
}: Parameters<RpcTransport>[0]): Promise<RpcResponse<TResponse>> {
const body = JSON.stringify(payload);
const requestInfo = {
...dispatcherConfig,
Expand All @@ -66,6 +66,9 @@ export function createHttpTransport(config: Config): RpcTransport {
statusCode: response.status,
});
}
return (await response.json()) as TResponse;
return Object.freeze({
json: () => response.json(),
text: () => response.text(),
});
};
}
23 changes: 15 additions & 8 deletions packages/rpc/src/__tests__/rpc-request-coalescer-test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import type { RpcTransport } from '@solana/rpc-spec';
import type { RpcResponse, RpcTransport } from '@solana/rpc-spec';

import { getRpcTransportWithRequestCoalescing } from '../rpc-request-coalescer';

function createMockResponse<T>(jsonResponse: T): RpcResponse<T> {
return {
json: () => Promise.resolve(jsonResponse),
text: () => Promise.resolve(JSON.stringify(jsonResponse)),
};
}

describe('RPC request coalescer', () => {
let coalescedTransport: RpcTransport;
let hashFn: jest.MockedFunction<() => string | undefined>;
Expand Down Expand Up @@ -30,7 +37,7 @@ describe('RPC request coalescer', () => {
});
it('multiple requests in the same tick receive the same response', async () => {
expect.assertions(2);
const mockResponse = { response: 'ok' };
const mockResponse = createMockResponse({ response: 'ok' });
mockTransport.mockResolvedValueOnce(mockResponse);
const responsePromiseA = coalescedTransport({ payload: null });
const responsePromiseB = coalescedTransport({ payload: null });
Expand All @@ -41,8 +48,8 @@ describe('RPC request coalescer', () => {
});
it('multiple requests in different ticks receive different responses', async () => {
expect.assertions(2);
const mockResponseA = { response: 'okA' };
const mockResponseB = { response: 'okB' };
const mockResponseA = createMockResponse({ response: 'okA' });
const mockResponseB = createMockResponse({ response: 'okB' });
mockTransport.mockResolvedValueOnce(mockResponseA);
mockTransport.mockResolvedValueOnce(mockResponseB);
const responsePromiseA = coalescedTransport({ payload: null });
Expand Down Expand Up @@ -100,7 +107,7 @@ describe('RPC request coalescer', () => {
let abortControllerB: AbortController;
let responsePromiseA: ReturnType<typeof mockTransport>;
let responsePromiseB: ReturnType<typeof mockTransport>;
let transportResponsePromise: (value: unknown) => void;
let transportResponsePromise: (value: RpcResponse<unknown>) => void;
beforeEach(() => {
abortControllerA = new AbortController();
abortControllerB = new AbortController();
Expand Down Expand Up @@ -157,7 +164,7 @@ describe('RPC request coalescer', () => {
it('delivers responses to all but the aborted requests', async () => {
expect.assertions(2);
abortControllerA.abort('o no A');
const mockResponse = { response: 'ok' };
const mockResponse = createMockResponse({ response: 'ok' });
transportResponsePromise(mockResponse);
await Promise.all([
expect(responsePromiseA).rejects.toBe('o no A'),
Expand Down Expand Up @@ -192,8 +199,8 @@ describe('RPC request coalescer', () => {
});
it('multiple requests in the same tick receive different responses', async () => {
expect.assertions(2);
const mockResponseA = { response: 'okA' };
const mockResponseB = { response: 'okB' };
const mockResponseA = createMockResponse({ response: 'okA' });
const mockResponseB = createMockResponse({ response: 'okB' });
mockTransport.mockResolvedValueOnce(mockResponseA);
mockTransport.mockResolvedValueOnce(mockResponseB);
const responsePromiseA = coalescedTransport({ payload: null });
Expand Down
20 changes: 11 additions & 9 deletions packages/rpc/src/rpc-request-coalescer.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import type { RpcTransport } from '@solana/rpc-spec';
import type { RpcResponse, RpcTransport } from '@solana/rpc-spec';

type CoalescedRequest = {
readonly abortController: AbortController;
numConsumers: number;
readonly responsePromise: Promise<unknown>;
readonly responsePromise: Promise<RpcResponse | undefined>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be | undefined?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's already the current type definition of the coalescer. It just didn't show before because unknown | undefined is equivalent to unknown.

};

type GetDeduplicationKeyFn = (payload: unknown) => string | undefined;
Expand All @@ -30,11 +30,13 @@ export function getRpcTransportWithRequestCoalescing<TTransport extends RpcTrans
getDeduplicationKey: GetDeduplicationKeyFn,
): TTransport {
let coalescedRequestsByDeduplicationKey: Record<string, CoalescedRequest> | undefined;
return async function makeCoalescedHttpRequest<TResponse>(config: Parameters<RpcTransport>[0]): Promise<TResponse> {
const { payload, signal } = config;
return async function makeCoalescedHttpRequest<TResponse>(
request: Parameters<RpcTransport>[0],
): Promise<RpcResponse<TResponse>> {
const { payload, signal } = request;
const deduplicationKey = getDeduplicationKey(payload);
if (deduplicationKey === undefined) {
return await transport(config);
return await transport(request);
}
if (!coalescedRequestsByDeduplicationKey) {
Promise.resolve().then(() => {
Expand All @@ -47,7 +49,7 @@ export function getRpcTransportWithRequestCoalescing<TTransport extends RpcTrans
const responsePromise = (async () => {
try {
return await transport<TResponse>({
...config,
...request,
signal: abortController.signal,
});
} catch (e) {
Expand All @@ -69,8 +71,8 @@ export function getRpcTransportWithRequestCoalescing<TTransport extends RpcTrans
const coalescedRequest = coalescedRequestsByDeduplicationKey[deduplicationKey];
coalescedRequest.numConsumers++;
if (signal) {
const responsePromise = coalescedRequest.responsePromise as Promise<TResponse>;
return await new Promise<TResponse>((resolve, reject) => {
const responsePromise = coalescedRequest.responsePromise as Promise<RpcResponse<TResponse>>;
return await new Promise<RpcResponse<TResponse>>((resolve, reject) => {
const handleAbort = (e: AbortSignalEventMap['abort']) => {
signal.removeEventListener('abort', handleAbort);
coalescedRequest.numConsumers -= 1;
Expand All @@ -91,7 +93,7 @@ export function getRpcTransportWithRequestCoalescing<TTransport extends RpcTrans
});
});
} else {
return (await coalescedRequest.responsePromise) as TResponse;
return (await coalescedRequest.responsePromise) as RpcResponse<TResponse>;
}
} as TTransport;
}