Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
1. fix the startup process of the configuration center(remove error log)
2. refactor zk registry listener
  • Loading branch information
dongjianhui03 committed Nov 4, 2021
1 parent 669ab0a commit 5fd6115
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 85 deletions.
25 changes: 10 additions & 15 deletions config/config_center_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,19 @@ func (c *CenterConfig) toURL() (*common.URL, error) {
// it will prepare the environment
func startConfigCenter(rc *RootConfig) error {
cc := rc.ConfigCenter
strConf, err := cc.prepareEnvironment()
dynamicConfig, err := cc.GetDynamicConfiguration()
if err != nil {
return errors.WithMessagef(err, "start config center error!")
logger.Errorf("Start dynamic configuration center error, error message is %v", err)
return err
}
envInstance := conf.GetEnvInstance()
envInstance.SetDynamicConfiguration(dynamicConfig)

strConf, err := dynamicConfig.GetProperties(cc.DataId, config_center.WithGroup(cc.Group))
if err != nil {
logger.Warnf("Dynamic onfig center has started, but config may not be initialized, because %s", err)
return nil
}
koan := koanf.New(".")
if err = koan.Load(rawbytes.Provider([]byte(strConf)), yaml.Parser()); err != nil {
return err
Expand Down Expand Up @@ -167,25 +175,12 @@ func (c *CenterConfig) GetDynamicConfiguration() (config_center.DynamicConfigura
}
dynamicConfig, err := c.CreateDynamicConfiguration()
if err != nil {
logger.Warnf("Create dynamic configuration error , error message is %v", err)
return nil, errors.WithStack(err)
}
c.DynamicConfiguration = dynamicConfig
return dynamicConfig, nil
}

func (c *CenterConfig) prepareEnvironment() (string, error) {
dynamicConfig, err := c.GetDynamicConfiguration()
if err != nil {
logger.Errorf("Create dynamic configuration error , error message is %v", err)
return "", errors.WithStack(err)
}
envInstance := conf.GetEnvInstance()
envInstance.SetDynamicConfiguration(dynamicConfig)

return dynamicConfig.GetProperties(c.DataId, config_center.WithGroup(c.Group))
}

func NewConfigCenterConfigBuilder() *ConfigCenterConfigBuilder {
return &ConfigCenterConfigBuilder{configCenterConfig: newEmptyConfigCenterConfig()}
}
Expand Down
2 changes: 1 addition & 1 deletion config_center/apollo/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func newApolloConfiguration(url *common.URL) (*apolloConfiguration, error) {
c.appConf = &config.AppConfig{
AppID: url.GetParam(constant.CONFIG_APP_ID_KEY, ""),
Cluster: url.GetParam(constant.CONFIG_CLUSTER_KEY, ""),
NamespaceName: url.GetParam(constant.CONFIG_NAMESPACE_KEY, cc.DEFAULT_GROUP),
NamespaceName: url.GetParam(constant.CONFIG_NAMESPACE_KEY, cc.DefaultGroup),
IP: c.getAddressWithProtocolPrefix(url),
Secret: url.GetParam(constant.CONFIG_SECRET_KEY, ""),
IsBackupConfig: url.GetParamBool(constant.CONFIG_BACKUP_CONFIG_KEY, true),
Expand Down
8 changes: 4 additions & 4 deletions config_center/dynamic_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ import (
// DynamicConfiguration
// ////////////////////////////////////////
const (
// DEFAULT_GROUP: default group
DEFAULT_GROUP = "dubbo"
// DEFAULT_CONFIG_TIMEOUT: default config timeout
DEFAULT_CONFIG_TIMEOUT = "10s"
// DefaultGroup default group
DefaultGroup = "dubbo"
// DefaultConfigTimeout default config timeout
DefaultConfigTimeout = "10s"
)

// DynamicConfiguration for modify listener and get properties file
Expand Down
2 changes: 1 addition & 1 deletion config_center/file/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (fsdc *FileSystemDynamicConfiguration) GetPath(key string, group string) st
}

if len(group) == 0 {
group = config_center.DEFAULT_GROUP
group = config_center.DefaultGroup
}

return filepath.Join(fsdc.rootPath, group, adapterKey(key))
Expand Down
2 changes: 1 addition & 1 deletion config_center/nacos/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type nacosDynamicConfiguration struct {

func newNacosDynamicConfiguration(url *common.URL) (*nacosDynamicConfiguration, error) {
c := &nacosDynamicConfiguration{
rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DEFAULT_GROUP) + "/config",
rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DefaultGroup) + "/config",
url: url,
done: make(chan struct{}),
}
Expand Down
6 changes: 4 additions & 2 deletions config_center/zookeeper/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type zookeeperDynamicConfiguration struct {
func newZookeeperDynamicConfiguration(url *common.URL) (*zookeeperDynamicConfiguration, error) {
c := &zookeeperDynamicConfiguration{
url: url,
rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DEFAULT_GROUP) + "/config",
rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DefaultGroup) + "/config",
}
if v, ok := config.GetRootConfig().ConfigCenter.Params["base64"]; ok {
base64Enabled, err := strconv.ParseBool(v)
Expand Down Expand Up @@ -146,6 +146,8 @@ func (c *zookeeperDynamicConfiguration) PublishConfig(key string, group string,
if c.base64Enabled {
valueBytes = []byte(base64.StdEncoding.EncodeToString(valueBytes))
}
// FIXME this method need to be fixed, because it will recursively
// create every node in the path with given value which we may not expected.
err := c.client.CreateWithValue(path, valueBytes)
if err != nil {
return perrors.WithStack(err)
Expand Down Expand Up @@ -246,7 +248,7 @@ func (c *zookeeperDynamicConfiguration) getPath(key string, group string) string

func (c *zookeeperDynamicConfiguration) buildPath(group string) string {
if len(group) == 0 {
group = config_center.DEFAULT_GROUP
group = config_center.DefaultGroup
}
return c.rootPath + pathSeparator + group
}
12 changes: 7 additions & 5 deletions config_center/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/config_center"
"dubbo.apache.org/dubbo-go/v3/remoting"
)
Expand Down Expand Up @@ -65,10 +66,12 @@ func (l *CacheListener) DataChange(event remoting.Event) bool {
// meanings new node
return true
}
key := l.pathToKey(event.Path)
var key string
// TODO use common way
if strings.HasSuffix(key, constant.MeshRouteSuffix) {
key = key[:strings.Index(key, constant.MeshRouteSuffix)]
if strings.HasSuffix(event.Path, constant.MeshRouteSuffix) {
key = config.GetRootConfig().Application.Name
} else {
key = l.pathToKey(event.Path)
}
if key != "" {
if listeners, ok := l.keyListeners.Load(key); ok {
Expand All @@ -85,8 +88,7 @@ func (l *CacheListener) pathToKey(path string) string {
key := strings.Replace(strings.Replace(path, l.rootPath+"/", "", -1), "/", ".", -1)
if strings.HasSuffix(key, constant.ConfiguratorSuffix) ||
strings.HasSuffix(key, constant.TagRouterRuleSuffix) ||
strings.HasSuffix(key, constant.ConditionRouterRuleSuffix) ||
strings.HasSuffix(key, constant.MeshRouteSuffix) {
strings.HasSuffix(key, constant.ConditionRouterRuleSuffix) {
// governance config, so we remove the "dubbo." prefix
key = key[strings.Index(key, ".")+1:]
}
Expand Down
116 changes: 60 additions & 56 deletions remoting/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,13 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
}
}

func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listener remoting.DataListener) {
func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener) {
defer l.wg.Done()

var (
failTimes int
ttl time.Duration
event chan struct{}
zkEvent zk.Event
)
event = make(chan struct{}, 4)
ttl = defaultTTL
Expand All @@ -235,13 +234,13 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
defer close(event)
for {
// get current children for a zkPath
children, childEventCh, err := l.client.GetChildrenW(zkPath)
children, childEventCh, err := l.client.GetChildrenW(zkRootPath)
if err != nil {
failTimes++
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
}
logger.Debugf("listenDirEvent(path{%s}) = error{%v}", zkPath, err)
logger.Debugf("listenDirEvent(path{%s}) = error{%v}", zkRootPath, err)
// clear the event channel
CLEAR:
for {
Expand All @@ -251,52 +250,51 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
break CLEAR
}
}
l.client.RegisterEvent(zkPath, &event)
l.client.RegisterEvent(zkRootPath, &event)
if err == errNilNode {
logger.Warnf("listenDirEvent(path{%s}) got errNilNode,so exit listen", zkPath)
l.client.UnregisterEvent(zkPath, &event)
logger.Warnf("listenDirEvent(path{%s}) got errNilNode,so exit listen", zkRootPath)
l.client.UnregisterEvent(zkRootPath, &event)
return
}

after := time.After(timeSecondDuration(failTimes * ConnDelay))
select {
case <-after:
l.client.UnregisterEvent(zkPath, &event)
l.client.UnregisterEvent(zkRootPath, &event)
continue
case <-l.exit:
l.client.UnregisterEvent(zkPath, &event)
logger.Debugf("listen(path{%s}) goroutine exit now...", zkPath)
l.client.UnregisterEvent(zkRootPath, &event)
logger.Debugf("listen(path{%s}) goroutine exit now...", zkRootPath)
return
case <-event:
logger.Debugf("get zk.EventNodeDataChange notify event")
l.client.UnregisterEvent(zkPath, &event)
l.handleZkNodeEvent(zkPath, nil, listener)
l.client.UnregisterEvent(zkRootPath, &event)
l.handleZkNodeEvent(zkRootPath, nil, listener)
continue
}
}
failTimes = 0
for _, c := range children {

// Only need to compare Path when subscribing to provider
if strings.LastIndex(zkPath, constant.PROVIDER_CATEGORY) != -1 {
if strings.LastIndex(zkRootPath, constant.PROVIDER_CATEGORY) != -1 {
provider, _ := common.NewURL(c)
if provider.ServiceKey() != conf.ServiceKey() {
continue
}
}

// listen l service node
dubboPath := path.Join(zkPath, c)
zkNodePath := path.Join(zkRootPath, c)

// Save the path to avoid listen repeatedly
l.pathMapLock.Lock()
_, ok := l.pathMap[dubboPath]
_, ok := l.pathMap[zkNodePath]
if !ok {
l.pathMap[dubboPath] = uatomic.NewInt32(0)
l.pathMap[zkNodePath] = uatomic.NewInt32(0)
}
l.pathMapLock.Unlock()
if ok {
logger.Warnf("@zkPath %s has already been listened.", dubboPath)
logger.Warnf("@zkPath %s has already been listened.", zkNodePath)
continue
}

Expand All @@ -306,16 +304,17 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
l.client.RUnlock()
break
}
content, _, err := l.client.Conn.Get(dubboPath)
content, _, err := l.client.Conn.Get(zkNodePath)

l.client.RUnlock()
if err != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", dubboPath, perrors.WithStack(err))
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", zkNodePath, perrors.WithStack(err))
}
logger.Debugf("Get children!{%s}", dubboPath)
if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeAdd, Content: string(content)}) {
logger.Debugf("Get children!{%s}", zkNodePath)
if !listener.DataChange(remoting.Event{Path: zkNodePath, Action: remoting.EventTypeAdd, Content: string(content)}) {
continue
}
logger.Infof("listen dubbo service key{%s}", dubboPath)
logger.Infof("listen dubbo service key{%s}", zkNodePath)
l.wg.Add(1)
go func(zkPath string, listener remoting.DataListener) {
// invoker l.wg.Done() in l.listenServiceNodeEvent
Expand All @@ -326,54 +325,59 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
l.pathMapLock.Unlock()
}
logger.Warnf("listenDirEvent->listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(dubboPath, listener)
}(zkNodePath, listener)

// listen sub path recursive
// if zkPath is end of "providers/ & consumers/" we do not listen children dir
if strings.LastIndex(zkPath, constant.PROVIDER_CATEGORY) == -1 &&
strings.LastIndex(zkPath, constant.CONSUMER_CATEGORY) == -1 {
if strings.LastIndex(zkRootPath, constant.PROVIDER_CATEGORY) == -1 &&
strings.LastIndex(zkRootPath, constant.CONSUMER_CATEGORY) == -1 {
l.wg.Add(1)
go func(zkPath string, listener remoting.DataListener) {
l.listenDirEvent(conf, zkPath, listener)
logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
}(dubboPath, listener)
}(zkNodePath, listener)
}
}
// Periodically update provider information
tickerTTL := ttl
if tickerTTL > 20e9 {
tickerTTL = 20e9
if l.startScheduleWatchTask(zkRootPath, children, ttl, listener, childEventCh) {
return
}
ticker := time.NewTicker(tickerTTL)
WATCH:
for {
select {
case <-ticker.C:
l.handleZkNodeEvent(zkPath, children, listener)
if tickerTTL < ttl {
tickerTTL *= 2
if tickerTTL > ttl {
tickerTTL = ttl
}
ticker.Stop()
ticker = time.NewTicker(tickerTTL)
}
}

func (l *ZkEventListener) startScheduleWatchTask(
zkRootPath string, children []string, ttl time.Duration,
listener remoting.DataListener, childEventCh <-chan zk.Event) bool {
// Periodically update provider information
tickerTTL := ttl
if tickerTTL > 20e9 {
tickerTTL = 20e9
}
ticker := time.NewTicker(tickerTTL)
for {
select {
case <-ticker.C:
l.handleZkNodeEvent(zkRootPath, children, listener)
if tickerTTL < ttl {
tickerTTL *= 2
if tickerTTL > ttl {
tickerTTL = ttl
}
case zkEvent = <-childEventCh:
logger.Warnf("get a zookeeper childEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%v}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, gxzookeeper.StateToString(zkEvent.State), zkEvent.Err)
ticker.Stop()
if zkEvent.Type != zk.EventNodeChildrenChanged {
break WATCH
}
ticker = time.NewTicker(tickerTTL)
}
case zkEvent := <-childEventCh:
logger.Warnf("get a zookeeper childEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%v}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, gxzookeeper.StateToString(zkEvent.State), zkEvent.Err)
ticker.Stop()
if zkEvent.Type == zk.EventNodeChildrenChanged {
l.handleZkNodeEvent(zkEvent.Path, children, listener)
break WATCH
case <-l.exit:
logger.Warnf("listen(path{%s}) goroutine exit now...", zkPath)
ticker.Stop()
return
}
return false
case <-l.exit:
logger.Warnf("listen(path{%s}) goroutine exit now...", zkRootPath)
ticker.Stop()
return true
}

}
}

Expand Down

0 comments on commit 5fd6115

Please sign in to comment.