Skip to content

Commit

Permalink
feat: broadcast and expect multiple acks
Browse files Browse the repository at this point in the history
This feature was added in `socket.io@4.5.0`:

```js
io.timeout(1000).emit("some-event", (err, responses) => {
  // ...
});
```

Thanks to this change, it will now work with multiple Socket.IO
servers.

Related: socketio/socket.io#4163
  • Loading branch information
darrachequesne committed Apr 28, 2022
1 parent 6397c1b commit 055b784
Show file tree
Hide file tree
Showing 5 changed files with 3,175 additions and 50 deletions.
101 changes: 97 additions & 4 deletions lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ enum EventType {
FETCH_SOCKETS_RESPONSE,
SERVER_SIDE_EMIT,
SERVER_SIDE_EMIT_RESPONSE,
BROADCAST_CLIENT_COUNT,
BROADCAST_ACK,
}

interface Request {
Expand All @@ -35,6 +37,12 @@ interface Request {
responses: any[];
}

interface AckRequest {
type: EventType.BROADCAST;
clientCountCallback: (clientCount: number) => void;
ack: (...args: any[]) => void;
}

export interface ClusterAdapterOptions {
/**
* after this timeout the adapter will stop waiting from responses to request
Expand All @@ -61,6 +69,7 @@ export class ClusterAdapter extends Adapter {

private workerIds: Set<number> = new Set();
private requests: Map<string, Request> = new Map();
private ackRequests: Map<string, AckRequest> = new Map();

/**
* Adapter constructor.
Expand Down Expand Up @@ -114,12 +123,54 @@ export class ClusterAdapter extends Adapter {
break;
case EventType.BROADCAST: {
debug("broadcast with opts %j", message.data.opts);
super.broadcast(
message.data.packet,
ClusterAdapter.deserializeOptions(message.data.opts)
);

const withAck = message.data.requestId !== undefined;
if (withAck) {
super.broadcastWithAck(
message.data.packet,
ClusterAdapter.deserializeOptions(message.data.opts),
(clientCount) => {
debug("waiting for %d client acknowledgements", clientCount);
this.publish({
type: EventType.BROADCAST_CLIENT_COUNT,
data: {
requestId: message.data.requestId,
clientCount,
},
});
},
(arg) => {
debug("received acknowledgement with value %j", arg);
this.publish({
type: EventType.BROADCAST_ACK,
data: {
requestId: message.data.requestId,
packet: arg,
},
});
}
);
} else {
super.broadcast(
message.data.packet,
ClusterAdapter.deserializeOptions(message.data.opts)
);
}
break;
}

case EventType.BROADCAST_CLIENT_COUNT: {
const request = this.ackRequests.get(message.data.requestId);
request?.clientCountCallback(message.data.clientCount);
break;
}

case EventType.BROADCAST_ACK: {
const request = this.ackRequests.get(message.data.requestId);
request?.ack(message.data.packet);
break;
}

case EventType.SOCKETS_JOIN: {
debug("calling addSockets with opts %j", message.data.opts);
super.addSockets(
Expand Down Expand Up @@ -287,6 +338,48 @@ export class ClusterAdapter extends Adapter {
});
}

public broadcastWithAck(
packet: any,
opts: BroadcastOptions,
clientCountCallback: (clientCount: number) => void,
ack: (...args: any[]) => void
) {
const onlyLocal = opts?.flags?.local;
if (!onlyLocal) {
const requestId = randomId();

this.publish({
type: EventType.BROADCAST,
data: {
packet,
requestId,
opts: ClusterAdapter.serializeOptions(opts),
},
});

this.ackRequests.set(requestId, {
type: EventType.BROADCAST,
clientCountCallback,
ack,
});

// we have no way to know at this level whether the server has received an acknowledgement from each client, so we
// will simply clean up the ackRequests map after the given delay
setTimeout(() => {
this.ackRequests.delete(requestId);
}, opts.flags!.timeout);
}

// packets with binary contents are modified by the broadcast method, hence the nextTick()
process.nextTick(() => {
super.broadcastWithAck(packet, opts, clientCountCallback, ack);
});
}

public serverCount(): Promise<number> {
return Promise.resolve(1 + this.workerIds.size);
}

addSockets(opts: BroadcastOptions, rooms: Room[]) {
super.addSockets(opts, rooms);

Expand Down
Loading

0 comments on commit 055b784

Please sign in to comment.