Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscription store fixes #1593

Merged
merged 3 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
"d3-shape": "^3.2.0",
"d3-time": "^3.1.0",
"fastest-levenshtein": "^1.0.16",
"graphql-ws": "^5.14.0",
"graphql-ws": "^5.16.2",
"json-source-map": "^0.6.1",
"jszip": "^3.10.1",
"jwt-decode": "^4.0.0",
Expand Down
60 changes: 39 additions & 21 deletions src/stores/subscribable.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { browser } from '$app/environment';
import { env } from '$env/dynamic/public';
import { createClient, type Client, type ClientOptions } from 'graphql-ws';
import { isEqual } from 'lodash-es';
import { debounce, isEqual } from 'lodash-es';
import type { Readable, Subscriber, Unsubscriber, Updater } from 'svelte/store';
import type { BaseUser, User } from '../types/app';
import type { GqlSubscribable, NextValue, QueryVariables, Subscription } from '../types/subscribable';
Expand All @@ -19,23 +19,28 @@ export function gqlSubscribable<T>(
transformer: (v: any) => T = v => v,
): GqlSubscribable<T> {
const subscribers: Set<Subscription<T>> = new Set();

let client: Client | null;
let unsubscribe: Unsubscriber = () => undefined;
let value: T | null = initialValue;
let variableUnsubscribers: Unsubscriber[] = [];
let variables: QueryVariables | null = initialVariables;
// Debounce clientSubscribe calls within the same call stack so that the last subscribe call is the
// only one within the stack that actually executes, otherwise we end up with duplicative subscriptions
// with potentially stale data that the underyling graphql-ws library does not immediately cancel.
const debouncedClientSubscribe = debounce(clientSubscribe, 0, { trailing: true });

function clientSubscribe(): Unsubscriber {
let unsubscribe: Unsubscriber = () => undefined;

/**
* Creates a subscription to the query within the web socket
*/
function clientSubscribe() {
if (browser && client) {
unsubscribe = client.subscribe<NextValue<T>>(
{
query,
variables,
},
{
complete: () => ({}),
complete: () => {},
error: async (error: Error | CloseEvent) => {
console.log('subscribe error');
console.log(error);
Expand All @@ -62,9 +67,9 @@ export function gqlSubscribable<T>(
},
},
);
} else {
unsubscribe = () => undefined;
}

return unsubscribe;
}

function filterValueById(id: number): void {
Expand Down Expand Up @@ -123,11 +128,8 @@ export function gqlSubscribable<T>(
}

function resubscribe() {
subscribers.forEach(subscriber => {
subscriber.unsubscribe();
const newUnsubscribe = clientSubscribe();
subscriber.unsubscribe = newUnsubscribe;
});
unsubscribe();
debouncedClientSubscribe();
}

function setVariables(newVariables: QueryVariables): void {
Expand All @@ -140,16 +142,23 @@ export function gqlSubscribable<T>(
}
}

/**
* Subscribe to the variables passed into the store.
* These variables could be stores themselves or plain values.
*/
function subscribeToVariables(initialVariables: QueryVariables | null): void {
variableUnsubscribers.forEach(unsubscribe => unsubscribe());
variableUnsubscribers.forEach(variableUnsubscribe => variableUnsubscribe());
variableUnsubscribers = [];

if (initialVariables !== null) {
for (const [name, variable] of Object.entries(initialVariables)) {
if (typeof variable === 'object' && variable?.subscribe !== undefined) {
// If this variable is a store, subscribe to the store and when the store
// updates, update our local cache of all of the variables from all of the stores
// and resubscribe to the main query with those new variables
const store = variable as Readable<any>;
const unsubscriber = store.subscribe(value => {
variables = { ...variables, [name]: value };
const unsubscriber = store.subscribe(storeValue => {
variables = { ...variables, [name]: storeValue };
resubscribe();
});
variableUnsubscribers.push(unsubscriber);
Expand All @@ -159,6 +168,8 @@ export function gqlSubscribable<T>(
}

function subscribe(next: Subscriber<T>): Unsubscriber {
// If we are in the browser and do not yet have a web socket client
// we will create one and subscribe to variables
if (browser && !client) {
const token = user?.token ?? getTokenFromUserCookie();
const clientOptions: ClientOptions = {
Expand All @@ -171,21 +182,28 @@ export function gqlSubscribable<T>(
url: env.PUBLIC_HASURA_WEB_SOCKET_URL,
};

client = createClient(clientOptions);
client = createClient(clientOptions); // WS subscription
subscribeToVariables(initialVariables);

// Subscribe within the WS to the GQL query
// Note that subscribeToVariables may immediately result in a resubscription if
// any of the variables are stores since the stores will call next(value) on
// initial subscription. This call below covers the case where no stores are passed
// in as variables. If resubscribe is called by subscribeToVariables then the debounce
// should take care of the duplication.
debouncedClientSubscribe();
}

const unsubscribe = clientSubscribe();
const subscriber: Subscription<T> = { next, unsubscribe };
const subscriber: Subscription<T> = { next };
subscribers.add(subscriber);
next(value as T);

return () => {
subscriber.unsubscribe();
subscribers.delete(subscriber);

if (subscribers.size === 0 && client) {
variableUnsubscribers.forEach(unsubscribe => unsubscribe());
unsubscribe();
variableUnsubscribers.forEach(variableUnsubscribe => variableUnsubscribe());
variableUnsubscribers = [];
client.dispose();
client = null;
Expand Down
1 change: 0 additions & 1 deletion src/types/subscribable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,4 @@ export type QueryVariables = Record<string, any>;

export type Subscription<T> = {
next: Subscriber<T>;
unsubscribe: Unsubscriber;
};
Loading