Skip to content

Commit

Permalink
fix(raftwal): Pass the encryption key instead of reading from WorkerC…
Browse files Browse the repository at this point in the history
…onfig (#7013)

Pass the encryption key to `raftwal.InitEncrypted()` instead of directly reading from the workerConfig.
Fix the debug tool and raftwal migrate, to allow using them with encryption enabled.
  • Loading branch information
ahsanbarkati authored Dec 7, 2020
1 parent 17df3f5 commit cf4c796
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 39 deletions.
3 changes: 2 additions & 1 deletion dgraph/cmd/debug/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,8 @@ func run() {

// If this is a new format WAL, print and return.
if isWal && !opt.oldWalFormat {
store := raftwal.Init(dir)
store, err := raftwal.InitEncrypted(dir, opt.key)
x.Check(err)
fmt.Printf("RaftID: %+v\n", store.Uint(raftwal.RaftId))

// TODO: Fix the pending logic.
Expand Down
17 changes: 15 additions & 2 deletions dgraph/cmd/raft-migrate/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"os"

"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/dgraph/ee/enc"
"github.com/dgraph-io/dgraph/raftwal"
"github.com/dgraph-io/dgraph/x"
"github.com/spf13/cobra"
Expand All @@ -32,6 +33,7 @@ var (
// RaftMigrate is the sub-command invoked when running "dgraph raft-migrate".
RaftMigrate x.SubCommand
quiet bool // enabling quiet mode would suppress the warning logs
encKey x.SensitiveByteSlice
)

func init() {
Expand All @@ -52,6 +54,7 @@ func init() {
"Node ID of the old node. This will be the node ID of the new node.")
flag.IntP("old-group-id", "", 0, "Group ID of the old node. This is used to open the old wal.")
flag.StringP("new-dir", "", "", "Path to the new (z)w directory.")
enc.RegisterFlags(flag)
}

func run(conf *viper.Viper) error {
Expand All @@ -67,10 +70,18 @@ func run(conf *viper.Viper) error {

nodeId := conf.GetInt("old-node-id")
groupId := conf.GetInt("old-group-id")

var err error
if encKey, err = enc.ReadKey(conf); err != nil {
log.Fatalf("Failed to read encryption file: %s", err)
}

// Copied over from zero/run.go
kvOpt := badger.LSMOnlyOptions(oldDir).
WithSyncWrites(false).
WithValueLogFileSize(64 << 20)
WithValueLogFileSize(64 << 20).
WithIndexCacheSize(100 << 20).
WithEncryptionKey(encKey)

kv, err := badger.OpenManaged(kvOpt)
x.Checkf(err, "Error while opening WAL store")
Expand Down Expand Up @@ -105,7 +116,9 @@ func run(conf *viper.Viper) error {
os.Mkdir(newDir, 0777)
}

newWal := raftwal.Init(newDir)
newWal, err := raftwal.InitEncrypted(newDir, encKey)
x.Check(err)

fmt.Printf("Setting raftID to: %+v\n", raftID)
// Set the raft ID
newWal.SetUint(raftwal.RaftId, raftID)
Expand Down
29 changes: 13 additions & 16 deletions raftwal/encryption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,47 +24,45 @@ import (
"os"
"testing"

"github.com/dgraph-io/dgraph/x"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/raft/raftpb"
)

func TestEntryReadWrite(t *testing.T) {
x.WorkerConfig.EncryptionKey = []byte("badger16byteskey")
key := []byte("badger16byteskey")
dir, err := ioutil.TempDir("", "raftwal")
require.NoError(t, err)
el, err := openWal(dir)
ds, err := InitEncrypted(dir, key)
require.NoError(t, err)
defer os.RemoveAll(dir)

// generate some random data
data := make([]byte, rand.Intn(1000))
rand.Read(data)

require.NoError(t, el.AddEntries([]raftpb.Entry{{Index: 1, Term: 1, Data: data}}))
entries := el.allEntries(0, 100, 10000)
require.NoError(t, ds.wal.AddEntries([]raftpb.Entry{{Index: 1, Term: 1, Data: data}}))
entries := ds.wal.allEntries(0, 100, 10000)
require.Equal(t, 1, len(entries))
require.Equal(t, uint64(1), entries[0].Index)
require.Equal(t, uint64(1), entries[0].Term)
require.Equal(t, data, entries[0].Data)

// Open the wal file again.
el2, err := openWal(dir)
ds2, err := InitEncrypted(dir, key)
require.NoError(t, err)
entries = el2.allEntries(0, 100, 10000)
entries = ds2.wal.allEntries(0, 100, 10000)
require.Equal(t, 1, len(entries))
require.Equal(t, uint64(1), entries[0].Index)
require.Equal(t, uint64(1), entries[0].Term)
require.Equal(t, data, entries[0].Data)

// Opening it with a wrong key fails.
x.WorkerConfig.EncryptionKey = []byte("other16byteskeys")
_, err = openWal(dir)
wrongKey := []byte("other16byteskeys")
_, err = InitEncrypted(dir, wrongKey)
require.EqualError(t, err, "Encryption key mismatch")

// Opening it without encryption key fails.
x.WorkerConfig.EncryptionKey = nil
_, err = openWal(dir)
_, err = InitEncrypted(dir, nil)
require.EqualError(t, err, "Logfile is encrypted but encryption key is nil")
}

Expand Down Expand Up @@ -126,10 +124,9 @@ func TestLogRotate(t *testing.T) {
// TestLogGrow writes data of sufficient size to grow the log file.
func TestLogGrow(t *testing.T) {
test := func(t *testing.T, key []byte) {
x.WorkerConfig.EncryptionKey = key
dir, err := ioutil.TempDir("", "raftwal")
require.NoError(t, err)
el, err := openWal(dir)
ds, err := InitEncrypted(dir, key)
require.NoError(t, err)
defer os.RemoveAll(dir)

Expand All @@ -144,13 +141,13 @@ func TestLogGrow(t *testing.T) {
entry := raftpb.Entry{Index: uint64(i + 1), Term: 1, Data: data}
entries = append(entries, entry)
}
err = el.AddEntries(entries)
err = ds.wal.AddEntries(entries)
require.NoError(t, err)

// Reopen the file and retrieve all entries.
el, err = openWal(dir)
ds, err = InitEncrypted(dir, key)
require.NoError(t, err)
readEntries := el.allEntries(0, math.MaxInt64, math.MaxInt64)
readEntries := ds.wal.allEntries(0, math.MaxInt64, math.MaxInt64)
require.Equal(t, numEntries, len(readEntries))

for i, gotEntry := range readEntries {
Expand Down
10 changes: 5 additions & 5 deletions raftwal/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ const (
)

var (
emptyEntry = entry(make([]byte, entrySize))
emptyEntry = entry(make([]byte, entrySize))
encryptionKey x.SensitiveByteSlice
)

type entry []byte
Expand Down Expand Up @@ -103,15 +104,14 @@ func openLogFile(dir string, fid int64) (*logFile, error) {
fid: fid,
}
var err error
encKey := x.WorkerConfig.EncryptionKey
// Initialize the registry for logFile if encryption in enabled.
// NOTE: If encryption is enabled then there is no going back because if we disable it
// later then the older log files which were previously encrypted can't be opened.
if len(encKey) > 0 {
if len(encryptionKey) > 0 {
krOpt := badger.KeyRegistryOptions{
ReadOnly: false,
Dir: dir,
EncryptionKey: encKey,
EncryptionKey: encryptionKey,
EncryptionKeyRotationDuration: 10 * 24 * time.Hour,
InMemory: false,
}
Expand All @@ -138,7 +138,7 @@ func openLogFile(dir string, fid int64) (*logFile, error) {
// If keyID is non-zero, then the opened file is encrypted.
if keyID != 0 {
// Logfile is encrypted but encryption key is not provided.
if encKey == nil {
if encryptionKey == nil {
return nil, errors.New("Logfile is encrypted but encryption key is nil")
}
// retrieve datakey from the keyID of the logfile.
Expand Down
28 changes: 20 additions & 8 deletions raftwal/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,26 +85,38 @@ type indexRange struct {
from, until uint64 // index range for deletion, until index is not deleted.
}

// Init initializes returns a properly initialized instance of DiskStorage.
// To gracefully shutdown DiskStorage, store.Closer.SignalAndWait() should be called.
// Init initializes an instance of DiskStorage without encryption.
func Init(dir string) *DiskStorage {
ds, err := InitEncrypted(dir, nil)
x.Check(err)
return ds
}

// InitEncrypted initializes returns a properly initialized instance of DiskStorage.
// To gracefully shutdown DiskStorage, store.Closer.SignalAndWait() should be called.
func InitEncrypted(dir string, encKey x.SensitiveByteSlice) (*DiskStorage, error) {
w := &DiskStorage{
dir: dir,
}

var err error
w.meta, err = newMetaFile(dir)
x.Check(err)
if w.meta, err = newMetaFile(dir); err != nil {
return nil, err
}
// fmt.Printf("meta: %s\n", hex.Dump(w.meta.data[1024:2048]))
// fmt.Printf("found snapshot of size: %d\n", sliceSize(w.meta.data, snapshotOffset))

w.wal, err = openWal(dir)
x.Check(err)
encryptionKey = encKey
if w.wal, err = openWal(dir); err != nil {
return nil, err
}

w.elog = trace.NewEventLog("Badger", "RaftStorage")

snap, err := w.meta.snapshot()
x.Check(err)
if err != nil {
return nil, err
}

first, _ := w.FirstIndex()
if !raft.IsEmptySnap(snap) {
Expand All @@ -119,7 +131,7 @@ func Init(dir string) *DiskStorage {

glog.Infof("Init Raft Storage with snap: %d, first: %d, last: %d\n",
snap.Metadata.Index, first, last)
return w
return w, nil
}

func (w *DiskStorage) SetUint(info MetaInfo, id uint64) { w.meta.SetUint(info, id) }
Expand Down
11 changes: 6 additions & 5 deletions raftwal/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ func TestStorageCreateSnapshot(t *testing.T) {
defer os.RemoveAll(dir)

ds := Init(dir)

ents := []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}}
cs := &pb.ConfState{Nodes: []uint64{1, 2, 3}}
data := []byte("data")
Expand Down Expand Up @@ -353,7 +352,8 @@ func TestStorageOnlySnap(t *testing.T) {
x.WorkerConfig.EncryptionKey = key
dir, err := ioutil.TempDir("", "raftwal")
require.NoError(t, err)
ds := Init(dir)
ds, err := InitEncrypted(dir, key)
require.NoError(t, err)
t.Logf("Creating dir: %s\n", dir)

buf := make([]byte, 128)
Expand Down Expand Up @@ -385,10 +385,10 @@ func TestStorageOnlySnap(t *testing.T) {

func TestStorageBig(t *testing.T) {
test := func(t *testing.T, key []byte) {
x.WorkerConfig.EncryptionKey = key
dir, err := ioutil.TempDir("", "raftwal")
require.NoError(t, err)
ds := Init(dir)
ds, err := InitEncrypted(dir, key)
require.NoError(t, err)
defer os.RemoveAll(dir)

ent := raftpb.Entry{
Expand Down Expand Up @@ -494,7 +494,8 @@ func TestStorageBig(t *testing.T) {
}
require.NoError(t, ds.Sync())

ks := Init(dir)
ks, err := InitEncrypted(dir, key)
require.NoError(t, err)
ents = ks.wal.allEntries(start, math.MaxInt64, math.MaxInt64)
require.Equal(t, 51, len(ents))
for idx, ent := range ents {
Expand Down
4 changes: 2 additions & 2 deletions worker/server_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ func (s *ServerState) initStorage() {
{
// Write Ahead Log directory
x.Checkf(os.MkdirAll(Config.WALDir, 0700), "Error while creating WAL dir.")
s.WALstore = raftwal.Init(Config.WALDir)
// TODO: Add encryption back to WALStore.
s.WALstore, err = raftwal.InitEncrypted(Config.WALDir, x.WorkerConfig.EncryptionKey)
x.Check(err)
}
{
// Postings directory
Expand Down

0 comments on commit cf4c796

Please sign in to comment.