diff --git a/server/config/config.go b/server/config/config.go index b6e2109c228..de59994098f 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -114,6 +114,7 @@ type ServerConfig struct { AutoCompactionRetention time.Duration AutoCompactionMode string CompactionBatchLimit int + CompactionSleepInterval time.Duration QuotaBackendBytes int64 MaxTxnOps uint diff --git a/server/embed/config.go b/server/embed/config.go index eec169a856d..fb4f9aee1ed 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -315,8 +315,10 @@ type Config struct { // TODO: Delete in v3.6 (https://github.com/etcd-io/etcd/issues/12913) ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"` // ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases. - ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"` - ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"` + ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"` + ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"` + // ExperimentalCompactionSleepInterval is the sleep interval between every etcd compaction loop. + ExperimentalCompactionSleepInterval time.Duration `json:"experimental-compaction-sleep-interval"` ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"` // ExperimentalWarningApplyDuration is the time duration after which a warning is generated if applying request // takes more time than this value. diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 001302f991b..8e5ac151ad2 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -217,6 +217,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { UnsafeNoFsync: cfg.UnsafeNoFsync, EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint, CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit, + CompactionSleepInterval: cfg.ExperimentalCompactionSleepInterval, WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval, DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime, WarningApplyDuration: cfg.ExperimentalWarningApplyDuration, diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index 1e969112115..5dadbf6c897 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -282,6 +282,7 @@ func newConfig() *config { fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.") fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.") + fs.DurationVar(&cfg.ec.ExperimentalCompactionSleepInterval, "experimental-compaction-sleep-interval", cfg.ec.ExperimentalCompactionSleepInterval, "Sets the sleep interval between each compaction batch.") fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.") fs.DurationVar(&cfg.ec.ExperimentalDowngradeCheckTime, "experimental-downgrade-check-time", cfg.ec.ExperimentalDowngradeCheckTime, "Duration of time between two downgrade status check.") fs.DurationVar(&cfg.ec.ExperimentalWarningApplyDuration, "experimental-warning-apply-duration", cfg.ec.ExperimentalWarningApplyDuration, "Time duration after which a warning is generated if request takes more time.") diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 56e288cc5f9..89fd9918512 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -608,10 +608,16 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { cfg.Logger.Warn("failed to create token provider", zap.Error(err)) return nil, err } - srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit}) + + mvccStoreConfig := mvcc.StoreConfig{ + CompactionBatchLimit: cfg.CompactionBatchLimit, + CompactionSleepInterval: cfg.CompactionSleepInterval, + } + srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig) kvindex := ci.ConsistentIndex() srv.lg.Debug("restore consistentIndex", zap.Uint64("index", kvindex)) + if beExist { // TODO: remove kvindex != 0 checking when we do not expect users to upgrade // etcd from pre-3.0 release. diff --git a/server/mvcc/kvstore.go b/server/mvcc/kvstore.go index 54055ed0552..ce5df4974de 100644 --- a/server/mvcc/kvstore.go +++ b/server/mvcc/kvstore.go @@ -52,9 +52,11 @@ const ( var restoreChunkKeys = 10000 // non-const for testing var defaultCompactBatchLimit = 1000 +var minimumBatchInterval = 10 * time.Millisecond type StoreConfig struct { - CompactionBatchLimit int + CompactionBatchLimit int + CompactionSleepInterval time.Duration } type store struct { @@ -96,6 +98,9 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi if cfg.CompactionBatchLimit == 0 { cfg.CompactionBatchLimit = defaultCompactBatchLimit } + if cfg.CompactionSleepInterval == 0 { + cfg.CompactionSleepInterval = minimumBatchInterval + } s := &store{ cfg: cfg, b: b, diff --git a/server/mvcc/kvstore_compaction.go b/server/mvcc/kvstore_compaction.go index 71bd4b7369c..1a71accdabd 100644 --- a/server/mvcc/kvstore_compaction.go +++ b/server/mvcc/kvstore_compaction.go @@ -32,6 +32,9 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc end := make([]byte, 8) binary.BigEndian.PutUint64(end, uint64(compactMainRev+1)) + batchNum := s.cfg.CompactionBatchLimit + batchInterval := s.cfg.CompactionSleepInterval + last := make([]byte, 8+1+8) for { var rev revision @@ -40,7 +43,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc tx := s.b.BatchTx() tx.Lock() - keys, _ := tx.UnsafeRange(buckets.Key, last, end, int64(s.cfg.CompactionBatchLimit)) + keys, _ := tx.UnsafeRange(buckets.Key, last, end, int64(batchNum)) for _, key := range keys { rev = bytesToRev(key) if _, ok := keep[rev]; !ok { @@ -49,7 +52,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc } } - if len(keys) < s.cfg.CompactionBatchLimit { + if len(keys) < batchNum { rbytes := make([]byte, 8+1+8) revToBytes(revision{main: compactMainRev}, rbytes) tx.UnsafePut(buckets.Meta, finishedCompactKeyName, rbytes) @@ -70,7 +73,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc dbCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond)) select { - case <-time.After(10 * time.Millisecond): + case <-time.After(batchInterval): case <-s.stopc: return false }