Skip to content

Commit 21634a9

Browse files
michaljasionowskiserathius
authored andcommitted
lease,integration: add checkpoint scheduling after leader change
Current checkpointing mechanism is buggy. New checkpoints for any lease are scheduled only until the first leader change. Added fix for that and a test that will check it.
1 parent eac7f98 commit 21634a9

File tree

3 files changed

+87
-47
lines changed

3 files changed

+87
-47
lines changed

server/lease/lessor.go

+1
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,7 @@ func (le *lessor) Promote(extend time.Duration) {
446446
l.refresh(extend)
447447
item := &LeaseWithTime{id: l.ID, time: l.expiry}
448448
le.leaseExpiredNotifier.RegisterOrUpdate(item)
449+
le.scheduleCheckpointIfNeeded(l)
449450
}
450451

451452
if len(le.leaseMap) < leaseRevokeRate {

server/lease/lessor_test.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,7 @@ func TestLessorCheckpointScheduling(t *testing.T) {
531531
defer be.Close()
532532

533533
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second})
534+
defer le.Stop()
534535
le.minLeaseTTL = 1
535536
checkpointedC := make(chan struct{})
536537
le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) {
@@ -543,13 +544,11 @@ func TestLessorCheckpointScheduling(t *testing.T) {
543544
t.Errorf("expected checkpoint to be called with Remaining_TTL=%d but got %d", 1, c.Remaining_TTL)
544545
}
545546
})
546-
defer le.Stop()
547-
le.Promote(0)
548-
549547
_, err := le.Grant(1, 2)
550548
if err != nil {
551549
t.Fatal(err)
552550
}
551+
le.Promote(0)
553552

554553
// TODO: Is there any way to avoid doing this wait? Lease TTL granularity is in seconds.
555554
select {

tests/integration/v3_lease_test.go

+84-44
Original file line numberDiff line numberDiff line change
@@ -229,56 +229,96 @@ func TestV3LeaseKeepAlive(t *testing.T) {
229229
// TestV3LeaseCheckpoint ensures a lease checkpoint results in a remaining TTL being persisted
230230
// across leader elections.
231231
func TestV3LeaseCheckpoint(t *testing.T) {
232-
BeforeTest(t)
233-
234-
var ttl int64 = 300
235-
leaseInterval := 2 * time.Second
236-
clus := NewClusterV3(t, &ClusterConfig{
237-
Size: 3,
238-
EnableLeaseCheckpoint: true,
239-
LeaseCheckpointInterval: leaseInterval,
240-
UseBridge: true,
241-
})
242-
defer clus.Terminate(t)
243-
244-
// create lease
245-
ctx, cancel := context.WithCancel(context.Background())
246-
defer cancel()
247-
c := toGRPC(clus.RandClient())
248-
lresp, err := c.Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: ttl})
249-
if err != nil {
250-
t.Fatal(err)
251-
}
232+
tcs := []struct {
233+
name string
234+
checkpointingEnabled bool
235+
ttl time.Duration
236+
checkpointingInterval time.Duration
237+
leaderChanges int
238+
expectTTLIsGT time.Duration
239+
expectTTLIsLT time.Duration
240+
}{
241+
{
242+
name: "Checkpointing disabled, lease TTL is reset",
243+
ttl: 300 * time.Second,
244+
leaderChanges: 1,
245+
expectTTLIsGT: 298 * time.Second,
246+
},
247+
{
248+
name: "Checkpointing enabled 10s, lease TTL is preserved after leader change",
249+
ttl: 300 * time.Second,
250+
checkpointingEnabled: true,
251+
checkpointingInterval: 10 * time.Second,
252+
leaderChanges: 1,
253+
expectTTLIsLT: 290 * time.Second,
254+
},
255+
{
256+
// Checking if checkpointing continues after the first leader change.
257+
name: "Checkpointing enabled 10s, lease TTL is preserved after 2 leader changes",
258+
ttl: 300 * time.Second,
259+
checkpointingEnabled: true,
260+
checkpointingInterval: 10 * time.Second,
261+
leaderChanges: 2,
262+
expectTTLIsLT: 280 * time.Second,
263+
},
264+
}
265+
for _, tc := range tcs {
266+
t.Run(tc.name, func(t *testing.T) {
267+
BeforeTest(t)
268+
config := &ClusterConfig{
269+
Size: 3,
270+
EnableLeaseCheckpoint: tc.checkpointingEnabled,
271+
LeaseCheckpointInterval: tc.checkpointingInterval,
272+
}
273+
clus := NewClusterV3(t, config)
274+
defer clus.Terminate(t)
275+
276+
// create lease
277+
ctx, cancel := context.WithCancel(context.Background())
278+
defer cancel()
279+
c := toGRPC(clus.RandClient())
280+
lresp, err := c.Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: int64(tc.ttl.Seconds())})
281+
if err != nil {
282+
t.Fatal(err)
283+
}
252284

253-
// wait for a checkpoint to occur
254-
time.Sleep(leaseInterval + 1*time.Second)
285+
for i := 0; i < tc.leaderChanges; i++ {
286+
// wait for a checkpoint to occur
287+
time.Sleep(tc.checkpointingInterval + 1*time.Second)
255288

256-
// Force a leader election
257-
leaderId := clus.WaitLeader(t)
258-
leader := clus.Members[leaderId]
259-
leader.Stop(t)
260-
time.Sleep(time.Duration(3*electionTicks) * tickDuration)
261-
leader.Restart(t)
262-
newLeaderId := clus.WaitLeader(t)
263-
c2 := toGRPC(clus.Client(newLeaderId))
289+
// Force a leader election
290+
leaderId := clus.WaitLeader(t)
291+
leader := clus.Members[leaderId]
292+
leader.Stop(t)
293+
time.Sleep(time.Duration(3*electionTicks) * tickDuration)
294+
leader.Restart(t)
295+
}
264296

265-
time.Sleep(250 * time.Millisecond)
297+
newLeaderId := clus.WaitLeader(t)
298+
c2 := toGRPC(clus.Client(newLeaderId))
299+
300+
time.Sleep(250 * time.Millisecond)
301+
302+
// Check the TTL of the new leader
303+
var ttlresp *pb.LeaseTimeToLiveResponse
304+
for i := 0; i < 10; i++ {
305+
if ttlresp, err = c2.Lease.LeaseTimeToLive(ctx, &pb.LeaseTimeToLiveRequest{ID: lresp.ID}); err != nil {
306+
if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable {
307+
time.Sleep(time.Millisecond * 250)
308+
} else {
309+
t.Fatal(err)
310+
}
311+
}
312+
}
266313

267-
// Check the TTL of the new leader
268-
var ttlresp *pb.LeaseTimeToLiveResponse
269-
for i := 0; i < 10; i++ {
270-
if ttlresp, err = c2.Lease.LeaseTimeToLive(ctx, &pb.LeaseTimeToLiveRequest{ID: lresp.ID}); err != nil {
271-
if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable {
272-
time.Sleep(time.Millisecond * 250)
273-
} else {
274-
t.Fatal(err)
314+
if tc.expectTTLIsGT != 0 && time.Duration(ttlresp.TTL)*time.Second <= tc.expectTTLIsGT {
315+
t.Errorf("Expected lease ttl (%v) to be greather than (%v)", time.Duration(ttlresp.TTL)*time.Second, tc.expectTTLIsGT)
275316
}
276-
}
277-
}
278317

279-
expectedTTL := ttl - int64(leaseInterval.Seconds())
280-
if ttlresp.TTL < expectedTTL-1 || ttlresp.TTL > expectedTTL {
281-
t.Fatalf("expected lease to be checkpointed after restart such that %d < TTL <%d, but got TTL=%d", expectedTTL-1, expectedTTL, ttlresp.TTL)
318+
if tc.expectTTLIsLT != 0 && time.Duration(ttlresp.TTL)*time.Second > tc.expectTTLIsLT {
319+
t.Errorf("Expected lease ttl (%v) to be lower than (%v)", time.Duration(ttlresp.TTL)*time.Second, tc.expectTTLIsLT)
320+
}
321+
})
282322
}
283323
}
284324

0 commit comments

Comments
 (0)