From 77f3730cb9d83b14d80a7e994214a85194549d88 Mon Sep 17 00:00:00 2001 From: dongjianhui03 Date: Wed, 1 Dec 2021 23:25:44 +0800 Subject: [PATCH] refactor zk dynamic configuration listener --- cluster/router/v3router/router_chain.go | 34 +++++--- common/constant/default.go | 7 +- common/constant/key.go | 1 + config/consumer_config.go | 2 +- config_center/zookeeper/impl.go | 29 +++++-- config_center/zookeeper/listener.go | 56 +++++++------ remoting/listener.go | 2 +- remoting/zookeeper/client.go | 2 +- remoting/zookeeper/listener.go | 101 ++++++++++++++++-------- 9 files changed, 146 insertions(+), 88 deletions(-) diff --git a/cluster/router/v3router/router_chain.go b/cluster/router/v3router/router_chain.go index 783e94ae32..6d593cf45a 100644 --- a/cluster/router/v3router/router_chain.go +++ b/cluster/router/v3router/router_chain.go @@ -35,6 +35,7 @@ import ( "dubbo.apache.org/dubbo-go/v3/config" "dubbo.apache.org/dubbo-go/v3/config_center" "dubbo.apache.org/dubbo-go/v3/protocol" + "dubbo.apache.org/dubbo-go/v3/remoting" ) // RouterChain contains all uniform router logic @@ -46,31 +47,36 @@ type RouterChain struct { // nolint func NewUniformRouterChain() (router.PriorityRouter, error) { - // 1. add mesh route listener + // 1. Add mesh route listener r := &RouterChain{} rootConfig := config.GetRootConfig() dynamicConfiguration := conf.GetEnvInstance().GetDynamicConfiguration() if dynamicConfiguration == nil { - logger.Infof("[Mesh Router] Config center does not start, please check if the configuration center has been properly configured in dubbogo.yml") + logger.Infof("[NewUniformRouterChain] Config center does not start, please check if the configuration center has been properly configured in dubbogo.yml") return nil, nil } - dynamicConfiguration.AddListener(rootConfig.Application.Name, r) - // 2. try to get mesh route configuration, default key is "dubbo.io.MESHAPPRULE" with group "dubbo" + // 2. Try to get mesh rules configuration, default key is "dubbo.io.MESHAPPRULE" with group "dubbo" key := rootConfig.Application.Name + constant.MeshRouteSuffix + group := rootConfig.ConfigCenter.Group + if group == "" { + group = constant.Dubbo + } + dynamicConfiguration.AddListener(group+constant.PathSeparator+key, r) meshRouteValue, err := dynamicConfiguration.GetProperties(key, config_center.WithGroup(rootConfig.ConfigCenter.Group)) if err != nil { - // the mesh route may not be initialized now - logger.Warnf("Can not get mesh route for key=%s, error=%v", key, err) + // The mesh rules may not be initialized now + logger.Warnf("[NewUniformRouterChain]Can not get mesh rules for group=%s, key=%s, error=%+v", rootConfig.ConfigCenter.Group, key, err) return r, nil } - logger.Debugf("Successfully get mesh route:%s", meshRouteValue) + logger.Debugf("[NewUniformRouterChain]Successfully get mesh rules:%s", meshRouteValue) routes, err := parseRoute(meshRouteValue) if err != nil { - logger.Warnf("Parse mesh route failed, error=%v", err) + logger.Warnf("[NewUniformRouterChain]Parse mesh rules failed, error=%+v", err) return nil, err } r.routers = routes + logger.Infof("[NewUniformRouterChain]Successfully init mesh rules with:\n%s", meshRouteValue) return r, nil } @@ -84,13 +90,19 @@ func (r *RouterChain) Route(invokers []protocol.Invoker, url *common.URL, invoca // Process process route config change event func (r *RouterChain) Process(event *config_center.ConfigChangeEvent) { - logger.Debugf("RouteChain process event:\n%+v", event) + logger.Infof("[RouteChain]Process config change event:%+v", event) + if event.ConfigType == remoting.EventTypeDel { + r.routers = nil + return + } routers, err := parseRoute(event.Value.(string)) if err != nil { + logger.Warnf("[RouteChain]Parse new mesh route config error, %+v "+ + "and we will use the original mesh rule configuration.", err) return } r.routers = routers - // todo delete router + logger.Infof("[RouteChain]Parse Mesh Rule Success.") } // Name get name of ConnCheckerRouter @@ -108,7 +120,7 @@ func (r *RouterChain) URL() *common.URL { return nil } -// parseFromConfigToRouters parse virtualService and destinationRule yaml file bytes to target router list +// Deprecated parseFromConfigToRouters parse virtualService and destinationRule yaml file bytes to target router list func parseFromConfigToRouters(virtualServiceConfig, destinationRuleConfig []byte) ([]*UniformRouter, error) { var virtualServiceConfigList []*config.VirtualServiceConfig destRuleConfigsMap := make(map[string]map[string]map[string]string) diff --git a/common/constant/default.go b/common/constant/default.go index 70a7eba50d..5b40ccef40 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -27,11 +27,8 @@ const ( ) const ( - DefaultWeight = 100 // - DefaultWarmup = 10 * 60 // in java here is 10*60*1000 because of System.currentTimeMillis() is measured in milliseconds & in go time.Unix() is second -) - -const ( + DefaultWeight = 100 // + DefaultWarmup = 10 * 60 // in java here is 10*60*1000 because of System.currentTimeMillis() is measured in milliseconds & in go time.Unix() is second DefaultLoadBalance = "random" DefaultRetries = "2" DefaultRetriesInt = 2 diff --git a/common/constant/key.go b/common/constant/key.go index 97d2851f36..60c544ceeb 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -49,6 +49,7 @@ const ( PortKey = "port" ProtocolKey = "protocol" PathSeparator = "/" + DotSeparator = "." CommaSeparator = "," SslEnabledKey = "ssl-enabled" // ParamsTypeKey key used in pass through invoker factory, to define param type diff --git a/config/consumer_config.go b/config/consumer_config.go index f9344f32cb..5e641b692c 100644 --- a/config/consumer_config.go +++ b/config/consumer_config.go @@ -83,7 +83,7 @@ func (cc *ConsumerConfig) Init(rc *RootConfig) error { // try to use interface name defined by pb triplePBService, ok := reference.(common.TriplePBService) if !ok { - logger.Errorf("Dubbogo cannot get interface name with reference = %s."+ + logger.Errorf("Dubbo-go cannot get interface name with reference = %s."+ "Please run the command 'go install github.com/dubbogo/tools/cmd/protoc-gen-go-triple@latest' to get the latest "+ "protoc-gen-go-triple, and then re-generate your pb file again by this tool."+ "If you are not using pb serialization, please set 'interfaceName' field in reference config to let dubbogo get the interface name.", key) diff --git a/config_center/zookeeper/impl.go b/config_center/zookeeper/impl.go index d6b6da1888..730b882177 100644 --- a/config_center/zookeeper/impl.go +++ b/config_center/zookeeper/impl.go @@ -20,6 +20,7 @@ package zookeeper import ( "encoding/base64" "strconv" + "strings" "sync" ) @@ -65,8 +66,9 @@ type zookeeperDynamicConfiguration struct { func newZookeeperDynamicConfiguration(url *common.URL) (*zookeeperDynamicConfiguration, error) { c := &zookeeperDynamicConfiguration{ - url: url, - rootPath: "/" + url.GetParam(constant.ConfigNamespaceKey, config_center.DefaultGroup) + "/config", + url: url, + // TODO adapt config center config + rootPath: "/dubbo/config", } logger.Infof("[Zookeeper ConfigCenter] New Zookeeper ConfigCenter with Configuration: %+v, url = %+v", c, c.GetURL()) if v, ok := config.GetRootConfig().ConfigCenter.Params["base64"]; ok { @@ -93,13 +95,26 @@ func newZookeeperDynamicConfiguration(url *common.URL) (*zookeeperDynamicConfigu // Start listener c.listener = zookeeper.NewZkEventListener(c.client) - c.cacheListener = NewCacheListener(c.rootPath) - c.listener.ListenServiceEvent(url, c.rootPath, c.cacheListener) + c.cacheListener = NewCacheListener(c.rootPath, c.listener) + c.listener.ListenConfigurationEvent(c.rootPath, c.cacheListener) return c, nil } -func (c *zookeeperDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener, opions ...config_center.Option) { - c.cacheListener.AddListener(key, listener) +// AddListener add listener for key +// TODO this method should has a parameter 'group', and it does not now, so we should concat group and key with '/' manually +func (c *zookeeperDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener, options ...config_center.Option) { + qualifiedKey := buildPath(c.rootPath, key) + c.cacheListener.AddListener(qualifiedKey, listener) +} + +// buildPath build path and format +func buildPath(rootPath, subPath string) string { + path := strings.TrimRight(rootPath+pathSeparator+subPath, pathSeparator) + if !strings.HasPrefix(path, pathSeparator) { + path = pathSeparator + path + } + path = strings.ReplaceAll(path, "//", "/") + return path } func (c *zookeeperDynamicConfiguration) RemoveListener(key string, listener config_center.ConfigurationListener, opions ...config_center.Option) { @@ -118,7 +133,7 @@ func (c *zookeeperDynamicConfiguration) GetProperties(key string, opts ...config if len(tmpOpts.Group) != 0 { key = tmpOpts.Group + "/" + key } else { - key = c.GetURL().GetParam(constant.ConfigNamespaceKey, config_center.DefaultGroup) + key = c.GetURL().GetParam(constant.ConfigNamespaceKey, config_center.DefaultGroup) + "/" + key } content, _, err := c.client.GetContent(c.rootPath + "/" + key) if err != nil { diff --git a/config_center/zookeeper/listener.go b/config_center/zookeeper/listener.go index 5dd5457ee0..3d311799cd 100644 --- a/config_center/zookeeper/listener.go +++ b/config_center/zookeeper/listener.go @@ -24,27 +24,33 @@ import ( import ( "dubbo.apache.org/dubbo-go/v3/common/constant" - "dubbo.apache.org/dubbo-go/v3/common/logger" - "dubbo.apache.org/dubbo-go/v3/config" "dubbo.apache.org/dubbo-go/v3/config_center" "dubbo.apache.org/dubbo-go/v3/remoting" + "dubbo.apache.org/dubbo-go/v3/remoting/zookeeper" ) // CacheListener defines keyListeners and rootPath type CacheListener struct { - keyListeners sync.Map - rootPath string + // key is zkNode Path and value is set of listeners + keyListeners sync.Map + zkEventListener *zookeeper.ZkEventListener + rootPath string } // NewCacheListener creates a new CacheListener -func NewCacheListener(rootPath string) *CacheListener { - return &CacheListener{rootPath: rootPath} +func NewCacheListener(rootPath string, listener *zookeeper.ZkEventListener) *CacheListener { + return &CacheListener{zkEventListener: listener, rootPath: rootPath} } // AddListener will add a listener if loaded func (l *CacheListener) AddListener(key string, listener config_center.ConfigurationListener) { + // FIXME do not use Client.ExistW, cause it has a bug(can not watch zk node that do not exist) + _, _, _, err := l.zkEventListener.Client.Conn.ExistsW(key) // reference from https://stackoverflow.com/questions/34018908/golang-why-dont-we-have-a-set-datastructure // make a map[your type]struct{} like set in java + if err != nil { + return + } listeners, loaded := l.keyListeners.LoadOrStore(key, map[config_center.ConfigurationListener]struct{}{listener: {}}) if loaded { listeners.(map[config_center.ConfigurationListener]struct{})[listener] = struct{}{} @@ -62,36 +68,28 @@ func (l *CacheListener) RemoveListener(key string, listener config_center.Config // DataChange changes all listeners' event func (l *CacheListener) DataChange(event remoting.Event) bool { + changeType := event.Action if event.Content == "" { - // meanings new node - return true - } - var key string - // TODO use common way - if strings.HasSuffix(event.Path, constant.MeshRouteSuffix) { - key = config.GetRootConfig().Application.Name - } else { - key = l.pathToKey(event.Path) + changeType = remoting.EventTypeDel } - if key != "" { - if listeners, ok := l.keyListeners.Load(key); ok { - for listener := range listeners.(map[config_center.ConfigurationListener]struct{}) { - listener.Process(&config_center.ConfigChangeEvent{Key: key, Value: event.Content, ConfigType: event.Action}) - } - return true + + if listeners, ok := l.keyListeners.Load(event.Path); ok { + for listener := range listeners.(map[config_center.ConfigurationListener]struct{}) { + listener.Process(&config_center.ConfigChangeEvent{ + Key: l.pathToKey(event.Path), + Value: event.Content, + ConfigType: changeType, + }) } + return true } return false } func (l *CacheListener) pathToKey(path string) string { - key := strings.Replace(strings.Replace(path, l.rootPath+"/", "", -1), "/", ".", -1) - if strings.HasSuffix(key, constant.ConfiguratorSuffix) || - strings.HasSuffix(key, constant.TagRouterRuleSuffix) || - strings.HasSuffix(key, constant.ConditionRouterRuleSuffix) { - // governance config, so we remove the "dubbo." prefix - key = key[strings.Index(key, ".")+1:] + if len(path) == 0 { + return path } - logger.Debugf("pathToKey path:%s, key:%s\n", path, key) - return key + groupKey := strings.Replace(strings.Replace(path, l.rootPath+constant.PathSeparator, "", -1), constant.PathSeparator, constant.DotSeparator, -1) + return groupKey[strings.Index(groupKey, constant.DotSeparator)+1:] } diff --git a/remoting/listener.go b/remoting/listener.go index a87e502446..ea2300fddb 100644 --- a/remoting/listener.go +++ b/remoting/listener.go @@ -23,7 +23,7 @@ import ( // DataListener defines common data listener interface type DataListener interface { - DataChange(eventType Event) bool // bool is return for interface implement is interesting + DataChange(event Event) bool // bool is return for interface implement is interesting } ////////////////////////////////////////// diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go index 5c955dac0f..f05863abb8 100644 --- a/remoting/zookeeper/client.go +++ b/remoting/zookeeper/client.go @@ -52,7 +52,7 @@ func ValidateZookeeperClient(container ZkClientFacade, zkName string) error { timeout := url.GetParamDuration(constant.ConfigTimeoutKey, constant.DefaultRegTimeout) zkAddresses := strings.Split(url.Location, ",") - logger.Infof("[Zookeeper Client] New zookeeper client with name = %s, zkAddress = %s, timeout = %d", zkName, url.Location, timeout.String()) + logger.Infof("[Zookeeper Client] New zookeeper client with name = %s, zkAddress = %s, timeout = %s", zkName, url.Location, timeout.String()) newClient, cltErr := gxzookeeper.NewZookeeperClient(zkName, zkAddresses, true, gxzookeeper.WithZkTimeOut(timeout)) if cltErr != nil { logger.Warnf("newZookeeperClient(name{%s}, zk address{%v}, timeout{%d}) = error{%v}", diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index a7f7be8644..7bf4382276 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -45,7 +45,7 @@ var defaultTTL = 10 * time.Minute // nolint type ZkEventListener struct { - client *gxzookeeper.ZookeeperClient + Client *gxzookeeper.ZookeeperClient pathMapLock sync.Mutex pathMap map[string]*uatomic.Int32 wg sync.WaitGroup @@ -55,17 +55,12 @@ type ZkEventListener struct { // NewZkEventListener returns a EventListener instance func NewZkEventListener(client *gxzookeeper.ZookeeperClient) *ZkEventListener { return &ZkEventListener{ - client: client, + Client: client, pathMap: make(map[string]*uatomic.Int32), exit: make(chan struct{}), } } -// nolint -func (l *ZkEventListener) SetClient(client *gxzookeeper.ZookeeperClient) { - l.client = client -} - // ListenServiceNodeEvent listen a path node event func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener remoting.DataListener) { // listen l service node @@ -81,6 +76,57 @@ func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener remotin }(zkPath, listener) } +// ListenConfigurationEvent listen a path node event +func (l *ZkEventListener) ListenConfigurationEvent(zkPath string, listener remoting.DataListener) { + l.wg.Add(1) + go func(zkPath string, listener remoting.DataListener) { + var eventChan = make(chan zk.Event, 16) + l.Client.RegisterEvent(zkPath, eventChan) + for { + select { + case event := <-eventChan: + logger.Infof("[ZkEventListener]Receive configuration change event:%#v", event) + if event.Type == zk.EventNodeChildrenChanged || event.Type == zk.EventNotWatching { + continue + } + // 1. Re-set watcher for the zk node + _, _, _, err := l.Client.Conn.ExistsW(event.Path) + if err != nil { + logger.Warnf("[ZkEventListener]Re-set watcher error, the reason is %+v", err) + continue + } + + action := remoting.EventTypeAdd + var content string + if event.Type == zk.EventNodeDeleted { + action = remoting.EventTypeDel + } else { + // 2. Try to get new configuration value of the zk node + // Notice: The order of step 1 and step 2 cannot be swapped, if you get value(with timestamp t1) + // before re-set the watcher(with timestamp t2), and some one change the data of the zk node after + // t2 but before t1, you may get the old value, and the new value will not trigger the event. + contentBytes, _, err := l.Client.Conn.Get(event.Path) + if err != nil { + logger.Warnf("[ListenConfigurationEvent]Get config value error, the reason is %+v", err) + continue + } + content = string(contentBytes) + logger.Debugf("[ZkEventListener]Successfully get new config value: %s", string(content)) + } + + listener.DataChange(remoting.Event{ + Path: event.Path, + Action: remoting.EventType(action), + Content: content, + }) + case <-l.exit: + return + } + } + + }(zkPath, listener) +} + // nolint func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool { defer l.wg.Done() @@ -97,7 +143,7 @@ func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remo var zkEvent zk.Event for { - keyEventCh, err := l.client.ExistW(zkPath) + keyEventCh, err := l.Client.ExistW(zkPath) if err != nil { logger.Warnf("existW{key:%s} = error{%v}", zkPath, err) return false @@ -111,7 +157,7 @@ func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remo case zk.EventNodeDataChanged: logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDataChanged}", zkPath) if len(listener) > 0 { - content, _, err := l.client.Conn.Get(zkEvent.Path) + content, _, err := l.Client.Conn.Get(zkEvent.Path) if err != nil { logger.Warnf("zk.Conn.Get{key:%s} = error{%v}", zkPath, err) return false @@ -119,9 +165,9 @@ func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remo listener[0].DataChange(remoting.Event{Path: zkEvent.Path, Action: remoting.EventTypeUpdate, Content: string(content)}) } case zk.EventNodeCreated: - logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeCreated}", zkPath) + logger.Warnf("[ZkEventListener][listenServiceNodeEvent]Get a EventNodeCreated event for path {%s}", zkPath) if len(listener) > 0 { - content, _, err := l.client.Conn.Get(zkEvent.Path) + content, _, err := l.Client.Conn.Get(zkEvent.Path) if err != nil { logger.Warnf("zk.Conn.Get{key:%s} = error{%v}", zkPath, err) return false @@ -129,9 +175,9 @@ func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remo listener[0].DataChange(remoting.Event{Path: zkEvent.Path, Action: remoting.EventTypeAdd, Content: string(content)}) } case zk.EventNotWatching: - logger.Warnf("zk.ExistW(key{%s}) = event{EventNotWatching}", zkPath) + logger.Infof("[ZkEventListener][listenServiceNodeEvent]Get a EventNotWatching event for path {%s}", zkPath) case zk.EventNodeDeleted: - logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDeleted}", zkPath) + logger.Infof("[ZkEventListener][listenServiceNodeEvent]Get a EventNodeDeleted event for path {%s}", zkPath) return true } case <-l.exit: @@ -150,11 +196,11 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li return false } - newChildren, err := l.client.GetChildren(zkPath) + newChildren, err := l.Client.GetChildren(zkPath) if err != nil { // TODO need to ignore this error in gost if err == gxzookeeper.ErrNilChildren { - content, _, connErr := l.client.Conn.Get(zkPath) + content, _, connErr := l.Client.Conn.Get(zkPath) if connErr != nil { logger.Errorf("Get new node path {%v} 's content error,message is {%v}", zkPath, perrors.WithStack(connErr)) @@ -176,7 +222,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li newNode = path.Join(zkPath, n) logger.Debugf("[Zookeeper Listener] add zkNode{%s}", newNode) - content, _, connErr := l.client.Conn.Get(newNode) + content, _, connErr := l.Client.Conn.Get(newNode) if connErr != nil { logger.Errorf("Get new node path {%v} 's content error,message is {%v}", newNode, perrors.WithStack(connErr)) @@ -231,7 +277,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li } for { // Get current children with watcher for the zkRootPath - children, childEventCh, err := l.client.GetChildrenW(zkRootPath) + children, childEventCh, err := l.Client.GetChildrenW(zkRootPath) if err != nil { failTimes++ if MaxFailTimes <= failTimes { @@ -277,14 +323,14 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li } // When Zk disconnected, the Conn will be set to nil, so here need check the value of Conn - l.client.RLock() - if l.client.Conn == nil { - l.client.RUnlock() + l.Client.RLock() + if l.Client.Conn == nil { + l.Client.RUnlock() break } - content, _, err := l.client.Conn.Get(zkNodePath) + content, _, err := l.Client.Conn.Get(zkNodePath) - l.client.RUnlock() + l.Client.RUnlock() if err != nil { logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get content of the child node {%v} failed, the error is %+v", zkNodePath, perrors.WithStack(err)) } @@ -304,17 +350,6 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li } logger.Warnf("listenDirEvent->listenSelf(zk path{%s}) goroutine exit now", zkPath) }(zkNodePath, listener) - - // listen sub path recursive - // if zkPath is end of "providers/ & consumers/" we do not listen children dir - if strings.LastIndex(zkRootPath, constant.ProviderCategory) == -1 && - strings.LastIndex(zkRootPath, constant.ConsumerCategory) == -1 { - l.wg.Add(1) - go func(zkPath string, listener remoting.DataListener) { - l.listenDirEvent(conf, zkPath, listener) - logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath) - }(zkNodePath, listener) - } } if l.startScheduleWatchTask(zkRootPath, children, ttl, listener, childEventCh) { return