From b4cf39e7afe95cd97073df2f1ce3e3b389bf681d Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 1 Dec 2021 15:24:25 +0100 Subject: [PATCH] server: Require either cluster version v3.6 or --experimental-enable-lease-checkpoint-persist to persist lease remainingTTL To avoid inconsistant behavior during cluster upgrade we are feature gating persistance behind cluster version. This should ensure that all cluster members are upgraded to v3.6 before changing behavior. To allow backporting this fix to v3.5 we are also introducing flag --experimental-enable-lease-checkpoint-persist that will allow for smooth upgrade in v3.5 clusters with this feature enabled. --- server/config/config.go | 4 +- server/embed/config.go | 8 +- server/embed/etcd.go | 1 + server/etcdmain/config.go | 3 +- server/etcdserver/server.go | 3 +- server/lease/leasehttp/http_test.go | 6 +- server/lease/lessor.go | 30 +++++- server/lease/lessor_bench_test.go | 2 +- server/lease/lessor_test.go | 121 ++++++++++++++++++++++--- tests/framework/integration/cluster.go | 68 +++++++------- 10 files changed, 186 insertions(+), 60 deletions(-) diff --git a/server/config/config.go b/server/config/config.go index 74587efd6a04..43ecab7ecd5f 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -149,10 +149,12 @@ type ServerConfig struct { ForceNewCluster bool - // EnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases. + // EnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change. EnableLeaseCheckpoint bool // LeaseCheckpointInterval time.Duration is the wait duration between lease checkpoints. LeaseCheckpointInterval time.Duration + // LeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. + LeaseCheckpointPersist bool EnableGRPCGateway bool diff --git a/server/embed/config.go b/server/embed/config.go index ecec546a0e0b..c97a229b05e5 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -315,9 +315,13 @@ type Config struct { // Deprecated in v3.5. // TODO: Delete in v3.6 (https://github.com/etcd-io/etcd/issues/12913) ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"` - // ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases. + // ExperimentalEnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change. ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"` - ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"` + // ExperimentalLeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. + // Deprecated in v3.6. + // TODO: Delete in v3.7 + ExperimentalLeaseCheckpointPersist bool `json:"experimental-enable-lease-checkpoint-persist"` + ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"` // ExperimentalCompactionSleepInterval is the sleep interval between every etcd compaction loop. ExperimentalCompactionSleepInterval time.Duration `json:"experimental-compaction-sleep-interval"` ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"` diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 5970437f0f07..0b4e88e35d82 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -210,6 +210,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { ExperimentalEnableDistributedTracing: cfg.ExperimentalEnableDistributedTracing, UnsafeNoFsync: cfg.UnsafeNoFsync, EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint, + LeaseCheckpointPersist: cfg.ExperimentalLeaseCheckpointPersist, CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit, CompactionSleepInterval: cfg.ExperimentalCompactionSleepInterval, WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval, diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index 52c99eb6b7e1..ec3541a8ffb2 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -282,7 +282,8 @@ func newConfig() *config { fs.BoolVar(&cfg.ec.ExperimentalInitialCorruptCheck, "experimental-initial-corrupt-check", cfg.ec.ExperimentalInitialCorruptCheck, "Enable to check data corruption before serving any client/peer traffic.") fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.") - fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.") + fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.") + fs.BoolVar(&cfg.ec.ExperimentalLeaseCheckpointPersist, "experimental-enable-lease-checkpoint-persist", false, "Enable persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled.") fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.") fs.DurationVar(&cfg.ec.ExperimentalCompactionSleepInterval, "experimental-compaction-sleep-interval", cfg.ec.ExperimentalCompactionSleepInterval, "Sets the sleep interval between each compaction batch.") fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.") diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 2de2477ec7f6..884caf411aad 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -344,9 +344,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers. - srv.lessor = lease.NewLessor(srv.Logger(), srv.be, lease.LessorConfig{ + srv.lessor = lease.NewLessor(srv.Logger(), srv.be, srv.cluster, lease.LessorConfig{ MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())), CheckpointInterval: cfg.LeaseCheckpointInterval, + CheckpointPersist: cfg.LeaseCheckpointPersist, ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(), }) diff --git a/server/lease/leasehttp/http_test.go b/server/lease/leasehttp/http_test.go index 4fb8fd7eb799..e614b4033a37 100644 --- a/server/lease/leasehttp/http_test.go +++ b/server/lease/leasehttp/http_test.go @@ -31,7 +31,7 @@ func TestRenewHTTP(t *testing.T) { be, _ := betesting.NewTmpBackend(t, time.Hour, 10000) defer betesting.Close(t, be) - le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}) + le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)}) le.Promote(time.Second) l, err := le.Grant(1, int64(5)) if err != nil { @@ -55,7 +55,7 @@ func TestTimeToLiveHTTP(t *testing.T) { be, _ := betesting.NewTmpBackend(t, time.Hour, 10000) defer betesting.Close(t, be) - le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}) + le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)}) le.Promote(time.Second) l, err := le.Grant(1, int64(5)) if err != nil { @@ -96,7 +96,7 @@ func testApplyTimeout(t *testing.T, f func(*lease.Lease, string) error) { be, _ := betesting.NewTmpBackend(t, time.Hour, 10000) defer betesting.Close(t, be) - le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}) + le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)}) le.Promote(time.Second) l, err := le.Grant(1, int64(5)) if err != nil { diff --git a/server/lease/lessor.go b/server/lease/lessor.go index f82fe5cc1335..bae138b3b9f2 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -24,6 +24,7 @@ import ( "sync" "time" + "github.com/coreos/go-semver/semver" pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/server/v3/lease/leasepb" "go.etcd.io/etcd/server/v3/storage/backend" @@ -37,6 +38,8 @@ const NoLease = LeaseID(0) // MaxLeaseTTL is the maximum lease TTL value const MaxLeaseTTL = 9000000000 +var v3_6 = semver.Version{Major: 3, Minor: 6} + var ( forever = time.Time{} @@ -180,19 +183,29 @@ type lessor struct { checkpointInterval time.Duration // the interval to check if the expired lease is revoked expiredLeaseRetryInterval time.Duration + // whether lessor should always persist remaining TTL (always enabled in v3.6). + checkpointPersist bool + // cluster is used to adapt lessor logic based on cluster version + cluster cluster +} + +type cluster interface { + // Version is the cluster-wide minimum major.minor version. + Version() *semver.Version } type LessorConfig struct { MinLeaseTTL int64 CheckpointInterval time.Duration ExpiredLeasesRetryInterval time.Duration + CheckpointPersist bool } -func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor { - return newLessor(lg, b, cfg) +func NewLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) Lessor { + return newLessor(lg, b, cluster, cfg) } -func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor { +func newLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) *lessor { checkpointInterval := cfg.CheckpointInterval expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval if checkpointInterval == 0 { @@ -210,11 +223,13 @@ func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor { minLeaseTTL: cfg.MinLeaseTTL, checkpointInterval: checkpointInterval, expiredLeaseRetryInterval: expiredLeaseRetryInterval, + checkpointPersist: cfg.CheckpointPersist, // expiredC is a small buffered chan to avoid unnecessary blocking. expiredC: make(chan []*Lease, 16), stopC: make(chan struct{}), doneC: make(chan struct{}), lg: lg, + cluster: cluster, } l.initAndRecover() @@ -351,7 +366,10 @@ func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error { if l, ok := le.leaseMap[id]; ok { // when checkpointing, we only update the remainingTTL, Promote is responsible for applying this to lease expiry l.remainingTTL = remainingTTL - l.persistTo(le.b) + cv := le.cluster.Version() + if le.checkpointPersist || (cv != nil && greaterOrEqual(*cv, v3_6)) { + l.persistTo(le.b) + } if le.isPrimary() { // schedule the next checkpoint as needed le.scheduleCheckpointIfNeeded(l) @@ -360,6 +378,10 @@ func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error { return nil } +func greaterOrEqual(first, second semver.Version) bool { + return first.Equal(second) || second.LessThan(first) +} + // Renew renews an existing lease. If the given lease does not exist or // has expired, an error will be returned. func (le *lessor) Renew(id LeaseID) (int64, error) { diff --git a/server/lease/lessor_bench_test.go b/server/lease/lessor_bench_test.go index 570663deb2de..86f9d1bf5146 100644 --- a/server/lease/lessor_bench_test.go +++ b/server/lease/lessor_bench_test.go @@ -68,7 +68,7 @@ func setUp(t testing.TB) (le *lessor, tearDown func()) { be, _ := betesting.NewDefaultTmpBackend(t) // MinLeaseTTL is negative, so we can grant expired lease in benchmark. // ExpiredLeasesRetryInterval should small, so benchmark of findExpired will recheck expired lease. - le = newLessor(lg, be, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond}) + le = newLessor(lg, be, nil, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond}) le.SetRangeDeleter(func() TxnDelete { ftd := &FakeTxnDelete{be.BatchTx()} ftd.Lock() diff --git a/server/lease/lessor_test.go b/server/lease/lessor_test.go index 9d67bedf5603..23f5b0099edd 100644 --- a/server/lease/lessor_test.go +++ b/server/lease/lessor_test.go @@ -25,7 +25,9 @@ import ( "testing" "time" + "github.com/coreos/go-semver/semver" pb "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/api/v3/version" "go.etcd.io/etcd/server/v3/storage/backend" "go.etcd.io/etcd/server/v3/storage/schema" "go.uber.org/zap" @@ -45,7 +47,7 @@ func TestLessorGrant(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() le.Promote(0) @@ -108,7 +110,7 @@ func TestLeaseConcurrentKeys(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) }) @@ -157,7 +159,7 @@ func TestLessorRevoke(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() var fd *fakeDeleter le.SetRangeDeleter(func() TxnDelete { @@ -211,7 +213,7 @@ func TestLessorRenew(t *testing.T) { defer be.Close() defer os.RemoveAll(dir) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() le.Promote(0) @@ -244,7 +246,7 @@ func TestLessorRenewWithCheckpointer(t *testing.T) { defer be.Close() defer os.RemoveAll(dir) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL}) fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) { for _, cp := range cp.GetCheckpoints() { le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL()) @@ -293,7 +295,7 @@ func TestLessorRenewExtendPileup(t *testing.T) { dir, be := NewTestBackend(t) defer os.RemoveAll(dir) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL}) ttl := int64(10) for i := 1; i <= leaseRevokeRate*10; i++ { if _, err := le.Grant(LeaseID(2*i), ttl); err != nil { @@ -312,7 +314,7 @@ func TestLessorRenewExtendPileup(t *testing.T) { bcfg.Path = filepath.Join(dir, "be") be = backend.New(bcfg) defer be.Close() - le = newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le = newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() // extend after recovery should extend expiration on lease pile-up @@ -342,7 +344,7 @@ func TestLessorDetach(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) }) @@ -383,7 +385,7 @@ func TestLessorRecover(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() l1, err1 := le.Grant(1, 10) l2, err2 := le.Grant(2, 20) @@ -392,7 +394,7 @@ func TestLessorRecover(t *testing.T) { } // Create a new lessor with the same backend - nle := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + nle := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer nle.Stop() nl1 := nle.Lookup(l1.ID) if nl1 == nil || nl1.ttl != l1.ttl { @@ -413,7 +415,7 @@ func TestLessorExpire(t *testing.T) { testMinTTL := int64(1) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}) + le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: testMinTTL}) defer le.Stop() le.Promote(1 * time.Second) @@ -466,7 +468,7 @@ func TestLessorExpireAndDemote(t *testing.T) { testMinTTL := int64(1) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}) + le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: testMinTTL}) defer le.Stop() le.Promote(1 * time.Second) @@ -515,7 +517,7 @@ func TestLessorMaxTTL(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() _, err := le.Grant(1, MaxLeaseTTL+1) @@ -531,7 +533,7 @@ func TestLessorCheckpointScheduling(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second}) + le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second}) defer le.Stop() le.minLeaseTTL = 1 checkpointedC := make(chan struct{}) @@ -565,7 +567,7 @@ func TestLessorCheckpointsRestoredOnPromote(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() l, err := le.Grant(1, 10) if err != nil { @@ -579,6 +581,75 @@ func TestLessorCheckpointsRestoredOnPromote(t *testing.T) { } } +func TestLessorCheckpointPersistenceAfterRestart(t *testing.T) { + var ttl int64 = 10 + var checkpointTTL int64 = 5 + + tcs := []struct { + name string + cluster cluster + checkpointPersist bool + expectRemainingTTL int64 + }{ + { + name: "Etcd v3.6 and newer persist remainingTTL on checkpoint", + cluster: clusterLatest(), + expectRemainingTTL: checkpointTTL, + }, + { + name: "Etcd v3.5 and older persist remainingTTL if CheckpointPersist is set", + cluster: clusterV3_5(), + checkpointPersist: true, + expectRemainingTTL: checkpointTTL, + }, + { + name: "Etcd with version unknown persists remainingTTL if CheckpointPersist is set", + cluster: clusterNil(), + checkpointPersist: true, + expectRemainingTTL: checkpointTTL, + }, + { + name: "Etcd v3.5 and older reset remainingTTL on checkpoint", + cluster: clusterV3_5(), + expectRemainingTTL: ttl, + }, + { + name: "Etcd with version unknown fallbacks to v3.5 behavior", + cluster: clusterNil(), + expectRemainingTTL: ttl, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + lg := zap.NewNop() + dir, be := NewTestBackend(t) + defer os.RemoveAll(dir) + defer be.Close() + + cfg := LessorConfig{MinLeaseTTL: minLeaseTTL} + cfg.CheckpointPersist = tc.checkpointPersist + le := newLessor(lg, be, tc.cluster, cfg) + l, err := le.Grant(2, ttl) + if err != nil { + t.Fatal(err) + } + if l.RemainingTTL() != ttl { + t.Errorf("remainingTTL() = %d, expected: 10", l.RemainingTTL()) + } + le.Checkpoint(2, checkpointTTL) + if l.RemainingTTL() != 5 { + t.Errorf("remainingTTL() = %d, expected: 5", l.RemainingTTL()) + } + le.Stop() + le2 := newLessor(lg, be, clusterLatest(), cfg) + l = le2.Lookup(2) + if l.RemainingTTL() != tc.expectRemainingTTL { + t.Errorf("remainingTTL() = %d, expected: 5", l.RemainingTTL()) + } + }) + } +} + type fakeDeleter struct { deleted []string tx backend.BatchTx @@ -606,3 +677,23 @@ func NewTestBackend(t *testing.T) (string, backend.Backend) { bcfg.Path = filepath.Join(tmpPath, "be") return tmpPath, backend.New(bcfg) } + +func clusterLatest() cluster { + return fakeCluster{semver.New(version.Cluster(version.Version) + ".0")} +} + +func clusterV3_5() cluster { + return fakeCluster{semver.New("3.5.0")} +} + +func clusterNil() cluster { + return fakeCluster{} +} + +type fakeCluster struct { + version *semver.Version +} + +func (c fakeCluster) Version() *semver.Version { + return c.version +} diff --git a/tests/framework/integration/cluster.go b/tests/framework/integration/cluster.go index 631e00689bcb..d7500700915b 100644 --- a/tests/framework/integration/cluster.go +++ b/tests/framework/integration/cluster.go @@ -166,8 +166,9 @@ type ClusterConfig struct { // UseTCP configures server listen on tcp socket. If disabled unix socket is used. UseTCP bool - EnableLeaseCheckpoint bool + EnableLeaseCheckpoint bool LeaseCheckpointInterval time.Duration + LeaseCheckpointPersist bool WatchProgressNotifyInterval time.Duration ExperimentalMaxLearners int @@ -311,16 +312,16 @@ func (c *Cluster) HTTPMembers() []client.Member { func (c *Cluster) mustNewMember(t testutil.TB, memberNumber int64) *Member { m := MustNewMember(t, MemberConfig{ - Name: c.generateMemberName(), - MemberNumber: memberNumber, - AuthToken: c.Cfg.AuthToken, - PeerTLS: c.Cfg.PeerTLS, - ClientTLS: c.Cfg.ClientTLS, - QuotaBackendBytes: c.Cfg.QuotaBackendBytes, - MaxTxnOps: c.Cfg.MaxTxnOps, - MaxRequestBytes: c.Cfg.MaxRequestBytes, - SnapshotCount: c.Cfg.SnapshotCount, - SnapshotCatchUpEntries: c.Cfg.SnapshotCatchUpEntries, + Name: c.generateMemberName(), + MemberNumber: memberNumber, + AuthToken: c.Cfg.AuthToken, + PeerTLS: c.Cfg.PeerTLS, + ClientTLS: c.Cfg.ClientTLS, + QuotaBackendBytes: c.Cfg.QuotaBackendBytes, + MaxTxnOps: c.Cfg.MaxTxnOps, + MaxRequestBytes: c.Cfg.MaxRequestBytes, + SnapshotCount: c.Cfg.SnapshotCount, + SnapshotCatchUpEntries: c.Cfg.SnapshotCatchUpEntries, GrpcKeepAliveMinTime: c.Cfg.GRPCKeepAliveMinTime, GrpcKeepAliveInterval: c.Cfg.GRPCKeepAliveInterval, GrpcKeepAliveTimeout: c.Cfg.GRPCKeepAliveTimeout, @@ -331,6 +332,7 @@ func (c *Cluster) mustNewMember(t testutil.TB, memberNumber int64) *Member { UseTCP: c.Cfg.UseTCP, EnableLeaseCheckpoint: c.Cfg.EnableLeaseCheckpoint, LeaseCheckpointInterval: c.Cfg.LeaseCheckpointInterval, + LeaseCheckpointPersist: c.Cfg.LeaseCheckpointPersist, WatchProgressNotifyInterval: c.Cfg.WatchProgressNotifyInterval, ExperimentalMaxLearners: c.Cfg.ExperimentalMaxLearners, }) @@ -613,29 +615,30 @@ type Member struct { func (m *Member) GRPCURL() string { return m.GrpcURL } type MemberConfig struct { - Name string - UniqNumber int64 - MemberNumber int64 - PeerTLS *transport.TLSInfo - ClientTLS *transport.TLSInfo - AuthToken string - QuotaBackendBytes int64 - MaxTxnOps uint - MaxRequestBytes uint - SnapshotCount uint64 - SnapshotCatchUpEntries uint64 - GrpcKeepAliveMinTime time.Duration - GrpcKeepAliveInterval time.Duration - GrpcKeepAliveTimeout time.Duration - ClientMaxCallSendMsgSize int - ClientMaxCallRecvMsgSize int - UseIP bool - UseBridge bool - UseTCP bool - EnableLeaseCheckpoint bool + Name string + UniqNumber int64 + MemberNumber int64 + PeerTLS *transport.TLSInfo + ClientTLS *transport.TLSInfo + AuthToken string + QuotaBackendBytes int64 + MaxTxnOps uint + MaxRequestBytes uint + SnapshotCount uint64 + SnapshotCatchUpEntries uint64 + GrpcKeepAliveMinTime time.Duration + GrpcKeepAliveInterval time.Duration + GrpcKeepAliveTimeout time.Duration + ClientMaxCallSendMsgSize int + ClientMaxCallRecvMsgSize int + UseIP bool + UseBridge bool + UseTCP bool + EnableLeaseCheckpoint bool LeaseCheckpointInterval time.Duration + LeaseCheckpointPersist bool WatchProgressNotifyInterval time.Duration - ExperimentalMaxLearners int + ExperimentalMaxLearners int } // MustNewMember return an inited member with the given name. If peerTLS is @@ -733,6 +736,7 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member { m.UseTCP = mcfg.UseTCP m.EnableLeaseCheckpoint = mcfg.EnableLeaseCheckpoint m.LeaseCheckpointInterval = mcfg.LeaseCheckpointInterval + m.LeaseCheckpointPersist = mcfg.LeaseCheckpointPersist m.WatchProgressNotifyInterval = mcfg.WatchProgressNotifyInterval