Skip to content

Commit

Permalink
Merge pull request #578 from zouyx/feature/addRegistryUnpub
Browse files Browse the repository at this point in the history
Rft : Optimize lock for zookeeper registry
  • Loading branch information
AlexStocks authored Jun 2, 2020
2 parents 9578fc0 + 9ae184f commit ff9eec7
Showing 1 changed file with 25 additions and 30 deletions.
55 changes: 25 additions & 30 deletions remoting/zookeeper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var (
type ZookeeperClient struct {
name string
ZkAddrs []string
sync.Mutex // for conn
sync.RWMutex // for conn
Conn *zk.Conn
Timeout time.Duration
exit chan struct{}
Expand Down Expand Up @@ -278,7 +278,7 @@ LOOP:
break LOOP
case (int)(zk.EventNodeDataChanged), (int)(zk.EventNodeChildrenChanged):
logger.Infof("zkClient{%s} get zk node changed event{path:%s}", z.name, event.Path)
z.Lock()
z.RLock()
for p, a := range z.eventRegistry {
if strings.HasPrefix(p, event.Path) {
logger.Infof("send event{state:zk.EventNodeDataChange, Path:%s} notify event to path{%s} related listener",
Expand All @@ -288,7 +288,7 @@ LOOP:
}
}
}
z.Unlock()
z.RUnlock()
case (int)(zk.StateConnecting), (int)(zk.StateConnected), (int)(zk.StateHasSession):
if state == (int)(zk.StateHasSession) {
continue
Expand Down Expand Up @@ -371,11 +371,11 @@ func (z *ZookeeperClient) ZkConnValid() bool {
}

valid := true
z.Lock()
z.RLock()
if z.Conn == nil {
valid = false
}
z.Unlock()
z.RUnlock()

return valid
}
Expand Down Expand Up @@ -416,15 +416,15 @@ func (z *ZookeeperClient) CreateWithValue(basePath string, value []byte) error {
)

logger.Debugf("zookeeperClient.Create(basePath{%s})", basePath)
conn := z.getConn()
err = errNilZkClientConn
if conn == nil {
return perrors.WithMessagef(err, "zk.Create(path:%s)", basePath)
}

for _, str := range strings.Split(basePath, "/")[1:] {
tmpPath = path.Join(tmpPath, "/", str)
err = errNilZkClientConn
z.Lock()
conn := z.Conn
z.Unlock()
if conn != nil {
_, err = conn.Create(tmpPath, value, 0, zk.WorldACL(zk.PermAll))
}
_, err = conn.Create(tmpPath, value, 0, zk.WorldACL(zk.PermAll))

if err != nil {
if err == zk.ErrNodeExists {
Expand All @@ -446,9 +446,7 @@ func (z *ZookeeperClient) Delete(basePath string) error {
)

err = errNilZkClientConn
z.Lock()
conn := z.Conn
z.Unlock()
conn := z.getConn()
if conn != nil {
err = conn.Delete(basePath, -1)
}
Expand All @@ -468,9 +466,7 @@ func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, er
err = errNilZkClientConn
data = []byte("")
zkPath = path.Join(basePath) + "/" + node
z.Lock()
conn := z.Conn
z.Unlock()
conn := z.getConn()
if conn != nil {
tmpPath, err = conn.Create(zkPath, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
}
Expand All @@ -493,9 +489,7 @@ func (z *ZookeeperClient) RegisterTempSeq(basePath string, data []byte) (string,
)

err = errNilZkClientConn
z.Lock()
conn := z.Conn
z.Unlock()
conn := z.getConn()
if conn != nil {
tmpPath, err = conn.Create(
path.Join(basePath)+"/",
Expand Down Expand Up @@ -526,9 +520,7 @@ func (z *ZookeeperClient) GetChildrenW(path string) ([]string, <-chan zk.Event,
)

err = errNilZkClientConn
z.Lock()
conn := z.Conn
z.Unlock()
conn := z.getConn()
if conn != nil {
children, stat, watcher, err = conn.ChildrenW(path)
}
Expand Down Expand Up @@ -562,9 +554,7 @@ func (z *ZookeeperClient) GetChildren(path string) ([]string, error) {
)

err = errNilZkClientConn
z.Lock()
conn := z.Conn
z.Unlock()
conn := z.getConn()
if conn != nil {
children, stat, err = conn.Children(path)
}
Expand Down Expand Up @@ -595,9 +585,7 @@ func (z *ZookeeperClient) ExistW(zkPath string) (<-chan zk.Event, error) {
)

err = errNilZkClientConn
z.Lock()
conn := z.Conn
z.Unlock()
conn := z.getConn()
if conn != nil {
exist, _, watcher, err = conn.ExistsW(zkPath)
}
Expand All @@ -618,3 +606,10 @@ func (z *ZookeeperClient) ExistW(zkPath string) (<-chan zk.Event, error) {
func (z *ZookeeperClient) GetContent(zkPath string) ([]byte, *zk.Stat, error) {
return z.Conn.Get(zkPath)
}

// getConn gets zookeeper connection safely
func (z *ZookeeperClient) getConn() *zk.Conn {
z.RLock()
defer z.RUnlock()
return z.Conn
}

0 comments on commit ff9eec7

Please sign in to comment.