Skip to content

Commit

Permalink
refactor zk dynamic configuration listener
Browse files Browse the repository at this point in the history
  • Loading branch information
dongjianhui03 committed Jan 6, 2022
1 parent c4fa4a0 commit 77f3730
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 88 deletions.
34 changes: 23 additions & 11 deletions cluster/router/v3router/router_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/config_center"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/remoting"
)

// RouterChain contains all uniform router logic
Expand All @@ -46,31 +47,36 @@ type RouterChain struct {

// nolint
func NewUniformRouterChain() (router.PriorityRouter, error) {
// 1. add mesh route listener
// 1. Add mesh route listener
r := &RouterChain{}
rootConfig := config.GetRootConfig()
dynamicConfiguration := conf.GetEnvInstance().GetDynamicConfiguration()
if dynamicConfiguration == nil {
logger.Infof("[Mesh Router] Config center does not start, please check if the configuration center has been properly configured in dubbogo.yml")
logger.Infof("[NewUniformRouterChain] Config center does not start, please check if the configuration center has been properly configured in dubbogo.yml")
return nil, nil
}
dynamicConfiguration.AddListener(rootConfig.Application.Name, r)

// 2. try to get mesh route configuration, default key is "dubbo.io.MESHAPPRULE" with group "dubbo"
// 2. Try to get mesh rules configuration, default key is "dubbo.io.MESHAPPRULE" with group "dubbo"
key := rootConfig.Application.Name + constant.MeshRouteSuffix
group := rootConfig.ConfigCenter.Group
if group == "" {
group = constant.Dubbo
}
dynamicConfiguration.AddListener(group+constant.PathSeparator+key, r)
meshRouteValue, err := dynamicConfiguration.GetProperties(key, config_center.WithGroup(rootConfig.ConfigCenter.Group))
if err != nil {
// the mesh route may not be initialized now
logger.Warnf("Can not get mesh route for key=%s, error=%v", key, err)
// The mesh rules may not be initialized now
logger.Warnf("[NewUniformRouterChain]Can not get mesh rules for group=%s, key=%s, error=%+v", rootConfig.ConfigCenter.Group, key, err)
return r, nil
}
logger.Debugf("Successfully get mesh route:%s", meshRouteValue)
logger.Debugf("[NewUniformRouterChain]Successfully get mesh rules:%s", meshRouteValue)
routes, err := parseRoute(meshRouteValue)
if err != nil {
logger.Warnf("Parse mesh route failed, error=%v", err)
logger.Warnf("[NewUniformRouterChain]Parse mesh rules failed, error=%+v", err)
return nil, err
}
r.routers = routes
logger.Infof("[NewUniformRouterChain]Successfully init mesh rules with:\n%s", meshRouteValue)
return r, nil
}

Expand All @@ -84,13 +90,19 @@ func (r *RouterChain) Route(invokers []protocol.Invoker, url *common.URL, invoca

// Process process route config change event
func (r *RouterChain) Process(event *config_center.ConfigChangeEvent) {
logger.Debugf("RouteChain process event:\n%+v", event)
logger.Infof("[RouteChain]Process config change event:%+v", event)
if event.ConfigType == remoting.EventTypeDel {
r.routers = nil
return
}
routers, err := parseRoute(event.Value.(string))
if err != nil {
logger.Warnf("[RouteChain]Parse new mesh route config error, %+v "+
"and we will use the original mesh rule configuration.", err)
return
}
r.routers = routers
// todo delete router
logger.Infof("[RouteChain]Parse Mesh Rule Success.")
}

// Name get name of ConnCheckerRouter
Expand All @@ -108,7 +120,7 @@ func (r *RouterChain) URL() *common.URL {
return nil
}

// parseFromConfigToRouters parse virtualService and destinationRule yaml file bytes to target router list
// Deprecated parseFromConfigToRouters parse virtualService and destinationRule yaml file bytes to target router list
func parseFromConfigToRouters(virtualServiceConfig, destinationRuleConfig []byte) ([]*UniformRouter, error) {
var virtualServiceConfigList []*config.VirtualServiceConfig
destRuleConfigsMap := make(map[string]map[string]map[string]string)
Expand Down
7 changes: 2 additions & 5 deletions common/constant/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,8 @@ const (
)

const (
DefaultWeight = 100 //
DefaultWarmup = 10 * 60 // in java here is 10*60*1000 because of System.currentTimeMillis() is measured in milliseconds & in go time.Unix() is second
)

const (
DefaultWeight = 100 //
DefaultWarmup = 10 * 60 // in java here is 10*60*1000 because of System.currentTimeMillis() is measured in milliseconds & in go time.Unix() is second
DefaultLoadBalance = "random"
DefaultRetries = "2"
DefaultRetriesInt = 2
Expand Down
1 change: 1 addition & 0 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const (
PortKey = "port"
ProtocolKey = "protocol"
PathSeparator = "/"
DotSeparator = "."
CommaSeparator = ","
SslEnabledKey = "ssl-enabled"
// ParamsTypeKey key used in pass through invoker factory, to define param type
Expand Down
2 changes: 1 addition & 1 deletion config/consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (cc *ConsumerConfig) Init(rc *RootConfig) error {
// try to use interface name defined by pb
triplePBService, ok := reference.(common.TriplePBService)
if !ok {
logger.Errorf("Dubbogo cannot get interface name with reference = %s."+
logger.Errorf("Dubbo-go cannot get interface name with reference = %s."+
"Please run the command 'go install github.com/dubbogo/tools/cmd/protoc-gen-go-triple@latest' to get the latest "+
"protoc-gen-go-triple, and then re-generate your pb file again by this tool."+
"If you are not using pb serialization, please set 'interfaceName' field in reference config to let dubbogo get the interface name.", key)
Expand Down
29 changes: 22 additions & 7 deletions config_center/zookeeper/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package zookeeper
import (
"encoding/base64"
"strconv"
"strings"
"sync"
)

Expand Down Expand Up @@ -65,8 +66,9 @@ type zookeeperDynamicConfiguration struct {

func newZookeeperDynamicConfiguration(url *common.URL) (*zookeeperDynamicConfiguration, error) {
c := &zookeeperDynamicConfiguration{
url: url,
rootPath: "/" + url.GetParam(constant.ConfigNamespaceKey, config_center.DefaultGroup) + "/config",
url: url,
// TODO adapt config center config
rootPath: "/dubbo/config",
}
logger.Infof("[Zookeeper ConfigCenter] New Zookeeper ConfigCenter with Configuration: %+v, url = %+v", c, c.GetURL())
if v, ok := config.GetRootConfig().ConfigCenter.Params["base64"]; ok {
Expand All @@ -93,13 +95,26 @@ func newZookeeperDynamicConfiguration(url *common.URL) (*zookeeperDynamicConfigu

// Start listener
c.listener = zookeeper.NewZkEventListener(c.client)
c.cacheListener = NewCacheListener(c.rootPath)
c.listener.ListenServiceEvent(url, c.rootPath, c.cacheListener)
c.cacheListener = NewCacheListener(c.rootPath, c.listener)
c.listener.ListenConfigurationEvent(c.rootPath, c.cacheListener)
return c, nil
}

func (c *zookeeperDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener, opions ...config_center.Option) {
c.cacheListener.AddListener(key, listener)
// AddListener add listener for key
// TODO this method should has a parameter 'group', and it does not now, so we should concat group and key with '/' manually
func (c *zookeeperDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener, options ...config_center.Option) {
qualifiedKey := buildPath(c.rootPath, key)
c.cacheListener.AddListener(qualifiedKey, listener)
}

// buildPath build path and format
func buildPath(rootPath, subPath string) string {
path := strings.TrimRight(rootPath+pathSeparator+subPath, pathSeparator)
if !strings.HasPrefix(path, pathSeparator) {
path = pathSeparator + path
}
path = strings.ReplaceAll(path, "//", "/")
return path
}

func (c *zookeeperDynamicConfiguration) RemoveListener(key string, listener config_center.ConfigurationListener, opions ...config_center.Option) {
Expand All @@ -118,7 +133,7 @@ func (c *zookeeperDynamicConfiguration) GetProperties(key string, opts ...config
if len(tmpOpts.Group) != 0 {
key = tmpOpts.Group + "/" + key
} else {
key = c.GetURL().GetParam(constant.ConfigNamespaceKey, config_center.DefaultGroup)
key = c.GetURL().GetParam(constant.ConfigNamespaceKey, config_center.DefaultGroup) + "/" + key
}
content, _, err := c.client.GetContent(c.rootPath + "/" + key)
if err != nil {
Expand Down
56 changes: 27 additions & 29 deletions config_center/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,33 @@ import (

import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/config_center"
"dubbo.apache.org/dubbo-go/v3/remoting"
"dubbo.apache.org/dubbo-go/v3/remoting/zookeeper"
)

// CacheListener defines keyListeners and rootPath
type CacheListener struct {
keyListeners sync.Map
rootPath string
// key is zkNode Path and value is set of listeners
keyListeners sync.Map
zkEventListener *zookeeper.ZkEventListener
rootPath string
}

// NewCacheListener creates a new CacheListener
func NewCacheListener(rootPath string) *CacheListener {
return &CacheListener{rootPath: rootPath}
func NewCacheListener(rootPath string, listener *zookeeper.ZkEventListener) *CacheListener {
return &CacheListener{zkEventListener: listener, rootPath: rootPath}
}

// AddListener will add a listener if loaded
func (l *CacheListener) AddListener(key string, listener config_center.ConfigurationListener) {
// FIXME do not use Client.ExistW, cause it has a bug(can not watch zk node that do not exist)
_, _, _, err := l.zkEventListener.Client.Conn.ExistsW(key)
// reference from https://stackoverflow.com/questions/34018908/golang-why-dont-we-have-a-set-datastructure
// make a map[your type]struct{} like set in java
if err != nil {
return
}
listeners, loaded := l.keyListeners.LoadOrStore(key, map[config_center.ConfigurationListener]struct{}{listener: {}})
if loaded {
listeners.(map[config_center.ConfigurationListener]struct{})[listener] = struct{}{}
Expand All @@ -62,36 +68,28 @@ func (l *CacheListener) RemoveListener(key string, listener config_center.Config

// DataChange changes all listeners' event
func (l *CacheListener) DataChange(event remoting.Event) bool {
changeType := event.Action
if event.Content == "" {
// meanings new node
return true
}
var key string
// TODO use common way
if strings.HasSuffix(event.Path, constant.MeshRouteSuffix) {
key = config.GetRootConfig().Application.Name
} else {
key = l.pathToKey(event.Path)
changeType = remoting.EventTypeDel
}
if key != "" {
if listeners, ok := l.keyListeners.Load(key); ok {
for listener := range listeners.(map[config_center.ConfigurationListener]struct{}) {
listener.Process(&config_center.ConfigChangeEvent{Key: key, Value: event.Content, ConfigType: event.Action})
}
return true

if listeners, ok := l.keyListeners.Load(event.Path); ok {
for listener := range listeners.(map[config_center.ConfigurationListener]struct{}) {
listener.Process(&config_center.ConfigChangeEvent{
Key: l.pathToKey(event.Path),
Value: event.Content,
ConfigType: changeType,
})
}
return true
}
return false
}

func (l *CacheListener) pathToKey(path string) string {
key := strings.Replace(strings.Replace(path, l.rootPath+"/", "", -1), "/", ".", -1)
if strings.HasSuffix(key, constant.ConfiguratorSuffix) ||
strings.HasSuffix(key, constant.TagRouterRuleSuffix) ||
strings.HasSuffix(key, constant.ConditionRouterRuleSuffix) {
// governance config, so we remove the "dubbo." prefix
key = key[strings.Index(key, ".")+1:]
if len(path) == 0 {
return path
}
logger.Debugf("pathToKey path:%s, key:%s\n", path, key)
return key
groupKey := strings.Replace(strings.Replace(path, l.rootPath+constant.PathSeparator, "", -1), constant.PathSeparator, constant.DotSeparator, -1)
return groupKey[strings.Index(groupKey, constant.DotSeparator)+1:]
}
2 changes: 1 addition & 1 deletion remoting/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

// DataListener defines common data listener interface
type DataListener interface {
DataChange(eventType Event) bool // bool is return for interface implement is interesting
DataChange(event Event) bool // bool is return for interface implement is interesting
}

//////////////////////////////////////////
Expand Down
2 changes: 1 addition & 1 deletion remoting/zookeeper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func ValidateZookeeperClient(container ZkClientFacade, zkName string) error {
timeout := url.GetParamDuration(constant.ConfigTimeoutKey, constant.DefaultRegTimeout)

zkAddresses := strings.Split(url.Location, ",")
logger.Infof("[Zookeeper Client] New zookeeper client with name = %s, zkAddress = %s, timeout = %d", zkName, url.Location, timeout.String())
logger.Infof("[Zookeeper Client] New zookeeper client with name = %s, zkAddress = %s, timeout = %s", zkName, url.Location, timeout.String())
newClient, cltErr := gxzookeeper.NewZookeeperClient(zkName, zkAddresses, true, gxzookeeper.WithZkTimeOut(timeout))
if cltErr != nil {
logger.Warnf("newZookeeperClient(name{%s}, zk address{%v}, timeout{%d}) = error{%v}",
Expand Down
Loading

0 comments on commit 77f3730

Please sign in to comment.