Skip to content

Commit

Permalink
storage/engine: Teeing engine fixes
Browse files Browse the repository at this point in the history
This change introduces some misc teeing engine fixes, such as proper
cache initialization, copying of SSTs before ingestions (now that
pebble also deletes SSTs after an Ingest), and better null msg
handling in GetProto. The cache part fixes a segfault in
GetStats.

It also unifies file handling between in-memory and on-disk
teeing engines, by ensuring we write to files in each engine's
aux directory if we're writing to one engine's aux directory.

Fixes #42654 .

Release note: None.
  • Loading branch information
itsbilal committed Mar 3, 2020
1 parent ada086e commit f38087b
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 55 deletions.
9 changes: 9 additions & 0 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2998,6 +2998,15 @@ func TestDecommission(t *testing.T) {
t.Skip("skipping under testrace: #39807 and #37811")
}

// This test relies on concurrently waiting for a value to change in the
// underlying engine(s). Since the teeing engine does not respond well to
// value mismatches, whether transient or permanent, skip this test if the
// teeing engine is being used. See
// https://github.com/cockroachdb/cockroach/issues/42656 for more context.
if engine.DefaultStorageEngine == enginepb.EngineTypeTeePebbleRocksDB {
t.Skip("disabled on teeing engine")
}

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 5, base.TestClusterArgs{
ReplicationMode: base.ReplicationAuto,
Expand Down
19 changes: 19 additions & 0 deletions pkg/storage/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -1694,6 +1695,15 @@ func TestSystemZoneConfigs(t *testing.T) {
t.Skip()
}

// This test relies on concurrently waiting for a value to change in the
// underlying engine(s). Since the teeing engine does not respond well to
// value mismatches, whether transient or permanent, skip this test if the
// teeing engine is being used. See
// https://github.com/cockroachdb/cockroach/issues/42656 for more context.
if engine.DefaultStorageEngine == enginepb.EngineTypeTeePebbleRocksDB {
t.Skip("disabled on teeing engine")
}

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 7, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Expand Down Expand Up @@ -2159,6 +2169,15 @@ func TestRandomConcurrentAdminChangeReplicasRequests(t *testing.T) {
func TestReplicaTombstone(t *testing.T) {
defer leaktest.AfterTest(t)()

// This test relies on concurrently waiting for a value to change in the
// underlying engine(s). Since the teeing engine does not respond well to
// value mismatches, whether transient or permanent, skip this test if the
// teeing engine is being used. See
// https://github.com/cockroachdb/cockroach/issues/42656 for more context.
if engine.DefaultStorageEngine == enginepb.EngineTypeTeePebbleRocksDB {
t.Skip("disabled on teeing engine")
}

t.Run("(1) ChangeReplicasTrigger", func(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
Expand Down
1 change: 0 additions & 1 deletion pkg/storage/engine/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,6 @@ func newTeeInMem(ctx context.Context, attrs roachpb.Attributes, cacheSize int64)
pebbleInMem := newPebbleInMem(ctx, attrs, cacheSize)
rocksDBInMem := newRocksDBInMem(attrs, cacheSize)
tee := NewTee(ctx, rocksDBInMem, pebbleInMem)
tee.inMem = true
return tee
}

Expand Down
159 changes: 105 additions & 54 deletions pkg/storage/engine/tee.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (
"context"
"encoding/hex"
"fmt"
"os"
"path/filepath"
"strings"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
Expand All @@ -32,10 +34,9 @@ import (
// This engine is only meant to be used in testing. No performance or
// stability guarantees are made about this engine in production.
type TeeEngine struct {
ctx context.Context
eng1 Engine
eng2 Engine
inMem bool
ctx context.Context
eng1 Engine
eng2 Engine
}

var _ Engine = &TeeEngine{}
Expand Down Expand Up @@ -136,7 +137,9 @@ func (t *TeeEngine) GetProto(
return false, 0, 0, err
}

err = protoutil.Unmarshal(val, msg)
if msg != nil {
err = protoutil.Unmarshal(val, msg)
}
keyBytes = int64(key.Len())
valBytes = int64(len(val))
return true, keyBytes, valBytes, err
Expand Down Expand Up @@ -256,9 +259,7 @@ func (t *TeeEngine) GetCompactionStats() string {

// GetStats implements the Engine interface.
func (t *TeeEngine) GetStats() (*Stats, error) {
// TODO(itsbilal): Test why getting stats from eng1 segfaults when
// eng1 = RocksDB.
return t.eng2.GetStats()
return t.eng1.GetStats()
}

// GetEncryptionRegistries implements the Engine interface.
Expand Down Expand Up @@ -330,26 +331,54 @@ func (t *TeeEngine) Type() enginepb.EngineType {
return enginepb.EngineTypeTeePebbleRocksDB
}

// Helper to remap a path in the first engine's aux dir, into the same path in
// the second engine's aux dir. Returns ok = true only if the provided path
// is in the first engine's aux dir.
func (t *TeeEngine) remapPath(path1 string) (path2 string, ok bool) {
auxDir1 := t.eng1.GetAuxiliaryDir()
if !strings.HasPrefix(path1, auxDir1) {
// This dir isn't in the first engine's aux dir.
return path1, false
}
ok = true
path2 = filepath.Join(t.eng2.GetAuxiliaryDir(), strings.TrimPrefix(path1, auxDir1))
return
}

// IngestExternalFiles implements the Engine interface.
func (t *TeeEngine) IngestExternalFiles(ctx context.Context, paths []string) error {
var err, err2 error
// Special case: If either engine is RocksDB, run that last, since RocksDB
// IngestExternalFiles deletes the specified files.
if rocksDBEng, ok := t.eng1.(*RocksDB); ok {
err2 = t.eng2.IngestExternalFiles(ctx, paths)
err = rocksDBEng.IngestExternalFiles(ctx, paths)
} else {
err = t.eng1.IngestExternalFiles(ctx, paths)
err2 = t.eng2.IngestExternalFiles(ctx, paths)

// The paths should be in eng1's aux directory. Map them to eng2's aux
// directory.
paths2 := make([]string, len(paths))
for i, path := range paths {
var ok bool
paths2[i], ok = t.remapPath(path)
if !ok {
paths2[i] = filepath.Join(t.eng2.GetAuxiliaryDir(), "temp-ingest", filepath.Base(path))
data, err := t.eng1.ReadFile(path)
if err != nil {
return err
}
f, err := t.eng2.CreateFile(paths2[i])
if err != nil {
return err
}
_, _ = f.Write(data)
_ = f.Sync()
_ = f.Close()
}
}

err = t.eng1.IngestExternalFiles(ctx, paths)
err2 = t.eng2.IngestExternalFiles(ctx, paths2)
return fatalOnErrorMismatch(t.ctx, err, err2)
}

// PreIngestDelay implements the Engine interface.
func (t *TeeEngine) PreIngestDelay(ctx context.Context) {
// TODO(itsbilal): Test why PreIngestDelay on eng1 segfaults when
// eng1 = RocksDB in tests like TestDBAddSSTable.
t.eng2.PreIngestDelay(ctx)
t.eng1.PreIngestDelay(ctx)
}

// ApproximateDiskBytes implements the Engine interface.
Expand All @@ -372,17 +401,19 @@ func (t *TeeEngine) CompactRange(start, end roachpb.Key, forceBottommost bool) e

// InMem implements the Engine interface.
func (t *TeeEngine) InMem() bool {
return t.inMem
return t.eng1.InMem()
}

// CreateFile implements the FS interface.
func (t *TeeEngine) CreateFile(filename string) (fs.File, error) {
_ = os.MkdirAll(filepath.Dir(filename), 0755)
file1, err := t.eng1.CreateFile(filename)
if !t.inMem {
// No need to write twice if the two engines share the same file system.
filename2, ok := t.remapPath(filename)
if !ok {
return file1, err
}
file2, err2 := t.eng2.CreateFile(filename)
_ = os.MkdirAll(filepath.Dir(filename2), 0755)
file2, err2 := t.eng2.CreateFile(filename2)
if err = fatalOnErrorMismatch(t.ctx, err, err2); err != nil {
return nil, err
}
Expand All @@ -395,12 +426,14 @@ func (t *TeeEngine) CreateFile(filename string) (fs.File, error) {

// CreateFileWithSync implements the FS interface.
func (t *TeeEngine) CreateFileWithSync(filename string, bytesPerSync int) (fs.File, error) {
_ = os.MkdirAll(filepath.Dir(filename), 0755)
file1, err := t.eng1.CreateFileWithSync(filename, bytesPerSync)
if !t.inMem {
// No need to write twice if the two engines share the same file system.
filename2, ok := t.remapPath(filename)
if !ok {
return file1, err
}
file2, err2 := t.eng2.CreateFileWithSync(filename, bytesPerSync)
_ = os.MkdirAll(filepath.Dir(filename2), 0755)
file2, err2 := t.eng2.CreateFileWithSync(filename2, bytesPerSync)
if err = fatalOnErrorMismatch(t.ctx, err, err2); err != nil {
return nil, err
}
Expand All @@ -414,7 +447,11 @@ func (t *TeeEngine) CreateFileWithSync(filename string, bytesPerSync int) (fs.Fi
// OpenFile implements the FS interface.
func (t *TeeEngine) OpenFile(filename string) (fs.File, error) {
file1, err := t.eng1.OpenFile(filename)
file2, err2 := t.eng2.OpenFile(filename)
filename2, ok := t.remapPath(filename)
if !ok {
return file1, err
}
file2, err2 := t.eng2.OpenFile(filename2)
if err = fatalOnErrorMismatch(t.ctx, err, err2); err != nil {
return nil, err
}
Expand All @@ -428,7 +465,11 @@ func (t *TeeEngine) OpenFile(filename string) (fs.File, error) {
// OpenDir implements the FS interface.
func (t *TeeEngine) OpenDir(name string) (fs.File, error) {
file1, err := t.eng1.OpenDir(name)
file2, err2 := t.eng2.OpenDir(name)
name2, ok := t.remapPath(name)
if !ok {
return file1, err
}
file2, err2 := t.eng2.OpenDir(name2)
if err = fatalOnErrorMismatch(t.ctx, err, err2); err != nil {
return nil, err
}
Expand All @@ -447,85 +488,91 @@ func (t *TeeEngine) ReadFile(filename string) ([]byte, error) {
// WriteFile implements the Engine interface.
func (t *TeeEngine) WriteFile(filename string, data []byte) error {
err := t.eng1.WriteFile(filename, data)
if !t.inMem {
// No need to write twice if the two engines share the same file system.
filename2, ok := t.remapPath(filename)
if !ok {
return err
}
err2 := t.eng2.WriteFile(filename, data)
_ = os.MkdirAll(filepath.Dir(filename2), 0755)
err2 := t.eng2.WriteFile(filename2, data)
return fatalOnErrorMismatch(t.ctx, err, err2)
}

// DeleteFile implements the FS interface.
func (t *TeeEngine) DeleteFile(filename string) error {
err := t.eng1.DeleteFile(filename)
if !t.inMem {
// No need to write twice if the two engines share the same file system.
filename2, ok := t.remapPath(filename)
if !ok {
return err
}
err2 := t.eng2.DeleteFile(filename)
err2 := t.eng2.DeleteFile(filename2)
return fatalOnErrorMismatch(t.ctx, err, err2)
}

// DeleteDirAndFiles implements the Engine interface.
func (t *TeeEngine) DeleteDirAndFiles(dir string) error {
err := t.eng1.DeleteDirAndFiles(dir)
if !t.inMem {
// No need to write twice if the two engines share the same file system.
dir2, ok := t.remapPath(dir)
if !ok {
return err
}
err2 := t.eng2.DeleteDirAndFiles(dir)
err2 := t.eng2.DeleteDirAndFiles(dir2)
return fatalOnErrorMismatch(t.ctx, err, err2)
}

// LinkFile implements the FS interface.
func (t *TeeEngine) LinkFile(oldname, newname string) error {
err := t.eng1.LinkFile(oldname, newname)
if !t.inMem {
// No need to write twice if the two engines share the same file system.
oldname2, ok := t.remapPath(oldname)
if !ok {
return err
}
err2 := t.eng2.LinkFile(oldname, newname)
newname2, _ := t.remapPath(newname)
err2 := t.eng2.LinkFile(oldname2, newname2)
return fatalOnErrorMismatch(t.ctx, err, err2)
}

// RenameFile implements the FS interface.
func (t *TeeEngine) RenameFile(oldname, newname string) error {
err := t.eng1.RenameFile(oldname, newname)
if !t.inMem {
// No need to write twice if the two engines share the same file system.
oldname2, ok := t.remapPath(oldname)
if !ok {
return err
}
err2 := t.eng2.RenameFile(oldname, newname)
newname2, _ := t.remapPath(newname)
err2 := t.eng2.RenameFile(oldname2, newname2)
return fatalOnErrorMismatch(t.ctx, err, err2)
}

// CreateDir implements the FS interface.
func (t *TeeEngine) CreateDir(name string) error {
err := t.eng1.CreateDir(name)
if !t.inMem {
// No need to create twice if the two engines share the same file system.
name2, ok := t.remapPath(name)
if !ok {
return err
}
err2 := t.eng2.CreateDir(name)
err2 := t.eng2.CreateDir(name2)
return fatalOnErrorMismatch(t.ctx, err, err2)
}

// DeleteDir implements the FS interface.
func (t *TeeEngine) DeleteDir(name string) error {
err := t.eng1.DeleteDir(name)
if !t.inMem {
// No need to delete twice if the two engines share the same file system.
name2, ok := t.remapPath(name)
if !ok {
return err
}
err2 := t.eng2.DeleteDir(name)
err2 := t.eng2.DeleteDir(name2)
return fatalOnErrorMismatch(t.ctx, err, err2)
}

// ListDir implements the FS interface.
func (t *TeeEngine) ListDir(name string) ([]string, error) {
list1, err := t.eng1.ListDir(name)
_, err2 := t.eng2.ListDir(name)

name2, ok := t.remapPath(name)
if !ok {
return list1, err
}
_, err2 := t.eng2.ListDir(name2)
if err = fatalOnErrorMismatch(t.ctx, err, err2); err != nil {
return nil, err
}
Expand Down Expand Up @@ -769,7 +816,9 @@ func (t *TeeEngineReader) GetProto(
return false, 0, 0, err
}

err = protoutil.Unmarshal(val, msg)
if msg != nil {
err = protoutil.Unmarshal(val, msg)
}
keyBytes = int64(key.Len())
valBytes = int64(len(val))
return true, keyBytes, valBytes, err
Expand Down Expand Up @@ -869,7 +918,9 @@ func (t *TeeEngineBatch) GetProto(
return false, 0, 0, err
}

err = protoutil.Unmarshal(val, msg)
if msg != nil {
err = protoutil.Unmarshal(val, msg)
}
keyBytes = int64(key.Len())
valBytes = int64(len(val))
return true, keyBytes, valBytes, err
Expand Down Expand Up @@ -1127,7 +1178,7 @@ func (t *TeeEngineIter) ComputeStats(
if !stats1.Equal(stats2) {
log.Fatalf(t.ctx, "mismatching stats between engines: %v != %v", stats1, stats2)
}
return stats1, nil
return stats1, err
}

// FindSplitKey implements the Iterator interface.
Expand Down

0 comments on commit f38087b

Please sign in to comment.