Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions packages/db/src/collection/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ export class CollectionSyncManager<
!isTruncateTransaction
) {
const existingValue = this.state.syncedData.get(key)
const utils = this.config.utils as
| Partial<LiveQueryCollectionUtils>
| undefined
const internal = utils?.[LIVE_QUERY_INTERNAL]

if (
existingValue !== undefined &&
deepEquals(existingValue, messageWithOptionalKey.value)
Expand All @@ -142,10 +147,12 @@ export class CollectionSyncManager<
// Treat it as an update so we preserve optimistic intent without
// throwing a duplicate-key error during reconciliation.
messageType = `update`
} else if (internal && !internal.hasCustomGetKey) {
// For live queries without custom getKey (like groupBy), the D2 pipeline
// might emit an insert for an updated aggregate without a corresponding
// delete in certain edge cases. Convert to update to avoid duplicate key errors.
messageType = `update`
} else {
const utils = this.config
.utils as Partial<LiveQueryCollectionUtils>
const internal = utils[LIVE_QUERY_INTERNAL]
throw new DuplicateKeySyncError(key, this.id, {
hasCustomGetKey: internal?.hasCustomGetKey ?? false,
hasJoins: internal?.hasJoins ?? false,
Expand Down
27 changes: 23 additions & 4 deletions packages/db/src/query/live/collection-config-builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -722,10 +722,29 @@ export class CollectionConfigBuilder<

// Simple singular insert.
if (inserts && deletes === 0) {
write({
value,
type: `insert`,
})
// Defensive check: if the key already exists and we're NOT using a custom getKey,
// treat as update instead. This handles edge cases in groupBy where the D2 pipeline
// might emit an insert for an updated aggregate without a corresponding delete
// (e.g., due to timing or state issues in incremental updates).
//
// We only apply this for queries WITHOUT custom getKey (like groupBy) because:
// - For groupBy: the key is derived from the group expression, and duplicate inserts
// without deletes can happen due to D2 pipeline state issues
// - For custom getKey + joins: duplicates indicate a user error (e.g., using a non-unique
// key for a 1:N relationship), and we should throw an error to alert the user
const hasCustomGetKey = !!this.config.getKey
const keyExists = collection.has(collection.getKeyFromItem(value))
if (!hasCustomGetKey && keyExists) {
write({
value,
type: `update`,
})
} else {
write({
value,
type: `insert`,
})
}
} else if (
// Insert & update(s) (updates are a delete & insert)
inserts > deletes ||
Expand Down
Loading
Loading