Skip to content

Commit

Permalink
postgres watch: checkpoints should move the high watermark revision
Browse files Browse the repository at this point in the history
In #2139 we introduced
the emission of checkpoint RevisionChanges when the postgres
snapshot moves up out of band (outside of application changes).

The fix missed to also move the locally-cached high watermark revision.
As a consequence, on a PG-backed SpiceDB were the pg snapshot does not
have an associated SpiceDB transaction, the Watch API will emit checkpoints
at the same revision over and over, because its own internal high watermark
hasn't moved.

This changes the code so that `currentTxn`, which keeps track of the last
checked revision, moves as well with an out-of-band snapshot revision.
  • Loading branch information
vroldanbet committed Nov 25, 2024
1 parent 5dc47f5 commit c5298e6
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 9 deletions.
22 changes: 13 additions & 9 deletions internal/datastore/postgres/postgres_shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1636,6 +1636,8 @@ func CheckpointsOnOutOfBandChangesTest(t *testing.T, ds datastore.Datastore) {
// Run the watch API.
changes, errchan := ds.Watch(ctx, lowestRevision, datastore.WatchOptions{
Content: datastore.WatchRelationships | datastore.WatchSchema | datastore.WatchCheckpoints,
// loop quickly over the changes to make sure we don't re-issue checkpoints
CheckpointInterval: 1 * time.Nanosecond,
})
require.Zero(len(errchan))

Expand All @@ -1651,27 +1653,29 @@ func CheckpointsOnOutOfBandChangesTest(t *testing.T, ds datastore.Datastore) {
require.NoError(err)
require.True(newRevision.GreaterThan(lowestRevision))

var checkedCheckpoint bool
awaitManyCheckpoints := time.NewTimer(1 * time.Second)
var checkpointCount int
for {
if checkedCheckpoint {
break
}

changeWait := time.NewTimer(waitForChangesTimeout)
select {
case change, ok := <-changes:
if !ok {
require.True(checkedCheckpoint, "expected checkpoint to be emitted")
require.Equal(1, checkpointCount, "expected checkpoint to be emitted")
return
}

if change.IsCheckpoint {
checkedCheckpoint = change.Revision.GreaterThan(lowestRevision)
if change.Revision.Equal(newRevision) {
checkpointCount++
}
}

time.Sleep(10 * time.Millisecond)
case <-changeWait.C:
require.Fail("timed out waiting for checkpoint for out of band change")
// we want to make sure checkpoints are not reissued when moving out-of-band, so with a short poll interval
// we wait a bit before checking if we received checkpoints,
case <-awaitManyCheckpoints.C:
require.Equal(1, checkpointCount, "expected checkpoint to be emitted")
return
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions internal/datastore/postgres/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ func (pgd *pgDatastore) Watch(
}) {
return
}

currentTxn = *optionalHeadRevision
}
}

Expand Down

0 comments on commit c5298e6

Please sign in to comment.