From a4a4c5aa8fc101d9583d2cd907c4e49937ed1ce9 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 30 Nov 2021 17:09:26 +0800 Subject: [PATCH] encryption: refine key manager watcher loop (#4111) (#4154) * This is an automated cherry-pick of #4111 Signed-off-by: ti-chi-bot --- server/encryptionkm/key_manager.go | 79 +++++++++++++++---------- server/encryptionkm/key_manager_test.go | 2 +- 2 files changed, 49 insertions(+), 32 deletions(-) diff --git a/server/encryptionkm/key_manager.go b/server/encryptionkm/key_manager.go index 97782d7b603..2337c3cb5d2 100644 --- a/server/encryptionkm/key_manager.go +++ b/server/encryptionkm/key_manager.go @@ -206,8 +206,6 @@ func (m *KeyManager) keysRevision() int64 { func (m *KeyManager) StartBackgroundLoop(ctx context.Context) { // Setup key dictionary watcher watcher := clientv3.NewWatcher(m.etcdClient) - watchChan := watcher.Watch(ctx, EncryptionKeysPath, clientv3.WithRev(m.keysRevision())) - watcherEnabled := true defer watcher.Close() // Check data key rotation every min(dataKeyRotationPeriod, keyRotationCheckPeriod). checkPeriod := m.dataKeyRotationPeriod @@ -216,40 +214,66 @@ func (m *KeyManager) StartBackgroundLoop(ctx context.Context) { } ticker := time.NewTicker(checkPeriod) defer ticker.Stop() - // Loop + for { - select { - // Reload encryption keys updated by PD leader (could be ourselves). - case resp := <-watchChan: - if resp.Canceled { - // If the watcher failed, we fallback to reload every 10 minutes. - log.Warn("encryption key watcher canceled") - watcherEnabled = false - continue - } - for _, event := range resp.Events { - if event.Type != mvccpb.PUT { - log.Warn("encryption keys is deleted unexpectedly") - continue + var ( + resp clientv3.WatchResponse + ok bool + ) + rch := watcher.Watch(ctx, EncryptionKeysPath, clientv3.WithRev(m.keysRevision())) + + keyWatchLoop: + for { + select { + case resp, ok = <-rch: + if !ok || resp.Err() != nil { + // If chan is closed or canceled, exit watch loop + // Ref https://etcd.io/docs/v3.4/learning/api/#watch-streams + break keyWatchLoop } - _, err := m.loadKeysFromKV(event.Kv) - if err != nil { - log.Warn("fail to get encryption keys from watcher result", zap.Error(err)) + for _, event := range resp.Events { + if event.Type != mvccpb.PUT { + log.Warn("encryption keys is deleted unexpectedly") + continue + } + _, err := m.loadKeysFromKV(event.Kv) + if err != nil { + log.Warn("fail to get encryption keys from watcher result", errs.ZapError(err)) + } } + m.helper.eventAfterReloadByWatcher() + case <-m.helper.tick(ticker): + m.checkOnTick() + m.helper.eventAfterTicker() } - m.helper.eventAfterReloadByWatcher() - case <-m.helper.tick(ticker): - m.checkOnTick(watcherEnabled) - m.helper.eventAfterTicker() + } + + select { case <-ctx.Done(): // Server shutdown. return + default: + } + + if resp.CompactRevision != 0 { + // meet compacted error + log.Warn("revision has been compacted, the watcher will watch again", + zap.Int64("revision", m.keysRevision()), + zap.Int64("compact-revision", resp.CompactRevision)) + } else { + // other error + log.Error("encryption key watcher canceled, the watcher will watch again", + errs.ZapError(errs.ErrEncryptionKeysWatcher, resp.Err())) + } + + if _, err := m.loadKeys(); err != nil { + log.Error("encryption key reload failed", errs.ZapError(err)) } } } // checkOnTick perform key rotation and key reload on timer tick, if necessary. -func (m *KeyManager) checkOnTick(watcherEnabled bool) { +func (m *KeyManager) checkOnTick() { m.mu.Lock() defer m.mu.Unlock() // Check data key rotation in case we are the PD leader. @@ -257,13 +281,6 @@ func (m *KeyManager) checkOnTick(watcherEnabled bool) { if err != nil { log.Warn("fail to rotate data encryption key", zap.Error(err)) } - // Fallback mechanism to reload keys if watcher failed. - if !watcherEnabled { - _, err = m.loadKeysImpl() - if err != nil { - log.Warn("fail to reload keys after watcher failed", zap.Error(err)) - } - } } // loadKeysFromKVImpl reload keys from etcd result. diff --git a/server/encryptionkm/key_manager_test.go b/server/encryptionkm/key_manager_test.go index 9425220f141..0eb5548da36 100644 --- a/server/encryptionkm/key_manager_test.go +++ b/server/encryptionkm/key_manager_test.go @@ -43,7 +43,7 @@ func TestKeyManager(t *testing.T) { type testKeyManagerSuite struct{} -var _ = Suite(&testKeyManagerSuite{}) +var _ = SerialSuites(&testKeyManagerSuite{}) const ( testMasterKey = "8fd7e3e917c170d92f3e51a981dd7bc8fba11f3df7d8df994842f6e86f69b530"