Skip to content

Commit

Permalink
fixup! feat(preview): add unstable_observeDocument(s) to preview st…
Browse files Browse the repository at this point in the history
…ore (#7176)
  • Loading branch information
bjoerge authored and juice49 committed Aug 4, 2024
1 parent 164269e commit 578bd62
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 20 deletions.
59 changes: 42 additions & 17 deletions packages/sanity/src/core/preview/createGlobalListener.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,63 @@
import {type MutationEvent, type SanityClient, type WelcomeEvent} from '@sanity/client'
import {defer, merge, timer} from 'rxjs'
import {
type MutationEvent,
type ReconnectEvent,
type SanityClient,
type WelcomeEvent,
} from '@sanity/client'
import {merge, timer} from 'rxjs'
import {filter, share, shareReplay} from 'rxjs/operators'

/**
* @internal
* Creates a listener that will emit 'welcome' for all new subscribers immediately, and thereafter emit at every mutation event
*/
export function createGlobalListener(client: SanityClient) {
const allEvents$ = defer(() =>
client.listen(
const allEvents$ = client
.listen(
'*[!(_id in path("_.**"))]',
{},
{
events: ['welcome', 'mutation'],
events: ['welcome', 'mutation', 'reconnect'],
includeResult: false,
includePreviousRevision: false,
includeMutations: false,
visibility: 'query',
effectFormat: 'mendoza',
tag: 'preview.global',
},
),
).pipe(
filter(
(event): event is WelcomeEvent | MutationEvent =>
event.type === 'welcome' || event.type === 'mutation',
),
share({resetOnRefCountZero: () => timer(2000), resetOnComplete: true}),
)
.pipe(
share({
resetOnRefCountZero: () => timer(2000),
}),
)

// Reconnect events emitted in case the connection is lost
const reconnect = allEvents$.pipe(
filter((event): event is ReconnectEvent => event.type === 'reconnect'),
)

// Welcome events are emitted when the listener is (re)connected
const welcome = allEvents$.pipe(
filter((event): event is WelcomeEvent => event.type === 'welcome'),
)

// Mutation events coming from the listener
const mutations = allEvents$.pipe(
filter((event): event is MutationEvent => event.type === 'mutation'),
)

// Replay the latest connection event that was emitted either when the connection was disconnected ('reconnect'), established or re-established ('welcome')
const connectionEvent = merge(welcome, reconnect).pipe(
shareReplay({bufferSize: 1, refCount: true}),
)

const welcome$ = allEvents$.pipe(
filter((event) => event.type === 'welcome'),
shareReplay({refCount: true, bufferSize: 1}),
// Emit the welcome event if the latest connection event was the 'welcome' event.
// Downstream subscribers will typically map the welcome event to an initial fetch
const replayWelcome = connectionEvent.pipe(
filter((latestConnectionEvent) => latestConnectionEvent.type === 'welcome'),
)
const mutations$ = allEvents$.pipe(filter((event) => event.type === 'mutation')).pipe(share())
return merge(welcome$, mutations$)

// Combine into a single stream
return merge(replayWelcome, mutations, reconnect)
}
12 changes: 9 additions & 3 deletions packages/sanity/src/core/preview/documentPreviewStore.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import {type SanityClient} from '@sanity/client'
import {type MutationEvent, type SanityClient, type WelcomeEvent} from '@sanity/client'
import {type PrepareViewOptions, type SanityDocument} from '@sanity/types'
import {pick} from 'lodash'
import {combineLatest, type Observable} from 'rxjs'
import {distinctUntilChanged, map} from 'rxjs/operators'
import {distinctUntilChanged, filter, map} from 'rxjs/operators'

import {isRecord} from '../util'
import {createPreviewAvailabilityObserver} from './availability'
Expand Down Expand Up @@ -91,7 +91,13 @@ export function createDocumentPreviewStore({
client,
}: DocumentPreviewStoreOptions): DocumentPreviewStore {
const versionedClient = client.withConfig({apiVersion: '1'})
const globalListener = createGlobalListener(versionedClient)
const globalListener = createGlobalListener(versionedClient).pipe(
filter(
(event): event is MutationEvent | WelcomeEvent =>
// ignore reconnect events for now
event.type === 'mutation' || event.type === 'welcome',
),
)
const invalidationChannel = globalListener.pipe(
map((event) => (event.type === 'welcome' ? {type: 'connected' as const} : event)),
)
Expand Down

0 comments on commit 578bd62

Please sign in to comment.