Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/swift-brooms-double.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@electric-sql/client': minor
---

feat: denote end of injected subset snapshot with an additional message
10 changes: 2 additions & 8 deletions packages/typescript-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
GetExtensions,
ChangeMessage,
SnapshotMetadata,
SubsetParams,
} from './types'
import { MessageParser, Parser, TransformFunction } from './parser'
import { ColumnMapper, encodeWhereClause } from './column-mapper'
Expand Down Expand Up @@ -128,14 +129,6 @@ export type ExternalParamsRecord<T extends Row<unknown> = Row> = {
[K in string]: ParamValue | undefined
} & Partial<PostgresParams<T>> & { [K in ReservedParamKeys]?: never }

export type SubsetParams = {
where?: string
params?: Record<string, string>
limit?: number
offset?: number
orderBy?: string
}

type ReservedParamKeys =
| typeof LIVE_CACHE_BUSTER_QUERY_PARAM
| typeof SHAPE_HANDLE_QUERY_PARAM
Expand Down Expand Up @@ -1457,6 +1450,7 @@ export class ShapeStream<T extends Row<unknown> = Row>

const dataWithEndBoundary = (data as Array<Message<T>>).concat([
{ headers: { control: `snapshot-end`, ...metadata } },
{ headers: { control: `subset-end`, ...opts } },
])

this.#snapshotTracker.addSnapshot(
Expand Down
6 changes: 2 additions & 4 deletions packages/typescript-client/src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,9 @@ export function isUpToDateMessage<T extends Row<unknown> = Row>(
* If we are not in SSE mode this function will return undefined.
*/
export function getOffset(message: ControlMessage): Offset | undefined {
if (message.headers.control != `up-to-date`) return
const lsn = message.headers.global_last_seen_lsn
if (!lsn) {
return
}
return `${lsn}_0` as Offset
return lsn ? (`${lsn}_0` as Offset) : undefined
}

/**
Expand Down
9 changes: 9 additions & 0 deletions packages/typescript-client/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,22 @@ export type MoveTag = string
*/
export type MoveOutPattern = { pos: number; value: string }

export type SubsetParams = {
where?: string
params?: Record<string, string>
limit?: number
offset?: number
orderBy?: string
}

export type ControlMessage = {
headers:
| (Header & {
control: `up-to-date` | `must-refetch`
global_last_seen_lsn?: string
})
| (Header & { control: `snapshot-end` } & PostgresSnapshot)
| (Header & { control: `subset-end` } & SubsetParams)
}

export type EventMessage = {
Expand Down
Loading