Skip to content

Commit

Permalink
db: Add IngestShared test to exercise shared ingestion
Browse files Browse the repository at this point in the history
This is a test-oriented change that adds more end-to-end testing for
ingesting shared sstables and also reading keys from them.
  • Loading branch information
itsbilal committed Jun 19, 2023
1 parent 9f82737 commit 591bd91
Show file tree
Hide file tree
Showing 2 changed files with 767 additions and 0 deletions.
268 changes: 268 additions & 0 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,274 @@ func TestExcise(t *testing.T) {
}

func TestIngestShared(t *testing.T) {
var d, d1, d2 *DB
defer func() {
if d1 != nil {
require.NoError(t, d1.Close())
}
if d2 != nil {
require.NoError(t, d2.Close())
}
}()
creatorIDCounter := uint64(1)
replicateCounter := 1

reset := func() {
if d1 != nil {
require.NoError(t, d1.Close())
}
if d2 != nil {
require.NoError(t, d2.Close())
}

sstorage := shared.NewInMem()
mem1 := vfs.NewMem()
mem2 := vfs.NewMem()
require.NoError(t, mem1.MkdirAll("ext", 0755))
require.NoError(t, mem2.MkdirAll("ext", 0755))
opts1 := &Options{
Comparer: testkeys.Comparer,
LBaseMaxBytes: 1,
FS: mem1,
L0CompactionThreshold: 100,
L0StopWritesThreshold: 100,
DebugCheck: DebugCheckLevels,
FormatMajorVersion: ExperimentalFormatVirtualSSTables,
}
opts1.Experimental.SharedStorage = shared.MakeSimpleFactory(map[shared.Locator]shared.Storage{
"": sstorage,
})
opts1.Experimental.CreateOnShared = true
opts1.Experimental.CreateOnSharedLocator = ""
// Disable automatic compactions because otherwise we'll race with
// delete-only compactions triggered by ingesting range tombstones.
opts1.DisableAutomaticCompactions = true

opts2 := &Options{}
*opts2 = *opts1
opts2.Experimental.SharedStorage = shared.MakeSimpleFactory(map[shared.Locator]shared.Storage{
"": sstorage,
})
opts2.Experimental.CreateOnShared = true
opts2.Experimental.CreateOnSharedLocator = ""
opts2.FS = mem2

var err error
d1, err = Open("", opts1)
require.NoError(t, err)
require.NoError(t, d1.SetCreatorID(creatorIDCounter))
creatorIDCounter++
d2, err = Open("", opts2)
require.NoError(t, err)
require.NoError(t, d2.SetCreatorID(creatorIDCounter))
creatorIDCounter++
d = d1
}
reset()

datadriven.RunTest(t, "testdata/ingest_shared", func(t *testing.T, td *datadriven.TestData) string {
switch td.Cmd {
case "reset":
reset()
return ""
case "switch":
if len(td.CmdArgs) != 1 {
return "usage: switch <1 or 2>"
}
switch td.CmdArgs[0].Key {
case "1":
d = d1
case "2":
d = d2
default:
return "usage: switch <1 or 2>"
}
return "ok"
case "batch":
b := d.NewIndexedBatch()
if err := runBatchDefineCmd(td, b); err != nil {
return err.Error()
}
if err := b.Commit(nil); err != nil {
return err.Error()
}
return ""
case "build":
if err := runBuildCmd(td, d, d.opts.FS); err != nil {
return err.Error()
}
return ""

case "flush":
if err := d.Flush(); err != nil {
return err.Error()
}
return ""

case "ingest":
if err := runIngestCmd(td, d, d.opts.FS); err != nil {
return err.Error()
}
// Wait for a possible flush.
d.mu.Lock()
for d.mu.compact.flushing {
d.mu.compact.cond.Wait()
}
d.mu.Unlock()
return ""

case "ingest-and-excise":
if err := runIngestAndExciseCmd(td, d, d.opts.FS); err != nil {
return err.Error()
}
// Wait for a possible flush.
d.mu.Lock()
for d.mu.compact.flushing {
d.mu.compact.cond.Wait()
}
d.mu.Unlock()
return ""

case "replicate":
if len(td.CmdArgs) != 4 {
return "usage: replicate <from-db-num> <to-db-num> <start-key> <end-key>"
}
var from, to *DB
switch td.CmdArgs[0].Key {
case "1":
from = d1
case "2":
from = d2
default:
return "usage: replicate <from-db-num> <to-db-num> <start-key> <end-key>"
}
switch td.CmdArgs[1].Key {
case "1":
to = d1
case "2":
to = d2
default:
return "usage: replicate <from-db-num> <to-db-num> <start-key> <end-key>"
}
startKey := []byte(td.CmdArgs[2].Key)
endKey := []byte(td.CmdArgs[3].Key)

writeOpts := d.opts.MakeWriterOptions(0 /* level */, to.opts.FormatMajorVersion.MaxTableFormat())
sstPath := fmt.Sprintf("ext/replicate%d.sst", replicateCounter)
f, err := to.opts.FS.Create(sstPath)
require.NoError(t, err)
replicateCounter++
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), writeOpts)

var sharedSSTs []SharedSSTMeta
err = from.ScanInternal(context.TODO(), startKey, endKey, func(key *InternalKey, value LazyValue) error {
val, _, err := value.Value(nil)
require.NoError(t, err)
require.NoError(t, w.Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val))
return nil
}, func(start, end []byte, seqNum uint64) error {
require.NoError(t, w.DeleteRange(start, end))
return nil
}, func(start, end []byte, keys []keyspan.Key) error {
s := keyspan.Span{
Start: start,
End: end,
Keys: keys,
KeysOrder: 0,
}
require.NoError(t, rangekey.Encode(&s, func(k base.InternalKey, v []byte) error {
return w.AddRangeKey(base.MakeInternalKey(k.UserKey, 0, k.Kind()), v)
}))
return nil
}, func(sst *SharedSSTMeta) error {
sharedSSTs = append(sharedSSTs, *sst)
return nil
})
require.NoError(t, err)
require.NoError(t, w.Close())

_, err = to.IngestAndExcise([]string{sstPath}, sharedSSTs, KeyRange{Start: startKey, End: endKey})
require.NoError(t, err)
return fmt.Sprintf("replicated %d shared SSTs", len(sharedSSTs))

case "get":
return runGetCmd(t, td, d)

case "iter":
o := &IterOptions{KeyTypes: IterKeyTypePointsAndRanges}
for _, arg := range td.CmdArgs {
switch arg.Key {
case "mask-suffix":
o.RangeKeyMasking.Suffix = []byte(arg.Vals[0])
case "mask-filter":
o.RangeKeyMasking.Filter = func() BlockPropertyFilterMask {
return sstable.NewTestKeysMaskingFilter()
}
}
}
iter := d.NewIter(o)
return runIterCmd(td, iter, true)

case "lsm":
return runLSMCmd(td, d)

case "metrics":
// The asynchronous loading of table stats can change metrics, so
// wait for all the tables' stats to be loaded.
d.mu.Lock()
d.waitTableStats()
d.mu.Unlock()

return d.Metrics().String()

case "wait-pending-table-stats":
return runTableStatsCmd(td, d)

case "excise":
ve := &versionEdit{
DeletedFiles: map[deletedFileEntry]*fileMetadata{},
}
var exciseSpan KeyRange
if len(td.CmdArgs) != 2 {
panic("insufficient args for compact command")
}
exciseSpan.Start = []byte(td.CmdArgs[0].Key)
exciseSpan.End = []byte(td.CmdArgs[1].Key)

d.mu.Lock()
d.mu.versions.logLock()
d.mu.Unlock()
current := d.mu.versions.currentVersion()
for level := range current.Levels {
iter := current.Levels[level].Iter()
for m := iter.SeekGE(d.cmp, exciseSpan.Start); m != nil && d.cmp(m.Smallest.UserKey, exciseSpan.End) < 0; m = iter.Next() {
_, err := d.excise(exciseSpan, m, ve, level)
if err != nil {
d.mu.Lock()
d.mu.versions.logUnlock()
d.mu.Unlock()
return fmt.Sprintf("error when excising %s: %s", m.FileNum, err.Error())
}
}
}
d.mu.Lock()
d.mu.versions.logUnlock()
d.mu.Unlock()
return fmt.Sprintf("would excise %d files, use ingest-and-excise to excise.\n%s", len(ve.DeletedFiles), ve.String())

case "compact":
err := runCompactCmd(td, d)
if err != nil {
return err.Error()
}
return "ok"
default:
return fmt.Sprintf("unknown command: %s", td.Cmd)
}
})
}

func TestSimpleIngestShared(t *testing.T) {
mem := vfs.NewMem()
var d *DB
var provider2 objstorage.Provider
Expand Down
Loading

0 comments on commit 591bd91

Please sign in to comment.