From 4664e04294cfe0470fa81f68cd2e962bd027778c Mon Sep 17 00:00:00 2001 From: Wilson Wang Date: Tue, 2 Nov 2021 21:01:09 -0700 Subject: [PATCH] server: remove KeyValue unmarshal failure panic and send corrupt alarm request instead --- server/config/config.go | 1 + server/embed/config.go | 3 +++ server/embed/etcd.go | 1 + server/etcdmain/config.go | 1 + server/etcdserver/corrupt.go | 22 +++++++++++------- server/etcdserver/server.go | 1 + server/storage/mvcc/kv.go | 3 +++ server/storage/mvcc/kvstore.go | 1 + server/storage/mvcc/watchable_store.go | 31 ++++++++++++++++++++++---- 9 files changed, 52 insertions(+), 12 deletions(-) diff --git a/server/config/config.go b/server/config/config.go index c9e7d3aa3f01..b50a60b83b14 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -115,6 +115,7 @@ type ServerConfig struct { AutoCompactionMode string CompactionBatchLimit int CompactionSleepInterval time.Duration + ReportKVDecodeError bool QuotaBackendBytes int64 MaxTxnOps uint diff --git a/server/embed/config.go b/server/embed/config.go index abcdead5a8ba..abb899cfd2ef 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -329,6 +329,9 @@ type Config struct { // ExperimentalWarningUnaryRequestDuration is the time duration after which a warning is generated if applying // unary request takes more time than this value. ExperimentalWarningUnaryRequestDuration time.Duration `json:"experimental-warning-unary-request-duration"` + // ExperimentalReportKVDecodeError enables WatchableStore to report kv decode error in case of failure. Normally we + // treat these failures as panics. + ExperimentalReportKVDecodeError bool `json:"experimental-report-kv-decode-error"` // ForceNewCluster starts a new cluster even if previously started; unsafe. ForceNewCluster bool `json:"force-new-cluster"` diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 418199037cd0..512d2a72e5ff 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -212,6 +212,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint, CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit, CompactionSleepInterval: cfg.ExperimentalCompactionSleepInterval, + ReportKVDecodeError: cfg.ExperimentalReportKVDecodeError, WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval, DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime, WarningApplyDuration: cfg.ExperimentalWarningApplyDuration, diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index 2c3f135d88bc..7d96ad451e3c 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -291,6 +291,7 @@ func newConfig() *config { fs.BoolVar(&cfg.ec.ExperimentalMemoryMlock, "experimental-memory-mlock", cfg.ec.ExperimentalMemoryMlock, "Enable to enforce etcd pages (in particular bbolt) to stay in RAM.") fs.BoolVar(&cfg.ec.ExperimentalTxnModeWriteWithSharedBuffer, "experimental-txn-mode-write-with-shared-buffer", true, "Enable the write transaction to use a shared buffer in its readonly check operations.") fs.UintVar(&cfg.ec.ExperimentalBootstrapDefragThresholdMegabytes, "experimental-bootstrap-defrag-threshold-megabytes", 0, "Enable the defrag during etcd server bootstrap on condition that it will free at least the provided threshold of disk space. Needs to be set to non-zero value to take effect.") + fs.BoolVar(&cfg.ec.ExperimentalReportKVDecodeError, "experimental-report-kv-decode-error", false, "Enable WatchableStore to report error instead of panic when decoding KVValue failed.") // unsafe fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.") diff --git a/server/etcdserver/corrupt.go b/server/etcdserver/corrupt.go index 81288d5cbaff..3a4f2134346e 100644 --- a/server/etcdserver/corrupt.go +++ b/server/etcdserver/corrupt.go @@ -137,6 +137,8 @@ func (s *EtcdServer) monitorKVHash() { select { case <-s.stopping: return + case <-s.kv.ErrorC(): + s.sendCorruptionAlarmRequest(uint64(s.ID())) case <-time.After(t): } if !s.isLeader() { @@ -148,6 +150,17 @@ func (s *EtcdServer) monitorKVHash() { } } +func (s *EtcdServer) sendCorruptionAlarmRequest(id uint64) { + a := &pb.AlarmRequest{ + MemberID: id, + Action: pb.AlarmRequest_ACTIVATE, + Alarm: pb.AlarmType_CORRUPT, + } + s.GoAttach(func() { + s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a}) + }) +} + func (s *EtcdServer) checkHashKV() error { lg := s.Logger() @@ -175,14 +188,7 @@ func (s *EtcdServer) checkHashKV() error { return } alarmed = true - a := &pb.AlarmRequest{ - MemberID: id, - Action: pb.AlarmRequest_ACTIVATE, - Alarm: pb.AlarmType_CORRUPT, - } - s.GoAttach(func() { - s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a}) - }) + s.sendCorruptionAlarmRequest(id) } if h2 != h && rev2 == rev && crev == crev2 { diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 2de2477ec7f6..7d9edd1a0713 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -364,6 +364,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { mvccStoreConfig := mvcc.StoreConfig{ CompactionBatchLimit: cfg.CompactionBatchLimit, CompactionSleepInterval: cfg.CompactionSleepInterval, + ReportKVDecodeError: cfg.ReportKVDecodeError, } srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig) diff --git a/server/storage/mvcc/kv.go b/server/storage/mvcc/kv.go index 10c4821b1463..9210f02931d1 100644 --- a/server/storage/mvcc/kv.go +++ b/server/storage/mvcc/kv.go @@ -147,4 +147,7 @@ type Watchable interface { // NewWatchStream returns a WatchStream that can be used to // watch events happened or happening on the KV. NewWatchStream() WatchStream + // ErrorC returns an error channel which holds errors encountered + // during WatchableStore execution. + ErrorC() <-chan error } diff --git a/server/storage/mvcc/kvstore.go b/server/storage/mvcc/kvstore.go index 3c0a60bef680..0f37de8c2a12 100644 --- a/server/storage/mvcc/kvstore.go +++ b/server/storage/mvcc/kvstore.go @@ -54,6 +54,7 @@ var minimumBatchInterval = 10 * time.Millisecond type StoreConfig struct { CompactionBatchLimit int CompactionSleepInterval time.Duration + ReportKVDecodeError bool } type store struct { diff --git a/server/storage/mvcc/watchable_store.go b/server/storage/mvcc/watchable_store.go index c2a8832db1d0..803daa7faeca 100644 --- a/server/storage/mvcc/watchable_store.go +++ b/server/storage/mvcc/watchable_store.go @@ -15,6 +15,7 @@ package mvcc import ( + "fmt" "sync" "time" @@ -64,6 +65,8 @@ type watchableStore struct { stopc chan struct{} wg sync.WaitGroup + + errorc chan error } // cancelFunc updates unsynced and synced maps when running @@ -85,6 +88,9 @@ func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg S synced: newWatcherGroup(), stopc: make(chan struct{}), } + if cfg.ReportKVDecodeError { + s.errorc = make(chan error) + } s.store.ReadView = &readView{s} s.store.WriteView = &writeView{s} if s.le != nil { @@ -99,6 +105,9 @@ func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg S func (s *watchableStore) Close() error { close(s.stopc) + if s.errorc != nil { + close(s.errorc) + } s.wg.Wait() return s.store.Close() } @@ -113,6 +122,11 @@ func (s *watchableStore) NewWatchStream() WatchStream { } } +// ErrorC can return nil result when ReportKVDecodeError is not enabled +func (s *watchableStore) ErrorC() <-chan error { + return s.errorc +} + func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) { wa := &watcher{ key: key, @@ -356,7 +370,15 @@ func (s *watchableStore) syncWatchers() int { tx.RLock() revs, vs := tx.UnsafeRange(schema.Key, minBytes, maxBytes, 0) tx.RUnlock() - evs := kvsToEvents(s.store.lg, wg, revs, vs) + evs, err := s.kvsToEvents(wg, revs, vs) + if err != nil { + if s.errorc != nil { + s.lg.Fatal("kvsToEvents failed", zap.Error(err)) + s.errorc <- err + return 0 + } + s.lg.Panic("failed to run kvsToEvents", zap.Error(err)) + } victims := make(watcherBatch) wb := newWatcherBatch(wg, evs) @@ -404,11 +426,12 @@ func (s *watchableStore) syncWatchers() int { } // kvsToEvents gets all events for the watchers from all key-value pairs -func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) { +func (s *watchableStore) kvsToEvents(wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event, err error) { for i, v := range vals { var kv mvccpb.KeyValue if err := kv.Unmarshal(v); err != nil { - lg.Panic("failed to unmarshal mvccpb.KeyValue", zap.Error(err)) + s.store.lg.Fatal("failed to unmarshal mvccpb.KeyValue", zap.Error(err)) + return nil, fmt.Errorf("rev: %d, value: %#v", revs[i], v) } if !wg.contains(string(kv.Key)) { @@ -423,7 +446,7 @@ func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []m } evs = append(evs, mvccpb.Event{Kv: &kv, Type: ty}) } - return evs + return evs, nil } // notify notifies the fact that given event at the given rev just happened to