Skip to content

fix: handle websocket messages with a priority queue #1427

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Nov 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,22 @@ MAINNET_SEND_MANY_CONTRACT_ID=SP3FBR2AGK5H9QBDH3EEN6DF8EK8JY7RX8QJ5SVTE.send-man
# IMGIX_DOMAIN=https://<your domain>.imgix.net
# IMGIX_TOKEN=<your token>

# Web Socket ping interval to determine client availability, in seconds.
# STACKS_API_WS_PING_INTERVAL=5

# Web Socket ping timeout, in seconds. Clients will be dropped if they do not respond with a pong
# after this time has elapsed.
# STACKS_API_WS_PING_TIMEOUT=5

# Web Socket message timeout, in seconds. Clients will be dropped if they do not acknowledge a
# message after this time has elapsed.
# STACKS_API_WS_MESSAGE_TIMEOUT=5

# Web Socket update queue timeout, in seconds. When an update is scheduled (new block, tx update,
# etc.), we will allow this number of seconds to elapse to allow all subscribed clients to receive
# new data.
# STACKS_API_WS_UPDATE_QUEUE_TIMEOUT=5

# Specify max number of STX address to store in an in-memory LRU cache (CPU optimization).
# Defaults to 50,000, which should result in around 25 megabytes of additional memory usage.
# STACKS_ADDRESS_CACHE_SIZE=10000
Expand Down
34 changes: 19 additions & 15 deletions docs/socket-io/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,38 @@ export type Topic =
| NftAssetEventTopic
| NftCollectionEventTopic;

// Allows timeout callbacks for messages. See
// https://socket.io/docs/v4/typescript/#emitting-with-a-timeout
type WithTimeoutAck<isSender extends boolean, args extends any[]> = isSender extends true ? [Error, ...args] : args;

export interface ClientToServerMessages {
subscribe: (topic: Topic | Topic[], callback: (error: string | null) => void) => void;
unsubscribe: (...topic: Topic[]) => void;
}

export interface ServerToClientMessages {
block: (block: Block) => void;
microblock: (microblock: Microblock) => void;
mempool: (transaction: MempoolTransaction) => void;
transaction: (transaction: Transaction | MempoolTransaction) => void;
export interface ServerToClientMessages<isSender extends boolean = false> {
block: (block: Block, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
microblock: (microblock: Microblock, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
mempool: (transaction: MempoolTransaction, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
transaction: (transaction: Transaction | MempoolTransaction, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;

// @ts-ignore scheduled for support in TS v4.3 https://github.com/microsoft/TypeScript/pull/26797
[key: 'nft-event']: (event: NftEvent) => void;
'nft-event': (event: NftEvent) => void;
[key: 'nft-event']: (event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
'nft-event': (event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;

// @ts-ignore scheduled for support in TS v4.3 https://github.com/microsoft/TypeScript/pull/26797
[key: NftAssetEventTopic]: (assetIdentifier: string, value: string, event: NftEvent) => void;
'nft-asset-event': (assetIdentifier: string, value: string, event: NftEvent) => void;
[key: NftAssetEventTopic]: (assetIdentifier: string, value: string, event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
'nft-asset-event': (assetIdentifier: string, value: string, event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;

// @ts-ignore scheduled for support in TS v4.3 https://github.com/microsoft/TypeScript/pull/26797
[key: NftCollectionEventTopic]: (assetIdentifier: string, event: NftEvent) => void;
'nft-collection-event': (assetIdentifier: string, event: NftEvent) => void;
[key: NftCollectionEventTopic]: (assetIdentifier: string, event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
'nft-collection-event': (assetIdentifier: string, event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;

// @ts-ignore scheduled for support in TS v4.3 https://github.com/microsoft/TypeScript/pull/26797
[key: AddressTransactionTopic]: (address: string, stxBalance: AddressTransactionWithTransfers) => void;
'address-transaction': (address: string, tx: AddressTransactionWithTransfers) => void;
[key: AddressTransactionTopic]: (address: string, stxBalance: AddressTransactionWithTransfers, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
'address-transaction': (address: string, tx: AddressTransactionWithTransfers, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;

// @ts-ignore scheduled for support in TS v4.3 https://github.com/microsoft/TypeScript/pull/26797
[key: AddressStxBalanceTopic]: (address: string, stxBalance: AddressStxBalanceResponse) => void;
'address-stx-balance': (address: string, stxBalance: AddressStxBalanceResponse) => void;
[key: AddressStxBalanceTopic]: (address: string, stxBalance: AddressStxBalanceResponse, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
'address-stx-balance': (address: string, stxBalance: AddressStxBalanceResponse, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
}
102 changes: 87 additions & 15 deletions src/api/routes/ws/channels/socket-io-channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@ import {
WebSocketPayload,
WebSocketTopics,
} from '../web-socket-channel';
import {
getWsMessageTimeoutMs,
getWsPingIntervalMs,
getWsPingTimeoutMs,
} from '../web-socket-transmitter';

/**
* SocketIO channel for sending real time API updates.
*/
export class SocketIOChannel extends WebSocketChannel {
private io?: SocketIOServer<ClientToServerMessages, ServerToClientMessages>;
private io?: SocketIOServer<ClientToServerMessages, ServerToClientMessages<true>>;
private adapter?: Adapter;

constructor(server: http.Server) {
Expand All @@ -33,9 +38,14 @@ export class SocketIOChannel extends WebSocketChannel {
}

connect(): void {
const io = new SocketIOServer<ClientToServerMessages, ServerToClientMessages>(this.server, {
cors: { origin: '*' },
});
const io = new SocketIOServer<ClientToServerMessages, ServerToClientMessages<true>>(
this.server,
{
cors: { origin: '*' },
pingInterval: getWsPingIntervalMs(),
pingTimeout: getWsPingTimeoutMs(),
}
);
this.io = io;

io.on('connection', async socket => {
Expand Down Expand Up @@ -153,74 +163,136 @@ export class SocketIOChannel extends WebSocketChannel {
return false;
}

private async getTopicSockets(room: Topic) {
if (!this.io) {
return;
}
const sockets = [];
const socketIds = await this.io.to(room).allSockets();
for (const id of socketIds) {
const socket = this.io.sockets.sockets.get(id);
if (socket) {
sockets.push(socket);
}
}
return sockets;
}

send<P extends keyof WebSocketPayload>(
payload: P,
...args: ListenerType<WebSocketPayload[P]>
): void {
if (!this.io) {
return;
}
// If a client takes more than this number of ms to respond to an event `emit`, it will be
// disconnected.
const timeout = getWsMessageTimeoutMs();
switch (payload) {
case 'block': {
const [block] = args as ListenerType<WebSocketPayload['block']>;
this.prometheus?.sendEvent('block');
this.io.to('block').emit('block', block);
void this.getTopicSockets('block').then(sockets =>
sockets?.forEach(socket =>
socket.timeout(timeout).emit('block', block, _ => socket.disconnect(true))
)
);
break;
}
case 'microblock': {
const [microblock] = args as ListenerType<WebSocketPayload['microblock']>;
this.prometheus?.sendEvent('microblock');
this.io.to('microblock').emit('microblock', microblock);
void this.getTopicSockets('microblock').then(sockets =>
sockets?.forEach(socket =>
socket.timeout(timeout).emit('microblock', microblock, _ => socket.disconnect(true))
)
);
break;
}
case 'mempoolTransaction': {
const [tx] = args as ListenerType<WebSocketPayload['mempoolTransaction']>;
this.prometheus?.sendEvent('mempool');
this.io.to('mempool').emit('mempool', tx);
void this.getTopicSockets('mempool').then(sockets =>
sockets?.forEach(socket =>
socket.timeout(timeout).emit('mempool', tx, _ => socket.disconnect(true))
)
);
break;
}
case 'transaction': {
const [tx] = args as ListenerType<WebSocketPayload['transaction']>;
this.prometheus?.sendEvent('transaction');
this.io.to(`transaction:${tx.tx_id}`).emit('transaction', tx);
void this.getTopicSockets(`transaction:${tx.tx_id}`).then(sockets =>
sockets?.forEach(socket =>
socket.timeout(timeout).emit('transaction', tx, _ => socket.disconnect(true))
)
);
break;
}
case 'nftEvent': {
const [event] = args as ListenerType<WebSocketPayload['nftEvent']>;
this.prometheus?.sendEvent('nft-event');
this.io.to('nft-event').emit('nft-event', event);
void this.getTopicSockets(`nft-event`).then(sockets =>
sockets?.forEach(socket =>
socket.timeout(timeout).emit('nft-event', event, _ => socket.disconnect(true))
)
);
break;
}
case 'nftAssetEvent': {
const [assetIdentifier, value, event] = args as ListenerType<
WebSocketPayload['nftAssetEvent']
>;
this.prometheus?.sendEvent('nft-asset-event');
this.io.to('nft-event').emit('nft-asset-event', assetIdentifier, value, event);
void this.getTopicSockets(`nft-event`).then(sockets =>
sockets?.forEach(socket =>
socket
.timeout(timeout)
.emit('nft-asset-event', assetIdentifier, value, event, _ => socket.disconnect(true))
)
);
break;
}
case 'nftCollectionEvent': {
const [assetIdentifier, event] = args as ListenerType<
WebSocketPayload['nftCollectionEvent']
>;
this.prometheus?.sendEvent('nft-collection-event');
this.io.to('nft-event').emit('nft-collection-event', assetIdentifier, event);
void this.getTopicSockets(`nft-event`).then(sockets =>
sockets?.forEach(socket =>
socket
.timeout(timeout)
.emit('nft-collection-event', assetIdentifier, event, _ => socket.disconnect(true))
)
);
break;
}
case 'principalTransaction': {
const [principal, tx] = args as ListenerType<WebSocketPayload['principalTransaction']>;
const topic: AddressTransactionTopic = `address-transaction:${principal}`;
this.prometheus?.sendEvent('address-transaction');
this.io.to(topic).emit('address-transaction', principal, tx);
this.io.to(topic).emit(topic, principal, tx);
void this.getTopicSockets(topic).then(sockets =>
sockets?.forEach(socket => {
socket
.timeout(timeout)
.emit('address-transaction', principal, tx, _ => socket.disconnect(true));
socket.timeout(timeout).emit(topic, principal, tx, _ => socket.disconnect(true));
})
);
break;
}
case 'principalStxBalance': {
const [principal, balance] = args as ListenerType<WebSocketPayload['principalStxBalance']>;
const topic: AddressStxBalanceTopic = `address-stx-balance:${principal}`;
this.prometheus?.sendEvent('address-stx-balance');
this.io.to(topic).emit('address-stx-balance', principal, balance);
this.io.to(topic).emit(topic, principal, balance);
void this.getTopicSockets(topic).then(sockets =>
sockets?.forEach(socket => {
socket
.timeout(timeout)
.emit('address-stx-balance', principal, balance, _ => socket.disconnect(true));
socket.timeout(timeout).emit(topic, principal, balance, _ => socket.disconnect(true));
})
);
break;
}
}
Expand Down
Loading