Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[protocol] Pre-session handshake #53

Merged
merged 13 commits into from
Feb 29, 2024
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Build and Test
on:
pull_request:
branches:
- main
- '**'
push:
branches:
- main
Expand Down
1 change: 0 additions & 1 deletion __tests__/fixtures/cleanup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ export async function testFinishesCleanly({

// server sits on top of server transport so we clean it up first
if (server) {
await advanceFakeTimersByDisconnectGrace();
await ensureServerIsClean(server);
await server.close();
}
Expand Down
8 changes: 5 additions & 3 deletions __tests__/fixtures/transports.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Connection, Transport } from '../../transport';
import { ClientTransport, Connection, ServerTransport } from '../../transport';
import http from 'node:http';
import net from 'node:net';
import {
Expand All @@ -14,8 +14,10 @@ import { UnixDomainSocketServerTransport } from '../../transport/impls/uds/serve
export const transports: Array<{
name: string;
setup: () => Promise<{
// client, server
getTransports: () => [Transport<Connection>, Transport<Connection>];
getTransports: () => [
ClientTransport<Connection>,
ServerTransport<Connection>,
];
cleanup: () => Promise<void>;
}>;
}> = [
Expand Down
11 changes: 10 additions & 1 deletion __tests__/typescript-stress.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Procedure, ServiceBuilder, serializeService } from '../router/builder';
import { Type } from '@sinclair/typebox';
import { OpaqueTransportMessage } from '../transport/message';
import { createServer } from '../router/server';
import { Transport, Connection } from '../transport';
import { Transport, Connection, Session } from '../transport';
import { createClient } from '../router/client';
import { Ok } from '../router/result';
import { buildServiceDefs } from '../router/defs';
Expand Down Expand Up @@ -97,6 +97,13 @@ export const StupidlyLargeService = <Name extends string>(name: Name) =>

// mock transport
export class MockTransport extends Transport<Connection> {
receiveWithBootSequence(
_conn: Connection,
_sessionCb: (sess: Session<Connection>) => void,
): (data: Uint8Array) => void {
throw new Error('Method not implemented.');
}

constructor(clientId: string) {
super(clientId);
}
Expand All @@ -105,6 +112,8 @@ export class MockTransport extends Transport<Connection> {
return true;
}

protected handleConnection(_conn: Connection, _to: string): void {}

async createNewOutgoingConnection(): Promise<Connection> {
throw new Error('unimplemented');
}
Expand Down
4 changes: 1 addition & 3 deletions router/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
* }
* ```
*/
export interface ServiceContext {
state: object | unknown;
}
export interface ServiceContext {}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idk why this included state before, we have a separate type for this


/**
* The {@link ServiceContext} with state. This is what is passed to procedures.
Expand Down
14 changes: 3 additions & 11 deletions transport/impls/stdio/client.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { log } from '../../../logging';
import { TransportClientId } from '../../message';
import { Transport, TransportOptions } from '../../transport';
import { ClientTransport, TransportOptions } from '../../transport';
import { StreamConnection } from './connection';

/**
* A client-side transport implementation that uses standard input and output streams.
* Will eagerly connect as soon as it's initialized.
* @extends Transport
*/
export class StdioClientTransport extends Transport<StreamConnection> {
export class StdioClientTransport extends ClientTransport<StreamConnection> {
input: NodeJS.ReadableStream = process.stdin;
output: NodeJS.WritableStream = process.stdout;
serverId: TransportClientId;
Expand Down Expand Up @@ -36,15 +36,7 @@ export class StdioClientTransport extends Transport<StreamConnection> {
async createNewOutgoingConnection(to: TransportClientId) {
log?.info(`${this.clientId} -- establishing a new stream to ${to}`);
const conn = new StreamConnection(this.input, this.output);
conn.addDataListener((data) => this.handleMsg(this.parseMsg(data)));
const cleanup = () => {
this.onDisconnect(conn, to);
this.connect(to);
};

this.input.addListener('close', cleanup);
this.output.addListener('close', cleanup);
this.onConnect(conn, to);
this.handleConnection(conn, to);
return conn;
}
}
11 changes: 10 additions & 1 deletion transport/impls/stdio/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,20 @@ export class StreamConnection extends Connection {
this.input.on('data', cb);
}

addCloseListener(cb: () => void): void {
this.input.on('close', cb);
this.output.on('close', cb);
}

addErrorListener(cb: (err: Error) => void): void {
this.input.on('error', cb);
this.output.on('error', cb);
}

send(payload: Uint8Array) {
return this.output.write(MessageFramer.write(payload));
}

// doesn't touch the underlying connection
async close() {
this.output.end();
this.input.unpipe(this.framer);
Expand Down
49 changes: 3 additions & 46 deletions transport/impls/stdio/server.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import { log } from '../../../logging';
import { TransportClientId } from '../../message';
import { Session } from '../../session';
import { Transport, TransportOptions } from '../../transport';
import { ServerTransport, TransportOptions } from '../../transport';
import { StreamConnection } from './connection';

/**
* A server-side transport implementation that uses standard input and output streams.
* This will sit idle until a client connects.
* @extends Transport
*/
export class StdioServerTransport extends Transport<StreamConnection> {
export class StdioServerTransport extends ServerTransport<StreamConnection> {
input: NodeJS.ReadableStream = process.stdin;
output: NodeJS.WritableStream = process.stdout;

Expand All @@ -28,48 +26,7 @@ export class StdioServerTransport extends Transport<StreamConnection> {
super(clientId, providedOptions);
this.input = input;
this.output = output;

let session: Session<StreamConnection> | undefined = undefined;
const receiver = () => session?.connectedTo ?? 'unknown';

const conn = new StreamConnection(this.input, this.output);
conn.addDataListener((data) => {
const parsed = this.parseMsg(data);
if (!parsed) return;
if (!session && !this.connections.has(parsed.from)) {
session = this.onConnect(conn, parsed.from);
}

this.handleMsg(parsed);
});

const cleanup = (session: Session<StreamConnection>) =>
this.onDisconnect(conn, session.connectedTo);

this.input.on('close', () => {
if (!session) return;
log?.info(
`${this.clientId} -- stream conn (id: ${
conn.debugId
}) to ${receiver()} disconnected`,
);
cleanup(session);
});

this.input.on('error', (err) => {
if (!session) return;
log?.warn(
`${this.clientId} -- error in stream connection (id: ${
conn.debugId
}) to ${receiver()}: ${err}`,
);
cleanup(session);
});
}

async createNewOutgoingConnection(to: string): Promise<StreamConnection> {
const err = `${this.clientId} -- failed to send msg to ${to}, client probably dropped`;
log?.warn(err);
throw new Error(err);
this.handleConnection(conn);
}
}
23 changes: 5 additions & 18 deletions transport/impls/uds/client.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { Transport, TransportClientId } from '../..';
import { TransportClientId } from '../..';
import { Socket } from 'node:net';
import { log } from '../../../logging';
import { UdsConnection } from './connection';
import { TransportOptions } from '../../transport';
import { ClientTransport, TransportOptions } from '../../transport';

export class UnixDomainSocketClientTransport extends Transport<UdsConnection> {
export class UnixDomainSocketClientTransport extends ClientTransport<UdsConnection> {
path: string;
serverId: TransportClientId;

Expand All @@ -20,7 +20,7 @@ export class UnixDomainSocketClientTransport extends Transport<UdsConnection> {
this.connect(serverId);
}

async createNewOutgoingConnection(to: string) {
async createNewOutgoingConnection(to: TransportClientId) {
const oldConnection = this.connections.get(to);
if (oldConnection) {
oldConnection.close();
Expand All @@ -35,20 +35,7 @@ export class UnixDomainSocketClientTransport extends Transport<UdsConnection> {
});

const conn = new UdsConnection(sock);
conn.addDataListener((data) => this.handleMsg(this.parseMsg(data)));
const cleanup = () => {
this.onDisconnect(conn, to);
this.connect(to);
};

sock.on('close', cleanup);
sock.on('error', (err) => {
log?.warn(
`${this.clientId} -- socket error in connection (id: ${conn.debugId}) to ${to}: ${err}`,
);
});

this.onConnect(conn, to);
this.handleConnection(conn, to);
return conn;
}
}
8 changes: 8 additions & 0 deletions transport/impls/uds/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ export class UdsConnection extends Connection {
this.input.on('data', cb);
}

addCloseListener(cb: () => void): void {
this.sock.on('close', cb);
}

addErrorListener(cb: (err: Error) => void): void {
this.sock.on('error', cb);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

https://nodejs.org/api/net.html#event-error

Maybe we need to call sock.close here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that's only for the actual net.Server itself, sock is net.Socket so it will call both close and error

}

send(payload: Uint8Array) {
return this.sock.write(MessageFramer.write(payload));
}
Expand Down
42 changes: 3 additions & 39 deletions transport/impls/uds/server.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import { Session } from '../../session';
import { log } from '../../../logging';
import { type Server, type Socket } from 'node:net';
import { Transport, TransportOptions } from '../../transport';
import { ServerTransport, TransportOptions } from '../../transport';
import { TransportClientId } from '../../message';
import { UdsConnection } from './connection';

export class UnixDomainSocketServerTransport extends Transport<UdsConnection> {
export class UnixDomainSocketServerTransport extends ServerTransport<UdsConnection> {
server: Server;

constructor(
Expand All @@ -22,48 +21,13 @@ export class UnixDomainSocketServerTransport extends Transport<UdsConnection> {
}

connectionHandler = (sock: Socket) => {
let session: Session<UdsConnection> | undefined = undefined;
const conn = new UdsConnection(sock);
this.handleConnection(conn);
log?.info(
`${this.clientId} -- new incoming uds connection (id: ${conn.debugId})`,
);

const client = () => session?.connectedTo ?? 'unknown';
conn.addDataListener((data) => {
const parsed = this.parseMsg(data);
if (!parsed) return;
if (!session) {
session = this.onConnect(conn, parsed.from);
}

this.handleMsg(parsed);
});

sock.on('close', () => {
if (!session) return;
log?.info(
`${this.clientId} -- uds (id: ${
conn.debugId
}) to ${client()} disconnected`,
);
this.onDisconnect(conn, session?.connectedTo);
});

sock.on('error', (err) => {
log?.warn(
`${this.clientId} -- uds (id: ${
conn.debugId
}) to ${client()} got an error: ${err}`,
);
});
};

async createNewOutgoingConnection(to: string): Promise<UdsConnection> {
const err = `${this.clientId} -- failed to send msg to ${to}, client probably dropped`;
log?.warn(err);
throw new Error(err);
}

async close() {
super.close();
this.server.removeListener('connection', this.connectionHandler);
Expand Down
24 changes: 3 additions & 21 deletions transport/impls/ws/client.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import WebSocket from 'isomorphic-ws';
import { Transport, TransportOptions } from '../../transport';
import { ClientTransport, TransportOptions } from '../../transport';
import { TransportClientId } from '../../message';
import { log } from '../../../logging';
import { WebSocketConnection } from './connection';
Expand All @@ -11,7 +11,7 @@ type WebSocketResult = { ws: WebSocket } | { err: string };
* @class
* @extends Transport
*/
export class WebSocketClientTransport extends Transport<WebSocketConnection> {
export class WebSocketClientTransport extends ClientTransport<WebSocketConnection> {
/**
* A function that returns a Promise that resolves to a WebSocket instance.
*/
Expand All @@ -34,7 +34,6 @@ export class WebSocketClientTransport extends Transport<WebSocketConnection> {
super(sessionId, providedOptions);
this.wsGetter = wsGetter;
this.serverId = serverId;
this.inflightConnectionPromises = new Map();

// eagerly connect as soon as we initialize
this.connect(this.serverId);
Expand Down Expand Up @@ -86,22 +85,7 @@ export class WebSocketClientTransport extends Transport<WebSocketConnection> {
log?.info(
`${this.clientId} -- websocket (id: ${conn.debugId}) to ${to} ok`,
);
this.onConnect(conn, to);
conn.addDataListener((data) => this.handleMsg(this.parseMsg(data)));
wsRes.ws.onclose = () => {
log?.info(
`${this.clientId} -- websocket (id: ${conn.debugId}) to ${to} disconnected`,
);
this.onDisconnect(conn, to);
this.connect(to);
};

wsRes.ws.onerror = (msg) => {
log?.warn(
`${this.clientId} -- websocket (id: ${conn.debugId}) to ${to} had an error: ${msg}`,
);
};

this.handleConnection(conn, to);
return conn;
} else {
throw new Error(wsRes.err);
Expand All @@ -110,11 +94,9 @@ export class WebSocketClientTransport extends Transport<WebSocketConnection> {

async close() {
super.close();
this.inflightConnectionPromises.clear();
}

async destroy() {
super.destroy();
this.inflightConnectionPromises.clear();
}
}
Loading
Loading