Skip to content

Commit

Permalink
Support SinceTs in iterators (#1653)
Browse files Browse the repository at this point in the history
SinceTs can be used to read data above a particular timestamp. All data
with version less than or equal to the sinceTs will be ignored.

SinceTs is always less than the readTs and when sinceTs is set, the
data between sinceTs and readTs will be read.

sinceTs < data to be read <= readTs

Fixes DGRAPH-2958
  • Loading branch information
Ibrahim Jarif authored Feb 5, 2021
1 parent a78c086 commit 31c061e
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 16 deletions.
7 changes: 4 additions & 3 deletions backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/dgraph-io/badger/v3/y"
"github.com/dgraph-io/ristretto/z"
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
)

// flushThreshold determines when a buffer will be flushed. When performing a
Expand All @@ -48,6 +49,7 @@ const flushThreshold = 100 << 20
func (db *DB) Backup(w io.Writer, since uint64) (uint64, error) {
stream := db.NewStream()
stream.LogPrefix = "DB.Backup"
stream.SinceTs = since
return stream.Backup(w, since)
}

Expand All @@ -69,9 +71,8 @@ func (stream *Stream) Backup(w io.Writer, since uint64) (uint64, error) {
return list, nil
}
if item.Version() < since {
// Ignore versions less than given timestamp, or skip older
// versions of the given key.
return list, nil
return nil, errors.Errorf("Backup: Item Version: %d less than sinceTs: %d",
item.Version(), since)
}

var valCopy []byte
Expand Down
28 changes: 24 additions & 4 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ type IteratorOptions struct {
// prefix are picked based on their range of keys.
prefixIsKey bool // If set, use the prefix for bloom filter lookup.
Prefix []byte // Only iterate over this given prefix.
SinceTs uint64 // Only read data that has version > SinceTs.
}

func (opt *IteratorOptions) compareToPrefix(key []byte) int {
Expand All @@ -342,6 +343,10 @@ func (opt *IteratorOptions) compareToPrefix(key []byte) int {
}

func (opt *IteratorOptions) pickTable(t table.TableInterface) bool {
// Ignore this table if its max version is less than the sinceTs.
if t.MaxVersion() < opt.SinceTs {
return false
}
if len(opt.Prefix) == 0 {
return true
}
Expand All @@ -362,10 +367,24 @@ func (opt *IteratorOptions) pickTable(t table.TableInterface) bool {
// pickTables picks the necessary table for the iterator. This function also assumes
// that the tables are sorted in the right order.
func (opt *IteratorOptions) pickTables(all []*table.Table) []*table.Table {
filterTables := func(tables []*table.Table) []*table.Table {
if opt.SinceTs > 0 {
tmp := tables[:0]
for _, t := range tables {
if t.MaxVersion() < opt.SinceTs {
continue
}
tmp = append(tmp, t)
}
tables = tmp
}
return tables
}

if len(opt.Prefix) == 0 {
out := make([]*table.Table, len(all))
copy(out, all)
return out
return filterTables(out)
}
sIdx := sort.Search(len(all), func(i int) bool {
// table.Biggest >= opt.prefix
Expand All @@ -384,7 +403,7 @@ func (opt *IteratorOptions) pickTables(all []*table.Table) []*table.Table {
})
out := make([]*table.Table, len(filtered[:eIdx]))
copy(out, filtered[:eIdx])
return out
return filterTables(out)
}

// opt.prefixIsKey == true. This code is optimizing for opt.prefixIsKey part.
Expand All @@ -405,7 +424,7 @@ func (opt *IteratorOptions) pickTables(all []*table.Table) []*table.Table {
}
out = append(out, t)
}
return out
return filterTables(out)
}

// DefaultIteratorOptions contains default options when iterating over Badger key-value stores.
Expand Down Expand Up @@ -612,7 +631,8 @@ func (it *Iterator) parseItem() bool {

// Skip any versions which are beyond the readTs.
version := y.ParseTs(key)
if version > it.readTs {
// Ignore everything that is above the readTs and below or at the sinceTs.
if version > it.readTs || version <= it.opt.SinceTs {
mi.Next()
return false
}
Expand Down
38 changes: 38 additions & 0 deletions iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"fmt"
"io/ioutil"
"math"
"math/rand"
"os"
"path/filepath"
Expand All @@ -39,6 +40,7 @@ type tableMock struct {
func (tm *tableMock) Smallest() []byte { return tm.left }
func (tm *tableMock) Biggest() []byte { return tm.right }
func (tm *tableMock) DoesNotHave(hash uint32) bool { return false }
func (tm *tableMock) MaxVersion() uint64 { return math.MaxUint64 }

func TestPickTables(t *testing.T) {
opt := DefaultIteratorOptions
Expand Down Expand Up @@ -122,6 +124,42 @@ func TestPickSortTables(t *testing.T) {
require.Equal(t, y.ParseKey(filtered[0].Biggest()), []byte("abc"))
}

func TestIterateSinceTs(t *testing.T) {
bkey := func(i int) []byte {
return []byte(fmt.Sprintf("%04d", i))
}
val := []byte("OK")
n := 100000

runBadgerTest(t, nil, func(t *testing.T, db *DB) {
batch := db.NewWriteBatch()
for i := 0; i < n; i++ {
if (i % 10000) == 0 {
t.Logf("Put i=%d\n", i)
}
require.NoError(t, batch.Set(bkey(i), val))
}
require.NoError(t, batch.Flush())

maxVs := db.MaxVersion()
sinceTs := maxVs - maxVs/10
iopt := DefaultIteratorOptions
iopt.SinceTs = sinceTs

db.View(func(txn *Txn) error {
it := txn.NewIterator(iopt)
defer it.Close()

for it.Rewind(); it.Valid(); it.Next() {
i := it.Item()
require.GreaterOrEqual(t, i.Version(), sinceTs)
}
return nil
})

})
}

func TestIteratePrefix(t *testing.T) {
if !*manual {
t.Skip("Skipping test meant to be run manually.")
Expand Down
14 changes: 7 additions & 7 deletions levels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,10 @@ func TestCompaction(t *testing.T) {

t.Run("level 0 to level 1 with lower overlap", func(t *testing.T) {
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
l0 := []keyValVersion{{"foo", "bar", 3, 0}, {"fooz", "baz", 1, 0}}
l01 := []keyValVersion{{"foo", "bar", 2, 0}}
l1 := []keyValVersion{{"foo", "bar", 1, 0}}
l2 := []keyValVersion{{"foo", "bar", 0, 0}}
l0 := []keyValVersion{{"foo", "bar", 4, 0}, {"fooz", "baz", 1, 0}}
l01 := []keyValVersion{{"foo", "bar", 3, 0}}
l1 := []keyValVersion{{"foo", "bar", 2, 0}}
l2 := []keyValVersion{{"foo", "bar", 1, 0}}
// Level 0 has table l0 and l01.
createAndOpen(db, l0, 0)
createAndOpen(db, l01, 0)
Expand All @@ -246,8 +246,8 @@ func TestCompaction(t *testing.T) {
db.SetDiscardTs(10)

getAllAndCheck(t, db, []keyValVersion{
{"foo", "bar", 3, 0}, {"foo", "bar", 2, 0}, {"foo", "bar", 1, 0},
{"foo", "bar", 0, 0}, {"fooz", "baz", 1, 0},
{"foo", "bar", 4, 0}, {"foo", "bar", 3, 0}, {"foo", "bar", 2, 0},
{"foo", "bar", 1, 0}, {"fooz", "baz", 1, 0},
})
cdef := compactDef{
thisLevel: db.lc.levels[0],
Expand All @@ -260,7 +260,7 @@ func TestCompaction(t *testing.T) {
require.NoError(t, db.lc.runCompactDef(-1, 0, cdef))
// foo version 2 and version 1 should be dropped after compaction.
getAllAndCheck(t, db, []keyValVersion{
{"foo", "bar", 3, 0}, {"foo", "bar", 0, 0}, {"fooz", "baz", 1, 0},
{"foo", "bar", 4, 0}, {"foo", "bar", 1, 0}, {"fooz", "baz", 1, 0},
})
})
})
Expand Down
4 changes: 2 additions & 2 deletions managed_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,12 +761,12 @@ func TestWriteBatchDuplicate(t *testing.T) {
wb := db.NewManagedWriteBatch()
defer wb.Cancel()

for i := uint64(0); i < uint64(N); i++ {
for i := uint64(1); i <= uint64(N); i++ {
// Multiple versions of the same key.
require.NoError(t, wb.SetEntryAt(&Entry{Key: k, Value: v}, i))
}
require.NoError(t, wb.Flush())
readVerify(t, db, N, []int{9, 8, 7, 6, 5, 4, 3, 2, 1, 0})
readVerify(t, db, N, []int{10, 9, 8, 7, 6, 5, 4, 3, 2, 1})
})
})
}
Expand Down
3 changes: 3 additions & 0 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ type Stream struct {
// single goroutine, i.e. logic within Send method can expect single threaded execution.
Send func(buf *z.Buffer) error

// Read data above the sinceTs. All keys with version =< sinceTs will be ignored.
SinceTs uint64
readTs uint64
db *DB
rangeCh chan keyRange
Expand Down Expand Up @@ -193,6 +195,7 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error {
iterOpts.AllVersions = true
iterOpts.Prefix = st.Prefix
iterOpts.PrefetchValues = false
iterOpts.SinceTs = st.SinceTs
itr := txn.NewIterator(iterOpts)
itr.ThreadId = threadId
defer itr.Close()
Expand Down
1 change: 1 addition & 0 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type TableInterface interface {
Smallest() []byte
Biggest() []byte
DoesNotHave(hash uint32) bool
MaxVersion() uint64
}

// Table represents a loaded table file with the info we have about it.
Expand Down

0 comments on commit 31c061e

Please sign in to comment.