Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #1166 for 3.0: Treat all zk child path url as new child #1249

Merged
merged 1 commit into from
Jun 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions registry/zookeeper/service_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,15 @@ import (
"dubbo.apache.org/dubbo-go/v3/registry/event"
)

var testName = "test"

var tc *zk.TestCluster
const testName = "test"

func prepareData(t *testing.T) *zk.TestCluster {
var err error
tc, err = zk.StartTestCluster(1, nil, nil, zk.WithRetryTimes(20))
tc, err := zk.StartTestCluster(1, nil, nil)
assert.NoError(t, err)
assert.NotNil(t, tc.Servers[0])
address := "127.0.0.1:" + strconv.Itoa(tc.Servers[0].Port)
//address := "127.0.0.1:2181"

config.GetBaseConfig().ServiceDiscoveries[testName] = &config.ServiceDiscoveryConfig{
Protocol: "zookeeper",
Expand All @@ -71,6 +70,7 @@ func TestNewZookeeperServiceDiscovery(t *testing.T) {
_, err := newZookeeperServiceDiscovery(name)

// the ServiceDiscoveryConfig not found
// err: could not init the instance because the config is invalid
assert.NotNil(t, err)

sdc := &config.ServiceDiscoveryConfig{
Expand All @@ -81,10 +81,20 @@ func TestNewZookeeperServiceDiscovery(t *testing.T) {
_, err = newZookeeperServiceDiscovery(name)

// RemoteConfig not found
// err: could not find the remote config for name: mock
assert.NotNil(t, err)
}

func TestCURDZookeeperServiceDiscovery(t *testing.T) {
func TestZookeeperServiceDiscovery_CURDAndListener(t *testing.T) {
tc := prepareData(t)
defer func() {
_ = tc.Stop()
}()
t.Run("testCURDZookeeperServiceDiscovery", testCURDZookeeperServiceDiscovery)
t.Run("testAddListenerZookeeperServiceDiscovery", testAddListenerZookeeperServiceDiscovery)
}

func testCURDZookeeperServiceDiscovery(t *testing.T) {
prepareData(t)
extension.SetEventDispatcher("mock", func() observer.EventDispatcher {
return dispatcher.NewMockEventDispatcher()
Expand Down Expand Up @@ -164,10 +174,7 @@ func TestCURDZookeeperServiceDiscovery(t *testing.T) {
assert.Nil(t, err)
}

func TestAddListenerZookeeperServiceDiscovery(t *testing.T) {
defer func() {
_ = tc.Stop()
}()
func testAddListenerZookeeperServiceDiscovery(t *testing.T) {
sd, err := newZookeeperServiceDiscovery(testName)
assert.Nil(t, err)
defer func() {
Expand Down
10 changes: 6 additions & 4 deletions remoting/getty/getty_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func testGetUser61(t *testing.T, c *Client) {

func testClient_AsyncCall(t *testing.T, client *Client) {
user := &User{}
lock := sync.Mutex{}
wg := sync.WaitGroup{}
request := remoting.NewRequest("2.0.2")
invocation := createInvocation("GetUser0", nil, nil, []interface{}{"4", nil, "username"},
[]reflect.Value{reflect.ValueOf("4"), reflect.ValueOf(nil), reflect.ValueOf("username")})
Expand All @@ -314,13 +314,13 @@ func testClient_AsyncCall(t *testing.T, client *Client) {
r := response.(remoting.AsyncCallbackResponse)
rst := *r.Reply.(*remoting.Response).Result.(*protocol.RPCResult)
assert.Equal(t, User{ID: "4", Name: "username"}, *(rst.Rest.(*User)))
lock.Unlock()
wg.Done()
}
lock.Lock()
wg.Add(1)
err := client.Request(request, 3*time.Second, rsp)
assert.NoError(t, err)
assert.Equal(t, User{}, *user)
time.Sleep(1 * time.Second)
wg.Done()
}

func InitTest(t *testing.T) (*Server, *common.URL) {
Expand Down Expand Up @@ -436,6 +436,8 @@ func (u *UserProvider) GetUser(ctx context.Context, req []interface{}, rsp *User
}

func (u *UserProvider) GetUser0(id string, k *User, name string) (User, error) {
// fix testClient_AsyncCall assertion bug(#1233)
time.Sleep(1 * time.Second)
return User{ID: id, Name: name}, nil
}

Expand Down
43 changes: 33 additions & 10 deletions remoting/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package zookeeper

import (
uatomic "go.uber.org/atomic"
"path"
"strings"
"sync"
Expand All @@ -38,13 +39,13 @@ import (
"dubbo.apache.org/dubbo-go/v3/remoting"
)

var defaultTTL = 15 * time.Minute
var defaultTTL = 10 * time.Minute

// nolint
type ZkEventListener struct {
client *gxzookeeper.ZookeeperClient
pathMapLock sync.Mutex
pathMap map[string]struct{}
pathMap map[string]*uatomic.Int32
wg sync.WaitGroup
exit chan struct{}
}
Expand All @@ -53,7 +54,7 @@ type ZkEventListener struct {
func NewZkEventListener(client *gxzookeeper.ZookeeperClient) *ZkEventListener {
return &ZkEventListener{
client: client,
pathMap: make(map[string]struct{}),
pathMap: make(map[string]*uatomic.Int32),
exit: make(chan struct{}),
}
}
Expand Down Expand Up @@ -81,6 +82,17 @@ func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener remotin
// nolint
func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool {
defer l.wg.Done()

l.pathMapLock.Lock()
a, ok := l.pathMap[zkPath]
if !ok || a.Load() > 1 {
l.pathMapLock.Unlock()
return false
}
a.Inc()
l.pathMapLock.Unlock()
defer a.Dec()

var zkEvent zk.Event
for {
keyEventCh, err := l.client.ExistW(zkPath)
Expand Down Expand Up @@ -158,9 +170,6 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
newNode string
)
for _, n := range newChildren {
if contains(children, n) {
continue
}

newNode = path.Join(zkPath, n)
logger.Infof("add zkNode{%s}", newNode)
Expand All @@ -176,6 +185,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
// listen l service node
l.wg.Add(1)
go func(node string, listener remoting.DataListener) {
// invoker l.wg.Done() in l.listenServiceNodeEvent
if l.listenServiceNodeEvent(node, listener) {
logger.Warnf("delete zkNode{%s}", node)
listener.DataChange(remoting.Event{Path: node, Action: remoting.EventTypeDel})
Expand Down Expand Up @@ -276,15 +286,15 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
// Save the path to avoid listen repeatedly
l.pathMapLock.Lock()
_, ok := l.pathMap[dubboPath]
if !ok {
l.pathMap[dubboPath] = uatomic.NewInt32(0)
}
l.pathMapLock.Unlock()
if ok {
logger.Warnf("@zkPath %s has already been listened.", dubboPath)
continue
}

l.pathMapLock.Lock()
l.pathMap[dubboPath] = struct{}{}
l.pathMapLock.Unlock()
// When Zk disconnected, the Conn will be set to nil, so here need check the value of Conn
l.client.RLock()
if l.client.Conn == nil {
Expand All @@ -303,6 +313,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
logger.Infof("listen dubbo service key{%s}", dubboPath)
l.wg.Add(1)
go func(zkPath string, listener remoting.DataListener) {
// invoker l.wg.Done() in l.listenServiceNodeEvent
if l.listenServiceNodeEvent(zkPath, listener) {
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
l.pathMapLock.Lock()
Expand All @@ -324,12 +335,24 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
}
}
// Periodically update provider information
ticker := time.NewTicker(ttl)
tickerTTL := ttl
if tickerTTL > 20e9 {
tickerTTL = 20e9
}
ticker := time.NewTicker(tickerTTL)
WATCH:
for {
select {
case <-ticker.C:
l.handleZkNodeEvent(zkPath, children, listener)
if tickerTTL < ttl {
tickerTTL *= 2
if tickerTTL > ttl {
tickerTTL = ttl
}
ticker.Stop()
ticker = time.NewTicker(tickerTTL)
}
case zkEvent = <-childEventCh:
logger.Warnf("get a zookeeper childEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, gxzookeeper.StateToString(zkEvent.State), zkEvent.Err)
Expand Down