From ba4c7cfeb9f9ca25570ffe16ec7a1f23308f4741 Mon Sep 17 00:00:00 2001 From: cvictory Date: Tue, 29 Jun 2021 15:39:50 +0800 Subject: [PATCH 1/4] support Key Func in ServiceEvent --- registry/event.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/registry/event.go b/registry/event.go index 41a899549e..143a6c7f40 100644 --- a/registry/event.go +++ b/registry/event.go @@ -45,6 +45,7 @@ type ServiceEvent struct { key string // If the url is updated, such as Merged. updated bool + KeyFunc func(*common.URL) string } // String return the description of event @@ -69,7 +70,11 @@ func (e *ServiceEvent) Key() string { if len(e.key) > 0 { return e.key } - e.key = e.Service.GetCacheInvokerMapKey() + if e.KeyFunc == nil { + e.key = e.Service.GetCacheInvokerMapKey() + } else { + e.key = e.KeyFunc(e.Service) + } return e.key } From 3cbda804acf117ec993ca78a8bf965b3a370c669 Mon Sep 17 00:00:00 2001 From: cvictory Date: Tue, 29 Jun 2021 20:50:39 +0800 Subject: [PATCH 2/4] FIX review issue and add unit test --- registry/event.go | 4 +++- registry/event_test.go | 47 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) create mode 100644 registry/event_test.go diff --git a/registry/event.go b/registry/event.go index 143a6c7f40..76a9600a42 100644 --- a/registry/event.go +++ b/registry/event.go @@ -29,6 +29,8 @@ import ( "dubbo.apache.org/dubbo-go/v3/remoting" ) +type KeyFunc func(*common.URL) string + func init() { rand.Seed(time.Now().UnixNano()) } @@ -45,7 +47,7 @@ type ServiceEvent struct { key string // If the url is updated, such as Merged. updated bool - KeyFunc func(*common.URL) string + KeyFunc KeyFunc } // String return the description of event diff --git a/registry/event_test.go b/registry/event_test.go new file mode 100644 index 0000000000..349dc3d476 --- /dev/null +++ b/registry/event_test.go @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package registry + +import ( + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/common" +) + +func TestKey(t *testing.T) { + u1, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.0") + se := ServiceEvent{ + Service: u1, + } + assert.Equal(t, se.Key(), "dubbo://:@127.0.0.1:20000/?interface=com.ikurento.user.UserProvider&group=&version=2.0×tamp=") + + se2 := ServiceEvent{ + Service: u1, + KeyFunc: defineKey, + } + assert.Equal(t, se2.Key(), "Hello Key") +} + +func defineKey(url *common.URL) string { + return "Hello Key" +} From bd950327a9ffc61bdba8f817fd5b7346730d39a7 Mon Sep 17 00:00:00 2001 From: cvictory Date: Tue, 29 Jun 2021 22:36:55 +0800 Subject: [PATCH 3/4] fix : make all getCacheKey from ServiceEvent --- registry/directory/directory.go | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 2b0ff056a8..402fbd6c16 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -193,7 +193,7 @@ func (dir *RegistryDirectory) refreshAllInvokers(events []*registry.ServiceEvent if event != nil && event.Service != nil && constant.ROUTER_PROTOCOL == event.Service.Protocol { dir.configRouters() } - if oldInvoker, _ := dir.doCacheInvoker(event.Service); oldInvoker != nil { + if oldInvoker, _ := dir.doCacheInvoker(event.Service, event); oldInvoker != nil { oldInvokers = append(oldInvokers, oldInvoker) } } @@ -224,7 +224,7 @@ func (dir *RegistryDirectory) invokerCacheKey(event *registry.ServiceEvent) stri referenceUrl := dir.GetDirectoryUrl().SubURL newUrl := common.MergeURL(event.Service, referenceUrl) event.Update(newUrl) - return newUrl.GetCacheInvokerMapKey() + return event.Key() } // setNewInvokers groups the invokers from the cache first, then set the result to both directory and router chain. @@ -240,17 +240,18 @@ func (dir *RegistryDirectory) setNewInvokers() { func (dir *RegistryDirectory) cacheInvokerByEvent(event *registry.ServiceEvent) (protocol.Invoker, error) { // judge is override or others if event != nil { - u := dir.convertUrl(event) + switch event.Action { case remoting.EventTypeAdd, remoting.EventTypeUpdate: + u := dir.convertUrl(event) logger.Infof("selector add service url{%s}", event.Service) if u != nil && constant.ROUTER_PROTOCOL == u.Protocol { dir.configRouters() } - return dir.cacheInvoker(u), nil + return dir.cacheInvoker(u, event), nil case remoting.EventTypeDel: logger.Infof("selector delete service url{%s}", event.Service) - return dir.uncacheInvoker(u), nil + return dir.uncacheInvoker(event), nil default: return nil, fmt.Errorf("illegal event type: %v", event.Action) } @@ -316,8 +317,8 @@ func (dir *RegistryDirectory) toGroupInvokers() []protocol.Invoker { } // uncacheInvoker will return abandoned Invoker, if no Invoker to be abandoned, return nil -func (dir *RegistryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker { - return dir.uncacheInvokerWithKey(url.GetCacheInvokerMapKey()) +func (dir *RegistryDirectory) uncacheInvoker(event *registry.ServiceEvent) protocol.Invoker { + return dir.uncacheInvokerWithKey(event.Key()) } func (dir *RegistryDirectory) uncacheInvokerWithKey(key string) protocol.Invoker { @@ -331,7 +332,7 @@ func (dir *RegistryDirectory) uncacheInvokerWithKey(key string) protocol.Invoker } // cacheInvoker will return abandoned Invoker,if no Invoker to be abandoned,return nil -func (dir *RegistryDirectory) cacheInvoker(url *common.URL) protocol.Invoker { +func (dir *RegistryDirectory) cacheInvoker(url *common.URL, event *registry.ServiceEvent) protocol.Invoker { dir.overrideUrl(dir.GetDirectoryUrl()) referenceUrl := dir.GetDirectoryUrl().SubURL @@ -348,15 +349,15 @@ func (dir *RegistryDirectory) cacheInvoker(url *common.URL) protocol.Invoker { if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" { newUrl := common.MergeURL(url, referenceUrl) dir.overrideUrl(newUrl) - if v, ok := dir.doCacheInvoker(newUrl); ok { + if v, ok := dir.doCacheInvoker(newUrl, event); ok { return v } } return nil } -func (dir *RegistryDirectory) doCacheInvoker(newUrl *common.URL) (protocol.Invoker, bool) { - key := newUrl.GetCacheInvokerMapKey() +func (dir *RegistryDirectory) doCacheInvoker(newUrl *common.URL, event *registry.ServiceEvent) (protocol.Invoker, bool) { + key := event.Key() if cacheInvoker, ok := dir.cacheInvokersMap.Load(key); !ok { logger.Debugf("service will be added in cache invokers: invokers url is %s!", newUrl) newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(newUrl) From ba494dd38cf74a6234ef4c4f98fb0280553099e3 Mon Sep 17 00:00:00 2001 From: cvictory Date: Wed, 30 Jun 2021 11:38:58 +0800 Subject: [PATCH 4/4] fix override url notify bug --- registry/directory/directory.go | 1 + 1 file changed, 1 insertion(+) diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 402fbd6c16..7974ad4be6 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -349,6 +349,7 @@ func (dir *RegistryDirectory) cacheInvoker(url *common.URL, event *registry.Serv if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" { newUrl := common.MergeURL(url, referenceUrl) dir.overrideUrl(newUrl) + event.Update(newUrl) if v, ok := dir.doCacheInvoker(newUrl, event); ok { return v }