Skip to content

Commit

Permalink
Merge pull request #1039 from go-kivik/changesDuplicates
Browse files Browse the repository at this point in the history
x/sqlite: Only report most recent rev for each doc in changes feed
  • Loading branch information
flimzy authored Jul 26, 2024
2 parents 6825b68 + 3b1d51b commit 8f5c345
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 38 deletions.
33 changes: 25 additions & 8 deletions x/sqlite/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,17 +107,34 @@ func (d *db) newNormalChanges(ctx context.Context, opts optsMap, since, lastSeq
}

query := fmt.Sprintf(d.query(leavesCTE+`,
results AS (
winning AS (
SELECT
id,
seq,
deleted,
rev,
rev_id,
IIF($3 OR $6 != '', doc, NULL) AS doc
FROM {{ .Docs }}
WHERE ($1 IS NULL OR seq > $1)
ORDER BY seq
rev_id
FROM (
SELECT
id,
rev,
rev_id,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY rev DESC, rev_id DESC) AS rank
FROM {{ .Revs }}
)
WHERE rank = 1
),
results AS (
SELECT
doc.id,
doc.seq,
doc.deleted,
doc.rev,
doc.rev_id,
IIF($3 OR $6 != '', doc.doc, NULL) AS doc
FROM {{ .Docs }} AS doc
LEFT JOIN winning ON winning.id = doc.id AND winning.rev = doc.rev AND winning.rev_id = doc.rev_id
WHERE ($1 IS NULL OR doc.seq > $1)
AND winning.id IS NOT NULL
ORDER BY doc.seq
)
SELECT
COUNT(*) AS id,
Expand Down
39 changes: 9 additions & 30 deletions x/sqlite/changes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,6 @@ func TestDBChanges(t *testing.T) {
return test{
db: d,
wantChanges: []driver.Change{
{
ID: "doc1",
Seq: "1",
Changes: driver.ChangedRevs{rev},
},
{
ID: "doc1",
Seq: "2",
Expand All @@ -94,7 +89,6 @@ func TestDBChanges(t *testing.T) {
tests.Add("longpoll", func(t *testing.T) interface{} {
d := newDB(t)
rev := d.tPut("doc1", map[string]string{"foo": "bar"})
rev2 := d.tDelete("doc1", kivik.Rev(rev))

return test{
db: d,
Expand All @@ -105,14 +99,8 @@ func TestDBChanges(t *testing.T) {
Seq: "1",
Changes: driver.ChangedRevs{rev},
},
{
ID: "doc1",
Seq: "2",
Deleted: true,
Changes: driver.ChangedRevs{rev2},
},
},
wantLastSeq: &[]string{"2"}[0],
wantLastSeq: &[]string{"1"}[0],
wantETag: &[]string{""}[0],
}
})
Expand Down Expand Up @@ -256,7 +244,7 @@ func TestDBChanges(t *testing.T) {
tests.Add("limit=0 acts the same as limit=1", func(t *testing.T) interface{} {
d := newDB(t)
rev := d.tPut("doc1", map[string]string{"foo": "bar"})
_ = d.tDelete("doc1", kivik.Rev(rev))
_ = d.tPut("doc2", map[string]string{"foo": "bar"})

return test{
db: d,
Expand All @@ -276,7 +264,7 @@ func TestDBChanges(t *testing.T) {
tests.Add("limit=1", func(t *testing.T) interface{} {
d := newDB(t)
rev := d.tPut("doc1", map[string]string{"foo": "bar"})
_ = d.tDelete("doc1", kivik.Rev(rev))
_ = d.tPut("doc2", map[string]string{"foo": "bar"})

return test{
db: d,
Expand All @@ -296,7 +284,7 @@ func TestDBChanges(t *testing.T) {
tests.Add("limit=1 as int", func(t *testing.T) interface{} {
d := newDB(t)
rev := d.tPut("doc1", map[string]string{"foo": "bar"})
_ = d.tDelete("doc1", kivik.Rev(rev))
_ = d.tPut("doc2", map[string]string{"foo": "bar"})

return test{
db: d,
Expand All @@ -316,7 +304,7 @@ func TestDBChanges(t *testing.T) {
tests.Add("feed=longpoll, limit=1, pending is set", func(t *testing.T) interface{} {
d := newDB(t)
rev := d.tPut("doc1", map[string]string{"foo": "bar"})
_ = d.tDelete("doc1", kivik.Rev(rev))
_ = d.tPut("doc2", map[string]string{"foo": "bar"})

return test{
db: d,
Expand All @@ -339,16 +327,15 @@ func TestDBChanges(t *testing.T) {
tests.Add("Descending order", func(t *testing.T) interface{} {
d := newDB(t)
rev := d.tPut("doc1", map[string]string{"foo": "bar"})
rev2 := d.tDelete("doc1", kivik.Rev(rev))
rev2 := d.tPut("doc2", map[string]string{"foo": "bar"})

return test{
db: d,
options: kivik.Param("descending", true),
wantChanges: []driver.Change{
{
ID: "doc1",
ID: "doc2",
Seq: "2",
Deleted: true,
Changes: driver.ChangedRevs{rev2},
},
{
Expand All @@ -364,7 +351,6 @@ func TestDBChanges(t *testing.T) {
tests.Add("include docs, normal feed", func(t *testing.T) interface{} {
d := newDB(t)
rev := d.tPut("doc1", map[string]string{"foo": "bar"})
rev2 := d.tDelete("doc1", kivik.Rev(rev))

return test{
db: d,
Expand All @@ -376,16 +362,9 @@ func TestDBChanges(t *testing.T) {
Changes: driver.ChangedRevs{rev},
Doc: []byte(`{"_id":"doc1","_rev":"` + rev + `","foo":"bar"}`),
},
{
ID: "doc1",
Seq: "2",
Deleted: true,
Changes: driver.ChangedRevs{rev2},
Doc: []byte(`{"_id":"doc1","_rev":"` + rev2 + `","_deleted":true}`),
},
},
wantLastSeq: &[]string{"2"}[0],
wantETag: &[]string{"c81e728d9d4c2f636f067f89cc14862c"}[0],
wantLastSeq: &[]string{"1"}[0],
wantETag: &[]string{"c4ca4238a0b923820dcc509a6f75849b"}[0],
}
})
tests.Add("include docs, attachment stubs, normal feed", func(t *testing.T) interface{} {
Expand Down

0 comments on commit 8f5c345

Please sign in to comment.