Skip to content

Commit

Permalink
Expose subscriberCount in WebSocket server (#13498)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarred-Sumner committed Aug 24, 2024
1 parent 1a9307d commit 0a37423
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 5 deletions.
10 changes: 10 additions & 0 deletions packages/bun-types/bun.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2767,6 +2767,16 @@ declare module "bun" {
compress?: boolean,
): ServerWebSocketSendStatus;

/**
* A count of connections subscribed to a given topic
*
* This operation will loop through each topic internally to get the count.
*
* @param topic the websocket topic to check how many subscribers are connected to
* @returns the number of subscribers
*/
subscriberCount(topic: string): number;

/**
* Returns the client IP address and port of the given Request. If the request was closed or is a unix socket, returns null.
*
Expand Down
4 changes: 4 additions & 0 deletions src/bun.js/api/server.classes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ function generate(name) {
fn: "doPublish",
length: 3,
},
subscriberCount: {
fn: "doSubscriberCount",
length: 1,
},
reload: {
fn: "doReload",
length: 2,
Expand Down
26 changes: 25 additions & 1 deletion src/bun.js/api/server.zig
Original file line number Diff line number Diff line change
Expand Up @@ -4339,7 +4339,6 @@ pub const ServerWebSocket = struct {
callframe: *JSC.CallFrame,
) JSValue {
const args = callframe.arguments(4);

if (args.len < 1) {
log("publish()", .{});
globalThis.throw("publish requires at least 1 argument", .{});
Expand Down Expand Up @@ -5354,6 +5353,31 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
pub const doFetch = onFetch;
pub const doRequestIP = JSC.wrapInstanceMethod(ThisServer, "requestIP", false);

pub fn doSubscriberCount(this: *ThisServer, globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) JSC.JSValue {
const arguments = callframe.arguments(1);
if (arguments.len < 1) {
globalThis.throwNotEnoughArguments("subscriberCount", 1, 0);
return .zero;
}

if (arguments.ptr[0].isEmptyOrUndefinedOrNull()) {
globalThis.throwInvalidArguments("subscriberCount requires a topic name as a string", .{});
return .zero;
}

var topic = arguments.ptr[0].toSlice(globalThis, bun.default_allocator);
defer topic.deinit();
if (globalThis.hasException()) {
return .zero;
}

if (topic.len == 0) {
return JSValue.jsNumber(0);
}

return JSValue.jsNumber((this.app.num_subscribers(topic.slice())));
}

pub usingnamespace NamespaceType;
pub usingnamespace bun.New(@This());

Expand Down
1 change: 1 addition & 0 deletions test/js/bun/websocket/websocket-server-fixture.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 17 additions & 4 deletions test/js/bun/websocket/websocket-server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -492,9 +492,11 @@ describe("ServerWebSocket", () => {
}
}
};
test(label, (done, connect) => ({
test(label, (done, connect, options) => ({
async open(ws) {
const initial = options.server.subscriberCount(topic);
ws.subscribe(topic);
expect(options.server.subscriberCount(topic)).toBe(initial + 1);
if (ws.data.id === 0) {
await connect();
} else if (ws.data.id === 1) {
Expand Down Expand Up @@ -525,10 +527,12 @@ describe("ServerWebSocket", () => {
}
}
};
test(label, done => ({
test(label, (done, _, options) => ({
publishToSelf: true,
async open(ws) {
const initial = options.server.subscriberCount(topic);
ws.subscribe(topic);
expect(options.server.subscriberCount(topic)).toBe(initial + 1);
send(ws);
},
drain(ws) {
Expand Down Expand Up @@ -690,7 +694,11 @@ describe("ServerWebSocket", () => {

function test(
label: string,
fn: (done: (err?: unknown) => void, connect: () => Promise<void>) => Partial<WebSocketHandler<{ id: number }>>,
fn: (
done: (err?: unknown) => void,
connect: () => Promise<void>,
options: { server: Server },
) => Partial<WebSocketHandler<{ id: number }>>,
timeout?: number,
) {
it(
Expand All @@ -705,6 +713,9 @@ function test(
}
};
let id = 0;
var options = {
server: undefined,
};
const server: Server = serve({
port: 0,
fetch(request, server) {
Expand All @@ -717,9 +728,11 @@ function test(
websocket: {
sendPings: false,
message() {},
...fn(done, () => connect(server)),
...fn(done, () => connect(server), options as any),
},
});
options.server = server;
expect(server.subscriberCount("empty topic")).toBe(0);
await connect(server);
},
{ timeout: timeout ?? 1000 },
Expand Down

0 comments on commit 0a37423

Please sign in to comment.