Skip to content

Commit

Permalink
feat: Implement the Solid WebSocket protocol.
Browse files Browse the repository at this point in the history
  • Loading branch information
RubenVerborgh committed Nov 20, 2020
1 parent 97d7e46 commit 0099d1d
Show file tree
Hide file tree
Showing 3 changed files with 317 additions and 0 deletions.
1 change: 1 addition & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ module.exports = {
'class-methods-use-this': 'off', // conflicts with functions from interfaces that sometimes don't require `this`
'comma-dangle': ['error', 'always-multiline'],
'dot-location': ['error', 'property'],
'lines-around-comment': 'off', // conflicts with padded-blocks
'lines-between-class-members': ['error', 'always', { exceptAfterSingleLine: true }],
'max-len': ['error', { code: 120, ignoreUrls: true }],
'new-cap': 'off', // used for RDF constants
Expand Down
145 changes: 145 additions & 0 deletions src/ldp/UnsecureWebSocketsProtocol.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import { EventEmitter } from 'events';
import type WebSocket from 'ws';
import { getLoggerFor } from '../logging/LogUtil';
import type { HttpRequest } from '../server/HttpRequest';
import { WebSocketHandler } from '../server/WebSocketHandler';
import type { ResourceIdentifier } from './representation/ResourceIdentifier';

const VERSION = 'solid/0.1.0-alpha';

/**
* Implementation of Solid WebSockets API Spec solid/0.1.0-alpha
* at https://github.com/solid/solid-spec/blob/master/api-websockets.md
*/
class WebSocketListener extends EventEmitter {
private host = '';
private protocol = '';
private readonly socket: WebSocket;
private readonly subscribedPaths = new Set<string>();
private readonly logger = getLoggerFor(this);

public constructor(socket: WebSocket) {
super();
this.socket = socket;
socket.addListener('error', (): void => this.stop());
socket.addListener('close', (): void => this.stop());
socket.addListener('message', (message: string): void => this.onMessage(message));
}

public start(upgradeRequest: HttpRequest): void {
// Greet the client
this.sendMessage('protocol', VERSION);
this.sendMessage('warning', 'Unstandardized protocol version, proceed with care');

// Verify the WebSocket protocol version
const protocolHeader = upgradeRequest.headers['sec-websocket-protocol'];
if (!protocolHeader) {
this.sendMessage('warning', `Missing Sec-WebSocket-Protocol header, expected value '${VERSION}'`);
} else {
const supportedProtocols = protocolHeader.split(/\s*,\s*/u);
if (!supportedProtocols.includes(VERSION)) {
this.sendMessage('error', `Client does not support protocol ${VERSION}`);
this.stop();
}
}

// Store the HTTP host and protocol
this.host = upgradeRequest.headers.host ?? '';
this.protocol = (upgradeRequest.socket as any).secure ? 'https:' : 'http:';
}

private stop(): void {
try {
this.socket.close();
} catch {
// Ignore
}
this.subscribedPaths.clear();
this.socket.removeAllListeners();
this.emit('closed');
}

public onResourceChanged({ path }: ResourceIdentifier): void {
if (this.subscribedPaths.has(path)) {
this.sendMessage('pub', path);
}
}

private onMessage(message: string): void {
// Parse the message
const match = /^(\w+)\s+(.+)$/u.exec(message);
if (!match) {
this.sendMessage('warning', `Unrecognized message format: ${message}`);
return;
}

// Process the message
const [ , type, value ] = match;
switch (type) {
case 'sub':
this.subscribe(value);
break;
default:
this.sendMessage('warning', `Unrecognized message type: ${type}`);
}
}

private subscribe(path: string): void {
try {
// Resolve and verify the URL
const resolved = new URL(path, `${this.protocol}${this.host}`);
if (resolved.host !== this.host) {
throw new Error(`Mismatched host: ${resolved.host} instead of ${this.host}`);
}
if (resolved.protocol !== this.protocol) {
throw new Error(`Mismatched protocol: ${resolved.protocol} instead of ${this.protocol}`);
}
// Subscribe to the URL
const url = resolved.toString();
this.subscribedPaths.add(url);
this.sendMessage('ack', url);
this.logger.debug(`WebSocket subscribed to changes on ${url}`);
} catch (error: unknown) {
// Report errors to the socket
const errorText: string = (error as any).message;
this.sendMessage('error', errorText);
this.logger.warn(`WebSocket could not subscribe to ${path}: ${errorText}`);
}
}

private sendMessage(type: string, value: string): void {
this.socket.send(`${type} ${value}`);
}
}

/**
* Provides live update functionality following
* the Solid WebSockets API Spec solid/0.1.0-alpha
*/
export class UnsecureWebSocketsProtocol extends WebSocketHandler {
private readonly logger = getLoggerFor(this);
private readonly listeners = new Set<WebSocketListener>();

public constructor(source: EventEmitter) {
super();
source.on('changed', (changed: ResourceIdentifier): void => this.onResourceChanged(changed));
}

public async handle(input: { webSocket: WebSocket; upgradeRequest: HttpRequest }): Promise<void> {
const listener = new WebSocketListener(input.webSocket);
this.listeners.add(listener);
this.logger.info(`New WebSocket added, ${this.listeners.size} in total`);

listener.on('closed', (): void => {
this.listeners.delete(listener);
this.logger.info(`WebSocket closed, ${this.listeners.size} remaining`);
});
listener.start(input.upgradeRequest);
}

private onResourceChanged(changed: ResourceIdentifier): void {
for (const listener of this.listeners) {
listener.onResourceChanged(changed);
}
}
}
171 changes: 171 additions & 0 deletions test/unit/ldp/UnsecureWebSocketsProtocol.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
import { EventEmitter } from 'events';
import { UnsecureWebSocketsProtocol } from '../../../src/ldp/UnsecureWebSocketsProtocol';
import type { HttpRequest } from '../../../src/server/HttpRequest';

class DummySocket extends EventEmitter {
public readonly messages = new Array<string>();
public readonly close = jest.fn();

public send(message: string): void {
this.messages.push(message);
}
}

describe('An UnsecureWebSocketsProtocol', (): void => {
const source = new EventEmitter();
const protocol = new UnsecureWebSocketsProtocol(source);

describe('after registering a socket', (): void => {
const webSocket = new DummySocket();

beforeAll(async(): Promise<void> => {
const upgradeRequest = {
headers: {
host: 'mypod.example',
'sec-websocket-protocol': 'solid/0.1.0-alpha, other/1.0.0',
},
socket: {
secure: true,
},
} as any as HttpRequest;
await protocol.handle({ webSocket, upgradeRequest } as any);
});

afterEach((): void => {
webSocket.messages.length = 0;
});

it('sends a protocol message.', (): void => {
expect(webSocket.messages).toHaveLength(2);
expect(webSocket.messages.shift()).toBe('protocol solid/0.1.0-alpha');
expect(webSocket.messages.shift()).toBe('warning Unstandardized protocol version, proceed with care');
});

it('warns when receiving an unexpected message.', (): void => {
webSocket.emit('message', 'unexpected');
expect(webSocket.messages).toHaveLength(1);
expect(webSocket.messages.shift()).toBe('warning Unrecognized message format: unexpected');
});

it('warns when receiving an unexpected message type.', (): void => {
webSocket.emit('message', 'unknown 1 2 3');
expect(webSocket.messages).toHaveLength(1);
expect(webSocket.messages.shift()).toBe('warning Unrecognized message type: unknown');
});

describe('before subscribing to resources', (): void => {
it('does not emit pub messages.', (): void => {
source.emit('changed', { path: 'https://mypod.example/foo/bar' });
expect(webSocket.messages).toHaveLength(0);
});
});

describe('after subscribing to a resource', (): void => {
beforeAll((): void => {
webSocket.emit('message', 'sub https://mypod.example/foo/bar');
});

it('sends an ack message.', (): void => {
expect(webSocket.messages).toHaveLength(1);
expect(webSocket.messages.shift()).toBe('ack https://mypod.example/foo/bar');
});

it('emits pub messages for that resource.', (): void => {
source.emit('changed', { path: 'https://mypod.example/foo/bar' });
expect(webSocket.messages).toHaveLength(1);
expect(webSocket.messages.shift()).toBe('pub https://mypod.example/foo/bar');
});
});

describe('after subscribing to a resource via a relative URL', (): void => {
beforeAll((): void => {
webSocket.emit('message', 'sub /relative/foo');
});

it('sends an ack message.', (): void => {
expect(webSocket.messages).toHaveLength(1);
expect(webSocket.messages.shift()).toBe('ack https://mypod.example/relative/foo');
});

it('emits pub messages for that resource.', (): void => {
source.emit('changed', { path: 'https://mypod.example/relative/foo' });
expect(webSocket.messages).toHaveLength(1);
expect(webSocket.messages.shift()).toBe('pub https://mypod.example/relative/foo');
});
});

describe('after subscribing to a resource with the wrong host name', (): void => {
beforeAll((): void => {
webSocket.emit('message', 'sub https://wrong.example/host/foo');
});

it('send an error message.', (): void => {
expect(webSocket.messages).toHaveLength(1);
expect(webSocket.messages.shift())
.toBe('error Mismatched host: wrong.example instead of mypod.example');
});
});

describe('after subscribing to a resource with the wrong protocol', (): void => {
beforeAll((): void => {
webSocket.emit('message', 'sub http://mypod.example/protocol/foo');
});

it('send an error message.', (): void => {
expect(webSocket.messages).toHaveLength(1);
expect(webSocket.messages.shift())
.toBe('error Mismatched protocol: http: instead of https:');
});
});
});

it('unsubscribes when a socket closes.', async(): Promise<void> => {
const newSocket = new DummySocket();
await protocol.handle({ webSocket: newSocket, upgradeRequest: { headers: {}, socket: {}}} as any);
expect(newSocket.listenerCount('message')).toBe(1);
newSocket.emit('close');
expect(newSocket.listenerCount('message')).toBe(0);
expect(newSocket.listenerCount('close')).toBe(0);
expect(newSocket.listenerCount('error')).toBe(0);
});

it('unsubscribes when a socket errors.', async(): Promise<void> => {
const newSocket = new DummySocket();
await protocol.handle({ webSocket: newSocket, upgradeRequest: { headers: {}, socket: {}}} as any);
expect(newSocket.listenerCount('message')).toBe(1);
newSocket.emit('error');
expect(newSocket.listenerCount('message')).toBe(0);
expect(newSocket.listenerCount('close')).toBe(0);
expect(newSocket.listenerCount('error')).toBe(0);
});

it('emits a warning when no Sec-WebSocket-Protocol is supplied.', async(): Promise<void> => {
const newSocket = new DummySocket();
const upgradeRequest = {
headers: {},
socket: {},
} as any as HttpRequest;
await protocol.handle({ webSocket: newSocket, upgradeRequest } as any);
expect(newSocket.messages).toHaveLength(3);
expect(newSocket.messages.pop())
.toBe('warning Missing Sec-WebSocket-Protocol header, expected value \'solid/0.1.0-alpha\'');
expect(newSocket.close).toHaveBeenCalledTimes(0);
});

it('emits an error and closes the connection with the wrong Sec-WebSocket-Protocol.', async(): Promise<void> => {
const newSocket = new DummySocket();
const upgradeRequest = {
headers: {
'sec-websocket-protocol': 'solid/1.0.0, other',
},
socket: {},
} as any as HttpRequest;
await protocol.handle({ webSocket: newSocket, upgradeRequest } as any);
expect(newSocket.messages).toHaveLength(3);
expect(newSocket.messages.pop()).toBe('error Client does not support protocol solid/0.1.0-alpha');
expect(newSocket.close).toHaveBeenCalledTimes(1);
expect(newSocket.listenerCount('message')).toBe(0);
expect(newSocket.listenerCount('close')).toBe(0);
expect(newSocket.listenerCount('error')).toBe(0);
});
});

0 comments on commit 0099d1d

Please sign in to comment.