From 2e01ed5bee64bac3a09d859278768be3dfa386cc Mon Sep 17 00:00:00 2001 From: watermelo <80680489@qq.com> Date: Sun, 19 Jul 2020 22:38:24 +0800 Subject: [PATCH 1/7] Ftr: add dynamic tag router --- cluster/router/condition/listenable_router.go | 2 +- cluster/router/tag/file.go | 2 +- cluster/router/tag/router_rule.go | 45 ++++++++ cluster/router/tag/tag.go | 39 +++++++ cluster/router/tag/tag_router.go | 105 +++++++++++++++++- 5 files changed, 187 insertions(+), 6 deletions(-) create mode 100644 cluster/router/tag/tag.go diff --git a/cluster/router/condition/listenable_router.go b/cluster/router/condition/listenable_router.go index 4ccc19e955..7f4f14a8e4 100644 --- a/cluster/router/condition/listenable_router.go +++ b/cluster/router/condition/listenable_router.go @@ -85,7 +85,7 @@ func newListenableRouter(url *common.URL, ruleKey string) (*AppRouter, error) { return l, nil } -// Process Process config change event , generate routers and set them to the listenableRouter instance +// Process Process config change event, generate routers and set them to the listenableRouter instance func (l *listenableRouter) Process(event *config_center.ConfigChangeEvent) { logger.Infof("Notification of condition rule, change type is:[%s] , raw rule is:[%v]", event.ConfigType, event.Value) if remoting.EventTypeDel == event.ConfigType { diff --git a/cluster/router/tag/file.go b/cluster/router/tag/file.go index 8144c83203..433abcb72e 100644 --- a/cluster/router/tag/file.go +++ b/cluster/router/tag/file.go @@ -42,7 +42,7 @@ type FileTagRouter struct { force bool } -// NewFileTagRouter Create file tag router instance with content ( from config file) +// NewFileTagRouter Create file tag router instance with content (from config file) func NewFileTagRouter(content []byte) (*FileTagRouter, error) { fileRouter := &FileTagRouter{} rule, err := getRule(string(content)) diff --git a/cluster/router/tag/router_rule.go b/cluster/router/tag/router_rule.go index 926446dcb2..b2b6659054 100644 --- a/cluster/router/tag/router_rule.go +++ b/cluster/router/tag/router_rule.go @@ -22,9 +22,27 @@ import ( "github.com/apache/dubbo-go/common/yaml" ) +/** + * %YAML1.2 + * --- + * force: true + * runtime: false + * enabled: true + * priority: 1 + * key: demo-provider + * tags: + * - name: tag1 + * addresses: [ip1, ip2] + * - name: tag2 + * addresses: [ip3, ip4] + * ... + */ // RouterRule RouterRule config read from config file or config center type RouterRule struct { router.BaseRouterRule `yaml:",inline""` + tags []tag + addressToTagNames map[string][]string + tagNameToAddresses map[string][]string } func getRule(rawRule string) (*RouterRule, error) { @@ -34,5 +52,32 @@ func getRule(rawRule string) (*RouterRule, error) { return r, err } r.RawRule = rawRule + // TODO init tags return r, nil } + +func (t *RouterRule) getAddresses() []string { + // TODO get all tag addresses + return nil +} + +func (t *RouterRule) getTagNames() []string { + // TODO get all tag names + return nil +} + +func (t *RouterRule) getAddressToTagNames() map[string][]string { + return t.addressToTagNames +} + +func (t *RouterRule) getTagNameToAddresses() map[string][]string { + return t.tagNameToAddresses +} + +func (t *RouterRule) getTags() []tag { + return t.tags +} + +func (t *RouterRule) setTags(tags []tag) { + t.tags = tags +} diff --git a/cluster/router/tag/tag.go b/cluster/router/tag/tag.go new file mode 100644 index 0000000000..07d719af86 --- /dev/null +++ b/cluster/router/tag/tag.go @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package tag + +type tag struct { + name string + addresses []string +} + +func (t *tag) getName() string { + return t.name +} + +func (t *tag) setName(name string) { + t.name = name +} + +func (t *tag) getAddresses() []string { + return t.addresses +} + +func (t *tag) setAddresses(addresses []string) { + t.addresses = addresses +} diff --git a/cluster/router/tag/tag_router.go b/cluster/router/tag/tag_router.go index 87da418943..ff1209dd45 100644 --- a/cluster/router/tag/tag_router.go +++ b/cluster/router/tag/tag_router.go @@ -18,6 +18,7 @@ package tag import ( + "fmt" "strconv" ) @@ -28,13 +29,17 @@ import ( import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config_center" "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/remoting" ) type tagRouter struct { - url *common.URL - enabled bool - priority int64 + url *common.URL + tagRouterRule *RouterRule + enabled bool + priority int64 } func NewTagRouter(url *common.URL) (*tagRouter, error) { @@ -59,7 +64,81 @@ func (c *tagRouter) Route(invokers []protocol.Invoker, url *common.URL, invocati if len(invokers) == 0 { return invokers } - return filterUsingStaticTag(invokers, url, invocation) + // since the rule can be changed by config center, we should copy one to use. + tagRouterRuleCopy := c.tagRouterRule + if tagRouterRuleCopy == nil || !tagRouterRuleCopy.Valid || !tagRouterRuleCopy.Enabled { + return filterUsingStaticTag(invokers, url, invocation) + } + tag, ok := invocation.Attachments()[constant.Tagkey] + if !ok { + tag = url.GetParam(constant.Tagkey, "") + } + var ( + result []protocol.Invoker + addresses []string + ) + if tag != "" { + addresses, _ = tagRouterRuleCopy.getTagNameToAddresses()[tag] + // filter by dynamic tag group first + if len(addresses) > 0 { + // TODO filter invokers + result = nil + if len(result) > 0 || tagRouterRuleCopy.Force { + return result + } + } else { + // dynamic tag group doesn't have any item about the requested app OR it's null after filtered by + // dynamic tag group but force=false. check static tag + // TODO filter invokers + return result + } + // If there's no tagged providers that can match the current tagged request. force.tag is set by default + // to false, which means it will invoke any providers without a tag unless it's explicitly disallowed. + if len(result) > 0 || isForceUseTag(url, invocation) { + return result + } else { + // FAILOVER: return all Providers without any tags. + // TODO filter invokers + return result + } + } else { + // return all addresses in dynamic tag group. + addresses = tagRouterRuleCopy.getAddresses() + if len(addresses) > 0 { + // TODO filter invokers + // 1. all addresses are in dynamic tag group, return empty list. + if len(result) == 0 { + return result + } + // 2. if there are some addresses that are not in any dynamic tag group, continue to filter using the + // static tag group. + } + // TODO filter invokers + return result + } +} + +func (c *tagRouter) Process(event *config_center.ConfigChangeEvent) { + logger.Infof("Notification of dynamic tag rule, change type is:[%s] , raw rule is:[%v]", event.ConfigType, event.Value) + if remoting.EventTypeDel == event.ConfigType { + c.tagRouterRule = nil + return + } else { + content, ok := event.Value.(string) + if !ok { + msg := fmt.Sprintf("Convert event content fail,raw content:[%s] ", event.Value) + logger.Error(msg) + return + } + + routerRule, err := getRule(content) + if err != nil { + logger.Errorf("Parse dynamic tag router rule fail,error:[%s] ", err) + return + } + c.tagRouterRule = routerRule + return + } } func (c *tagRouter) URL() common.URL { @@ -92,3 +171,21 @@ func isForceUseTag(url *common.URL, invocation protocol.Invocation) bool { } return false } + +func addressMatches(url *common.URL, addresses []string) bool { + return len(addresses) > 0 && checkAddressMatch(addresses, url.Ip, url.Port) +} + +func addressNotMatches(url *common.URL, addresses []string) bool { + return len(addresses) == 0 || !checkAddressMatch(addresses, url.Ip, url.Port) +} + +func checkAddressMatch(addresses []string, host, port string) bool { + for _, address := range addresses { + // TODO address parse + if address == (host + port) { + return true + } + } + return false +} From 71095a35998513b9a8632e59e85c81cc1b45556e Mon Sep 17 00:00:00 2001 From: watermelo <80680489@qq.com> Date: Mon, 27 Jul 2020 01:13:24 +0800 Subject: [PATCH 2/7] Add: add unit tests for tag router --- cluster/router/tag/router_rule.go | 46 ++++++++++++---- cluster/router/tag/router_rule_test.go | 55 ++++++++++++++++--- cluster/router/tag/tag.go | 22 ++++---- cluster/router/tag/tag_router.go | 76 +++++++++++++++++++++----- cluster/router/tag/tag_router_test.go | 23 ++++++++ 5 files changed, 177 insertions(+), 45 deletions(-) diff --git a/cluster/router/tag/router_rule.go b/cluster/router/tag/router_rule.go index b2b6659054..dc7a446783 100644 --- a/cluster/router/tag/router_rule.go +++ b/cluster/router/tag/router_rule.go @@ -40,7 +40,7 @@ import ( // RouterRule RouterRule config read from config file or config center type RouterRule struct { router.BaseRouterRule `yaml:",inline""` - tags []tag + Tags []Tag addressToTagNames map[string][]string tagNameToAddresses map[string][]string } @@ -52,18 +52,44 @@ func getRule(rawRule string) (*RouterRule, error) { return r, err } r.RawRule = rawRule - // TODO init tags + r.init() return r, nil } +func (t *RouterRule) init() { + t.addressToTagNames = make(map[string][]string) + t.tagNameToAddresses = make(map[string][]string) + for _, tag := range t.Tags { + for _, address := range tag.Addresses { + t.addressToTagNames[address] = append(t.addressToTagNames[address], tag.Name) + } + t.tagNameToAddresses[tag.Name] = tag.Addresses + } +} + func (t *RouterRule) getAddresses() []string { - // TODO get all tag addresses - return nil + var result []string + for _, tag := range t.Tags { + result = append(result, tag.Addresses...) + } + return result } func (t *RouterRule) getTagNames() []string { - // TODO get all tag names - return nil + var result []string + for _, tag := range t.Tags { + result = append(result, tag.Name) + } + return result +} + +func (t *RouterRule) hasTag(tag string) bool { + for _, t := range t.Tags { + if tag == t.Name { + return true + } + } + return false } func (t *RouterRule) getAddressToTagNames() map[string][]string { @@ -74,10 +100,10 @@ func (t *RouterRule) getTagNameToAddresses() map[string][]string { return t.tagNameToAddresses } -func (t *RouterRule) getTags() []tag { - return t.tags +func (t *RouterRule) getTags() []Tag { + return t.Tags } -func (t *RouterRule) setTags(tags []tag) { - t.tags = tags +func (t *RouterRule) setTags(tags []Tag) { + t.Tags = tags } diff --git a/cluster/router/tag/router_rule_test.go b/cluster/router/tag/router_rule_test.go index 2df65193f9..4e0f5b729e 100644 --- a/cluster/router/tag/router_rule_test.go +++ b/cluster/router/tag/router_rule_test.go @@ -22,19 +22,56 @@ import ( ) import ( - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" ) -func TestGetRule(t *testing.T) { +type RuleTestSuite struct { + suite.Suite + rule *RouterRule +} + +func (suite *RuleTestSuite) SetupTest() { + var err error yml := ` scope: application -runtime: true force: true +runtime: false +enabled: true +priority: 1 +key: demo-provider +tags: + - name: tag1 + addresses: [ip1, ip2] + - name: tag2 + addresses: [ip3, ip4] ` - rule, e := getRule(yml) - assert.Nil(t, e) - assert.NotNil(t, rule) - assert.Equal(t, true, rule.Force) - assert.Equal(t, true, rule.Runtime) - assert.Equal(t, "application", rule.Scope) + suite.rule, err = getRule(yml) + suite.Nil(err) +} + +func (suite *RuleTestSuite) TestGetRule() { + var err error + suite.Equal(true, suite.rule.Force) + suite.Equal(false, suite.rule.Runtime) + suite.Equal("application", suite.rule.Scope) + suite.Equal(1, suite.rule.Priority) + suite.Equal("demo-provider", suite.rule.Key) + suite.Nil(err) +} + +func (suite *RuleTestSuite) TestGetTagNames() { + suite.Equal([]string{"tag1", "tag2"}, suite.rule.getTagNames()) +} + +func (suite *RuleTestSuite) TestGetAddresses() { + suite.Equal([]string{"ip1", "ip2", "ip3", "ip4"}, suite.rule.getAddresses()) +} + +func (suite *RuleTestSuite) TestHasTag() { + suite.Equal(true, suite.rule.hasTag("tag1")) + suite.Equal(false, suite.rule.hasTag("tag404")) +} + +func TestRuleTestSuite(t *testing.T) { + suite.Run(t, new(RuleTestSuite)) } diff --git a/cluster/router/tag/tag.go b/cluster/router/tag/tag.go index 07d719af86..73d10b5db4 100644 --- a/cluster/router/tag/tag.go +++ b/cluster/router/tag/tag.go @@ -17,23 +17,23 @@ package tag -type tag struct { - name string - addresses []string +type Tag struct { + Name string + Addresses []string } -func (t *tag) getName() string { - return t.name +func (t *Tag) getName() string { + return t.Name } -func (t *tag) setName(name string) { - t.name = name +func (t *Tag) setName(name string) { + t.Name = name } -func (t *tag) getAddresses() []string { - return t.addresses +func (t *Tag) getAddresses() []string { + return t.Addresses } -func (t *tag) setAddresses(addresses []string) { - t.addresses = addresses +func (t *Tag) setAddresses(addresses []string) { + t.Addresses = addresses } diff --git a/cluster/router/tag/tag_router.go b/cluster/router/tag/tag_router.go index ff1209dd45..7417efcf4d 100644 --- a/cluster/router/tag/tag_router.go +++ b/cluster/router/tag/tag_router.go @@ -81,16 +81,20 @@ func (c *tagRouter) Route(invokers []protocol.Invoker, url *common.URL, invocati addresses, _ = tagRouterRuleCopy.getTagNameToAddresses()[tag] // filter by dynamic tag group first if len(addresses) > 0 { - // TODO filter invokers - result = nil + result = filterAddressMatches(invokers, addresses) if len(result) > 0 || tagRouterRuleCopy.Force { return result } } else { // dynamic tag group doesn't have any item about the requested app OR it's null after filtered by // dynamic tag group but force=false. check static tag - // TODO filter invokers - return result + cond := func(invoker protocol.Invoker) bool { + if invoker.GetUrl().GetParam(constant.Tagkey, "") == tag { + return true + } + return false + } + result = filterCondition(invokers, cond) } // If there's no tagged providers that can match the current tagged request. force.tag is set by default // to false, which means it will invoke any providers without a tag unless it's explicitly disallowed. @@ -98,14 +102,20 @@ func (c *tagRouter) Route(invokers []protocol.Invoker, url *common.URL, invocati return result } else { // FAILOVER: return all Providers without any tags. - // TODO filter invokers - return result + result = filterAddressNotMatches(invokers, tagRouterRuleCopy.getAddresses()) + cond := func(invoker protocol.Invoker) bool { + if invoker.GetUrl().GetParam(constant.Tagkey, "") == "" { + return true + } + return false + } + return filterCondition(result, cond) } } else { // return all addresses in dynamic tag group. addresses = tagRouterRuleCopy.getAddresses() if len(addresses) > 0 { - // TODO filter invokers + result = filterAddressNotMatches(invokers, addresses) // 1. all addresses are in dynamic tag group, return empty list. if len(result) == 0 { return result @@ -113,8 +123,11 @@ func (c *tagRouter) Route(invokers []protocol.Invoker, url *common.URL, invocati // 2. if there are some addresses that are not in any dynamic tag group, continue to filter using the // static tag group. } - // TODO filter invokers - return result + cond := func(invoker protocol.Invoker) bool { + localTag := invoker.GetUrl().GetParam(constant.Tagkey, "") + return localTag == "" || !(tagRouterRuleCopy.hasTag(localTag)) + } + return filterCondition(result, cond) } } @@ -172,18 +185,51 @@ func isForceUseTag(url *common.URL, invocation protocol.Invocation) bool { return false } -func addressMatches(url *common.URL, addresses []string) bool { - return len(addresses) > 0 && checkAddressMatch(addresses, url.Ip, url.Port) +func filterAddressMatches(invokers []protocol.Invoker, addresses []string) []protocol.Invoker { + var idx int + for _, invoker := range invokers { + url := invoker.GetUrl() + if !(len(addresses) > 0 && checkAddressMatch(addresses, url.Ip, url.Port)) { + continue + } + invokers[idx] = invoker + idx++ + } + return invokers[:idx] +} + +func filterAddressNotMatches(invokers []protocol.Invoker, addresses []string) []protocol.Invoker { + var idx int + for _, invoker := range invokers { + url := invoker.GetUrl() + if !(len(addresses) == 0 || !checkAddressMatch(addresses, url.Ip, url.Port)) { + continue + } + invokers[idx] = invoker + idx++ + } + return invokers[:idx] } -func addressNotMatches(url *common.URL, addresses []string) bool { - return len(addresses) == 0 || !checkAddressMatch(addresses, url.Ip, url.Port) +func filterCondition(invokers []protocol.Invoker, condition func(protocol.Invoker) bool) []protocol.Invoker { + var idx int + for _, invoker := range invokers { + if !condition(invoker) { + continue + } + invokers[idx] = invoker + idx++ + } + return invokers[:idx] } func checkAddressMatch(addresses []string, host, port string) bool { for _, address := range addresses { - // TODO address parse - if address == (host + port) { + // TODO ip match + if address == host+":"+port { + return true + } + if address == constant.ANYHOST_VALUE+":"+port { return true } } diff --git a/cluster/router/tag/tag_router_test.go b/cluster/router/tag/tag_router_test.go index 000b3ec672..3a053de4b0 100644 --- a/cluster/router/tag/tag_router_test.go +++ b/cluster/router/tag/tag_router_test.go @@ -19,6 +19,7 @@ package tag import ( "context" + "github.com/apache/dubbo-go/common/constant" "testing" ) @@ -160,3 +161,25 @@ func TestTagRouterRouteNoForce(t *testing.T) { invRst2 := tagRouter.Route(invokers, &u1, inv) assert.Equal(t, 3, len(invRst2)) } + +func TestFilterCondition(t *testing.T) { + u2, e2 := common.NewURL(tagRouterTestHangZhouUrl) + u3, e3 := common.NewURL(tagRouterTestShangHaiUrl) + u4, e4 := common.NewURL(tagRouterTestBeijingUrl) + assert.Nil(t, e2) + assert.Nil(t, e3) + assert.Nil(t, e4) + inv2 := NewMockInvoker(u2) + inv3 := NewMockInvoker(u3) + inv4 := NewMockInvoker(u4) + var invokers []protocol.Invoker + invokers = append(invokers, inv2, inv3, inv4) + cond := func(invoker protocol.Invoker) bool { + if invoker.GetUrl().GetParam(constant.Tagkey, "") == "beijing" { + return true + } + return false + } + res := filterCondition(invokers, cond) + assert.Equal(t, []protocol.Invoker{inv4}, res) +} From 4ef21cbbd82241d0c02700a64aeb517b4887e125 Mon Sep 17 00:00:00 2001 From: watermelo <80680489@qq.com> Date: Thu, 30 Jul 2020 00:23:56 +0800 Subject: [PATCH 3/7] Add: add ip address match function --- cluster/router/tag/tag_router.go | 215 +++++++++++++++++++++----- cluster/router/tag/tag_router_test.go | 35 +++-- 2 files changed, 198 insertions(+), 52 deletions(-) diff --git a/cluster/router/tag/tag_router.go b/cluster/router/tag/tag_router.go index 7417efcf4d..0ef7387f25 100644 --- a/cluster/router/tag/tag_router.go +++ b/cluster/router/tag/tag_router.go @@ -18,8 +18,11 @@ package tag import ( + "errors" "fmt" + "net" "strconv" + "strings" ) import ( @@ -81,20 +84,27 @@ func (c *tagRouter) Route(invokers []protocol.Invoker, url *common.URL, invocati addresses, _ = tagRouterRuleCopy.getTagNameToAddresses()[tag] // filter by dynamic tag group first if len(addresses) > 0 { - result = filterAddressMatches(invokers, addresses) + filterAddressMatches := func(invoker protocol.Invoker) bool { + url := invoker.GetUrl() + if len(addresses) > 0 && checkAddressMatch(addresses, url.Ip, url.Port) { + return true + } + return false + } + result = filterInvoker(invokers, filterAddressMatches) if len(result) > 0 || tagRouterRuleCopy.Force { return result } } else { // dynamic tag group doesn't have any item about the requested app OR it's null after filtered by // dynamic tag group but force=false. check static tag - cond := func(invoker protocol.Invoker) bool { + filter := func(invoker protocol.Invoker) bool { if invoker.GetUrl().GetParam(constant.Tagkey, "") == tag { return true } return false } - result = filterCondition(invokers, cond) + result = filterInvoker(invokers, filter) } // If there's no tagged providers that can match the current tagged request. force.tag is set by default // to false, which means it will invoke any providers without a tag unless it's explicitly disallowed. @@ -102,20 +112,33 @@ func (c *tagRouter) Route(invokers []protocol.Invoker, url *common.URL, invocati return result } else { // FAILOVER: return all Providers without any tags. - result = filterAddressNotMatches(invokers, tagRouterRuleCopy.getAddresses()) - cond := func(invoker protocol.Invoker) bool { + filterAddressNotMatches := func(invoker protocol.Invoker) bool { + url := invoker.GetUrl() + if len(addresses) == 0 || !checkAddressMatch(tagRouterRuleCopy.getAddresses(), url.Ip, url.Port) { + return true + } + return false + } + filterTagIsEmpty := func(invoker protocol.Invoker) bool { if invoker.GetUrl().GetParam(constant.Tagkey, "") == "" { return true } return false } - return filterCondition(result, cond) + return filterInvoker(invokers, filterAddressNotMatches, filterTagIsEmpty) } } else { // return all addresses in dynamic tag group. addresses = tagRouterRuleCopy.getAddresses() if len(addresses) > 0 { - result = filterAddressNotMatches(invokers, addresses) + filterAddressNotMatches := func(invoker protocol.Invoker) bool { + url := invoker.GetUrl() + if len(addresses) == 0 || !checkAddressMatch(addresses, url.Ip, url.Port) { + return true + } + return false + } + result = filterInvoker(invokers, filterAddressNotMatches) // 1. all addresses are in dynamic tag group, return empty list. if len(result) == 0 { return result @@ -123,11 +146,11 @@ func (c *tagRouter) Route(invokers []protocol.Invoker, url *common.URL, invocati // 2. if there are some addresses that are not in any dynamic tag group, continue to filter using the // static tag group. } - cond := func(invoker protocol.Invoker) bool { + filter := func(invoker protocol.Invoker) bool { localTag := invoker.GetUrl().GetParam(constant.Tagkey, "") return localTag == "" || !(tagRouterRuleCopy.hasTag(localTag)) } - return filterCondition(result, cond) + return filterInvoker(result, filter) } } @@ -185,53 +208,163 @@ func isForceUseTag(url *common.URL, invocation protocol.Invocation) bool { return false } -func filterAddressMatches(invokers []protocol.Invoker, addresses []string) []protocol.Invoker { - var idx int +type filter func(protocol.Invoker) bool + +func filterInvoker(invokers []protocol.Invoker, filters ...filter) []protocol.Invoker { + var res []protocol.Invoker +OUTER: for _, invoker := range invokers { - url := invoker.GetUrl() - if !(len(addresses) > 0 && checkAddressMatch(addresses, url.Ip, url.Port)) { - continue + for _, filter := range filters { + if !filter(invoker) { + continue OUTER + } } - invokers[idx] = invoker - idx++ + res = append(res, invoker) } - return invokers[:idx] + return res } -func filterAddressNotMatches(invokers []protocol.Invoker, addresses []string) []protocol.Invoker { - var idx int - for _, invoker := range invokers { - url := invoker.GetUrl() - if !(len(addresses) == 0 || !checkAddressMatch(addresses, url.Ip, url.Port)) { - continue +// TODO 需要搬到 dubbogo/gost, 可以先 review +func checkAddressMatch(addresses []string, host, port string) bool { + for _, address := range addresses { + if matchIp(address, host, port) { + return true + } + if address == constant.ANYHOST_VALUE+":"+port { + return true } - invokers[idx] = invoker - idx++ } - return invokers[:idx] + return false } -func filterCondition(invokers []protocol.Invoker, condition func(protocol.Invoker) bool) []protocol.Invoker { - var idx int - for _, invoker := range invokers { - if !condition(invoker) { +func matchIp(pattern, host, port string) bool { + // if the pattern is subnet format, it will not be allowed to config port param in pattern. + if strings.Contains(pattern, "/") { + _, subnet, _ := net.ParseCIDR(pattern) + if subnet != nil && subnet.Contains(net.ParseIP(host)) { + return true + } + return false + } + return matchIpRange(pattern, host, port) +} + +func matchIpRange(pattern, host, port string) bool { + if pattern == "" || host == "" { + logger.Error("Illegal Argument pattern or hostName. Pattern:" + pattern + ", Host:" + host) + return false + } + + pattern = strings.TrimSpace(pattern) + if "*.*.*.*" == pattern || "*" == pattern { + return true + } + + isIpv4 := true + ip4 := net.ParseIP(host).To4() + + if ip4 == nil { + isIpv4 = false + } + + hostAndPort := getPatternHostAndPort(pattern, isIpv4) + if hostAndPort[1] != "" && hostAndPort[1] != port { + return false + } + + pattern = hostAndPort[0] + // TODO 常量化 + splitCharacter := "\\." + if !isIpv4 { + splitCharacter = ":" + } + + mask := strings.Split(pattern, splitCharacter) + // check format of pattern + if err := checkHostPattern(pattern, mask, isIpv4); err != nil { + logger.Error(err) + return false + } + + if pattern == host { + return true + } + + // short name condition + if !ipPatternContains(pattern) { + return pattern == host + } + + // ip 段 + ipAddress := strings.Split(host, splitCharacter) + for i := 0; i < len(mask); i++ { + if "*" == mask[i] || mask[i] == ipAddress[i] { + continue + } else if strings.Contains(mask[i], "-") { + rangeNumStrs := strings.Split(mask[i], "-") + if len(rangeNumStrs) != 2 { + logger.Error("There is wrong format of ip Address: " + mask[i]) + return false + } + min := getNumOfIpSegment(rangeNumStrs[0], isIpv4) + max := getNumOfIpSegment(rangeNumStrs[1], isIpv4) + ip := getNumOfIpSegment(ipAddress[i], isIpv4) + if ip < min || ip > max { + return false + } + } else if "0" == ipAddress[i] && "0" == mask[i] || "00" == mask[i] || "000" == mask[i] || "0000" == mask[i] { continue + } else if mask[i] != ipAddress[i] { + return false } - invokers[idx] = invoker - idx++ } - return invokers[:idx] + return true } -func checkAddressMatch(addresses []string, host, port string) bool { - for _, address := range addresses { - // TODO ip match - if address == host+":"+port { - return true +func ipPatternContains(pattern string) bool { + return strings.Contains(pattern, "*") || strings.Contains(pattern, "-") +} + +func checkHostPattern(pattern string, mask []string, isIpv4 bool) error { + if !isIpv4 { + if len(mask) != 8 && ipPatternContains(pattern) { + return errors.New("If you config ip expression that contains '*' or '-', please fill qualified ip pattern like 234e:0:4567:0:0:0:3d:*. ") } - if address == constant.ANYHOST_VALUE+":"+port { - return true + if len(mask) != 8 && !strings.Contains(pattern, "::") { + return errors.New("The host is ipv6, but the pattern is not ipv6 pattern : " + pattern) + } + } else { + if len(mask) != 4 { + return errors.New("The host is ipv4, but the pattern is not ipv4 pattern : " + pattern) } } - return false + return nil +} + +func getPatternHostAndPort(pattern string, isIpv4 bool) []string { + result := make([]string, 2) + if strings.HasPrefix(pattern, "[") && strings.Contains(pattern, "]:") { + end := strings.Index(pattern, "]:") + result[0] = pattern[1:end] + result[1] = pattern[end+2:] + } else if strings.HasPrefix(pattern, "[") && strings.HasSuffix(pattern, "]") { + result[0] = pattern[1 : len(pattern)-1] + result[1] = "" + } else if isIpv4 && strings.Contains(pattern, ":") { + end := strings.Index(pattern, ":") + result[0] = pattern[:end] + result[1] = pattern[end+1:] + } else { + result[0] = pattern + } + return result +} + +func getNumOfIpSegment(ipSegment string, isIpv4 bool) int { + if isIpv4 { + ipSeg, _ := strconv.Atoi(ipSegment) + return ipSeg + } + ipSeg, _ := strconv.ParseInt(ipSegment, 0, 16) + return int(ipSeg) } diff --git a/cluster/router/tag/tag_router_test.go b/cluster/router/tag/tag_router_test.go index 3a053de4b0..4f2a8a9fca 100644 --- a/cluster/router/tag/tag_router_test.go +++ b/cluster/router/tag/tag_router_test.go @@ -19,7 +19,6 @@ package tag import ( "context" - "github.com/apache/dubbo-go/common/constant" "testing" ) @@ -29,16 +28,18 @@ import ( import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" ) const ( - tagRouterTestHangZhouUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=hangzhou" - tagRouterTestShangHaiUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=shanghai" - tagRouterTestBeijingUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=beijing" - tagRouterTestUserConsumer = "dubbo://127.0.0.1:20000/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true" - tagRouterTestUserConsumerTag = "dubbo://127.0.0.1:20000/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&dubbo.force.tag=true" + tagRouterTestHangZhouUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=hangzhou" + tagRouterTestShangHaiUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=shanghai" + tagRouterTestBeijingUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=beijing" + tagRouterTestEnabledBeijingUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=false&dubbo.tag=beijing" + tagRouterTestUserConsumer = "dubbo://127.0.0.1:20000/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true" + tagRouterTestUserConsumerTag = "dubbo://127.0.0.1:20000/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&dubbo.force.tag=true" tagRouterTestDubboTag = "dubbo.tag" tagRouterTestDubboForceTag = "dubbo.force.tag" @@ -162,24 +163,36 @@ func TestTagRouterRouteNoForce(t *testing.T) { assert.Equal(t, 3, len(invRst2)) } -func TestFilterCondition(t *testing.T) { +func TestFilterInvoker(t *testing.T) { u2, e2 := common.NewURL(tagRouterTestHangZhouUrl) u3, e3 := common.NewURL(tagRouterTestShangHaiUrl) u4, e4 := common.NewURL(tagRouterTestBeijingUrl) + u5, e5 := common.NewURL(tagRouterTestEnabledBeijingUrl) assert.Nil(t, e2) assert.Nil(t, e3) assert.Nil(t, e4) + assert.Nil(t, e5) inv2 := NewMockInvoker(u2) inv3 := NewMockInvoker(u3) inv4 := NewMockInvoker(u4) + inv5 := NewMockInvoker(u5) var invokers []protocol.Invoker - invokers = append(invokers, inv2, inv3, inv4) - cond := func(invoker protocol.Invoker) bool { + invokers = append(invokers, inv2, inv3, inv4, inv5) + filterTag := func(invoker protocol.Invoker) bool { if invoker.GetUrl().GetParam(constant.Tagkey, "") == "beijing" { return true } return false } - res := filterCondition(invokers, cond) - assert.Equal(t, []protocol.Invoker{inv4}, res) + res := filterInvoker(invokers, filterTag) + assert.Equal(t, []protocol.Invoker{inv4, inv5}, res) + flag := true + filterEnabled := func(invoker protocol.Invoker) bool { + if invoker.GetUrl().GetParamBool(constant.RouterEnabled, false) == flag { + return true + } + return false + } + res2 := filterInvoker(invokers, filterTag, filterEnabled) + assert.Equal(t, []protocol.Invoker{inv4}, res2) } From 891ec6cbb7ed0bda93d9faebcde718fc6a97c56c Mon Sep 17 00:00:00 2001 From: watermelo <80680489@qq.com> Date: Thu, 30 Jul 2020 01:28:18 +0800 Subject: [PATCH 4/7] Add: add listener for tag router --- cluster/router/tag/tag_router.go | 57 +++++++++++++++++++++++++++++--- common/constant/key.go | 3 ++ 2 files changed, 56 insertions(+), 4 deletions(-) diff --git a/cluster/router/tag/tag_router.go b/cluster/router/tag/tag_router.go index 0ef7387f25..8389fc26ad 100644 --- a/cluster/router/tag/tag_router.go +++ b/cluster/router/tag/tag_router.go @@ -20,6 +20,7 @@ package tag import ( "errors" "fmt" + "github.com/apache/dubbo-go/common/config" "net" "strconv" "strings" @@ -43,6 +44,7 @@ type tagRouter struct { tagRouterRule *RouterRule enabled bool priority int64 + application string } func NewTagRouter(url *common.URL) (*tagRouter, error) { @@ -60,6 +62,15 @@ func (c *tagRouter) isEnabled() bool { return c.enabled } +func (c *tagRouter) SetApplication(app string) { + c.application = app +} + +func (c *tagRouter) tagRouterRuleCopy() RouterRule { + routerRule := *c.tagRouterRule + return routerRule +} + func (c *tagRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker { if !c.isEnabled() { return invokers @@ -68,8 +79,8 @@ func (c *tagRouter) Route(invokers []protocol.Invoker, url *common.URL, invocati return invokers } // since the rule can be changed by config center, we should copy one to use. - tagRouterRuleCopy := c.tagRouterRule - if tagRouterRuleCopy == nil || !tagRouterRuleCopy.Valid || !tagRouterRuleCopy.Enabled { + tagRouterRuleCopy := c.tagRouterRuleCopy() + if !tagRouterRuleCopy.Valid || !tagRouterRuleCopy.Enabled { return filterUsingStaticTag(invokers, url, invocation) } tag, ok := invocation.Attachments()[constant.Tagkey] @@ -155,7 +166,7 @@ func (c *tagRouter) Route(invokers []protocol.Invoker, url *common.URL, invocati } func (c *tagRouter) Process(event *config_center.ConfigChangeEvent) { - logger.Infof("Notification of dynamic tag rule, change type is:[%s] , raw rule is:[%v]", event.ConfigType, event.Value) + logger.Infof("Notification of tag rule, change type is:[%s] , raw rule is:[%v]", event.ConfigType, event.Value) if remoting.EventTypeDel == event.ConfigType { c.tagRouterRule = nil return @@ -177,6 +188,44 @@ func (c *tagRouter) Process(event *config_center.ConfigChangeEvent) { } } +func (c *tagRouter) Notify(invokers []protocol.Invoker) { + if len(invokers) == 0 { + return + } + invoker := invokers[0] + url := invoker.GetUrl() + providerApplication := url.GetParam(constant.RemoteApplicationKey, "") + if providerApplication == "" { + logger.Error("TagRouter must getConfig from or subscribe to a specific application, but the application " + + "in this TagRouter is not specified.") + return + } + dynamicConfiguration := config.GetEnvInstance().GetDynamicConfiguration() + if dynamicConfiguration == nil { + logger.Error("Get dynamicConfiguration fail, dynamicConfiguration is nil, init config center plugin please") + return + } + + if providerApplication != c.application { + dynamicConfiguration.RemoveListener(c.application+constant.TagRouterRuleSuffix, c) + } + + routerKey := providerApplication + constant.TagRouterRuleSuffix + dynamicConfiguration.AddListener(routerKey, c) + //get rule + rule, err := dynamicConfiguration.GetRule(routerKey, config_center.WithGroup(config_center.DEFAULT_GROUP)) + if len(rule) == 0 || err != nil { + logger.Errorf("Get rule fail, config rule{%s}, error{%v}", rule, err) + return + } + if rule != "" { + c.Process(&config_center.ConfigChangeEvent{ + Key: routerKey, + Value: rule, + ConfigType: remoting.EventTypeUpdate}) + } +} + func (c *tagRouter) URL() common.URL { return *c.url } @@ -185,6 +234,7 @@ func (c *tagRouter) Priority() int64 { return c.priority } +// If there's no dynamic tag rule being set, use static tag in URL func filterUsingStaticTag(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker { if tag, ok := invocation.Attachments()[constant.Tagkey]; ok { result := make([]protocol.Invoker, 0, 8) @@ -295,7 +345,6 @@ func matchIpRange(pattern, host, port string) bool { return pattern == host } - // ip 段 ipAddress := strings.Split(host, splitCharacter) for i := 0; i < len(mask); i++ { if "*" == mask[i] || mask[i] == ipAddress[i] { diff --git a/common/constant/key.go b/common/constant/key.go index cd23dd0f1a..a86e797ae5 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -187,6 +187,9 @@ const ( HealthCheckRouterName = "health_check" // TagRouterName Specify the name of TagRouter TagRouterName = "tag" + // TagRouterRuleSuffix Specify tag router suffix + TagRouterRuleSuffix = ".tag-router" + RemoteApplicationKey = "remote.application" // ConditionRouterRuleSuffix Specify condition router suffix ConditionRouterRuleSuffix = ".condition-router" From 8580ad0798097de164d9b855584757faeee6eb50 Mon Sep 17 00:00:00 2001 From: watermelo <80680489@qq.com> Date: Thu, 30 Jul 2020 11:54:02 +0800 Subject: [PATCH 5/7] Mod: update tag router unit test --- cluster/router/tag/tag_router.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/cluster/router/tag/tag_router.go b/cluster/router/tag/tag_router.go index 24170c101d..093a31ef36 100644 --- a/cluster/router/tag/tag_router.go +++ b/cluster/router/tag/tag_router.go @@ -20,7 +20,6 @@ package tag import ( "errors" "fmt" - "github.com/apache/dubbo-go/common/config" "net" "strconv" "strings" @@ -32,6 +31,7 @@ import ( import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/config" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/config_center" @@ -70,6 +70,7 @@ func (c *tagRouter) SetApplication(app string) { } func (c *tagRouter) tagRouterRuleCopy() RouterRule { + fmt.Println(c.tagRouterRule, "fuck") routerRule := *c.tagRouterRule return routerRule } @@ -82,11 +83,11 @@ func (c *tagRouter) Route(invokers []protocol.Invoker, url *common.URL, invocati if len(invokers) == 0 { return invokers } - // since the rule can be changed by config center, we should copy one to use. - tagRouterRuleCopy := c.tagRouterRuleCopy() - if !tagRouterRuleCopy.Valid || !tagRouterRuleCopy.Enabled { + if c.tagRouterRule == nil || !c.tagRouterRule.Valid || !c.tagRouterRule.Enabled { return filterUsingStaticTag(invokers, url, invocation) } + // since the rule can be changed by config center, we should copy one to use. + tagRouterRuleCopy := c.tagRouterRuleCopy() tag, ok := invocation.Attachments()[constant.Tagkey] if !ok { tag = url.GetParam(constant.Tagkey, "") From 70c01a5b31c572b50a6139e0c48192345f818592 Mon Sep 17 00:00:00 2001 From: watermelo <80680489@qq.com> Date: Mon, 3 Aug 2020 01:46:22 +0800 Subject: [PATCH 6/7] Add: add unit test for dynamic tag --- cluster/router/condition/app_router_test.go | 2 +- cluster/router/tag/router_rule.go | 8 +- cluster/router/tag/tag_router.go | 18 +- cluster/router/tag/tag_router_test.go | 211 +++++++++++++++++++- 4 files changed, 218 insertions(+), 21 deletions(-) diff --git a/cluster/router/condition/app_router_test.go b/cluster/router/condition/app_router_test.go index 8b38f2dd61..ea18604964 100644 --- a/cluster/router/condition/app_router_test.go +++ b/cluster/router/condition/app_router_test.go @@ -24,7 +24,6 @@ import ( ) import ( - _ "github.com/apache/dubbo-go/config_center/zookeeper" "github.com/stretchr/testify/assert" ) @@ -34,6 +33,7 @@ import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/config_center" + _ "github.com/apache/dubbo-go/config_center/zookeeper" "github.com/apache/dubbo-go/remoting" "github.com/apache/dubbo-go/remoting/zookeeper" ) diff --git a/cluster/router/tag/router_rule.go b/cluster/router/tag/router_rule.go index dc7a446783..5fb7ab151c 100644 --- a/cluster/router/tag/router_rule.go +++ b/cluster/router/tag/router_rule.go @@ -57,8 +57,8 @@ func getRule(rawRule string) (*RouterRule, error) { } func (t *RouterRule) init() { - t.addressToTagNames = make(map[string][]string) - t.tagNameToAddresses = make(map[string][]string) + t.addressToTagNames = make(map[string][]string, 8) + t.tagNameToAddresses = make(map[string][]string, 8) for _, tag := range t.Tags { for _, address := range tag.Addresses { t.addressToTagNames[address] = append(t.addressToTagNames[address], tag.Name) @@ -68,7 +68,7 @@ func (t *RouterRule) init() { } func (t *RouterRule) getAddresses() []string { - var result []string + var result = make([]string, 0, 8*len(t.Tags)) for _, tag := range t.Tags { result = append(result, tag.Addresses...) } @@ -76,7 +76,7 @@ func (t *RouterRule) getAddresses() []string { } func (t *RouterRule) getTagNames() []string { - var result []string + var result = make([]string, 0, len(t.Tags)) for _, tag := range t.Tags { result = append(result, tag.Name) } diff --git a/cluster/router/tag/tag_router.go b/cluster/router/tag/tag_router.go index 093a31ef36..ece950ebc0 100644 --- a/cluster/router/tag/tag_router.go +++ b/cluster/router/tag/tag_router.go @@ -19,7 +19,6 @@ package tag import ( "errors" - "fmt" "net" "strconv" "strings" @@ -70,7 +69,6 @@ func (c *tagRouter) SetApplication(app string) { } func (c *tagRouter) tagRouterRuleCopy() RouterRule { - fmt.Println(c.tagRouterRule, "fuck") routerRule := *c.tagRouterRule return routerRule } @@ -96,7 +94,8 @@ func (c *tagRouter) Route(invokers []protocol.Invoker, url *common.URL, invocati result []protocol.Invoker addresses []string ) - if tag != "" { + // if we are requesting for a Provider with a specific tag + if len(tag) > 0 { addresses, _ = tagRouterRuleCopy.getTagNameToAddresses()[tag] // filter by dynamic tag group first if len(addresses) > 0 { @@ -159,9 +158,9 @@ func (c *tagRouter) Route(invokers []protocol.Invoker, url *common.URL, invocati if len(result) == 0 { return result } - // 2. if there are some addresses that are not in any dynamic tag group, continue to filter using the - // static tag group. } + // 2. if there are some addresses that are not in any dynamic tag group, continue to filter using the + // static tag group. filter := func(invoker protocol.Invoker) bool { localTag := invoker.GetUrl().GetParam(constant.Tagkey, "") return localTag == "" || !(tagRouterRuleCopy.hasTag(localTag)) @@ -178,8 +177,7 @@ func (c *tagRouter) Process(event *config_center.ConfigChangeEvent) { } else { content, ok := event.Value.(string) if !ok { - msg := fmt.Sprintf("Convert event content fail,raw content:[%s] ", event.Value) - logger.Error(msg) + logger.Errorf("Convert event content fail,raw content:[%s] ", event.Value) return } @@ -282,13 +280,13 @@ OUTER: return res } -// TODO 需要搬到 dubbogo/gost, 可以先 review +// TODO: need move to dubbogo/gost func checkAddressMatch(addresses []string, host, port string) bool { for _, address := range addresses { if matchIp(address, host, port) { return true } - if address == constant.ANYHOST_VALUE+":"+port { + if address == net.JoinHostPort(constant.ANYHOST_VALUE, port) { return true } } @@ -332,7 +330,7 @@ func matchIpRange(pattern, host, port string) bool { pattern = hostAndPort[0] // TODO 常量化 - splitCharacter := "\\." + splitCharacter := "." if !isIpv4 { splitCharacter = ":" } diff --git a/cluster/router/tag/tag_router_test.go b/cluster/router/tag/tag_router_test.go index 4f2a8a9fca..e5ddc2890c 100644 --- a/cluster/router/tag/tag_router_test.go +++ b/cluster/router/tag/tag_router_test.go @@ -19,27 +19,43 @@ package tag import ( "context" + "fmt" + "github.com/stretchr/testify/suite" "testing" + "time" ) import ( + "github.com/dubbogo/go-zookeeper/zk" "github.com/stretchr/testify/assert" ) import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/config" "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/config_center" + _ "github.com/apache/dubbo-go/config_center/zookeeper" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/remoting" + "github.com/apache/dubbo-go/remoting/zookeeper" ) const ( - tagRouterTestHangZhouUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=hangzhou" - tagRouterTestShangHaiUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=shanghai" - tagRouterTestBeijingUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=beijing" - tagRouterTestEnabledBeijingUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=false&dubbo.tag=beijing" - tagRouterTestUserConsumer = "dubbo://127.0.0.1:20000/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true" - tagRouterTestUserConsumerTag = "dubbo://127.0.0.1:20000/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&dubbo.force.tag=true" + tagRouterTestHangZhouUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=hangzhou&remote.application=test-tag" + tagRouterTestShangHaiUrl = "dubbo://127.0.0.1:20002/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=shanghai&remote.application=test-tag" + tagRouterTestBeijingUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=beijing&remote.application=test-tag" + tagRouterTestEnabledBeijingUrl = "dubbo://127.0.0.1:20004/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=false&dubbo.tag=beijing&remote.application=test-tag" + tagRouterTestUserConsumer = "dubbo://127.0.0.1:20005/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&remote.application=test-tag" + tagRouterTestUserConsumerTag = "dubbo://127.0.0.1:20000/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&dubbo.force.tag=true&remote.application=test-tag" + + tagRouterTestDynamicIpv4Provider1 = "dubbo://127.0.0.1:20001/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&remote.application=test-tag" + tagRouterTestDynamicIpv4Provider2 = "dubbo://127.0.0.1:20002/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&remote.application=test-tag" + tagRouterTestDynamicIpv4Provider3 = "dubbo://127.0.0.1:20003/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&remote.application=test-tag" + tagRouterTestDynamicIpv4Provider4 = "dubbo://127.0.0.1:20004/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&remote.application=test-tag&dubbo.tag=tag4" + tagRouterTestDynamicIpv4Provider5 = "dubbo://127.0.0.1:20005/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&remote.application=test-tag&dubbo.tag=tag5" tagRouterTestDubboTag = "dubbo.tag" tagRouterTestDubboForceTag = "dubbo.force.tag" @@ -47,6 +63,15 @@ const ( tagRouterTestGuangZhou = "guangzhou" tagRouterTestFalse = "false" tagRouterTestTrue = "true" + + routerPath = "/dubbo/config/dubbo/test-tag.tag-router" + routerLocalIP = "127.0.0.1" + routerZk = "zookeeper" +) + +var ( + zkFormat = "zookeeper://%s:%d" + conditionFormat = "condition://%s/com.foo.BarService" ) // MockInvoker is only mock the Invoker to support test tagRouter @@ -196,3 +221,177 @@ func TestFilterInvoker(t *testing.T) { res2 := filterInvoker(invokers, filterTag, filterEnabled) assert.Equal(t, []protocol.Invoker{inv4}, res2) } + +type DynamicTagRouter struct { + suite.Suite + rule *RouterRule + + route *tagRouter + zkClient *zookeeper.ZookeeperClient + testCluster *zk.TestCluster + invokers []protocol.Invoker + url *common.URL +} + +func TestDynamicTagRouter(t *testing.T) { + dtg := &DynamicTagRouter{} + u1, _ := common.NewURL(tagRouterTestDynamicIpv4Provider1) + u2, _ := common.NewURL(tagRouterTestDynamicIpv4Provider2) + u3, _ := common.NewURL(tagRouterTestDynamicIpv4Provider3) + u4, _ := common.NewURL(tagRouterTestDynamicIpv4Provider4) + u5, _ := common.NewURL(tagRouterTestDynamicIpv4Provider5) + inv1 := NewMockInvoker(u1) + inv2 := NewMockInvoker(u2) + inv3 := NewMockInvoker(u3) + inv4 := NewMockInvoker(u4) + inv5 := NewMockInvoker(u5) + dtg.invokers = append(dtg.invokers, inv1, inv2, inv3, inv4, inv5) + suite.Run(t, dtg) +} + +func (suite *DynamicTagRouter) SetupTest() { + var err error + testYML := `enabled: true +scope: application +force: true +runtime: false +valid: true +priority: 1 +key: demo-provider +tags: + - name: tag1 + addresses: ["127.0.0.1:20001"] + - name: tag2 + addresses: ["127.0.0.1:20002"] + - name: tag3 + addresses: ["127.0.0.1:20003", "127.0.0.1:20004"] +` + ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second) + suite.NoError(err) + err = z.Create(routerPath) + suite.NoError(err) + + suite.zkClient = z + suite.testCluster = ts + + _, err = z.Conn.Set(routerPath, []byte(testYML), 0) + suite.NoError(err) + + zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, routerLocalIP, suite.testCluster.Servers[0].Port)) + configuration, err := extension.GetConfigCenterFactory(routerZk).GetDynamicConfiguration(&zkUrl) + config.GetEnvInstance().SetDynamicConfiguration(configuration) + + suite.Nil(err) + suite.NotNil(configuration) + + url, e1 := common.NewURL(tagRouterTestUserConsumerTag) + suite.Nil(e1) + + tagRouter, err := NewTagRouter(&url) + suite.Nil(err) + suite.NotNil(tagRouter) + suite.route = tagRouter + suite.url = &url +} + +func (suite *DynamicTagRouter) TearDownTest() { + suite.zkClient.Close() + suite.testCluster.Stop() +} + +func (suite *DynamicTagRouter) TestDynamicTagRouterSetByIPv4() { + invokers := suite.invokers + suite.route.Notify(invokers) + suite.NotNil(suite.route.tagRouterRule) + + consumer := &invocation.RPCInvocation{} + consumer.SetAttachments(tagRouterTestDubboTag, "tag1") + targetInvokers := suite.route.Route(invokers, suite.url, consumer) + suite.Equal(1, len(targetInvokers)) + suite.Equal(targetInvokers[0], suite.invokers[0]) + + consumer.SetAttachments(tagRouterTestDubboTag, "tag3") + targetInvokers = suite.route.Route(invokers, suite.url, consumer) + suite.Equal(2, len(targetInvokers)) + suite.Equal(targetInvokers, []protocol.Invoker{suite.invokers[2], suite.invokers[3]}) +} + +func (suite *DynamicTagRouter) TestDynamicTagRouterStaticTag() { + invokers := suite.invokers + consumer := &invocation.RPCInvocation{} + consumer.SetAttachments(tagRouterTestDubboTag, "tag4") + targetInvokers := suite.route.Route(invokers, suite.url, consumer) + suite.Equal(1, len(targetInvokers)) + suite.Equal(targetInvokers[0], suite.invokers[3]) +} + +// Teas no tag and return a address are not in dynamic tag group +func (suite *DynamicTagRouter) TestDynamicTagRouterByNoTagAndAddressMatch() { + invokers := suite.invokers + suite.route.Notify(invokers) + suite.NotNil(suite.route.tagRouterRule) + consumer := &invocation.RPCInvocation{} + targetInvokers := suite.route.Route(invokers, suite.url, consumer) + suite.Equal(1, len(targetInvokers)) + suite.Equal(targetInvokers[0], suite.invokers[4]) + // test if there are some addresses that are not in any dynamic tag group, continue to filter using the static tag group. + consumer.SetAttachments(tagRouterTestDubboTag, "tag5") + targetInvokers = suite.route.Route(invokers, suite.url, consumer) + suite.Equal(1, len(targetInvokers)) + suite.Equal(targetInvokers[0], suite.invokers[4]) +} + +func (suite *DynamicTagRouter) TestTODO() { + testYML := `enabled: true +scope: application +force: true +runtime: false +valid: true +priority: 1 +key: demo-provider +tags: + - name: tag1 + addresses: ["127.0.0.1:20001"] + - name: tag2 + addresses: ["127.0.0.1:20002"] + - name: tag3 + addresses: ["127.0.0.1:20003", "127.0.0.1:20004"] +` + _, err := suite.zkClient.Conn.Set(routerPath, []byte(testYML), 1) + suite.NoError(err) + + zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, routerLocalIP, suite.testCluster.Servers[0].Port)) + configuration, err := extension.GetConfigCenterFactory(routerZk).GetDynamicConfiguration(&zkUrl) + config.GetEnvInstance().SetDynamicConfiguration(configuration) +} + +func TestProcess(t *testing.T) { + u1, err := common.NewURL(tagRouterTestUserConsumerTag) + assert.Nil(t, err) + tagRouter, e := NewTagRouter(&u1) + assert.Nil(t, e) + assert.NotNil(t, tagRouter) + + testYML := ` +scope: application +force: true +runtime: false +enabled: true +valid: true +priority: 1 +key: demo-provider +tags: + - name: beijing + addresses: [192.168.1.1, 192.168.1.2] + - name: hangzhou + addresses: [192.168.1.3, 192.168.1.4] +` + tagRouter.Process(&config_center.ConfigChangeEvent{Value: testYML, ConfigType: remoting.EventTypeAdd}) + assert.NotNil(t, tagRouter.tagRouterRule) + assert.Equal(t, []string{"beijing", "hangzhou"}, tagRouter.tagRouterRule.getTagNames()) + assert.Equal(t, []string{"192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4"}, tagRouter.tagRouterRule.getAddresses()) + assert.Equal(t, []string{"192.168.1.3", "192.168.1.4"}, tagRouter.tagRouterRule.getTagNameToAddresses()["hangzhou"]) + assert.Equal(t, []string{"beijing"}, tagRouter.tagRouterRule.getAddressToTagNames()["192.168.1.1"]) + tagRouter.Process(&config_center.ConfigChangeEvent{ConfigType: remoting.EventTypeDel}) + assert.Nil(t, tagRouter.tagRouterRule) +} From a31a4e211fc9696741aa22fa792a16d87b55256d Mon Sep 17 00:00:00 2001 From: watermelo <80680489@qq.com> Date: Mon, 3 Aug 2020 14:48:48 +0800 Subject: [PATCH 7/7] Add: add zk jar for tag test --- before_ut.sh | 3 +++ 1 file changed, 3 insertions(+) diff --git a/before_ut.sh b/before_ut.sh index 210e9e723b..b55e424ef7 100755 --- a/before_ut.sh +++ b/before_ut.sh @@ -36,5 +36,8 @@ cp ${zkJar} cluster/router/chain/zookeeper-4unittest/contrib/fatjar mkdir -p cluster/router/condition/zookeeper-4unittest/contrib/fatjar cp ${zkJar} cluster/router/condition/zookeeper-4unittest/contrib/fatjar +mkdir -p cluster/router/tag/zookeeper-4unittest/contrib/fatjar +cp ${zkJar} cluster/router/tag/zookeeper-4unittest/contrib/fatjar + mkdir -p metadata/report/zookeeper/zookeeper-4unittest/contrib/fatjar cp ${zkJar} metadata/report/zookeeper/zookeeper-4unittest/contrib/fatjar \ No newline at end of file