Skip to content

Commit

Permalink
fix endless loop caused by ErrCompacted (#3774)
Browse files Browse the repository at this point in the history
Co-authored-by: lidengfu <lidengfu@excean.com>
  • Loading branch information
LeeDF and lidengfu authored Dec 17, 2023
1 parent 83a776a commit 48625fa
Showing 1 changed file with 13 additions and 6 deletions.
19 changes: 13 additions & 6 deletions core/discov/internal/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internal

import (
"context"
"errors"
"fmt"
"io"
"sort"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/syncx"
"github.com/zeromicro/go-zero/core/threading"
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3"
)

Expand Down Expand Up @@ -288,13 +290,18 @@ func (c *cluster) reload(cli EtcdClient) {

func (c *cluster) watch(cli EtcdClient, key string, rev int64) {
for {
if c.watchStream(cli, key, rev) {
err := c.watchStream(cli, key, rev)
if err == nil {
return
}
if rev != 0 && errors.Is(err, v3rpc.ErrCompacted) {
logx.Errorf("etcd watch stream has been compacted, try to reload, rev %v", rev)
rev = c.load(cli, key)
}
}
}

func (c *cluster) watchStream(cli EtcdClient, key string, rev int64) bool {
func (c *cluster) watchStream(cli EtcdClient, key string, rev int64) error {
var rch clientv3.WatchChan
if rev != 0 {
rch = cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key), clientv3.WithPrefix(),
Expand All @@ -308,20 +315,20 @@ func (c *cluster) watchStream(cli EtcdClient, key string, rev int64) bool {
case wresp, ok := <-rch:
if !ok {
logx.Error("etcd monitor chan has been closed")
return false
return errors.New("etcd monitor chan has been closed")
}
if wresp.Canceled {
logx.Errorf("etcd monitor chan has been canceled, error: %v", wresp.Err())
return false
return wresp.Err()
}
if wresp.Err() != nil {
logx.Error(fmt.Sprintf("etcd monitor chan error: %v", wresp.Err()))
return false
return wresp.Err()
}

c.handleWatchEvents(key, wresp.Events)
case <-c.done:
return true
return nil
}
}
}
Expand Down

0 comments on commit 48625fa

Please sign in to comment.