Skip to content

Commit

Permalink
Dedupe gql subscriptions within a single web socket
Browse files Browse the repository at this point in the history
  • Loading branch information
AaronPlave committed Jan 17, 2025
1 parent 9f43227 commit 1985a19
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 23 deletions.
62 changes: 40 additions & 22 deletions src/stores/subscribable.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { browser } from '$app/environment';
import { env } from '$env/dynamic/public';
import { createClient, type Client, type ClientOptions } from 'graphql-ws';
import { isEqual } from 'lodash-es';
import type { Readable, Subscriber, Unsubscriber, Updater } from 'svelte/store';
import { debounce, isEqual } from 'lodash-es';
import { type Readable, type Subscriber, type Unsubscriber, type Updater } from 'svelte/store';
import type { BaseUser, User } from '../types/app';
import type { GqlSubscribable, NextValue, QueryVariables, Subscription } from '../types/subscribable';
import { logout } from '../utilities/login';
Expand All @@ -19,23 +19,28 @@ export function gqlSubscribable<T>(
transformer: (v: any) => T = v => v,
): GqlSubscribable<T> {
const subscribers: Set<Subscription<T>> = new Set();

let client: Client | null;
let unsubscribe: Unsubscriber = () => undefined;
let value: T | null = initialValue;
let variableUnsubscribers: Unsubscriber[] = [];
let variables: QueryVariables | null = initialVariables;
// Debounce clientSubscribe calls within the same call stack so that the last subscribe call is the
// only one within the stack that actually executes, otherwise we end up with duplicative subscriptions
// with potentially stale data that the underyling graphql-ws library does not immediately cancel.
const debouncedClientSubscribe = debounce(clientSubscribe, 0, { trailing: true });

function clientSubscribe(): Unsubscriber {
let unsubscribe: Unsubscriber = () => undefined;

/**
* Creates a subscription to the query within the web socket
*/
function clientSubscribe() {
if (browser && client) {
unsubscribe = client.subscribe<NextValue<T>>(
{
query,
variables,
},
{
complete: () => ({}),
complete: () => {},
error: async (error: Error | CloseEvent) => {
console.log('subscribe error');
console.log(error);
Expand All @@ -62,9 +67,9 @@ export function gqlSubscribable<T>(
},
},
);
} else {
unsubscribe = () => undefined;
}

return unsubscribe;
}

function filterValueById(id: number): void {
Expand Down Expand Up @@ -123,11 +128,8 @@ export function gqlSubscribable<T>(
}

function resubscribe() {
subscribers.forEach(subscriber => {
subscriber.unsubscribe();
const newUnsubscribe = clientSubscribe();
subscriber.unsubscribe = newUnsubscribe;
});
unsubscribe();
debouncedClientSubscribe();
}

function setVariables(newVariables: QueryVariables): void {
Expand All @@ -140,16 +142,23 @@ export function gqlSubscribable<T>(
}
}

/**
* Subscribe to the variables passed into the store.
* These variables could be stores themselves or plain values.
*/
function subscribeToVariables(initialVariables: QueryVariables | null): void {
variableUnsubscribers.forEach(unsubscribe => unsubscribe());
variableUnsubscribers.forEach(variableUnsubscribe => variableUnsubscribe());
variableUnsubscribers = [];

if (initialVariables !== null) {
for (const [name, variable] of Object.entries(initialVariables)) {
if (typeof variable === 'object' && variable?.subscribe !== undefined) {
// If this variable is a store, subscribe to the store and when the store
// updates, update our local cache of all of the variables from all of the stores
// and resubscribe to the main query with those new variables
const store = variable as Readable<any>;
const unsubscriber = store.subscribe(value => {
variables = { ...variables, [name]: value };
const unsubscriber = store.subscribe(storeValue => {
variables = { ...variables, [name]: storeValue };
resubscribe();
});
variableUnsubscribers.push(unsubscriber);
Expand All @@ -159,6 +168,8 @@ export function gqlSubscribable<T>(
}

function subscribe(next: Subscriber<T>): Unsubscriber {
// If we are in the browser and do not yet have a web socket client
// we will create one and subscribe to variables
if (browser && !client) {
const token = user?.token ?? getTokenFromUserCookie();
const clientOptions: ClientOptions = {
Expand All @@ -171,21 +182,28 @@ export function gqlSubscribable<T>(
url: env.PUBLIC_HASURA_WEB_SOCKET_URL,
};

client = createClient(clientOptions);
client = createClient(clientOptions); // WS subscription
subscribeToVariables(initialVariables);

// Subscribe within the WS to the GQL query
// Note that subscribeToVariables may immediately result in a resubscription if
// any of the variables are stores since the stores will call next(value) on
// initial subscription. This call below covers the case where no stores are passed
// in as variables. If resubscribe is called by subscribeToVariables then the debounce
// should take care of the duplication.
debouncedClientSubscribe();
}

const unsubscribe = clientSubscribe();
const subscriber: Subscription<T> = { next, unsubscribe };
const subscriber: Subscription<T> = { next };
subscribers.add(subscriber);
next(value as T);

return () => {
subscriber.unsubscribe();
subscribers.delete(subscriber);

if (subscribers.size === 0 && client) {
variableUnsubscribers.forEach(unsubscribe => unsubscribe());
unsubscribe();
variableUnsubscribers.forEach(variableUnsubscribe => variableUnsubscribe());
variableUnsubscribers = [];
client.dispose();
client = null;
Expand Down
1 change: 0 additions & 1 deletion src/types/subscribable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,4 @@ export type QueryVariables = Record<string, any>;

export type Subscription<T> = {
next: Subscriber<T>;
unsubscribe: Unsubscriber;
};

0 comments on commit 1985a19

Please sign in to comment.