From ae16652f22fd9db6c8e1102b8cc59e617f90562d Mon Sep 17 00:00:00 2001 From: jiangxinmeng1 Date: Thu, 20 Jun 2024 02:28:01 +0800 Subject: [PATCH] update opt --- pkg/vm/engine/tae/db/checkpoint/option.go | 6 ++++++ pkg/vm/engine/tae/db/checkpoint/runner.go | 9 +++++++-- pkg/vm/engine/tae/db/open.go | 3 ++- pkg/vm/engine/tae/options/cfg.go | 1 + pkg/vm/engine/tae/testutils/config/options.go | 4 ++++ 5 files changed, 20 insertions(+), 3 deletions(-) diff --git a/pkg/vm/engine/tae/db/checkpoint/option.go b/pkg/vm/engine/tae/db/checkpoint/option.go index b54e8936f06d5..7c57bce4103e9 100644 --- a/pkg/vm/engine/tae/db/checkpoint/option.go +++ b/pkg/vm/engine/tae/db/checkpoint/option.go @@ -89,3 +89,9 @@ func WithReserveWALEntryCount(count uint64) Option { r.options.reservedWALEntryCount = count } } + +func WithStartupLatancy(latency time.Duration) Option { + return func(r *runner) { + r.options.startupLatency = latency + } +} diff --git a/pkg/vm/engine/tae/db/checkpoint/runner.go b/pkg/vm/engine/tae/db/checkpoint/runner.go index 5c6ed23acbb2f..729c0c99973a2 100644 --- a/pkg/vm/engine/tae/db/checkpoint/runner.go +++ b/pkg/vm/engine/tae/db/checkpoint/runner.go @@ -193,6 +193,8 @@ type runner struct { checkpointBlockRows int checkpointSize int + startupLatency time.Duration + reservedWALEntryCount uint64 } @@ -768,8 +770,8 @@ func (r *runner) tryScheduleCheckpoint(endts types.TS) { logutil.Infof("Checkpoint is disable") return } - if time.Since(r.openTime) < time.Minute*5 { - logutil.Infof("Checkpoint is disable. TN has been running for %v", time.Since(r.openTime)) + if time.Since(r.openTime) < r.options.startupLatency { + logutil.Infof("Checkpoint is disable. TN has been running for %v, startup latency %v", time.Since(r.openTime), r.options.startupLatency) return } entry := r.MaxCheckpoint() @@ -859,6 +861,9 @@ func (r *runner) fillDefaults() { if r.options.checkpointSize <= 0 { r.options.checkpointSize = logtail.DefaultCheckpointSize } + if r.options.startupLatency <= 0 { + r.options.startupLatency = time.Minute * 5 + } } func (r *runner) onWaitWaitableItems(items ...any) { diff --git a/pkg/vm/engine/tae/db/open.go b/pkg/vm/engine/tae/db/open.go index 5e1fa3496c598..933b26f927de0 100644 --- a/pkg/vm/engine/tae/db/open.go +++ b/pkg/vm/engine/tae/db/open.go @@ -163,7 +163,8 @@ func Open(ctx context.Context, dirname string, opts *options.Options) (db *DB, e checkpoint.WithMinIncrementalInterval(opts.CheckpointCfg.IncrementalInterval), checkpoint.WithGlobalMinCount(int(opts.CheckpointCfg.GlobalMinCount)), checkpoint.WithGlobalVersionInterval(opts.CheckpointCfg.GlobalVersionInterval), - checkpoint.WithReserveWALEntryCount(opts.CheckpointCfg.ReservedWALEntryCount)) + checkpoint.WithReserveWALEntryCount(opts.CheckpointCfg.ReservedWALEntryCount), + checkpoint.WithStartupLatancy(opts.CheckpointCfg.StartupLatency)) now := time.Now() checkpointed, ckpLSN, valid, err := db.BGCheckpointRunner.Replay(dataFactory) diff --git a/pkg/vm/engine/tae/options/cfg.go b/pkg/vm/engine/tae/options/cfg.go index 6dd919179ccc8..1756c70d25471 100644 --- a/pkg/vm/engine/tae/options/cfg.go +++ b/pkg/vm/engine/tae/options/cfg.go @@ -47,6 +47,7 @@ type CheckpointCfg struct { GCCheckpointInterval time.Duration DisableGCCheckpoint bool ReservedWALEntryCount uint64 + StartupLatency time.Duration // only for test // it is used to control the block rows of the checkpoint diff --git a/pkg/vm/engine/tae/testutils/config/options.go b/pkg/vm/engine/tae/testutils/config/options.go index b0eeae588f3d2..a2ec0d8d7810c 100644 --- a/pkg/vm/engine/tae/testutils/config/options.go +++ b/pkg/vm/engine/tae/testutils/config/options.go @@ -35,6 +35,7 @@ func WithQuickScanAndCKPOpts2(in *options.Options, factor int) (opts *options.Op opts.CheckpointCfg.MinCount = int64(factor) opts.CheckpointCfg.IncrementalInterval *= time.Duration(factor) opts.CheckpointCfg.BlockRows = 10 + opts.CheckpointCfg.StartupLatency = time.Millisecond opts.Ctx = context.Background() return opts } @@ -54,6 +55,7 @@ func WithQuickScanAndCKPOpts(in *options.Options) (opts *options.Options) { opts.CheckpointCfg.GCCheckpointInterval = time.Millisecond * 10 opts.CheckpointCfg.BlockRows = 10 opts.CheckpointCfg.GlobalVersionInterval = time.Millisecond * 10 + opts.CheckpointCfg.StartupLatency = time.Millisecond opts.GCCfg = new(options.GCCfg) opts.GCCfg.ScanGCInterval = time.Millisecond * 10 opts.GCCfg.GCTTL = time.Millisecond * 1 @@ -76,6 +78,7 @@ func WithQuickScanAndCKPAndGCOpts(in *options.Options) (opts *options.Options) { opts.CheckpointCfg.IncrementalInterval = time.Millisecond * 20 opts.CheckpointCfg.GlobalMinCount = 1 opts.CheckpointCfg.BlockRows = 10 + opts.CheckpointCfg.StartupLatency = time.Millisecond opts.GCCfg = new(options.GCCfg) // ScanGCInterval does not need to be too fast, because manual gc will be performed in the case @@ -100,6 +103,7 @@ func WithOpts(in *options.Options, factor float64) (opts *options.Options) { opts.CheckpointCfg.IncrementalInterval = time.Second * 2 * time.Duration(factor) opts.CheckpointCfg.GlobalMinCount = 10 opts.CheckpointCfg.BlockRows = 10 + opts.CheckpointCfg.StartupLatency = time.Millisecond opts.Ctx = context.Background() return opts }