From 578bd6264f9701a3d077da38c6573f5d1baf0282 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B8rge=20N=C3=A6ss?= Date: Thu, 25 Jul 2024 12:12:45 +0200 Subject: [PATCH] fixup! feat(preview): add `unstable_observeDocument(s)` to preview store (#7176) --- .../src/core/preview/createGlobalListener.ts | 59 +++++++++++++------ .../src/core/preview/documentPreviewStore.ts | 12 +++- 2 files changed, 51 insertions(+), 20 deletions(-) diff --git a/packages/sanity/src/core/preview/createGlobalListener.ts b/packages/sanity/src/core/preview/createGlobalListener.ts index f1b68448144..a67bf779533 100644 --- a/packages/sanity/src/core/preview/createGlobalListener.ts +++ b/packages/sanity/src/core/preview/createGlobalListener.ts @@ -1,5 +1,10 @@ -import {type MutationEvent, type SanityClient, type WelcomeEvent} from '@sanity/client' -import {defer, merge, timer} from 'rxjs' +import { + type MutationEvent, + type ReconnectEvent, + type SanityClient, + type WelcomeEvent, +} from '@sanity/client' +import {merge, timer} from 'rxjs' import {filter, share, shareReplay} from 'rxjs/operators' /** @@ -7,12 +12,12 @@ import {filter, share, shareReplay} from 'rxjs/operators' * Creates a listener that will emit 'welcome' for all new subscribers immediately, and thereafter emit at every mutation event */ export function createGlobalListener(client: SanityClient) { - const allEvents$ = defer(() => - client.listen( + const allEvents$ = client + .listen( '*[!(_id in path("_.**"))]', {}, { - events: ['welcome', 'mutation'], + events: ['welcome', 'mutation', 'reconnect'], includeResult: false, includePreviousRevision: false, includeMutations: false, @@ -20,19 +25,39 @@ export function createGlobalListener(client: SanityClient) { effectFormat: 'mendoza', tag: 'preview.global', }, - ), - ).pipe( - filter( - (event): event is WelcomeEvent | MutationEvent => - event.type === 'welcome' || event.type === 'mutation', - ), - share({resetOnRefCountZero: () => timer(2000), resetOnComplete: true}), + ) + .pipe( + share({ + resetOnRefCountZero: () => timer(2000), + }), + ) + + // Reconnect events emitted in case the connection is lost + const reconnect = allEvents$.pipe( + filter((event): event is ReconnectEvent => event.type === 'reconnect'), + ) + + // Welcome events are emitted when the listener is (re)connected + const welcome = allEvents$.pipe( + filter((event): event is WelcomeEvent => event.type === 'welcome'), + ) + + // Mutation events coming from the listener + const mutations = allEvents$.pipe( + filter((event): event is MutationEvent => event.type === 'mutation'), + ) + + // Replay the latest connection event that was emitted either when the connection was disconnected ('reconnect'), established or re-established ('welcome') + const connectionEvent = merge(welcome, reconnect).pipe( + shareReplay({bufferSize: 1, refCount: true}), ) - const welcome$ = allEvents$.pipe( - filter((event) => event.type === 'welcome'), - shareReplay({refCount: true, bufferSize: 1}), + // Emit the welcome event if the latest connection event was the 'welcome' event. + // Downstream subscribers will typically map the welcome event to an initial fetch + const replayWelcome = connectionEvent.pipe( + filter((latestConnectionEvent) => latestConnectionEvent.type === 'welcome'), ) - const mutations$ = allEvents$.pipe(filter((event) => event.type === 'mutation')).pipe(share()) - return merge(welcome$, mutations$) + + // Combine into a single stream + return merge(replayWelcome, mutations, reconnect) } diff --git a/packages/sanity/src/core/preview/documentPreviewStore.ts b/packages/sanity/src/core/preview/documentPreviewStore.ts index be921b68aab..82bff355ad9 100644 --- a/packages/sanity/src/core/preview/documentPreviewStore.ts +++ b/packages/sanity/src/core/preview/documentPreviewStore.ts @@ -1,8 +1,8 @@ -import {type SanityClient} from '@sanity/client' +import {type MutationEvent, type SanityClient, type WelcomeEvent} from '@sanity/client' import {type PrepareViewOptions, type SanityDocument} from '@sanity/types' import {pick} from 'lodash' import {combineLatest, type Observable} from 'rxjs' -import {distinctUntilChanged, map} from 'rxjs/operators' +import {distinctUntilChanged, filter, map} from 'rxjs/operators' import {isRecord} from '../util' import {createPreviewAvailabilityObserver} from './availability' @@ -91,7 +91,13 @@ export function createDocumentPreviewStore({ client, }: DocumentPreviewStoreOptions): DocumentPreviewStore { const versionedClient = client.withConfig({apiVersion: '1'}) - const globalListener = createGlobalListener(versionedClient) + const globalListener = createGlobalListener(versionedClient).pipe( + filter( + (event): event is MutationEvent | WelcomeEvent => + // ignore reconnect events for now + event.type === 'mutation' || event.type === 'welcome', + ), + ) const invalidationChannel = globalListener.pipe( map((event) => (event.type === 'welcome' ? {type: 'connected' as const} : event)), )