diff --git a/cluster/router/condition/dynamic_router.go b/cluster/router/condition/dynamic_router.go index 6d89771b37..c02d0a09de 100644 --- a/cluster/router/condition/dynamic_router.go +++ b/cluster/router/condition/dynamic_router.go @@ -127,8 +127,7 @@ func (s *ServiceRouter) Notify(invokers []protocol.Invoker) { logger.Warnf("config center does not start, please check if the configuration center has been properly configured in dubbogo.yml") return } - key := strings.Join([]string{strings.Join([]string{url.Service(), url.GetParam(constant.VersionKey, ""), url.GetParam(constant.GroupKey, "")}, ":"), - constant.ConditionRouterRuleSuffix}, "") + key := strings.Join([]string{url.ColonSeparatedKey(), constant.ConditionRouterRuleSuffix}, "") dynamicConfiguration.AddListener(key, s) value, err := dynamicConfiguration.GetRule(key) if err != nil { diff --git a/cluster/router/tag/match.go b/cluster/router/tag/match.go index 14633606af..0b4493dfc7 100644 --- a/cluster/router/tag/match.go +++ b/cluster/router/tag/match.go @@ -99,22 +99,38 @@ func requestTag(invokers []protocol.Invoker, url *common.URL, invocation protoco var ( addresses []string result []protocol.Invoker + match []*common.ParamMatch ) for _, tagCfg := range cfg.Tags { if tagCfg.Name == tag { addresses = tagCfg.Addresses + match = tagCfg.Match } } - if len(addresses) == 0 { - // filter tag does not match - result = filterInvokers(invokers, tag, func(invoker protocol.Invoker, tag interface{}) bool { - return invoker.GetURL().GetParam(constant.Tagkey, "") != tag + + // only one of 'match' and 'addresses' will take effect if both are specified. + if len(match) != 0 { + result = filterInvokers(invokers, match, func(invoker protocol.Invoker, match interface{}) bool { + matches := match.([]*common.ParamMatch) + for _, m := range matches { + if !m.IsMatch(invoker.GetURL()) { + return true + } + } + return false }) - logger.Debugf("[tag router] filter dynamic tag, tag=%s, invokers=%+v", tag, result) } else { - // filter address does not match - result = filterInvokers(invokers, addresses, getAddressPredicate(false)) - logger.Debugf("[tag router] filter dynamic tag address, invokers=%+v", result) + if len(addresses) == 0 { + // filter tag does not match + result = filterInvokers(invokers, tag, func(invoker protocol.Invoker, tag interface{}) bool { + return invoker.GetURL().GetParam(constant.Tagkey, "") != tag + }) + logger.Debugf("[tag router] filter dynamic tag, tag=%s, invokers=%+v", tag, result) + } else { + // filter address does not match + result = filterInvokers(invokers, addresses, getAddressPredicate(false)) + logger.Debugf("[tag router] filter dynamic tag address, invokers=%+v", result) + } } // returns the result directly if *cfg.Force || requestIsForce(url, invocation) { @@ -135,6 +151,7 @@ func requestTag(invokers []protocol.Invoker, url *common.URL, invocation protoco return result } +// filterInvokers remove invokers that match with predicate from the original input. func filterInvokers(invokers []protocol.Invoker, param interface{}, predicate predicate) []protocol.Invoker { result := make([]protocol.Invoker, len(invokers)) copy(result, invokers) diff --git a/common/constant/cluster.go b/common/constant/cluster.go index 4467b3d31b..a8b321b609 100644 --- a/common/constant/cluster.go +++ b/common/constant/cluster.go @@ -32,3 +32,8 @@ const ( const ( NonImportErrorMsgFormat = "Cluster for %s is not existing, make sure you have import the package." ) + +const ( + MatchCondition = "MATCH_CONDITION" + APIVersion = "v3.0" +) diff --git a/common/host_util.go b/common/host_util.go index eac0ac1450..a1bbc4fa8d 100644 --- a/common/host_util.go +++ b/common/host_util.go @@ -20,6 +20,7 @@ package common import ( "os" "strconv" + "strings" ) import ( @@ -80,3 +81,28 @@ func isValidPort(port string) bool { portInt, err := strconv.Atoi(port) return err == nil && portInt > 0 && portInt < 65536 } + +func IsMatchGlobPattern(pattern, value string) bool { + if constant.AnyValue == pattern { + return true + } + if pattern == "" && value == "" { + return true + } + if pattern == "" || value == "" { + return false + } + + i := strings.Index(pattern, constant.AnyValue) + if i == -1 { // doesn't find "*" + return value == pattern + } else if i == len(pattern)-1 { // "*" is at the end + return strings.HasPrefix(value, pattern[0:i]) + } else if i == 0 { // "*" is at the beginning + return strings.HasSuffix(value, pattern[i+1:]) + } else { // "*" is in the middle + prefix := pattern[0:i] + suffix := pattern[i+1:] + return strings.HasPrefix(value, prefix) && strings.HasSuffix(value, suffix) + } +} diff --git a/common/match.go b/common/match.go new file mode 100644 index 0000000000..08099e07f8 --- /dev/null +++ b/common/match.go @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package common + +import ( + "dubbo.apache.org/dubbo-go/v3/common/constant" + "fmt" + "net" + "regexp" + "strings" +) + +type ParamMatch struct { + Key string `yaml:"key" json:"key,omitempty" property:"key"` + Value StringMatch `yaml:"value" json:"value,omitempty" property:"value"` +} + +func (p *ParamMatch) IsMatch(url *URL) bool { + return p.Value.IsMatch(url.GetParam(p.Key, "")) +} + +type StringMatch struct { + Exact string `yaml:"exact" json:"exact,omitempty" property:"exact"` + Prefix string `yaml:"prefix" json:"prefix,omitempty" property:"prefix"` + Regex string `yaml:"regex" json:"regex,omitempty" property:"regex"` + Noempty string `yaml:"noempty" json:"noempty,omitempty" property:"noempty"` + Empty string `yaml:"empty" json:"empty,omitempty" property:"empty"` + Wildcard string `yaml:"wildcard" json:"wildcard,omitempty" property:"wildcard"` +} + +func (p *StringMatch) IsMatch(value string) bool { + if p.Exact != "" { + return p.Exact == value + } else if p.Prefix != "" { + return strings.HasPrefix(value, p.Prefix) + } else if p.Regex != "" { + match, _ := regexp.MatchString(p.Regex, value) + return match + } else if p.Wildcard != "" { + return value == p.Wildcard || constant.AnyValue == p.Wildcard + } else if p.Empty != "" { + return value == "" + } else if p.Noempty != "" { + return value != "" + } + return false +} + +type AddressMatch struct { + Wildcard string `yaml:"wildcard" json:"wildcard,omitempty" property:"wildcard"` + Cird string `yaml:"cird" json:"cird,omitempty" property:"cird"` + Exact string `yaml:"exact" json:"exact,omitempty" property:"exact"` +} + +func (p *AddressMatch) IsMatch(value string) bool { + if p.Cird != "" && value != "" { + _, ipnet, err := net.ParseCIDR(p.Cird) + if err != nil { + fmt.Println("Error", p.Cird, err) + return false + } + return ipnet.Contains(net.ParseIP(value)) + } + if p.Wildcard != "" && value != "" { + if constant.AnyValue == value || constant.AnyHostValue == value { + return true + } + return IsMatchGlobPattern(p.Wildcard, value) + } + if p.Exact != "" && value != "" { + return p.Exact == value + } + return false +} + +type ListStringMatch struct { + Oneof []StringMatch `yaml:"oneof" json:"oneof,omitempty" property:"oneof"` +} + +func (p *ListStringMatch) IsMatch(value string) bool { + for _, match := range p.Oneof { + if match.IsMatch(value) { + return true + } + } + return false +} diff --git a/common/url.go b/common/url.go index 6590ebfe9d..a07f87c3b4 100644 --- a/common/url.go +++ b/common/url.go @@ -115,7 +115,19 @@ type URL struct { Password string Methods []string // special for registry - SubURL *URL + SubURL *URL + attributes sync.Map +} + +func (c *URL) AddAttribute(key string, value interface{}) { + if value != nil { + c.attributes.Store(key, value) + } +} + +func (c *URL) GetAttribute(key string) interface{} { + v, _ := c.attributes.Load(key) + return v } // JavaClassName POJO for URL diff --git a/config/router_config.go b/config/router_config.go index 616e994ea2..a68339a4a4 100644 --- a/config/router_config.go +++ b/config/router_config.go @@ -18,6 +18,7 @@ package config import ( + "dubbo.apache.org/dubbo-go/v3/common" "github.com/creasty/defaults" ) @@ -41,8 +42,9 @@ type RouterConfig struct { } type Tag struct { - Name string `yaml:"name" json:"name,omitempty" property:"name"` - Addresses []string `yaml:"addresses" json:"addresses,omitempty" property:"addresses"` + Name string `yaml:"name" json:"name,omitempty" property:"name"` + Match []*common.ParamMatch `yaml:"match" json:"match,omitempty" property:"match"` + Addresses []string `yaml:"addresses" json:"addresses,omitempty" property:"addresses"` } // Prefix dubbo.router diff --git a/config_center/configurator/override.go b/config_center/configurator/override.go index b39322525b..f74d1c86e1 100644 --- a/config_center/configurator/override.go +++ b/config_center/configurator/override.go @@ -18,6 +18,7 @@ package configurator import ( + "dubbo.apache.org/dubbo-go/v3/config_center/parser" "strings" ) @@ -57,13 +58,19 @@ func (c *overrideConfigurator) Configure(url *common.URL) { // branch for version 2.7.x apiVersion := c.configuratorUrl.GetParam(constant.ConfigVersionKey, "") if len(apiVersion) != 0 { + var host string currentSide := url.GetParam(constant.SideKey, "") configuratorSide := c.configuratorUrl.GetParam(constant.SideKey, "") - if currentSide == configuratorSide && common.DubboRole[common.CONSUMER] == currentSide && c.configuratorUrl.Port == "0" { - localIP := common.GetLocalIp() - c.configureIfMatch(localIP, url) - } else if currentSide == configuratorSide && common.DubboRole[common.PROVIDER] == currentSide && c.configuratorUrl.Port == url.Port { - c.configureIfMatch(url.Ip, url) + if currentSide == configuratorSide && common.DubboRole[common.CONSUMER] == currentSide { + host = common.GetLocalIp() + } else if currentSide == configuratorSide && common.DubboRole[common.PROVIDER] == currentSide { + host = url.Ip + } + + if strings.HasPrefix(apiVersion, constant.APIVersion) { + c.configureIfMatchV3(host, url) + } else { + c.configureIfMatch(host, url) } } else { // branch for version 2.6.x and less @@ -71,20 +78,43 @@ func (c *overrideConfigurator) Configure(url *common.URL) { } } +// configureIfMatch +func (c *overrideConfigurator) configureIfMatchV3(host string, url *common.URL) { + conditionKeys := getConditionKeys() + matcher := c.configuratorUrl.GetAttribute(constant.MatchCondition) + if matcher != nil { + conditionMatcher := matcher.(*parser.ConditionMatch) + if conditionMatcher.IsMatch(host, url) { + configUrl := c.configuratorUrl.CloneExceptParams(conditionKeys) + url.SetParams(configUrl.GetParams()) + } + } +} + +func (c *overrideConfigurator) configureDeprecated(url *common.URL) { + // If override url has port, means it is a provider address. We want to control a specific provider with this override url, it may take effect on the specific provider instance or on consumers holding this provider instance. + if c.configuratorUrl.Port != "0" { + if url.Port == c.configuratorUrl.Port { + c.configureIfMatch(url.Ip, url) + } + } else { + // override url don't have a port, means the ip override url specify is a consumer address or 0.0.0.0 + // 1.If it is a consumer ip address, the intention is to control a specific consumer instance, it must takes effect at the consumer side, any provider received this override url should ignore; + // 2.If the ip is 0.0.0.0, this override url can be used on consumer, and also can be used on provider + if url.GetParam(constant.SideKey, "") == common.DubboRole[common.CONSUMER] { + localIP := common.GetLocalIp() + c.configureIfMatch(localIP, url) + } else { + c.configureIfMatch(constant.AnyHostValue, url) + } + } +} + func (c *overrideConfigurator) configureIfMatchInternal(url *common.URL) { configApp := c.configuratorUrl.GetParam(constant.ApplicationKey, c.configuratorUrl.Username) currentApp := url.GetParam(constant.ApplicationKey, url.Username) if len(configApp) == 0 || constant.AnyValue == configApp || configApp == currentApp { - conditionKeys := gxset.NewSet() - conditionKeys.Add(constant.CategoryKey) - conditionKeys.Add(constant.CheckKey) - conditionKeys.Add(constant.EnabledKey) - conditionKeys.Add(constant.GroupKey) - conditionKeys.Add(constant.VersionKey) - conditionKeys.Add(constant.ApplicationKey) - conditionKeys.Add(constant.SideKey) - conditionKeys.Add(constant.ConfigVersionKey) - conditionKeys.Add(constant.CompatibleConfigKey) + conditionKeys := getConditionKeys() returnUrl := false c.configuratorUrl.RangeParams(func(k, _ string) bool { value := c.configuratorUrl.GetParam(k, "") @@ -115,21 +145,16 @@ func (c *overrideConfigurator) configureIfMatch(host string, url *common.URL) { } } -func (c *overrideConfigurator) configureDeprecated(url *common.URL) { - // If override url has port, means it is a provider address. We want to control a specific provider with this override url, it may take effect on the specific provider instance or on consumers holding this provider instance. - if c.configuratorUrl.Port != "0" { - if url.Port == c.configuratorUrl.Port { - c.configureIfMatch(url.Ip, url) - } - } else { - // override url don't have a port, means the ip override url specify is a consumer address or 0.0.0.0 - // 1.If it is a consumer ip address, the intention is to control a specific consumer instance, it must takes effect at the consumer side, any provider received this override url should ignore; - // 2.If the ip is 0.0.0.0, this override url can be used on consumer, and also can be used on provider - if url.GetParam(constant.SideKey, "") == common.DubboRole[common.CONSUMER] { - localIP := common.GetLocalIp() - c.configureIfMatch(localIP, url) - } else { - c.configureIfMatch(constant.AnyHostValue, url) - } - } +func getConditionKeys() *gxset.HashSet { + conditionKeys := gxset.NewSet() + conditionKeys.Add(constant.CategoryKey) + conditionKeys.Add(constant.CheckKey) + conditionKeys.Add(constant.EnabledKey) + conditionKeys.Add(constant.GroupKey) + conditionKeys.Add(constant.VersionKey) + conditionKeys.Add(constant.ApplicationKey) + conditionKeys.Add(constant.SideKey) + conditionKeys.Add(constant.ConfigVersionKey) + conditionKeys.Add(constant.CompatibleConfigKey) + return conditionKeys } diff --git a/config_center/mock_dynamic_config.go b/config_center/mock_dynamic_config.go index 0abf808c39..9561abf41a 100644 --- a/config_center/mock_dynamic_config.go +++ b/config_center/mock_dynamic_config.go @@ -168,7 +168,7 @@ func (c *MockDynamicConfiguration) MockServiceConfigEvent() { }, } value, _ := yaml.Marshal(config) - key := "group*" + mockServiceName + ":1.0.0" + constant.ConfiguratorSuffix + key := mockServiceName + ":1.0.0:group" + constant.ConfiguratorSuffix c.listener[key].Process(&ConfigChangeEvent{Key: key, Value: string(value), ConfigType: remoting.EventTypeAdd}) } diff --git a/config_center/parser/configuration_parser.go b/config_center/parser/configuration_parser.go index 1f4b96083a..a262cc6055 100644 --- a/config_center/parser/configuration_parser.go +++ b/config_center/parser/configuration_parser.go @@ -70,6 +70,38 @@ type ConfigItem struct { Applications []string `yaml:"applications"` Parameters map[string]string `yaml:"parameters"` Side string `yaml:"side"` + Match *ConditionMatch `yaml:"match"` +} + +type ConditionMatch struct { + Address *common.AddressMatch `yaml:"address"` + ProviderAddress *common.AddressMatch `yaml:"providerAddress"` + Service *common.ListStringMatch `yaml:"service"` + App *common.ListStringMatch `yaml:"app"` + Param []*common.ParamMatch `yaml:"param"` +} + +func (c *ConditionMatch) IsMatch(host string, url *common.URL) bool { + if !c.Address.IsMatch(host) { + return false + } + if !c.ProviderAddress.IsMatch(url.Location) { + return false + } + if !c.Service.IsMatch(url.ServiceKey()) { + return false + } + if !c.App.IsMatch(url.GetParam(constant.ApplicationKey, "")) { + return false + } + if c.Param != nil { + for _, p := range c.Param { + if !p.IsMatch(url) { + return false + } + } + } + return true } // Parse load content @@ -145,6 +177,7 @@ func serviceItemToUrls(item ConfigItem, config ConfiguratorConfig) ([]*common.UR if err != nil { return nil, perrors.WithStack(err) } + url.AddAttribute(constant.MatchCondition, item.Match) urls = append(urls, url) } } else { @@ -152,6 +185,7 @@ func serviceItemToUrls(item ConfigItem, config ConfiguratorConfig) ([]*common.UR if err != nil { return nil, perrors.WithStack(err) } + url.AddAttribute(constant.MatchCondition, item.Match) urls = append(urls, url) } } @@ -193,6 +227,7 @@ func appItemToUrls(item ConfigItem, config ConfiguratorConfig) ([]*common.URL, e if err != nil { return nil, perrors.WithStack(err) } + url.AddAttribute(constant.MatchCondition, item.Match) urls = append(urls, url) } } diff --git a/registry/base_configuration_listener.go b/registry/base_configuration_listener.go index 88f1867645..bbca23a3c1 100644 --- a/registry/base_configuration_listener.go +++ b/registry/base_configuration_listener.go @@ -69,7 +69,7 @@ func (bcl *BaseConfigurationListener) InitWith(key string, listener config_cente // Process the notification event once there's any change happens on the config. func (bcl *BaseConfigurationListener) Process(event *config_center.ConfigChangeEvent) { - logger.Debugf("Notification of overriding rule, change type is: %v , raw config content is:%v", event.ConfigType, event.Value) + logger.Infof("Notification of overriding rule, change type is: %v , raw config content is:%v", event.ConfigType, event.Value) if event.ConfigType == remoting.EventTypeDel { bcl.configurators = nil } else { diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 903fbdb118..5fc2d783c2 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -493,7 +493,7 @@ type referenceConfigurationListener struct { func newReferenceConfigurationListener(dir *RegistryDirectory, url *common.URL) *referenceConfigurationListener { listener := &referenceConfigurationListener{directory: dir, url: url} listener.InitWith( - url.EncodedServiceKey()+constant.ConfiguratorSuffix, + url.ColonSeparatedKey()+constant.ConfiguratorSuffix, listener, extension.GetDefaultConfiguratorFunc(), ) diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index 822f622075..88988b5077 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -545,7 +545,7 @@ type serviceConfigurationListener struct { func newServiceConfigurationListener(overrideListener *overrideSubscribeListener, providerUrl *common.URL) *serviceConfigurationListener { listener := &serviceConfigurationListener{overrideListener: overrideListener, providerUrl: providerUrl} listener.InitWith( - providerUrl.EncodedServiceKey()+constant.ConfiguratorSuffix, + providerUrl.ColonSeparatedKey()+constant.ConfiguratorSuffix, listener, extension.GetDefaultConfiguratorFunc(), )