From 1fdfb4292c2c31dc333b57b6a8e7b8c029efa597 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Thu, 15 Dec 2022 02:12:49 +0800 Subject: [PATCH] clientv3: revert the client side change in 14547 In order to fix https://github.com/etcd-io/etcd/issues/12385, PR https://github.com/etcd-io/etcd/pull/14322 introduced a change in which the client side may retry based on the error message returned from server side. This is not good, as it's too fragile and it's also changed the protocol between client and server. Please see the discussion in https://github.com/kubernetes/kubernetes/pull/114403 Note: The issue https://github.com/etcd-io/etcd/issues/12385 only happens when auth is enabled, and client side reuse the same client to watch. So we decided to rollback the change on 3.5, reasons: 1.K8s doesn't enable auth at all. It has no any impact on K8s. 2.It's very easy for client application to workaround the issue. The client just needs to create a new client each time before watching. Signed-off-by: Benjamin Wang --- client/v3/watch.go | 26 ------------------------ tests/integration/v3_auth_test.go | 33 ------------------------------- 2 files changed, 59 deletions(-) diff --git a/client/v3/watch.go b/client/v3/watch.go index 90f125ac466..bc886936c86 100644 --- a/client/v3/watch.go +++ b/client/v3/watch.go @@ -18,7 +18,6 @@ import ( "context" "errors" "fmt" - "strings" "sync" "time" @@ -589,26 +588,6 @@ func (w *watchGrpcStream) run() { switch { case pbresp.Created: - cancelReasonError := v3rpc.Error(errors.New(pbresp.CancelReason)) - if shouldRetryWatch(cancelReasonError) { - var newErr error - if wc, newErr = w.newWatchClient(); newErr != nil { - w.lg.Error("failed to create a new watch client", zap.Error(newErr)) - return - } - - if len(w.resuming) != 0 { - if ws := w.resuming[0]; ws != nil { - if err := wc.Send(ws.initReq.toPB()); err != nil { - w.lg.Debug("error when sending request", zap.Error(err)) - } - } - } - - cur = nil - continue - } - // response to head of queue creation if len(w.resuming) != 0 { if ws := w.resuming[0]; ws != nil { @@ -718,11 +697,6 @@ func (w *watchGrpcStream) run() { } } -func shouldRetryWatch(cancelReasonError error) bool { - return (strings.Compare(cancelReasonError.Error(), v3rpc.ErrGRPCInvalidAuthToken.Error()) == 0) || - (strings.Compare(cancelReasonError.Error(), v3rpc.ErrGRPCAuthOldRevision.Error()) == 0) -} - // nextResume chooses the next resuming to register with the grpc stream. Abandoned // streams are marked as nil in the queue since the head must wait for its inflight registration. func (w *watchGrpcStream) nextResume() *watcherStream { diff --git a/tests/integration/v3_auth_test.go b/tests/integration/v3_auth_test.go index bc042ded3ef..10c5dadb4ef 100644 --- a/tests/integration/v3_auth_test.go +++ b/tests/integration/v3_auth_test.go @@ -499,39 +499,6 @@ func TestV3AuthRestartMember(t *testing.T) { testutil.AssertNil(t, err) } -func TestV3AuthWatchAndTokenExpire(t *testing.T) { - BeforeTest(t) - clus := NewClusterV3(t, &ClusterConfig{Size: 1, AuthTokenTTL: 3}) - defer clus.Terminate(t) - - ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second) - defer cancel() - - authSetupRoot(t, toGRPC(clus.Client(0)).Auth) - - c, cerr := NewClient(t, clientv3.Config{Endpoints: clus.Client(0).Endpoints(), Username: "root", Password: "123"}) - if cerr != nil { - t.Fatal(cerr) - } - defer c.Close() - - _, err := c.Put(ctx, "key", "val") - if err != nil { - t.Fatalf("Unexpected error from Put: %v", err) - } - - // The first watch gets a valid auth token through watcher.newWatcherGrpcStream() - // We should discard the first one by waiting TTL after the first watch. - wChan := c.Watch(ctx, "key", clientv3.WithRev(1)) - watchResponse := <-wChan - - time.Sleep(5 * time.Second) - - wChan = c.Watch(ctx, "key", clientv3.WithRev(1)) - watchResponse = <-wChan - testutil.AssertNil(t, watchResponse.Err()) -} - func TestV3AuthWatchErrorAndWatchId0(t *testing.T) { BeforeTest(t) clus := NewClusterV3(t, &ClusterConfig{Size: 3})