Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

encryption: refine key manager watcher loop (#4111) #4154

Merged
merged 5 commits into from
Nov 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 48 additions & 31 deletions server/encryptionkm/key_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,6 @@ func (m *KeyManager) keysRevision() int64 {
func (m *KeyManager) StartBackgroundLoop(ctx context.Context) {
// Setup key dictionary watcher
watcher := clientv3.NewWatcher(m.etcdClient)
watchChan := watcher.Watch(ctx, EncryptionKeysPath, clientv3.WithRev(m.keysRevision()))
watcherEnabled := true
defer watcher.Close()
// Check data key rotation every min(dataKeyRotationPeriod, keyRotationCheckPeriod).
checkPeriod := m.dataKeyRotationPeriod
Expand All @@ -216,54 +214,73 @@ func (m *KeyManager) StartBackgroundLoop(ctx context.Context) {
}
ticker := time.NewTicker(checkPeriod)
defer ticker.Stop()
// Loop

for {
select {
// Reload encryption keys updated by PD leader (could be ourselves).
case resp := <-watchChan:
if resp.Canceled {
// If the watcher failed, we fallback to reload every 10 minutes.
log.Warn("encryption key watcher canceled")
watcherEnabled = false
continue
}
for _, event := range resp.Events {
if event.Type != mvccpb.PUT {
log.Warn("encryption keys is deleted unexpectedly")
continue
var (
resp clientv3.WatchResponse
ok bool
)
rch := watcher.Watch(ctx, EncryptionKeysPath, clientv3.WithRev(m.keysRevision()))

keyWatchLoop:
for {
select {
case resp, ok = <-rch:
if !ok || resp.Err() != nil {
// If chan is closed or canceled, exit watch loop
// Ref https://etcd.io/docs/v3.4/learning/api/#watch-streams
break keyWatchLoop
}
_, err := m.loadKeysFromKV(event.Kv)
if err != nil {
log.Warn("fail to get encryption keys from watcher result", zap.Error(err))
for _, event := range resp.Events {
if event.Type != mvccpb.PUT {
log.Warn("encryption keys is deleted unexpectedly")
continue
}
_, err := m.loadKeysFromKV(event.Kv)
if err != nil {
log.Warn("fail to get encryption keys from watcher result", errs.ZapError(err))
}
}
m.helper.eventAfterReloadByWatcher()
case <-m.helper.tick(ticker):
m.checkOnTick()
m.helper.eventAfterTicker()
}
m.helper.eventAfterReloadByWatcher()
case <-m.helper.tick(ticker):
m.checkOnTick(watcherEnabled)
m.helper.eventAfterTicker()
}

select {
case <-ctx.Done():
// Server shutdown.
return
default:
}

if resp.CompactRevision != 0 {
// meet compacted error
log.Warn("revision has been compacted, the watcher will watch again",
zap.Int64("revision", m.keysRevision()),
zap.Int64("compact-revision", resp.CompactRevision))
} else {
// other error
log.Error("encryption key watcher canceled, the watcher will watch again",
errs.ZapError(errs.ErrEncryptionKeysWatcher, resp.Err()))
}

if _, err := m.loadKeys(); err != nil {
log.Error("encryption key reload failed", errs.ZapError(err))
}
}
}

// checkOnTick perform key rotation and key reload on timer tick, if necessary.
func (m *KeyManager) checkOnTick(watcherEnabled bool) {
func (m *KeyManager) checkOnTick() {
m.mu.Lock()
defer m.mu.Unlock()
// Check data key rotation in case we are the PD leader.
err := m.rotateKeyIfNeeded(false /*forceUpdate*/)
if err != nil {
log.Warn("fail to rotate data encryption key", zap.Error(err))
}
// Fallback mechanism to reload keys if watcher failed.
if !watcherEnabled {
_, err = m.loadKeysImpl()
if err != nil {
log.Warn("fail to reload keys after watcher failed", zap.Error(err))
}
}
}

// loadKeysFromKVImpl reload keys from etcd result.
Expand Down
2 changes: 1 addition & 1 deletion server/encryptionkm/key_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestKeyManager(t *testing.T) {

type testKeyManagerSuite struct{}

var _ = Suite(&testKeyManagerSuite{})
var _ = SerialSuites(&testKeyManagerSuite{})

const (
testMasterKey = "8fd7e3e917c170d92f3e51a981dd7bc8fba11f3df7d8df994842f6e86f69b530"
Expand Down