Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add bolt compaction sleep interval #13018

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ type ServerConfig struct {
AutoCompactionRetention time.Duration
AutoCompactionMode string
CompactionBatchLimit int
CompactionSleepInterval time.Duration
QuotaBackendBytes int64
MaxTxnOps uint

Expand Down
6 changes: 4 additions & 2 deletions server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,10 @@ type Config struct {
// TODO: Delete in v3.6 (https://github.com/etcd-io/etcd/issues/12913)
ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"`
// ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
// ExperimentalCompactionSleepInterval is the sleep interval between every etcd compaction loop.
ExperimentalCompactionSleepInterval time.Duration `json:"experimental-compaction-sleep-interval"`
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
// ExperimentalWarningApplyDuration is the time duration after which a warning is generated if applying request
// takes more time than this value.
Expand Down
1 change: 1 addition & 0 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
UnsafeNoFsync: cfg.UnsafeNoFsync,
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
CompactionSleepInterval: cfg.ExperimentalCompactionSleepInterval,
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime,
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
Expand Down
1 change: 1 addition & 0 deletions server/etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ func newConfig() *config {

fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.")
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
fs.DurationVar(&cfg.ec.ExperimentalCompactionSleepInterval, "experimental-compaction-sleep-interval", cfg.ec.ExperimentalCompactionSleepInterval, "Sets the sleep interval between each compaction batch.")
fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.")
fs.DurationVar(&cfg.ec.ExperimentalDowngradeCheckTime, "experimental-downgrade-check-time", cfg.ec.ExperimentalDowngradeCheckTime, "Duration of time between two downgrade status check.")
fs.DurationVar(&cfg.ec.ExperimentalWarningApplyDuration, "experimental-warning-apply-duration", cfg.ec.ExperimentalWarningApplyDuration, "Time duration after which a warning is generated if request takes more time.")
Expand Down
8 changes: 7 additions & 1 deletion server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,10 +608,16 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
cfg.Logger.Warn("failed to create token provider", zap.Error(err))
return nil, err
}
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})

mvccStoreConfig := mvcc.StoreConfig{
CompactionBatchLimit: cfg.CompactionBatchLimit,
CompactionSleepInterval: cfg.CompactionSleepInterval,
}
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)

kvindex := ci.ConsistentIndex()
srv.lg.Debug("restore consistentIndex", zap.Uint64("index", kvindex))

if beExist {
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
// etcd from pre-3.0 release.
Expand Down
7 changes: 6 additions & 1 deletion server/mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ const (

var restoreChunkKeys = 10000 // non-const for testing
var defaultCompactBatchLimit = 1000
var minimumBatchInterval = 10 * time.Millisecond
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should rename to defaultBatchInterval.

By 'minimum,' we mean that the interval should be adjusted to 10ms if users set a smaller value, but clearly, the current implementation does not reflect this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in pull request #18495: #18495 😃


type StoreConfig struct {
CompactionBatchLimit int
CompactionBatchLimit int
CompactionSleepInterval time.Duration
}

type store struct {
Expand Down Expand Up @@ -96,6 +98,9 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi
if cfg.CompactionBatchLimit == 0 {
cfg.CompactionBatchLimit = defaultCompactBatchLimit
}
if cfg.CompactionSleepInterval == 0 {
cfg.CompactionSleepInterval = minimumBatchInterval
}
s := &store{
cfg: cfg,
b: b,
Expand Down
9 changes: 6 additions & 3 deletions server/mvcc/kvstore_compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
end := make([]byte, 8)
binary.BigEndian.PutUint64(end, uint64(compactMainRev+1))

batchNum := s.cfg.CompactionBatchLimit
batchInterval := s.cfg.CompactionSleepInterval

last := make([]byte, 8+1+8)
for {
var rev revision
Expand All @@ -40,7 +43,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc

tx := s.b.BatchTx()
tx.Lock()
keys, _ := tx.UnsafeRange(buckets.Key, last, end, int64(s.cfg.CompactionBatchLimit))
keys, _ := tx.UnsafeRange(buckets.Key, last, end, int64(batchNum))
for _, key := range keys {
rev = bytesToRev(key)
if _, ok := keep[rev]; !ok {
Expand All @@ -49,7 +52,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
}
}

if len(keys) < s.cfg.CompactionBatchLimit {
if len(keys) < batchNum {
rbytes := make([]byte, 8+1+8)
revToBytes(revision{main: compactMainRev}, rbytes)
tx.UnsafePut(buckets.Meta, finishedCompactKeyName, rbytes)
Expand All @@ -70,7 +73,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
dbCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))

select {
case <-time.After(10 * time.Millisecond):
case <-time.After(batchInterval):
case <-s.stopc:
return false
}
Expand Down