From fd228f2c274be6c2864fd5b3525edb3406d18b02 Mon Sep 17 00:00:00 2001 From: Ivan Valdes Date: Tue, 20 Feb 2024 09:17:52 -0800 Subject: [PATCH 1/3] test/e2e: fix WaitLeader backport Fixes the incorrect backport of WaitLeader, as spawnJsonCmd receives an expected output string as an argument, which wasn't provided in the first backport implementation. Original backport of commit 371179e29208f690477c85fc552e7c824a66f87e from PR #17398. Signed-off-by: Ivan Valdes --- tests/e2e/etcdctl.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/e2e/etcdctl.go b/tests/e2e/etcdctl.go index d2d356abbfa..cc5c7a31b6d 100644 --- a/tests/e2e/etcdctl.go +++ b/tests/e2e/etcdctl.go @@ -146,7 +146,7 @@ func (ctl *Etcdctl) Status() ([]*clientv3.StatusResponse, error) { Endpoint string Status *clientv3.StatusResponse } - err := ctl.spawnJsonCmd(&epStatus, "endpoint", "status") + err := ctl.spawnJsonCmd(&epStatus, "", "endpoint", "status") if err != nil { return nil, err } From 9a3450ae749e07267bb2b4f34fc4eadf7a4cf615 Mon Sep 17 00:00:00 2001 From: Ivan Valdes Date: Tue, 20 Feb 2024 09:19:15 -0800 Subject: [PATCH 2/3] tests/e2e: backport e2e cluster setup Finish backporting the remaining functions from the original backport from PR #15620. Backport of commit 65add8cec44b32682e59d2596a592038bac90dcd. Co-authored-by: Marek Siarkowicz Signed-off-by: Ivan Valdes --- tests/e2e/cluster_test.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/e2e/cluster_test.go b/tests/e2e/cluster_test.go index d176b45209c..982eb3a820b 100644 --- a/tests/e2e/cluster_test.go +++ b/tests/e2e/cluster_test.go @@ -438,6 +438,14 @@ func (epc *etcdProcessCluster) EndpointsV3() []string { return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV3() }) } +func (epc *etcdProcessCluster) EndpointsGRPC() []string { + return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsGRPC() }) +} + +func (epc *etcdProcessCluster) EndpointsHTTP() []string { + return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsHTTP() }) +} + func (epc *etcdProcessCluster) endpoints(f func(ep etcdProcess) []string) (ret []string) { for _, p := range epc.procs { ret = append(ret, f(p)...) From bf04c6740846c9cd7dd60263c33eeda3ecaf6107 Mon Sep 17 00:00:00 2001 From: Ivan Valdes Date: Tue, 20 Feb 2024 09:24:36 -0800 Subject: [PATCH 3/3] Backport ignore old leader's leases revoking request Backport of PR #16822, commits f7e488dc9262685d6624755e0d3bb0a655863248, 67f17166bf2ba337dafb8e0ea8eea5f74a990767, and f7ff898fd6c2d6dbb54278343073aa4fa5f46a03. Co-authored-by: Benjamin Wang Signed-off-by: Ivan Valdes --- etcdserver/server.go | 61 ++++++++++- etcdserver/v3_server.go | 11 ++ integration/v3_lease_test.go | 2 +- lease/lessor.go | 10 +- lease/lessor_test.go | 6 +- tests/e2e/v3_lease_no_proxy_test.go | 156 ++++++++++++++++++++++++++++ 6 files changed, 239 insertions(+), 7 deletions(-) create mode 100644 tests/e2e/v3_lease_no_proxy_test.go diff --git a/etcdserver/server.go b/etcdserver/server.go index f5807243ee6..f9bd1acabcc 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -622,8 +622,19 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { if srv.Cfg.EnableLeaseCheckpoint { // setting checkpointer enables lease checkpoint feature. - srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) { + srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) error { + if !srv.ensureLeadership() { + if lg := srv.getLogger(); lg != nil { + lg.Warn("Ignore the checkpoint request because current member isn't a leader", + zap.Uint64("local-member-id", uint64(srv.ID()))) + } else { + plog.Warningf("Ignore the checkpoint request because current member %d isn't a leader", uint64(srv.ID())) + } + return lease.ErrNotPrimary + } + srv.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseCheckpoint: cp}) + return nil }) } @@ -1098,7 +1109,23 @@ func (s *EtcdServer) run() { func (s *EtcdServer) revokeExpiredLeases(leases []*lease.Lease) { s.goAttach(func() { + // We shouldn't revoke any leases if current member isn't a leader, + // because the operation should only be performed by the leader. When + // the leader gets blocked on the raft loop, such as writing WAL entries, + // it can't process any events or messages from raft. It may think it + // is still the leader even the leader has already changed. + // Refer to https://github.com/etcd-io/etcd/issues/15247 lg := s.Logger() + if !s.ensureLeadership() { + if lg != nil { + lg.Warn("Ignore the lease revoking request because current member isn't a leader", + zap.Uint64("local-member-id", uint64(s.ID()))) + } else { + plog.Warningf("Ignore the lease revoking request because current member %d isn't a leader", uint64(s.ID())) + } + return + } + // Increases throughput of expired leases deletion process through parallelization c := make(chan struct{}, maxPendingRevokes) for _, curLease := range leases { @@ -1135,6 +1162,38 @@ func (s *EtcdServer) revokeExpiredLeases(leases []*lease.Lease) { }) } +// ensureLeadership checks whether current member is still the leader. +func (s *EtcdServer) ensureLeadership() bool { + lg := s.Logger() + + ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout()) + defer cancel() + if err := s.linearizableReadNotify(ctx); err != nil { + if lg != nil { + lg.Warn("Failed to check current member's leadership", zap.Error(err)) + } else { + plog.Warningf("Failed to check current member's leadership: %s", err) + } + + return false + } + + newLeaderId := s.raftStatus().Lead + if newLeaderId != uint64(s.ID()) { + if lg != nil { + lg.Warn("Current member isn't a leader", + zap.Uint64("local-member-id", uint64(s.ID())), + zap.Uint64("new-lead", newLeaderId)) + } else { + plog.Warningf("Current member %d isn't a leader (new-lead=%d)", uint64(s.ID()), newLeaderId) + } + + return false + } + + return true +} + func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) { s.applySnapshot(ep, apply) s.applyEntries(ep, apply) diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index 955399fc8df..a1040aca63a 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -285,6 +285,17 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) { if s.isLeader() { + // If s.isLeader() returns true, but we fail to ensure the current + // member's leadership, there are a couple of possibilities: + // 1. current member gets stuck on writing WAL entries; + // 2. current member is in network isolation status; + // 3. current member isn't a leader anymore (possibly due to #1 above). + // In such case, we just return error to client, so that the client can + // switch to another member to continue the lease keep-alive operation. + if !s.ensureLeadership() { + return -1, lease.ErrNotPrimary + } + if err := s.waitAppliedIndex(); err != nil { return 0, err } diff --git a/integration/v3_lease_test.go b/integration/v3_lease_test.go index a157f57b68e..7f9742ccda6 100644 --- a/integration/v3_lease_test.go +++ b/integration/v3_lease_test.go @@ -730,7 +730,7 @@ func TestV3LeaseFailover(t *testing.T) { // send keep alive to old leader until the old leader starts // to drop lease request. - var expectedExp time.Time + expectedExp := time.Now().Add(5 * time.Second) for { if err = lac.Send(lreq); err != nil { break diff --git a/lease/lessor.go b/lease/lessor.go index a60f4666ef4..02718b94e59 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -78,7 +78,7 @@ type RangeDeleter func() TxnDelete // Checkpointer permits checkpointing of lease remaining TTLs to the consensus log. Defined here to // avoid circular dependency with mvcc. -type Checkpointer func(ctx context.Context, lc *pb.LeaseCheckpointRequest) +type Checkpointer func(ctx context.Context, lc *pb.LeaseCheckpointRequest) error type LeaseID int64 @@ -424,7 +424,9 @@ func (le *lessor) Renew(id LeaseID) (int64, error) { // By applying a RAFT entry only when the remainingTTL is already set, we limit the number // of RAFT entries written per lease to a max of 2 per checkpoint interval. if clearRemainingTTL { - le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}}) + if err := le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}}); err != nil { + return -1, err + } } le.mu.Lock() @@ -660,7 +662,9 @@ func (le *lessor) checkpointScheduledLeases() { le.mu.Unlock() if len(cps) != 0 { - le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: cps}) + if err := le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: cps}); err != nil { + return + } } if len(cps) < maxLeaseCheckpointBatchSize { return diff --git a/lease/lessor_test.go b/lease/lessor_test.go index 264a1aa1819..5280be0751e 100644 --- a/lease/lessor_test.go +++ b/lease/lessor_test.go @@ -248,10 +248,11 @@ func TestLessorRenewWithCheckpointer(t *testing.T) { defer os.RemoveAll(dir) le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) - fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) { + fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) error { for _, cp := range cp.GetCheckpoints() { le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL()) } + return nil } defer le.Stop() // Set checkpointer @@ -538,7 +539,7 @@ func TestLessorCheckpointScheduling(t *testing.T) { defer le.Stop() le.minLeaseTTL = 1 checkpointedC := make(chan struct{}) - le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) { + le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) error { close(checkpointedC) if len(lc.Checkpoints) != 1 { t.Errorf("expected 1 checkpoint but got %d", len(lc.Checkpoints)) @@ -547,6 +548,7 @@ func TestLessorCheckpointScheduling(t *testing.T) { if c.Remaining_TTL != 1 { t.Errorf("expected checkpoint to be called with Remaining_TTL=%d but got %d", 1, c.Remaining_TTL) } + return nil }) _, err := le.Grant(1, 2) if err != nil { diff --git a/tests/e2e/v3_lease_no_proxy_test.go b/tests/e2e/v3_lease_no_proxy_test.go new file mode 100644 index 00000000000..776b63ce3d2 --- /dev/null +++ b/tests/e2e/v3_lease_no_proxy_test.go @@ -0,0 +1,156 @@ +// Copyright 2024 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !cluster_proxy + +package e2e + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/pkg/testutil" +) + +// TestLeaseRevoke_IgnoreOldLeader verifies that leases shouldn't be revoked +// by old leader. +// See the case 1 in https://github.com/etcd-io/etcd/issues/15247#issuecomment-1777862093. +func TestLeaseRevoke_IgnoreOldLeader(t *testing.T) { + testLeaseRevokeIssue(t, true) +} + +// TestLeaseRevoke_ClientSwitchToOtherMember verifies that leases shouldn't +// be revoked by new leader. +// See the case 2 in https://github.com/etcd-io/etcd/issues/15247#issuecomment-1777862093. +func TestLeaseRevoke_ClientSwitchToOtherMember(t *testing.T) { + testLeaseRevokeIssue(t, false) +} + +func testLeaseRevokeIssue(t *testing.T, connectToOneFollower bool) { + defer testutil.AfterTest(t) + + ctx := context.Background() + + t.Log("Starting a new etcd cluster") + epc, err := newEtcdProcessCluster(t, &etcdProcessClusterConfig{ + clusterSize: 3, + goFailEnabled: true, + goFailClientTimeout: 40 * time.Second, + }) + require.NoError(t, err) + defer func() { + if errC := epc.Close(); errC != nil { + t.Fatalf("error closing etcd processes (%v)", errC) + } + }() + + leaderIdx := epc.WaitLeader(t) + t.Logf("Leader index: %d", leaderIdx) + + epsForNormalOperations := epc.procs[(leaderIdx+2)%3].EndpointsGRPC() + t.Logf("Creating a client for normal operations: %v", epsForNormalOperations) + client, err := clientv3.New(clientv3.Config{Endpoints: epsForNormalOperations, DialTimeout: 3 * time.Second}) + require.NoError(t, err) + defer client.Close() + + var epsForLeaseKeepAlive []string + if connectToOneFollower { + epsForLeaseKeepAlive = epc.procs[(leaderIdx+1)%3].EndpointsGRPC() + } else { + epsForLeaseKeepAlive = epc.EndpointsGRPC() + } + t.Logf("Creating a client for the leaseKeepAlive operation: %v", epsForLeaseKeepAlive) + clientForKeepAlive, err := clientv3.New(clientv3.Config{Endpoints: epsForLeaseKeepAlive, DialTimeout: 3 * time.Second}) + require.NoError(t, err) + defer clientForKeepAlive.Close() + + resp, err := client.Status(ctx, epsForNormalOperations[0]) + require.NoError(t, err) + oldLeaderId := resp.Leader + + t.Log("Creating a new lease") + leaseRsp, err := client.Grant(ctx, 20) + require.NoError(t, err) + + t.Log("Starting a goroutine to keep alive the lease") + doneC := make(chan struct{}) + stopC := make(chan struct{}) + startC := make(chan struct{}, 1) + go func() { + defer close(doneC) + + respC, kerr := clientForKeepAlive.KeepAlive(ctx, leaseRsp.ID) + require.NoError(t, kerr) + // ensure we have received the first response from the server + <-respC + startC <- struct{}{} + + for { + select { + case <-stopC: + return + case <-respC: + } + } + }() + + t.Log("Wait for the keepAlive goroutine to get started") + <-startC + + t.Log("Trigger the failpoint to simulate stalled writing") + err = epc.procs[leaderIdx].Failpoints().SetupHTTP(ctx, "raftBeforeSave", `sleep("30s")`) + require.NoError(t, err) + + cctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + t.Logf("Waiting for a new leader to be elected, old leader index: %d, old leader ID: %d", leaderIdx, oldLeaderId) + executeUntil(cctx, t, func() { + for { + resp, err = client.Status(ctx, epsForNormalOperations[0]) + if err == nil && resp.Leader != oldLeaderId { + t.Logf("A new leader has already been elected, new leader index: %d", resp.Leader) + return + } + time.Sleep(100 * time.Millisecond) + } + }) + cancel() + + t.Log("Writing a key/value pair") + _, err = client.Put(ctx, "foo", "bar") + require.NoError(t, err) + + t.Log("Sleeping 30 seconds") + time.Sleep(30 * time.Second) + + t.Log("Remove the failpoint 'raftBeforeSave'") + err = epc.procs[leaderIdx].Failpoints().DeactivateHTTP(ctx, "raftBeforeSave") + require.NoError(t, err) + + // By default, etcd tries to revoke leases every 7 seconds. + t.Log("Sleeping 10 seconds") + time.Sleep(10 * time.Second) + + t.Log("Confirming the lease isn't revoked") + leases, err := client.Leases(ctx) + require.NoError(t, err) + require.Equal(t, 1, len(leases.Leases)) + + t.Log("Waiting for the keepAlive goroutine to exit") + close(stopC) + <-doneC +}