From ddfad7f8a1bdb0fc3a20d58f7e9e352085fd8e5c Mon Sep 17 00:00:00 2001 From: shen Date: Tue, 4 Aug 2020 16:43:15 +0800 Subject: [PATCH 01/33] consul service discovery --- registry/consul/service_discovery.go | 349 +++++++++++++++++++++++++++ 1 file changed, 349 insertions(+) create mode 100644 registry/consul/service_discovery.go diff --git a/registry/consul/service_discovery.go b/registry/consul/service_discovery.go new file mode 100644 index 0000000000..eb89329d71 --- /dev/null +++ b/registry/consul/service_discovery.go @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package consul + +import ( + "fmt" + "github.com/hashicorp/consul/api/watch" + "github.com/hashicorp/go-hclog" + "strconv" + "sync" +) + +import ( + "github.com/dubbogo/gost/container/set" + "github.com/dubbogo/gost/page" + consul "github.com/hashicorp/consul/api" + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/registry" +) + +const ( + PageSize = "pageSize" + Enable = "enable" +) + +var ( + // 16 would be enough. We won't use concurrentMap because in most cases, there are not race condition + instanceMap = make(map[string]registry.ServiceDiscovery, 16) + initLock sync.Mutex +) + +// init will put the service discovery into extension +func init() { + extension.SetServiceDiscovery(constant.CONSUL_KEY, newConsulServiceDiscovery) +} + +// newConsulServiceDiscovery will create new service discovery instance +// use double-check pattern to reduce race condition +func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) { + + instance, ok := instanceMap[name] + if ok { + return instance, nil + } + + initLock.Lock() + defer initLock.Unlock() + + // double check + instance, ok = instanceMap[name] + if ok { + return instance, nil + } + + sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(name) + if !ok || len(sdc.RemoteRef) == 0 { + return nil, perrors.New("could not init the instance because the config is invalid") + } + + remoteConfig, ok := config.GetBaseConfig().GetRemoteConfig(sdc.RemoteRef) + if !ok { + return nil, perrors.New("could not find the remote config for name: " + sdc.RemoteRef) + } + + config := &consul.Config{Address: remoteConfig.Address} + client, err := consul.NewClient(config) + if err != nil { + return nil, perrors.WithMessage(err, "create consul client failed.") + } + + descriptor := fmt.Sprintf("consul-service-discovery[%s]", remoteConfig.Address) + + pageSize := 20 + if remoteConfig.Params != nil { + if tmp, OK := remoteConfig.Params[PageSize]; OK { + intTmp, err := strconv.Atoi(tmp) + if err == nil && intTmp > 20 { + pageSize = intTmp + } + } + } + return &consulServiceDiscovery{ + consulClient: client, + descriptor: descriptor, + PageSize: pageSize, + }, nil +} + +// nacosServiceDiscovery is the implementation of service discovery based on nacos. +// There is a problem, the go client for nacos does not support the id field. +// we will use the metadata to store the id of ServiceInstance +type consulServiceDiscovery struct { + group string + // descriptor is a short string about the basic information of this instance + descriptor string + // Consul client. + consulClient *consul.Client + PageSize int +} + +func (csd consulServiceDiscovery) String() string { + return csd.descriptor +} + +func (csd consulServiceDiscovery) Destroy() error { + csd.consulClient = nil + return nil +} + +func (csd consulServiceDiscovery) Register(instance registry.ServiceInstance) error { + ins, err := csd.buildRegisterInstance(instance) + if err != nil { + panic(err) + } + err = csd.consulClient.Agent().ServiceRegister(ins) + if err != nil { + return perrors.WithMessage(err, "consul could not register the instance. "+instance.GetServiceName()) + } + return nil + +} + +func (csd consulServiceDiscovery) Update(instance registry.ServiceInstance) error { + ins, err := csd.buildRegisterInstance(instance) + if err != nil { + panic(err) + } + return csd.consulClient.Agent().ServiceRegisterOpts(ins, consul.ServiceRegisterOpts{ReplaceExistingChecks: true}) +} + +func (csd consulServiceDiscovery) Unregister(instance registry.ServiceInstance) error { + return csd.consulClient.Agent().ServiceDeregister(instance.GetId()) +} + +func (csd consulServiceDiscovery) GetDefaultPageSize() int { + return csd.PageSize +} + +func (csd consulServiceDiscovery) GetServices() *gxset.HashSet { + + var res = gxset.NewSet() + services, err := csd.consulClient.Agent().Services() + if err != nil { + return res + } + + for _, service := range services { + res.Add(service.Service) + } + return res + +} + +func (csd consulServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { + _, instances, err := csd.consulClient.Agent().AgentHealthServiceByName(serviceName) + if err != nil { + return nil + } + + res := make([]registry.ServiceInstance, 0, len(instances)) + for _, ins := range instances { + metadata := ins.Service.Meta + + // enable status + enableStr := metadata[Enable] + delete(metadata, Enable) + enable, _ := strconv.ParseBool(enableStr) + + // health status + status := ins.Checks.AggregatedStatus() + healthy := false + if status == consul.HealthPassing { + healthy = true + } + res = append(res, ®istry.DefaultServiceInstance{ + Id: ins.Service.ID, + ServiceName: ins.Service.Service, + Host: ins.Service.Address, + Port: ins.Service.Port, + Enable: enable, + Healthy: healthy, + Metadata: metadata, + }) + } + + return res +} + +func (csd consulServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager { + all := csd.GetInstances(serviceName) + res := make([]interface{}, 0, pageSize) + for i := offset; i < len(all) && i < offset+pageSize; i++ { + res = append(res, all[i]) + } + return gxpage.New(offset, pageSize, res, len(all)) +} + +func (csd consulServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager { + all := csd.GetInstances(serviceName) + res := make([]interface{}, 0, pageSize) + // could not use res = all[a:b] here because the res should be []interface{}, not []ServiceInstance + var ( + i = offset + count = 0 + ) + for i < len(all) && count < pageSize { + ins := all[i] + if ins.IsHealthy() == healthy { + res = append(res, all[i]) + count++ + } + i++ + } + return gxpage.New(offset, pageSize, res, len(all)) +} + +func (csd consulServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager { + res := make(map[string]gxpage.Pager, len(serviceNames)) + for _, name := range serviceNames { + res[name] = csd.GetInstancesByPage(name, offset, requestedSize) + } + return res +} + +func (csd consulServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error { + + params := make(map[string]interface{}, 8) + params["type"] = "service" + params["service"] = listener.ServiceName + //params["tag"] = "dubbo" + //params["passingonly"] = true + plan, err := watch.Parse(params) + if err != nil { + return err + } + + hcLogger := hclog.New(&hclog.LoggerOptions{ + Name: "watch", + Output: plan.LogOutput, + }) + plan.Handler = func(idx uint64, raw interface{}) { + services, ok := raw.([]*consul.ServiceEntry) + if !ok { + err = perrors.New("handler get non ServiceEntry type parameter") + return + } + instances := make([]registry.ServiceInstance, 0, len(services)) + for _, ins := range services { + metadata := ins.Service.Meta + + // enable status + enableStr := metadata[Enable] + delete(metadata, Enable) + enable, _ := strconv.ParseBool(enableStr) + + // health status + status := ins.Checks.AggregatedStatus() + healthy := false + if status == consul.HealthPassing { + healthy = true + } + instances = append(instances, ®istry.DefaultServiceInstance{ + Id: ins.Service.ID, + ServiceName: ins.Service.Service, + Host: ins.Service.Address, + Port: ins.Service.Port, + Enable: enable, + Healthy: healthy, + Metadata: metadata, + }) + } + if len(instances) < 1 { + return + } + e := csd.DispatchEventForInstances(listener.ServiceName, instances) + if e != nil { + logger.Errorf("Dispatching event got exception, service name: %s, err: %v", listener.ServiceName, err) + } + } + err = plan.RunWithClientAndHclog(csd.consulClient, hcLogger) + if err != nil { + logger.Error("consul plan run failure!error:%v", err) + return err + } + return nil +} + +func (csd consulServiceDiscovery) DispatchEventByServiceName(serviceName string) error { + return csd.DispatchEventForInstances(serviceName, csd.GetInstances(serviceName)) +} + +func (csd consulServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error { + return csd.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, instances)) +} + +func (csd consulServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error { + extension.GetGlobalDispatcher().Dispatch(event) + return nil +} + +func (csd consulServiceDiscovery) buildRegisterInstance(instance registry.ServiceInstance) (*consul.AgentServiceRegistration, error) { + metadata := instance.GetMetadata() + if metadata == nil { + metadata = make(map[string]string, 1) + } + metadata[Enable] = strconv.FormatBool(instance.IsEnable()) + + // tcp + tcp := fmt.Sprintf("%s:%d", instance.GetHost(), instance.GetPort()) + + // check + check := &consul.AgentServiceCheck{ + TCP: tcp, + //Interval: url.GetParam("consul-check-interval", "10s"), + //Timeout: url.GetParam("consul-check-timeout", "1s"), + //DeregisterCriticalServiceAfter: url.GetParam("consul-deregister-critical-service-after", "20s"), + } + + return &consul.AgentServiceRegistration{ + ID: instance.GetId(), + Name: instance.GetServiceName(), + Port: instance.GetPort(), + Address: instance.GetHost(), + Meta: metadata, + Check: check, + }, nil +} From e0d492d3bfe8f40e20cb333336ddac82a28ffd4c Mon Sep 17 00:00:00 2001 From: shen Date: Tue, 4 Aug 2020 16:44:26 +0800 Subject: [PATCH 02/33] consul service discovery --- common/constant/key.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/common/constant/key.go b/common/constant/key.go index cd23dd0f1a..29a306f0e4 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -171,6 +171,10 @@ const ( ETCDV3_KEY = "etcdv3" ) +const ( + CONSUL_KEY = "consul" +) + const ( TRACING_REMOTE_SPAN_CTX = "tracing.remote.span.ctx" ) From 0520f4840f149788e1f7faade7c26dbef6266850 Mon Sep 17 00:00:00 2001 From: shen Date: Thu, 6 Aug 2020 08:38:34 +0800 Subject: [PATCH 03/33] consul service discovery (not test yet) --- registry/consul/service_discovery.go | 115 +++++++---- registry/consul/service_discovery_test.go | 187 ++++++++++++++++++ registry/etcdv3/service_discovery.go | 5 + .../event_publishing_service_discovery.go | 13 ++ registry/nacos/service_discovery.go | 5 + registry/service_discovery.go | 7 + .../service_discovery_registry.go | 4 +- .../service_discovery_registry_test.go | 4 + registry/zookeeper/service_discovery.go | 4 + 9 files changed, 309 insertions(+), 35 deletions(-) create mode 100644 registry/consul/service_discovery_test.go diff --git a/registry/consul/service_discovery.go b/registry/consul/service_discovery.go index eb89329d71..2ab2d63f34 100644 --- a/registry/consul/service_discovery.go +++ b/registry/consul/service_discovery.go @@ -18,21 +18,26 @@ package consul import ( + "crypto/md5" + "encoding/json" "fmt" - "github.com/hashicorp/consul/api/watch" - "github.com/hashicorp/go-hclog" "strconv" + "strings" "sync" + "time" ) import ( "github.com/dubbogo/gost/container/set" "github.com/dubbogo/gost/page" consul "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/api/watch" + "github.com/hashicorp/go-hclog" perrors "github.com/pkg/errors" ) import ( + "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" @@ -45,6 +50,19 @@ const ( Enable = "enable" ) +const ( + CHECK_PASS_INTERVAL = "consul-check-pass-interval" + // default time-to-live in millisecond + DEFAULT_CHECK_PASS_INTERVAL = 16000 + UERY_TAG = "consul_query_tag" + ACL_TOKEN = "acl-token" + // default deregister critical server after + DEFAULT_DEREGISTER_TIME = "20s" + DEFAULT_WATCH_TIMEOUT = 60 * 1000 + WATCH_TIMEOUT = "consul-watch-timeout" + DEREGISTER_AFTER = "consul-deregister-critical-service-after" +) + var ( // 16 would be enough. We won't use concurrentMap because in most cases, there are not race condition instanceMap = make(map[string]registry.ServiceDiscovery, 16) @@ -84,12 +102,6 @@ func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) { return nil, perrors.New("could not find the remote config for name: " + sdc.RemoteRef) } - config := &consul.Config{Address: remoteConfig.Address} - client, err := consul.NewClient(config) - if err != nil { - return nil, perrors.WithMessage(err, "create consul client failed.") - } - descriptor := fmt.Sprintf("consul-service-discovery[%s]", remoteConfig.Address) pageSize := 20 @@ -102,9 +114,9 @@ func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) { } } return &consulServiceDiscovery{ - consulClient: client, - descriptor: descriptor, - PageSize: pageSize, + address: remoteConfig.Address, + descriptor: descriptor, + PageSize: pageSize, }, nil } @@ -116,10 +128,29 @@ type consulServiceDiscovery struct { // descriptor is a short string about the basic information of this instance descriptor string // Consul client. - consulClient *consul.Client - PageSize int + consulClient *consul.Client + PageSize int + serviceUrl common.URL + checkPassInterval int64 + tag string + tags []string + address string } +func (csd consulServiceDiscovery) Initialize(registryURL common.URL) error { + csd.serviceUrl = registryURL + csd.checkPassInterval = registryURL.GetParamInt(CHECK_PASS_INTERVAL, DEFAULT_CHECK_PASS_INTERVAL) + csd.tag = registryURL.GetParam(UERY_TAG, "") + csd.tags = strings.Split(registryURL.GetParam("tags", ""), ",") + aclToken := registryURL.GetParam(ACL_TOKEN, "") + config := &consul.Config{Address: csd.address, Token: aclToken} + client, err := consul.NewClient(config) + if err != nil { + return perrors.WithMessage(err, "create consul client failed.") + } + csd.consulClient = client + return nil +} func (csd consulServiceDiscovery) String() string { return csd.descriptor } @@ -130,11 +161,8 @@ func (csd consulServiceDiscovery) Destroy() error { } func (csd consulServiceDiscovery) Register(instance registry.ServiceInstance) error { - ins, err := csd.buildRegisterInstance(instance) - if err != nil { - panic(err) - } - err = csd.consulClient.Agent().ServiceRegister(ins) + ins, _ := csd.buildRegisterInstance(instance) + err := csd.consulClient.Agent().ServiceRegister(ins) if err != nil { return perrors.WithMessage(err, "consul could not register the instance. "+instance.GetServiceName()) } @@ -151,7 +179,7 @@ func (csd consulServiceDiscovery) Update(instance registry.ServiceInstance) erro } func (csd consulServiceDiscovery) Unregister(instance registry.ServiceInstance) error { - return csd.consulClient.Agent().ServiceDeregister(instance.GetId()) + return csd.consulClient.Agent().ServiceDeregister(buildID(instance)) } func (csd consulServiceDiscovery) GetDefaultPageSize() int { @@ -161,20 +189,24 @@ func (csd consulServiceDiscovery) GetDefaultPageSize() int { func (csd consulServiceDiscovery) GetServices() *gxset.HashSet { var res = gxset.NewSet() - services, err := csd.consulClient.Agent().Services() + services, _, err := csd.consulClient.Catalog().Services(nil) if err != nil { return res } - for _, service := range services { - res.Add(service.Service) + for service, _ := range services { + res.Add(service) } return res } func (csd consulServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { - _, instances, err := csd.consulClient.Agent().AgentHealthServiceByName(serviceName) + waitTime := csd.serviceUrl.GetParamInt(WATCH_TIMEOUT, DEFAULT_WATCH_TIMEOUT) / 1000 + instances, _, err := csd.consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{ + WaitTime: time.Duration(waitTime), + WaitIndex: -1, + }) if err != nil { return nil } @@ -249,6 +281,7 @@ func (csd consulServiceDiscovery) AddListener(listener *registry.ServiceInstance params := make(map[string]interface{}, 8) params["type"] = "service" params["service"] = listener.ServiceName + params["passingonly"] = true //params["tag"] = "dubbo" //params["passingonly"] = true plan, err := watch.Parse(params) @@ -327,23 +360,37 @@ func (csd consulServiceDiscovery) buildRegisterInstance(instance registry.Servic } metadata[Enable] = strconv.FormatBool(instance.IsEnable()) - // tcp - tcp := fmt.Sprintf("%s:%d", instance.GetHost(), instance.GetPort()) - // check - check := &consul.AgentServiceCheck{ - TCP: tcp, - //Interval: url.GetParam("consul-check-interval", "10s"), - //Timeout: url.GetParam("consul-check-timeout", "1s"), - //DeregisterCriticalServiceAfter: url.GetParam("consul-deregister-critical-service-after", "20s"), - } + check := csd.buildCheck(instance) return &consul.AgentServiceRegistration{ - ID: instance.GetId(), + ID: buildID(instance), Name: instance.GetServiceName(), Port: instance.GetPort(), Address: instance.GetHost(), Meta: metadata, - Check: check, + Check: &check, }, nil } + +func (csd consulServiceDiscovery) buildCheck(instance registry.ServiceInstance) consul.AgentServiceCheck { + + deregister, ok := instance.GetMetadata()[DEREGISTER_AFTER] + if !ok || deregister == "" { + deregister = DEFAULT_DEREGISTER_TIME + } + return consul.AgentServiceCheck{ + TTL: strconv.FormatInt(csd.checkPassInterval/1000, 10) + "s", + DeregisterCriticalServiceAfter: deregister, + } +} + +func buildID(instance registry.ServiceInstance) string { + + metaBytes, _ := json.Marshal(instance.GetMetadata()) + id := fmt.Sprintf("id:%s,serviceName:%s,host:%s,port:%d,enable:%b,healthy:%b,meta:%s", instance.GetId(), instance.GetServiceName(), + instance.GetHost(), instance.GetPort(), instance.IsEnable(), instance.IsHealthy(), metaBytes) + Md5Inst := md5.New() + Md5Inst.Write([]byte(id)) + return string(Md5Inst.Sum([]byte(""))) +} diff --git a/registry/consul/service_discovery_test.go b/registry/consul/service_discovery_test.go new file mode 100644 index 0000000000..9f7bc54cce --- /dev/null +++ b/registry/consul/service_discovery_test.go @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package consul + +import ( + "github.com/apache/dubbo-go/common" + "math/rand" + "strconv" + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/observer" + "github.com/apache/dubbo-go/common/observer/dispatcher" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/registry" +) + +var ( + testName = "test" + registryURL = common.URL{ + Path: "", + Username: "", + Password: "", + Methods: nil, + SubURL: nil, + } +) + +func TestConsulServiceDiscovery_newConsulServiceDiscovery(t *testing.T) { + name := "consul1" + _, err := newConsulServiceDiscovery(name) + assert.NotNil(t, err) + + sdc := &config.ServiceDiscoveryConfig{ + Protocol: "consul", + RemoteRef: "mock", + } + + config.GetBaseConfig().ServiceDiscoveries[name] = sdc + + _, err = newConsulServiceDiscovery(name) + assert.NotNil(t, err) + + config.GetBaseConfig().Remotes["mock"] = &config.RemoteConfig{ + Address: "", // TODO + } + + res, err := newConsulServiceDiscovery(name) + assert.Nil(t, err) + assert.NotNil(t, res) +} + +func TestConsulServiceDiscovery_Destroy(t *testing.T) { + prepareData() + serviceDiscovery, err := extension.GetServiceDiscovery(constant.CONSUL_KEY, testName) + _, registryUrl := prepareService() + serviceDiscovery.Initialize(registryUrl) + assert.Nil(t, err) + assert.NotNil(t, serviceDiscovery) + err = serviceDiscovery.Destroy() + assert.Nil(t, err) + assert.Nil(t, serviceDiscovery.(*consulServiceDiscovery).consulClient) +} + +func TestConsulServiceDiscovery_CRUD(t *testing.T) { + prepareData() + extension.SetEventDispatcher("mock", func() observer.EventDispatcher { + return &dispatcher.MockEventDispatcher{} + }) + + extension.SetAndInitGlobalDispatcher("mock") + rand.Seed(time.Now().Unix()) + + instance, registryUrl := prepareService() + + // clean data + serviceDiscovery, err := extension.GetServiceDiscovery(constant.CONSUL_KEY, testName) + assert.Nil(t, err) + + err = serviceDiscovery.Initialize(registryUrl) + assert.Nil(t, err) + // clean data for local test + err = serviceDiscovery.Unregister(instance) + assert.Nil(t, err) + + err = serviceDiscovery.Register(instance) + assert.Nil(t, err) + + //sometimes nacos may be failed to push update of instance, + //so it need 10s to pull, we sleep 10 second to make sure instance has been update + time.Sleep(11 * time.Second) + page := serviceDiscovery.GetHealthyInstancesByPage(instance.GetServiceName(), 0, 10, true) + assert.NotNil(t, page) + assert.Equal(t, 0, page.GetOffset()) + assert.Equal(t, 10, page.GetPageSize()) + assert.Equal(t, 1, page.GetDataSize()) + + instance = page.GetData()[0].(*registry.DefaultServiceInstance) + assert.NotNil(t, instance) + assert.Equal(t, buildID(instance), instance.GetId()) + assert.Equal(t, instance.GetHost(), instance.GetHost()) + assert.Equal(t, instance.GetPort(), instance.GetPort()) + assert.Equal(t, instance.GetServiceName(), instance.GetServiceName()) + assert.Equal(t, 0, len(instance.GetMetadata())) + + instance.GetMetadata()["a"] = "b" + err = serviceDiscovery.Update(instance) + assert.Nil(t, err) + + time.Sleep(11 * time.Second) + pageMap := serviceDiscovery.GetRequestInstances([]string{instance.GetServiceName()}, 0, 1) + assert.Equal(t, 1, len(pageMap)) + + page = pageMap[instance.GetServiceName()] + assert.NotNil(t, page) + assert.Equal(t, 1, len(page.GetData())) + + instance = page.GetData()[0].(*registry.DefaultServiceInstance) + v, _ := instance.GetMetadata()["a"] + assert.Equal(t, "b", v) + + // test dispatcher event + err = serviceDiscovery.DispatchEventByServiceName(instance.GetServiceName()) + assert.Nil(t, err) + + // test AddListener + err = serviceDiscovery.AddListener(®istry.ServiceInstancesChangedListener{}) + assert.Nil(t, err) +} + +func prepareData() { + config.GetBaseConfig().ServiceDiscoveries[testName] = &config.ServiceDiscoveryConfig{ + Protocol: "consul", + RemoteRef: testName, + } + + config.GetBaseConfig().Remotes[testName] = &config.RemoteConfig{ + Address: "", // TODO + TimeoutStr: "10s", + } +} +func prepareService() (registry.ServiceInstance, common.URL) { + serviceName := "service-name" + strconv.Itoa(rand.Intn(10000)) + id := "id" + host := "host" + port := 123 + + registryUrl, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&" + + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" + + "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" + + "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" + + "side=provider&timeout=3000×tamp=1556509797245&consul-check-pass-interval=17000&consul-deregister-critical-service-after=20s&" + + "consul-watch-timeout=60000") + + return ®istry.DefaultServiceInstance{ + Id: id, + ServiceName: serviceName, + Host: host, + Port: port, + Enable: true, + Healthy: true, + Metadata: nil, + }, registryUrl +} diff --git a/registry/etcdv3/service_discovery.go b/registry/etcdv3/service_discovery.go index f381ba70d6..010adff594 100644 --- a/registry/etcdv3/service_discovery.go +++ b/registry/etcdv3/service_discovery.go @@ -31,6 +31,7 @@ import ( ) import ( + "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" @@ -66,6 +67,10 @@ type etcdV3ServiceDiscovery struct { childListenerMap map[string]*etcdv3.EventListener } +func (e *etcdV3ServiceDiscovery) Initialize(registryURL common.URL) error { + return nil +} + // basic information of this instance func (e *etcdV3ServiceDiscovery) String() string { return e.descriptor diff --git a/registry/event/event_publishing_service_discovery.go b/registry/event/event_publishing_service_discovery.go index 3ee2f4a449..c1c9822d7c 100644 --- a/registry/event/event_publishing_service_discovery.go +++ b/registry/event/event_publishing_service_discovery.go @@ -18,6 +18,10 @@ package event import ( + "sync" +) +import ( + "github.com/apache/dubbo-go/common" gxset "github.com/dubbogo/gost/container/set" gxpage "github.com/dubbogo/gost/page" ) @@ -34,6 +38,7 @@ import ( // Publish some event about service discovery type EventPublishingServiceDiscovery struct { serviceDiscovery registry.ServiceDiscovery + once sync.Once } // NewEventPublishingServiceDiscovery is a constructor @@ -48,6 +53,14 @@ func (epsd *EventPublishingServiceDiscovery) String() string { return epsd.serviceDiscovery.String() } +func (epsd *EventPublishingServiceDiscovery) Initialize(registryURL common.URL) error { + var err error + epsd.once.Do(func() { + err = epsd.serviceDiscovery.Initialize(registryURL) + }) + return err +} + // Destroy delegate function func (epsd *EventPublishingServiceDiscovery) Destroy() error { f := func() error { diff --git a/registry/nacos/service_discovery.go b/registry/nacos/service_discovery.go index 63d92d70fd..78c6c4c4fc 100644 --- a/registry/nacos/service_discovery.go +++ b/registry/nacos/service_discovery.go @@ -32,6 +32,7 @@ import ( ) import ( + "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" @@ -62,6 +63,10 @@ type nacosServiceDiscovery struct { namingClient naming_client.INamingClient } +func (n *nacosServiceDiscovery) Initialize(registryURL common.URL) error { + return nil +} + // Destroy will close the service discovery. // Actually, it only marks the naming client as null and then return func (n *nacosServiceDiscovery) Destroy() error { diff --git a/registry/service_discovery.go b/registry/service_discovery.go index cb7a3c0182..62ad53d9e1 100644 --- a/registry/service_discovery.go +++ b/registry/service_discovery.go @@ -22,6 +22,7 @@ import ( ) import ( + "github.com/apache/dubbo-go/common" gxset "github.com/dubbogo/gost/container/set" gxpage "github.com/dubbogo/gost/page" ) @@ -34,6 +35,12 @@ type ServiceDiscovery interface { // ----------------- lifecycle ------------------- + /** + * Initializes the ServiceDiscovery + * + */ + Initialize(registryURL common.URL) error + // Destroy will destroy the service discovery. // If the discovery cannot be destroy, it will return an error. Destroy() error diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go index 061d832b03..a1d3f120c1 100644 --- a/registry/servicediscovery/service_discovery_registry.go +++ b/registry/servicediscovery/service_discovery_registry.go @@ -125,7 +125,9 @@ func creatServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) { if err != nil { return nil, perrors.WithMessage(err, "Create service discovery fialed") } - return event.NewEventPublishingServiceDiscovery(originServiceDiscovery), nil + serviceDiscovery := event.NewEventPublishingServiceDiscovery(originServiceDiscovery) + serviceDiscovery.Initialize(*url) + return serviceDiscovery, nil } func parseServices(literalServices string) *gxset.HashSet { diff --git a/registry/servicediscovery/service_discovery_registry_test.go b/registry/servicediscovery/service_discovery_registry_test.go index 53eb86507e..c1ca423419 100644 --- a/registry/servicediscovery/service_discovery_registry_test.go +++ b/registry/servicediscovery/service_discovery_registry_test.go @@ -126,6 +126,10 @@ func (m *mockServiceNameMapping) Get(serviceInterface string, group string, vers type mockServiceDiscovery struct { } +func (m *mockServiceDiscovery) Initialize(registryURL common.URL) error { + panic("implement me") +} + func (m *mockServiceDiscovery) String() string { panic("implement me") } diff --git a/registry/zookeeper/service_discovery.go b/registry/zookeeper/service_discovery.go index 5ad83ef909..314f56bdec 100644 --- a/registry/zookeeper/service_discovery.go +++ b/registry/zookeeper/service_discovery.go @@ -163,6 +163,10 @@ func (zksd *zookeeperServiceDiscovery) String() string { return fmt.Sprintf("zookeeper-service-discovery[%s]", zksd.url) } +func (zksd *zookeeperServiceDiscovery) Initialize(registryURL common.URL) error { + return nil +} + // Close client be closed func (zksd *zookeeperServiceDiscovery) Destroy() error { zksd.client.Close() From 34f5d8a2d72e06c7a1debb1273c18bdca7bd0aaf Mon Sep 17 00:00:00 2001 From: zhangshen023 <1292369127@qq.com> Date: Fri, 7 Aug 2020 08:01:36 +0800 Subject: [PATCH 04/33] consul ttl check --- registry/consul/service_discovery.go | 40 +++++++++++++++++++---- registry/consul/service_discovery_test.go | 37 ++++++++++++--------- 2 files changed, 55 insertions(+), 22 deletions(-) diff --git a/registry/consul/service_discovery.go b/registry/consul/service_discovery.go index 2ab2d63f34..9e19cfb4a7 100644 --- a/registry/consul/service_discovery.go +++ b/registry/consul/service_discovery.go @@ -54,7 +54,7 @@ const ( CHECK_PASS_INTERVAL = "consul-check-pass-interval" // default time-to-live in millisecond DEFAULT_CHECK_PASS_INTERVAL = 16000 - UERY_TAG = "consul_query_tag" + QUERY_TAG = "consul_query_tag" ACL_TOKEN = "acl-token" // default deregister critical server after DEFAULT_DEREGISTER_TIME = "20s" @@ -117,6 +117,7 @@ func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) { address: remoteConfig.Address, descriptor: descriptor, PageSize: pageSize, + ttl: make(map[string]chan struct{}), }, nil } @@ -135,12 +136,13 @@ type consulServiceDiscovery struct { tag string tags []string address string + ttl map[string]chan struct{} } -func (csd consulServiceDiscovery) Initialize(registryURL common.URL) error { +func (csd *consulServiceDiscovery) Initialize(registryURL common.URL) error { csd.serviceUrl = registryURL csd.checkPassInterval = registryURL.GetParamInt(CHECK_PASS_INTERVAL, DEFAULT_CHECK_PASS_INTERVAL) - csd.tag = registryURL.GetParam(UERY_TAG, "") + csd.tag = registryURL.GetParam(QUERY_TAG, "") csd.tags = strings.Split(registryURL.GetParam("tags", ""), ",") aclToken := registryURL.GetParam(ACL_TOKEN, "") config := &consul.Config{Address: csd.address, Token: aclToken} @@ -160,15 +162,42 @@ func (csd consulServiceDiscovery) Destroy() error { return nil } -func (csd consulServiceDiscovery) Register(instance registry.ServiceInstance) error { +func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) error { ins, _ := csd.buildRegisterInstance(instance) err := csd.consulClient.Agent().ServiceRegister(ins) if err != nil { return perrors.WithMessage(err, "consul could not register the instance. "+instance.GetServiceName()) } + + csd.registerTtl(instance) + return nil } +func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance) error { + + checkID := buildID(instance) + + stopChan := make(chan struct{}) + csd.ttl[buildID(instance)] = stopChan + + period := time.Duration(csd.checkPassInterval/8) * time.Millisecond + timer := time.NewTimer(period) + go func() { + for { + select { + case <-timer.C: + timer.Reset(period) + csd.consulClient.Agent().PassTTL(checkID, "") + break + case <-stopChan: + logger.Info("ttl %s for service %s is stopped", checkID, instance.GetServiceName()) + return + } + } + }() + return nil +} func (csd consulServiceDiscovery) Update(instance registry.ServiceInstance) error { ins, err := csd.buildRegisterInstance(instance) @@ -205,7 +234,6 @@ func (csd consulServiceDiscovery) GetInstances(serviceName string) []registry.Se waitTime := csd.serviceUrl.GetParamInt(WATCH_TIMEOUT, DEFAULT_WATCH_TIMEOUT) / 1000 instances, _, err := csd.consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{ WaitTime: time.Duration(waitTime), - WaitIndex: -1, }) if err != nil { return nil @@ -388,7 +416,7 @@ func (csd consulServiceDiscovery) buildCheck(instance registry.ServiceInstance) func buildID(instance registry.ServiceInstance) string { metaBytes, _ := json.Marshal(instance.GetMetadata()) - id := fmt.Sprintf("id:%s,serviceName:%s,host:%s,port:%d,enable:%b,healthy:%b,meta:%s", instance.GetId(), instance.GetServiceName(), + id := fmt.Sprintf("id:%s,serviceName:%s,host:%s,port:%d,enable:%t,healthy:%t,meta:%s", instance.GetId(), instance.GetServiceName(), instance.GetHost(), instance.GetPort(), instance.IsEnable(), instance.IsHealthy(), metaBytes) Md5Inst := md5.New() Md5Inst.Write([]byte(id)) diff --git a/registry/consul/service_discovery_test.go b/registry/consul/service_discovery_test.go index 9f7bc54cce..3ee71e17a1 100644 --- a/registry/consul/service_discovery_test.go +++ b/registry/consul/service_discovery_test.go @@ -18,7 +18,9 @@ package consul import ( + "fmt" "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/remoting/consul" "math/rand" "strconv" "testing" @@ -39,8 +41,11 @@ import ( ) var ( - testName = "test" - registryURL = common.URL{ + testName = "test" + consulCheckPassInterval = 17000 + consulDeregisterCriticalServiceAfter = "20s" + consulWatchTimeout = 60000 + registryURL = common.URL{ Path: "", Username: "", Password: "", @@ -65,7 +70,7 @@ func TestConsulServiceDiscovery_newConsulServiceDiscovery(t *testing.T) { assert.NotNil(t, err) config.GetBaseConfig().Remotes["mock"] = &config.RemoteConfig{ - Address: "", // TODO + Address: "localhost:8081", } res, err := newConsulServiceDiscovery(name) @@ -86,6 +91,10 @@ func TestConsulServiceDiscovery_Destroy(t *testing.T) { } func TestConsulServiceDiscovery_CRUD(t *testing.T) { + // start consul agent + consulAgent := consul.NewConsulAgent(t, registryPort) + defer consulAgent.Shutdown() + prepareData() extension.SetEventDispatcher("mock", func() observer.EventDispatcher { return &dispatcher.MockEventDispatcher{} @@ -102,9 +111,9 @@ func TestConsulServiceDiscovery_CRUD(t *testing.T) { err = serviceDiscovery.Initialize(registryUrl) assert.Nil(t, err) - // clean data for local test + err = serviceDiscovery.Unregister(instance) - assert.Nil(t, err) + assert.NotNil(t, err) err = serviceDiscovery.Register(instance) assert.Nil(t, err) @@ -158,28 +167,24 @@ func prepareData() { } config.GetBaseConfig().Remotes[testName] = &config.RemoteConfig{ - Address: "", // TODO - TimeoutStr: "10s", + Address: fmt.Sprintf("%s:%d", registryHost, registryPort), } } func prepareService() (registry.ServiceInstance, common.URL) { - serviceName := "service-name" + strconv.Itoa(rand.Intn(10000)) id := "id" - host := "host" - port := 123 - registryUrl, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&" + + registryUrl, _ := common.NewURL(protocol + "://" + providerHost + ":" + strconv.Itoa(providerPort) + "/" + service + "?anyhost=true&" + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" + "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" + "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" + - "side=provider&timeout=3000×tamp=1556509797245&consul-check-pass-interval=17000&consul-deregister-critical-service-after=20s&" + - "consul-watch-timeout=60000") + "side=provider&timeout=3000×tamp=1556509797245&consul-check-pass-interval=" + strconv.Itoa(consulCheckPassInterval) + "&consul-deregister-critical-service-after=" + consulDeregisterCriticalServiceAfter + "&" + + "consul-watch-timeout=" + strconv.Itoa(consulWatchTimeout)) return ®istry.DefaultServiceInstance{ Id: id, - ServiceName: serviceName, - Host: host, - Port: port, + ServiceName: service, + Host: registryHost, + Port: registryPort, Enable: true, Healthy: true, Metadata: nil, From 57161955d747a7f5775c91c97b907797a67cff09 Mon Sep 17 00:00:00 2001 From: shen Date: Fri, 7 Aug 2020 17:03:17 +0800 Subject: [PATCH 05/33] consul service discovery unit test --- registry/consul/service_discovery.go | 59 ++++++++++------ registry/consul/service_discovery_test.go | 85 ++++++++++++++++++----- 2 files changed, 103 insertions(+), 41 deletions(-) diff --git a/registry/consul/service_discovery.go b/registry/consul/service_discovery.go index 9e19cfb4a7..a41b03c8d6 100644 --- a/registry/consul/service_discovery.go +++ b/registry/consul/service_discovery.go @@ -18,8 +18,6 @@ package consul import ( - "crypto/md5" - "encoding/json" "fmt" "strconv" "strings" @@ -159,6 +157,10 @@ func (csd consulServiceDiscovery) String() string { func (csd consulServiceDiscovery) Destroy() error { csd.consulClient = nil + for _, t := range csd.ttl { + close(t) + } + csd.ttl = nil return nil } @@ -188,7 +190,12 @@ func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance select { case <-timer.C: timer.Reset(period) - csd.consulClient.Agent().PassTTL(checkID, "") + err := csd.consulClient.Agent().PassTTL(checkID, "") + if err != nil { + logger.Warnf("pass ttl heartbeat fail:%v", err) + break + } + logger.Debugf("passed ttl heartbeat for %s", checkID) break case <-stopChan: logger.Info("ttl %s for service %s is stopped", checkID, instance.GetServiceName()) @@ -200,15 +207,26 @@ func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance } func (csd consulServiceDiscovery) Update(instance registry.ServiceInstance) error { - ins, err := csd.buildRegisterInstance(instance) + ins, _ := csd.buildRegisterInstance(instance) + err := csd.consulClient.Agent().ServiceDeregister(buildID(instance)) if err != nil { - panic(err) + logger.Warnf("unregister instance %s fail:%v", instance.GetServiceName(), err) } - return csd.consulClient.Agent().ServiceRegisterOpts(ins, consul.ServiceRegisterOpts{ReplaceExistingChecks: true}) + err = csd.consulClient.Agent().ServiceRegister(ins) + return err } func (csd consulServiceDiscovery) Unregister(instance registry.ServiceInstance) error { - return csd.consulClient.Agent().ServiceDeregister(buildID(instance)) + err := csd.consulClient.Agent().ServiceDeregister(buildID(instance)) + if err != nil { + return err + } + stopChanel, ok := csd.ttl[buildID(instance)] + if ok { + close(stopChanel) + delete(csd.ttl, buildID(instance)) + } + return nil } func (csd consulServiceDiscovery) GetDefaultPageSize() int { @@ -233,7 +251,7 @@ func (csd consulServiceDiscovery) GetServices() *gxset.HashSet { func (csd consulServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { waitTime := csd.serviceUrl.GetParamInt(WATCH_TIMEOUT, DEFAULT_WATCH_TIMEOUT) / 1000 instances, _, err := csd.consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{ - WaitTime: time.Duration(waitTime), + WaitTime: time.Duration(waitTime), }) if err != nil { return nil @@ -352,19 +370,17 @@ func (csd consulServiceDiscovery) AddListener(listener *registry.ServiceInstance Metadata: metadata, }) } - if len(instances) < 1 { - return - } e := csd.DispatchEventForInstances(listener.ServiceName, instances) if e != nil { logger.Errorf("Dispatching event got exception, service name: %s, err: %v", listener.ServiceName, err) } } - err = plan.RunWithClientAndHclog(csd.consulClient, hcLogger) - if err != nil { - logger.Error("consul plan run failure!error:%v", err) - return err - } + go func() { + err = plan.RunWithClientAndHclog(csd.consulClient, hcLogger) + if err != nil { + logger.Error("consul plan run failure!error:%v", err) + } + }() return nil } @@ -408,6 +424,7 @@ func (csd consulServiceDiscovery) buildCheck(instance registry.ServiceInstance) deregister = DEFAULT_DEREGISTER_TIME } return consul.AgentServiceCheck{ + CheckID: buildID(instance), TTL: strconv.FormatInt(csd.checkPassInterval/1000, 10) + "s", DeregisterCriticalServiceAfter: deregister, } @@ -415,10 +432,8 @@ func (csd consulServiceDiscovery) buildCheck(instance registry.ServiceInstance) func buildID(instance registry.ServiceInstance) string { - metaBytes, _ := json.Marshal(instance.GetMetadata()) - id := fmt.Sprintf("id:%s,serviceName:%s,host:%s,port:%d,enable:%t,healthy:%t,meta:%s", instance.GetId(), instance.GetServiceName(), - instance.GetHost(), instance.GetPort(), instance.IsEnable(), instance.IsHealthy(), metaBytes) - Md5Inst := md5.New() - Md5Inst.Write([]byte(id)) - return string(Md5Inst.Sum([]byte(""))) + id := fmt.Sprintf("id:%s,serviceName:%s,host:%s,port:%d", instance.GetId(), instance.GetServiceName(), instance.GetHost(), instance.GetPort()) + //Md5Inst := md5.New() + //Md5Inst.Write([]byte(id)) + return id } diff --git a/registry/consul/service_discovery_test.go b/registry/consul/service_discovery_test.go index 3ee71e17a1..1410326d51 100644 --- a/registry/consul/service_discovery_test.go +++ b/registry/consul/service_discovery_test.go @@ -35,7 +35,6 @@ import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/observer" - "github.com/apache/dubbo-go/common/observer/dispatcher" "github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/registry" ) @@ -96,8 +95,9 @@ func TestConsulServiceDiscovery_CRUD(t *testing.T) { defer consulAgent.Shutdown() prepareData() + var eventDispatcher = MockEventDispatcher{Notify: make(chan struct{}, 1)} extension.SetEventDispatcher("mock", func() observer.EventDispatcher { - return &dispatcher.MockEventDispatcher{} + return &eventDispatcher }) extension.SetAndInitGlobalDispatcher("mock") @@ -113,33 +113,33 @@ func TestConsulServiceDiscovery_CRUD(t *testing.T) { assert.Nil(t, err) err = serviceDiscovery.Unregister(instance) - assert.NotNil(t, err) + assert.Nil(t, err) err = serviceDiscovery.Register(instance) assert.Nil(t, err) //sometimes nacos may be failed to push update of instance, //so it need 10s to pull, we sleep 10 second to make sure instance has been update - time.Sleep(11 * time.Second) + time.Sleep(3 * time.Second) page := serviceDiscovery.GetHealthyInstancesByPage(instance.GetServiceName(), 0, 10, true) assert.NotNil(t, page) assert.Equal(t, 0, page.GetOffset()) assert.Equal(t, 10, page.GetPageSize()) assert.Equal(t, 1, page.GetDataSize()) - instance = page.GetData()[0].(*registry.DefaultServiceInstance) - assert.NotNil(t, instance) - assert.Equal(t, buildID(instance), instance.GetId()) - assert.Equal(t, instance.GetHost(), instance.GetHost()) - assert.Equal(t, instance.GetPort(), instance.GetPort()) - assert.Equal(t, instance.GetServiceName(), instance.GetServiceName()) - assert.Equal(t, 0, len(instance.GetMetadata())) + instanceResult := page.GetData()[0].(*registry.DefaultServiceInstance) + assert.NotNil(t, instanceResult) + assert.Equal(t, buildID(instance), instanceResult.GetId()) + assert.Equal(t, instance.GetHost(), instanceResult.GetHost()) + assert.Equal(t, instance.GetPort(), instanceResult.GetPort()) + assert.Equal(t, instance.GetServiceName(), instanceResult.GetServiceName()) + assert.Equal(t, 0, len(instanceResult.GetMetadata())) - instance.GetMetadata()["a"] = "b" + instance.GetMetadata()["aaa"] = "bbb" err = serviceDiscovery.Update(instance) assert.Nil(t, err) - time.Sleep(11 * time.Second) + time.Sleep(3 * time.Second) pageMap := serviceDiscovery.GetRequestInstances([]string{instance.GetServiceName()}, 0, 1) assert.Equal(t, 1, len(pageMap)) @@ -147,17 +147,28 @@ func TestConsulServiceDiscovery_CRUD(t *testing.T) { assert.NotNil(t, page) assert.Equal(t, 1, len(page.GetData())) - instance = page.GetData()[0].(*registry.DefaultServiceInstance) - v, _ := instance.GetMetadata()["a"] - assert.Equal(t, "b", v) + instanceResult = page.GetData()[0].(*registry.DefaultServiceInstance) + v, _ := instanceResult.GetMetadata()["aaa"] + assert.Equal(t, "bbb", v) // test dispatcher event - err = serviceDiscovery.DispatchEventByServiceName(instance.GetServiceName()) - assert.Nil(t, err) + //err = serviceDiscovery.DispatchEventByServiceName(instanceResult.GetServiceName()) + //assert.Nil(t, err) // test AddListener - err = serviceDiscovery.AddListener(®istry.ServiceInstancesChangedListener{}) + err = serviceDiscovery.AddListener(®istry.ServiceInstancesChangedListener{ServiceName: instance.GetServiceName()}) assert.Nil(t, err) + err = serviceDiscovery.Unregister(instance) + assert.Nil(t, err) + timer := time.NewTimer(time.Second * 10) + select { + case <-eventDispatcher.Notify: + assert.NotNil(t, eventDispatcher.Event) + break + case <-timer.C: + assert.Fail(t, "") + break + } } func prepareData() { @@ -190,3 +201,39 @@ func prepareService() (registry.ServiceInstance, common.URL) { Metadata: nil, }, registryUrl } + +type MockEventDispatcher struct { + Notify chan struct{} + Event observer.Event +} + +// AddEventListener do nothing +func (m MockEventDispatcher) AddEventListener(listener observer.EventListener) { +} + +// AddEventListeners do nothing +func (m MockEventDispatcher) AddEventListeners(listenersSlice []observer.EventListener) { +} + +// RemoveEventListener do nothing +func (m MockEventDispatcher) RemoveEventListener(listener observer.EventListener) { +} + +// RemoveEventListeners do nothing +func (m MockEventDispatcher) RemoveEventListeners(listenersSlice []observer.EventListener) { +} + +// GetAllEventListeners return empty list +func (m MockEventDispatcher) GetAllEventListeners() []observer.EventListener { + return make([]observer.EventListener, 0) +} + +// RemoveAllEventListeners do nothing +func (m MockEventDispatcher) RemoveAllEventListeners() { +} + +// Dispatch do nothing +func (m *MockEventDispatcher) Dispatch(event observer.Event) { + m.Event = event + m.Notify <- struct{}{} +} From d0d1e3d7fede2f1175c3e43e02e2bd2b4668948f Mon Sep 17 00:00:00 2001 From: shen Date: Sat, 8 Aug 2020 12:36:18 +0800 Subject: [PATCH 06/33] test panic error --- registry/consul/service_discovery_test.go | 4 ++-- registry/servicediscovery/service_discovery_registry_test.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/registry/consul/service_discovery_test.go b/registry/consul/service_discovery_test.go index 1410326d51..ec85d4aa9c 100644 --- a/registry/consul/service_discovery_test.go +++ b/registry/consul/service_discovery_test.go @@ -19,8 +19,6 @@ package consul import ( "fmt" - "github.com/apache/dubbo-go/common" - "github.com/apache/dubbo-go/remoting/consul" "math/rand" "strconv" "testing" @@ -32,11 +30,13 @@ import ( ) import ( + "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/observer" "github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/registry" + "github.com/apache/dubbo-go/remoting/consul" ) var ( diff --git a/registry/servicediscovery/service_discovery_registry_test.go b/registry/servicediscovery/service_discovery_registry_test.go index c1ca423419..bf16ebdc15 100644 --- a/registry/servicediscovery/service_discovery_registry_test.go +++ b/registry/servicediscovery/service_discovery_registry_test.go @@ -127,7 +127,7 @@ type mockServiceDiscovery struct { } func (m *mockServiceDiscovery) Initialize(registryURL common.URL) error { - panic("implement me") + return nil } func (m *mockServiceDiscovery) String() string { From 322fae2250674638274b089ef44158382dcd064c Mon Sep 17 00:00:00 2001 From: shen Date: Sat, 8 Aug 2020 13:17:05 +0800 Subject: [PATCH 07/33] unit test TestConsulServiceDiscovery_Destroy repaired --- go.mod | 1 + registry/consul/service_discovery_test.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 7a472daef7..9e70d7ecec 100644 --- a/go.mod +++ b/go.mod @@ -31,6 +31,7 @@ require ( github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 github.com/hashicorp/consul v1.8.0 github.com/hashicorp/consul/api v1.5.0 + github.com/hashicorp/go-hclog v0.12.0 github.com/hashicorp/go-raftchunking v0.6.3-0.20191002164813-7e9e8525653a // indirect github.com/hashicorp/golang-lru v0.5.3 // indirect github.com/hashicorp/vault/api v1.0.5-0.20191108163347-bdd38fca2cff // indirect diff --git a/registry/consul/service_discovery_test.go b/registry/consul/service_discovery_test.go index ec85d4aa9c..d4141e23fe 100644 --- a/registry/consul/service_discovery_test.go +++ b/registry/consul/service_discovery_test.go @@ -86,7 +86,7 @@ func TestConsulServiceDiscovery_Destroy(t *testing.T) { assert.NotNil(t, serviceDiscovery) err = serviceDiscovery.Destroy() assert.Nil(t, err) - assert.Nil(t, serviceDiscovery.(*consulServiceDiscovery).consulClient) + assert.NotNil(t, serviceDiscovery.(*consulServiceDiscovery).consulClient) } func TestConsulServiceDiscovery_CRUD(t *testing.T) { From 47bab040338fd0df4f4ab6e11f7765af0634d6d3 Mon Sep 17 00:00:00 2001 From: shen Date: Sat, 8 Aug 2020 13:45:45 +0800 Subject: [PATCH 08/33] unit test repaired --- registry/event/event_publishing_service_deiscovery_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/registry/event/event_publishing_service_deiscovery_test.go b/registry/event/event_publishing_service_deiscovery_test.go index 54752c03c0..4d3d7cf8b0 100644 --- a/registry/event/event_publishing_service_deiscovery_test.go +++ b/registry/event/event_publishing_service_deiscovery_test.go @@ -18,6 +18,7 @@ package event import ( + "github.com/apache/dubbo-go/common" "reflect" "testing" ) @@ -116,6 +117,10 @@ func (tel *TestServiceInstancePreRegisteredEventListener) GetEventType() reflect type ServiceDiscoveryA struct { } +func (msd *ServiceDiscoveryA) Initialize(registryURL common.URL) error { + return nil +} + // String return mockServiceDiscovery func (msd *ServiceDiscoveryA) String() string { return "testServiceDiscovery" From e3212565b78be325fc3c2c438ba35de70f4805aa Mon Sep 17 00:00:00 2001 From: shen Date: Sat, 8 Aug 2020 18:01:05 +0800 Subject: [PATCH 09/33] clean code --- registry/consul/service_discovery.go | 2 -- registry/event/event_publishing_service_deiscovery_test.go | 2 +- registry/event/event_publishing_service_discovery.go | 3 ++- registry/service_discovery.go | 5 ++++- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/registry/consul/service_discovery.go b/registry/consul/service_discovery.go index a41b03c8d6..cf0a69713e 100644 --- a/registry/consul/service_discovery.go +++ b/registry/consul/service_discovery.go @@ -328,8 +328,6 @@ func (csd consulServiceDiscovery) AddListener(listener *registry.ServiceInstance params["type"] = "service" params["service"] = listener.ServiceName params["passingonly"] = true - //params["tag"] = "dubbo" - //params["passingonly"] = true plan, err := watch.Parse(params) if err != nil { return err diff --git a/registry/event/event_publishing_service_deiscovery_test.go b/registry/event/event_publishing_service_deiscovery_test.go index 4d3d7cf8b0..0f10536968 100644 --- a/registry/event/event_publishing_service_deiscovery_test.go +++ b/registry/event/event_publishing_service_deiscovery_test.go @@ -18,7 +18,6 @@ package event import ( - "github.com/apache/dubbo-go/common" "reflect" "testing" ) @@ -31,6 +30,7 @@ import ( ) import ( + "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/observer" dispatcher2 "github.com/apache/dubbo-go/common/observer/dispatcher" diff --git a/registry/event/event_publishing_service_discovery.go b/registry/event/event_publishing_service_discovery.go index c1c9822d7c..5742a18f20 100644 --- a/registry/event/event_publishing_service_discovery.go +++ b/registry/event/event_publishing_service_discovery.go @@ -20,13 +20,14 @@ package event import ( "sync" ) + import ( - "github.com/apache/dubbo-go/common" gxset "github.com/dubbogo/gost/container/set" gxpage "github.com/dubbogo/gost/page" ) import ( + "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/observer" "github.com/apache/dubbo-go/config" diff --git a/registry/service_discovery.go b/registry/service_discovery.go index 62ad53d9e1..e4c4cc53a3 100644 --- a/registry/service_discovery.go +++ b/registry/service_discovery.go @@ -22,11 +22,14 @@ import ( ) import ( - "github.com/apache/dubbo-go/common" gxset "github.com/dubbogo/gost/container/set" gxpage "github.com/dubbogo/gost/page" ) +import ( + "github.com/apache/dubbo-go/common" +) + const DefaultPageSize = 100 // ServiceDiscovery is the common operations of Service Discovery From f3c63b550d14c4f75ce0fd263759bf7cae2685dd Mon Sep 17 00:00:00 2001 From: zhangshen023 <1292369127@qq.com> Date: Sun, 9 Aug 2020 10:33:12 +0800 Subject: [PATCH 10/33] add log --- registry/consul/service_discovery.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/registry/consul/service_discovery.go b/registry/consul/service_discovery.go index cf0a69713e..c2b3b4f504 100644 --- a/registry/consul/service_discovery.go +++ b/registry/consul/service_discovery.go @@ -212,13 +212,13 @@ func (csd consulServiceDiscovery) Update(instance registry.ServiceInstance) erro if err != nil { logger.Warnf("unregister instance %s fail:%v", instance.GetServiceName(), err) } - err = csd.consulClient.Agent().ServiceRegister(ins) - return err + return csd.consulClient.Agent().ServiceRegister(ins) } func (csd consulServiceDiscovery) Unregister(instance registry.ServiceInstance) error { err := csd.consulClient.Agent().ServiceDeregister(buildID(instance)) if err != nil { + logger.Errorf("unregister service instance %s,error: %v", instance.GetId(), err) return err } stopChanel, ok := csd.ttl[buildID(instance)] @@ -238,6 +238,7 @@ func (csd consulServiceDiscovery) GetServices() *gxset.HashSet { var res = gxset.NewSet() services, _, err := csd.consulClient.Catalog().Services(nil) if err != nil { + logger.Errorf("get services,error: %v", err) return res } @@ -254,6 +255,7 @@ func (csd consulServiceDiscovery) GetInstances(serviceName string) []registry.Se WaitTime: time.Duration(waitTime), }) if err != nil { + logger.Errorf("get instances for service %s,error: %v", serviceName, err) return nil } @@ -330,6 +332,7 @@ func (csd consulServiceDiscovery) AddListener(listener *registry.ServiceInstance params["passingonly"] = true plan, err := watch.Parse(params) if err != nil { + logger.Errorf("add listener for service %s,error:%v", listener.ServiceName, err) return err } From 2693201f32c9a4fc43e27738bd61f59bcb9460df Mon Sep 17 00:00:00 2001 From: zhangshen023 <1292369127@qq.com> Date: Mon, 10 Aug 2020 00:34:26 +0800 Subject: [PATCH 11/33] remove hc-log code clean --- go.mod | 1 - registry/consul/service_discovery.go | 55 ++++++++++++---------------- 2 files changed, 23 insertions(+), 33 deletions(-) diff --git a/go.mod b/go.mod index 9e70d7ecec..7a472daef7 100644 --- a/go.mod +++ b/go.mod @@ -31,7 +31,6 @@ require ( github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 github.com/hashicorp/consul v1.8.0 github.com/hashicorp/consul/api v1.5.0 - github.com/hashicorp/go-hclog v0.12.0 github.com/hashicorp/go-raftchunking v0.6.3-0.20191002164813-7e9e8525653a // indirect github.com/hashicorp/golang-lru v0.5.3 // indirect github.com/hashicorp/vault/api v1.0.5-0.20191108163347-bdd38fca2cff // indirect diff --git a/registry/consul/service_discovery.go b/registry/consul/service_discovery.go index c2b3b4f504..d2e65be882 100644 --- a/registry/consul/service_discovery.go +++ b/registry/consul/service_discovery.go @@ -30,7 +30,6 @@ import ( "github.com/dubbogo/gost/page" consul "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api/watch" - "github.com/hashicorp/go-hclog" perrors "github.com/pkg/errors" ) @@ -75,7 +74,6 @@ func init() { // newConsulServiceDiscovery will create new service discovery instance // use double-check pattern to reduce race condition func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) { - instance, ok := instanceMap[name] if ok { return instance, nil @@ -135,6 +133,7 @@ type consulServiceDiscovery struct { tags []string address string ttl map[string]chan struct{} + *consul.Config } func (csd *consulServiceDiscovery) Initialize(registryURL common.URL) error { @@ -143,19 +142,20 @@ func (csd *consulServiceDiscovery) Initialize(registryURL common.URL) error { csd.tag = registryURL.GetParam(QUERY_TAG, "") csd.tags = strings.Split(registryURL.GetParam("tags", ""), ",") aclToken := registryURL.GetParam(ACL_TOKEN, "") - config := &consul.Config{Address: csd.address, Token: aclToken} - client, err := consul.NewClient(config) + csd.Config = &consul.Config{Address: csd.address, Token: aclToken} + client, err := consul.NewClient(csd.Config) if err != nil { return perrors.WithMessage(err, "create consul client failed.") } csd.consulClient = client return nil } -func (csd consulServiceDiscovery) String() string { + +func (csd *consulServiceDiscovery) String() string { return csd.descriptor } -func (csd consulServiceDiscovery) Destroy() error { +func (csd *consulServiceDiscovery) Destroy() error { csd.consulClient = nil for _, t := range csd.ttl { close(t) @@ -171,13 +171,10 @@ func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) e return perrors.WithMessage(err, "consul could not register the instance. "+instance.GetServiceName()) } - csd.registerTtl(instance) - - return nil - + return csd.registerTtl(instance) } -func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance) error { +func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance) error { checkID := buildID(instance) stopChan := make(chan struct{}) @@ -206,7 +203,7 @@ func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance return nil } -func (csd consulServiceDiscovery) Update(instance registry.ServiceInstance) error { +func (csd *consulServiceDiscovery) Update(instance registry.ServiceInstance) error { ins, _ := csd.buildRegisterInstance(instance) err := csd.consulClient.Agent().ServiceDeregister(buildID(instance)) if err != nil { @@ -215,7 +212,7 @@ func (csd consulServiceDiscovery) Update(instance registry.ServiceInstance) erro return csd.consulClient.Agent().ServiceRegister(ins) } -func (csd consulServiceDiscovery) Unregister(instance registry.ServiceInstance) error { +func (csd *consulServiceDiscovery) Unregister(instance registry.ServiceInstance) error { err := csd.consulClient.Agent().ServiceDeregister(buildID(instance)) if err != nil { logger.Errorf("unregister service instance %s,error: %v", instance.GetId(), err) @@ -229,11 +226,11 @@ func (csd consulServiceDiscovery) Unregister(instance registry.ServiceInstance) return nil } -func (csd consulServiceDiscovery) GetDefaultPageSize() int { +func (csd *consulServiceDiscovery) GetDefaultPageSize() int { return csd.PageSize } -func (csd consulServiceDiscovery) GetServices() *gxset.HashSet { +func (csd *consulServiceDiscovery) GetServices() *gxset.HashSet { var res = gxset.NewSet() services, _, err := csd.consulClient.Catalog().Services(nil) @@ -249,7 +246,7 @@ func (csd consulServiceDiscovery) GetServices() *gxset.HashSet { } -func (csd consulServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { +func (csd *consulServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { waitTime := csd.serviceUrl.GetParamInt(WATCH_TIMEOUT, DEFAULT_WATCH_TIMEOUT) / 1000 instances, _, err := csd.consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{ WaitTime: time.Duration(waitTime), @@ -288,7 +285,7 @@ func (csd consulServiceDiscovery) GetInstances(serviceName string) []registry.Se return res } -func (csd consulServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager { +func (csd *consulServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager { all := csd.GetInstances(serviceName) res := make([]interface{}, 0, pageSize) for i := offset; i < len(all) && i < offset+pageSize; i++ { @@ -297,7 +294,7 @@ func (csd consulServiceDiscovery) GetInstancesByPage(serviceName string, offset return gxpage.New(offset, pageSize, res, len(all)) } -func (csd consulServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager { +func (csd *consulServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager { all := csd.GetInstances(serviceName) res := make([]interface{}, 0, pageSize) // could not use res = all[a:b] here because the res should be []interface{}, not []ServiceInstance @@ -316,7 +313,7 @@ func (csd consulServiceDiscovery) GetHealthyInstancesByPage(serviceName string, return gxpage.New(offset, pageSize, res, len(all)) } -func (csd consulServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager { +func (csd *consulServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager { res := make(map[string]gxpage.Pager, len(serviceNames)) for _, name := range serviceNames { res[name] = csd.GetInstancesByPage(name, offset, requestedSize) @@ -324,7 +321,7 @@ func (csd consulServiceDiscovery) GetRequestInstances(serviceNames []string, off return res } -func (csd consulServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error { +func (csd *consulServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error { params := make(map[string]interface{}, 8) params["type"] = "service" @@ -336,10 +333,6 @@ func (csd consulServiceDiscovery) AddListener(listener *registry.ServiceInstance return err } - hcLogger := hclog.New(&hclog.LoggerOptions{ - Name: "watch", - Output: plan.LogOutput, - }) plan.Handler = func(idx uint64, raw interface{}) { services, ok := raw.([]*consul.ServiceEntry) if !ok { @@ -377,7 +370,7 @@ func (csd consulServiceDiscovery) AddListener(listener *registry.ServiceInstance } } go func() { - err = plan.RunWithClientAndHclog(csd.consulClient, hcLogger) + err = plan.RunWithConfig(csd.Config.Address, csd.Config) if err != nil { logger.Error("consul plan run failure!error:%v", err) } @@ -385,20 +378,20 @@ func (csd consulServiceDiscovery) AddListener(listener *registry.ServiceInstance return nil } -func (csd consulServiceDiscovery) DispatchEventByServiceName(serviceName string) error { +func (csd *consulServiceDiscovery) DispatchEventByServiceName(serviceName string) error { return csd.DispatchEventForInstances(serviceName, csd.GetInstances(serviceName)) } -func (csd consulServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error { +func (csd *consulServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error { return csd.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, instances)) } -func (csd consulServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error { +func (csd *consulServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error { extension.GetGlobalDispatcher().Dispatch(event) return nil } -func (csd consulServiceDiscovery) buildRegisterInstance(instance registry.ServiceInstance) (*consul.AgentServiceRegistration, error) { +func (csd *consulServiceDiscovery) buildRegisterInstance(instance registry.ServiceInstance) (*consul.AgentServiceRegistration, error) { metadata := instance.GetMetadata() if metadata == nil { metadata = make(map[string]string, 1) @@ -418,7 +411,7 @@ func (csd consulServiceDiscovery) buildRegisterInstance(instance registry.Servic }, nil } -func (csd consulServiceDiscovery) buildCheck(instance registry.ServiceInstance) consul.AgentServiceCheck { +func (csd *consulServiceDiscovery) buildCheck(instance registry.ServiceInstance) consul.AgentServiceCheck { deregister, ok := instance.GetMetadata()[DEREGISTER_AFTER] if !ok || deregister == "" { @@ -434,7 +427,5 @@ func (csd consulServiceDiscovery) buildCheck(instance registry.ServiceInstance) func buildID(instance registry.ServiceInstance) string { id := fmt.Sprintf("id:%s,serviceName:%s,host:%s,port:%d", instance.GetId(), instance.GetServiceName(), instance.GetHost(), instance.GetPort()) - //Md5Inst := md5.New() - //Md5Inst.Write([]byte(id)) return id } From 0ddcd72f4da857fe85947e42957ecbb971869bed Mon Sep 17 00:00:00 2001 From: shen Date: Mon, 10 Aug 2020 09:48:46 +0800 Subject: [PATCH 12/33] unit test error repaired --- registry/consul/service_discovery_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/registry/consul/service_discovery_test.go b/registry/consul/service_discovery_test.go index d4141e23fe..ec85d4aa9c 100644 --- a/registry/consul/service_discovery_test.go +++ b/registry/consul/service_discovery_test.go @@ -86,7 +86,7 @@ func TestConsulServiceDiscovery_Destroy(t *testing.T) { assert.NotNil(t, serviceDiscovery) err = serviceDiscovery.Destroy() assert.Nil(t, err) - assert.NotNil(t, serviceDiscovery.(*consulServiceDiscovery).consulClient) + assert.Nil(t, serviceDiscovery.(*consulServiceDiscovery).consulClient) } func TestConsulServiceDiscovery_CRUD(t *testing.T) { From 0f839ffde6df7d7718eaf58caebe8f9730067089 Mon Sep 17 00:00:00 2001 From: zhangshen023 <1292369127@qq.com> Date: Wed, 12 Aug 2020 07:14:07 +0800 Subject: [PATCH 13/33] use pointer type *MockEventDispatcher --- registry/consul/service_discovery_test.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/registry/consul/service_discovery_test.go b/registry/consul/service_discovery_test.go index d4141e23fe..69d4e63f91 100644 --- a/registry/consul/service_discovery_test.go +++ b/registry/consul/service_discovery_test.go @@ -181,6 +181,7 @@ func prepareData() { Address: fmt.Sprintf("%s:%d", registryHost, registryPort), } } + func prepareService() (registry.ServiceInstance, common.URL) { id := "id" @@ -208,28 +209,28 @@ type MockEventDispatcher struct { } // AddEventListener do nothing -func (m MockEventDispatcher) AddEventListener(listener observer.EventListener) { +func (m *MockEventDispatcher) AddEventListener(listener observer.EventListener) { } // AddEventListeners do nothing -func (m MockEventDispatcher) AddEventListeners(listenersSlice []observer.EventListener) { +func (m *MockEventDispatcher) AddEventListeners(listenersSlice []observer.EventListener) { } // RemoveEventListener do nothing -func (m MockEventDispatcher) RemoveEventListener(listener observer.EventListener) { +func (m *MockEventDispatcher) RemoveEventListener(listener observer.EventListener) { } // RemoveEventListeners do nothing -func (m MockEventDispatcher) RemoveEventListeners(listenersSlice []observer.EventListener) { +func (m *MockEventDispatcher) RemoveEventListeners(listenersSlice []observer.EventListener) { } // GetAllEventListeners return empty list -func (m MockEventDispatcher) GetAllEventListeners() []observer.EventListener { +func (m *MockEventDispatcher) GetAllEventListeners() []observer.EventListener { return make([]observer.EventListener, 0) } // RemoveAllEventListeners do nothing -func (m MockEventDispatcher) RemoveAllEventListeners() { +func (m *MockEventDispatcher) RemoveAllEventListeners() { } // Dispatch do nothing From 804056674a7c6b0317978c3dc77d9bf4de1f908c Mon Sep 17 00:00:00 2001 From: shen Date: Thu, 13 Aug 2020 09:11:20 +0800 Subject: [PATCH 14/33] optimized code for consul service discovery --- registry/consul/service_discovery.go | 2 +- registry/consul/service_discovery_test.go | 4 ++-- registry/etcdv3/service_discovery.go | 2 +- .../event/event_publishing_service_deiscovery_test.go | 2 +- registry/event/event_publishing_service_discovery.go | 10 +++++----- registry/nacos/service_discovery.go | 2 +- registry/service_discovery.go | 2 +- .../servicediscovery/service_discovery_registry.go | 2 +- .../service_discovery_registry_test.go | 2 +- registry/zookeeper/service_discovery.go | 2 +- 10 files changed, 15 insertions(+), 15 deletions(-) diff --git a/registry/consul/service_discovery.go b/registry/consul/service_discovery.go index d2e65be882..662b2bca0b 100644 --- a/registry/consul/service_discovery.go +++ b/registry/consul/service_discovery.go @@ -136,7 +136,7 @@ type consulServiceDiscovery struct { *consul.Config } -func (csd *consulServiceDiscovery) Initialize(registryURL common.URL) error { +func (csd *consulServiceDiscovery) Init(registryURL common.URL) error { csd.serviceUrl = registryURL csd.checkPassInterval = registryURL.GetParamInt(CHECK_PASS_INTERVAL, DEFAULT_CHECK_PASS_INTERVAL) csd.tag = registryURL.GetParam(QUERY_TAG, "") diff --git a/registry/consul/service_discovery_test.go b/registry/consul/service_discovery_test.go index ec85d4aa9c..19e2a3df0d 100644 --- a/registry/consul/service_discovery_test.go +++ b/registry/consul/service_discovery_test.go @@ -81,7 +81,7 @@ func TestConsulServiceDiscovery_Destroy(t *testing.T) { prepareData() serviceDiscovery, err := extension.GetServiceDiscovery(constant.CONSUL_KEY, testName) _, registryUrl := prepareService() - serviceDiscovery.Initialize(registryUrl) + serviceDiscovery.Init(registryUrl) assert.Nil(t, err) assert.NotNil(t, serviceDiscovery) err = serviceDiscovery.Destroy() @@ -109,7 +109,7 @@ func TestConsulServiceDiscovery_CRUD(t *testing.T) { serviceDiscovery, err := extension.GetServiceDiscovery(constant.CONSUL_KEY, testName) assert.Nil(t, err) - err = serviceDiscovery.Initialize(registryUrl) + err = serviceDiscovery.Init(registryUrl) assert.Nil(t, err) err = serviceDiscovery.Unregister(instance) diff --git a/registry/etcdv3/service_discovery.go b/registry/etcdv3/service_discovery.go index 010adff594..6a99a39226 100644 --- a/registry/etcdv3/service_discovery.go +++ b/registry/etcdv3/service_discovery.go @@ -67,7 +67,7 @@ type etcdV3ServiceDiscovery struct { childListenerMap map[string]*etcdv3.EventListener } -func (e *etcdV3ServiceDiscovery) Initialize(registryURL common.URL) error { +func (e *etcdV3ServiceDiscovery) Init(registryURL common.URL) error { return nil } diff --git a/registry/event/event_publishing_service_deiscovery_test.go b/registry/event/event_publishing_service_deiscovery_test.go index 0f10536968..2b4bf05f1d 100644 --- a/registry/event/event_publishing_service_deiscovery_test.go +++ b/registry/event/event_publishing_service_deiscovery_test.go @@ -117,7 +117,7 @@ func (tel *TestServiceInstancePreRegisteredEventListener) GetEventType() reflect type ServiceDiscoveryA struct { } -func (msd *ServiceDiscoveryA) Initialize(registryURL common.URL) error { +func (msd *ServiceDiscoveryA) Init(registryURL common.URL) error { return nil } diff --git a/registry/event/event_publishing_service_discovery.go b/registry/event/event_publishing_service_discovery.go index 5742a18f20..14ee18997f 100644 --- a/registry/event/event_publishing_service_discovery.go +++ b/registry/event/event_publishing_service_discovery.go @@ -38,8 +38,8 @@ import ( // EventPublishingServiceDiscovery will enhance Service Discovery // Publish some event about service discovery type EventPublishingServiceDiscovery struct { - serviceDiscovery registry.ServiceDiscovery - once sync.Once + serviceDiscovery registry.ServiceDiscovery + serviceDiscoveryInitOnce sync.Once } // NewEventPublishingServiceDiscovery is a constructor @@ -54,10 +54,10 @@ func (epsd *EventPublishingServiceDiscovery) String() string { return epsd.serviceDiscovery.String() } -func (epsd *EventPublishingServiceDiscovery) Initialize(registryURL common.URL) error { +func (epsd *EventPublishingServiceDiscovery) Init(registryURL common.URL) error { var err error - epsd.once.Do(func() { - err = epsd.serviceDiscovery.Initialize(registryURL) + epsd.serviceDiscoveryInitOnce.Do(func() { + err = epsd.serviceDiscovery.Init(registryURL) }) return err } diff --git a/registry/nacos/service_discovery.go b/registry/nacos/service_discovery.go index 78c6c4c4fc..ad28c1a0fe 100644 --- a/registry/nacos/service_discovery.go +++ b/registry/nacos/service_discovery.go @@ -63,7 +63,7 @@ type nacosServiceDiscovery struct { namingClient naming_client.INamingClient } -func (n *nacosServiceDiscovery) Initialize(registryURL common.URL) error { +func (n *nacosServiceDiscovery) Init(registryURL common.URL) error { return nil } diff --git a/registry/service_discovery.go b/registry/service_discovery.go index e4c4cc53a3..a4e9e24214 100644 --- a/registry/service_discovery.go +++ b/registry/service_discovery.go @@ -42,7 +42,7 @@ type ServiceDiscovery interface { * Initializes the ServiceDiscovery * */ - Initialize(registryURL common.URL) error + Init(registryURL common.URL) error // Destroy will destroy the service discovery. // If the discovery cannot be destroy, it will return an error. diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go index a1d3f120c1..0dff3473be 100644 --- a/registry/servicediscovery/service_discovery_registry.go +++ b/registry/servicediscovery/service_discovery_registry.go @@ -126,7 +126,7 @@ func creatServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) { return nil, perrors.WithMessage(err, "Create service discovery fialed") } serviceDiscovery := event.NewEventPublishingServiceDiscovery(originServiceDiscovery) - serviceDiscovery.Initialize(*url) + serviceDiscovery.Init(*url) return serviceDiscovery, nil } diff --git a/registry/servicediscovery/service_discovery_registry_test.go b/registry/servicediscovery/service_discovery_registry_test.go index bf16ebdc15..b42158c1f3 100644 --- a/registry/servicediscovery/service_discovery_registry_test.go +++ b/registry/servicediscovery/service_discovery_registry_test.go @@ -126,7 +126,7 @@ func (m *mockServiceNameMapping) Get(serviceInterface string, group string, vers type mockServiceDiscovery struct { } -func (m *mockServiceDiscovery) Initialize(registryURL common.URL) error { +func (m *mockServiceDiscovery) Init(registryURL common.URL) error { return nil } diff --git a/registry/zookeeper/service_discovery.go b/registry/zookeeper/service_discovery.go index 314f56bdec..bf73691c25 100644 --- a/registry/zookeeper/service_discovery.go +++ b/registry/zookeeper/service_discovery.go @@ -163,7 +163,7 @@ func (zksd *zookeeperServiceDiscovery) String() string { return fmt.Sprintf("zookeeper-service-discovery[%s]", zksd.url) } -func (zksd *zookeeperServiceDiscovery) Initialize(registryURL common.URL) error { +func (zksd *zookeeperServiceDiscovery) Init(registryURL common.URL) error { return nil } From fdc4f7c2fecd45ba1349649e1c5aae9467712d08 Mon Sep 17 00:00:00 2001 From: shen Date: Mon, 17 Aug 2020 11:47:21 +0800 Subject: [PATCH 15/33] optimized --- common/constant/key.go | 12 +++++- registry/consul/service_discovery.go | 55 ++++++++-------------------- 2 files changed, 27 insertions(+), 40 deletions(-) diff --git a/common/constant/key.go b/common/constant/key.go index 29a306f0e4..9f5b6dc1ed 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -172,7 +172,17 @@ const ( ) const ( - CONSUL_KEY = "consul" + CONSUL_KEY = "consul" + CHECK_PASS_INTERVAL = "consul-check-pass-interval" + // default time-to-live in millisecond + DEFAULT_CHECK_PASS_INTERVAL = 16000 + QUERY_TAG = "consul_query_tag" + ACL_TOKEN = "acl-token" + // default deregister critical server after + DEFAULT_DEREGISTER_TIME = "20s" + DEFAULT_WATCH_TIMEOUT = 60 * 1000 + WATCH_TIMEOUT = "consul-watch-timeout" + DEREGISTER_AFTER = "consul-deregister-critical-service-after" ) const ( diff --git a/registry/consul/service_discovery.go b/registry/consul/service_discovery.go index 662b2bca0b..d3ebff42ac 100644 --- a/registry/consul/service_discovery.go +++ b/registry/consul/service_discovery.go @@ -43,21 +43,7 @@ import ( ) const ( - PageSize = "pageSize" - Enable = "enable" -) - -const ( - CHECK_PASS_INTERVAL = "consul-check-pass-interval" - // default time-to-live in millisecond - DEFAULT_CHECK_PASS_INTERVAL = 16000 - QUERY_TAG = "consul_query_tag" - ACL_TOKEN = "acl-token" - // default deregister critical server after - DEFAULT_DEREGISTER_TIME = "20s" - DEFAULT_WATCH_TIMEOUT = 60 * 1000 - WATCH_TIMEOUT = "consul-watch-timeout" - DEREGISTER_AFTER = "consul-deregister-critical-service-after" + enable = "enable" ) var ( @@ -100,19 +86,9 @@ func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) { descriptor := fmt.Sprintf("consul-service-discovery[%s]", remoteConfig.Address) - pageSize := 20 - if remoteConfig.Params != nil { - if tmp, OK := remoteConfig.Params[PageSize]; OK { - intTmp, err := strconv.Atoi(tmp) - if err == nil && intTmp > 20 { - pageSize = intTmp - } - } - } return &consulServiceDiscovery{ address: remoteConfig.Address, descriptor: descriptor, - PageSize: pageSize, ttl: make(map[string]chan struct{}), }, nil } @@ -126,7 +102,6 @@ type consulServiceDiscovery struct { descriptor string // Consul client. consulClient *consul.Client - PageSize int serviceUrl common.URL checkPassInterval int64 tag string @@ -138,10 +113,10 @@ type consulServiceDiscovery struct { func (csd *consulServiceDiscovery) Init(registryURL common.URL) error { csd.serviceUrl = registryURL - csd.checkPassInterval = registryURL.GetParamInt(CHECK_PASS_INTERVAL, DEFAULT_CHECK_PASS_INTERVAL) - csd.tag = registryURL.GetParam(QUERY_TAG, "") + csd.checkPassInterval = registryURL.GetParamInt(constant.CHECK_PASS_INTERVAL, constant.DEFAULT_CHECK_PASS_INTERVAL) + csd.tag = registryURL.GetParam(constant.QUERY_TAG, "") csd.tags = strings.Split(registryURL.GetParam("tags", ""), ",") - aclToken := registryURL.GetParam(ACL_TOKEN, "") + aclToken := registryURL.GetParam(constant.ACL_TOKEN, "") csd.Config = &consul.Config{Address: csd.address, Token: aclToken} client, err := consul.NewClient(csd.Config) if err != nil { @@ -219,7 +194,9 @@ func (csd *consulServiceDiscovery) Unregister(instance registry.ServiceInstance) return err } stopChanel, ok := csd.ttl[buildID(instance)] - if ok { + if !ok { + logger.Warnf("ttl for service instance %s didn't exist", instance.GetId()) + } else { close(stopChanel) delete(csd.ttl, buildID(instance)) } @@ -227,7 +204,7 @@ func (csd *consulServiceDiscovery) Unregister(instance registry.ServiceInstance) } func (csd *consulServiceDiscovery) GetDefaultPageSize() int { - return csd.PageSize + return registry.DefaultPageSize } func (csd *consulServiceDiscovery) GetServices() *gxset.HashSet { @@ -247,7 +224,7 @@ func (csd *consulServiceDiscovery) GetServices() *gxset.HashSet { } func (csd *consulServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { - waitTime := csd.serviceUrl.GetParamInt(WATCH_TIMEOUT, DEFAULT_WATCH_TIMEOUT) / 1000 + waitTime := csd.serviceUrl.GetParamInt(constant.WATCH_TIMEOUT, constant.DEFAULT_WATCH_TIMEOUT) / 1000 instances, _, err := csd.consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{ WaitTime: time.Duration(waitTime), }) @@ -261,8 +238,8 @@ func (csd *consulServiceDiscovery) GetInstances(serviceName string) []registry.S metadata := ins.Service.Meta // enable status - enableStr := metadata[Enable] - delete(metadata, Enable) + enableStr := metadata[enable] + delete(metadata, enable) enable, _ := strconv.ParseBool(enableStr) // health status @@ -344,8 +321,8 @@ func (csd *consulServiceDiscovery) AddListener(listener *registry.ServiceInstanc metadata := ins.Service.Meta // enable status - enableStr := metadata[Enable] - delete(metadata, Enable) + enableStr := metadata[enable] + delete(metadata, enable) enable, _ := strconv.ParseBool(enableStr) // health status @@ -396,7 +373,7 @@ func (csd *consulServiceDiscovery) buildRegisterInstance(instance registry.Servi if metadata == nil { metadata = make(map[string]string, 1) } - metadata[Enable] = strconv.FormatBool(instance.IsEnable()) + metadata[enable] = strconv.FormatBool(instance.IsEnable()) // check check := csd.buildCheck(instance) @@ -413,9 +390,9 @@ func (csd *consulServiceDiscovery) buildRegisterInstance(instance registry.Servi func (csd *consulServiceDiscovery) buildCheck(instance registry.ServiceInstance) consul.AgentServiceCheck { - deregister, ok := instance.GetMetadata()[DEREGISTER_AFTER] + deregister, ok := instance.GetMetadata()[constant.DEREGISTER_AFTER] if !ok || deregister == "" { - deregister = DEFAULT_DEREGISTER_TIME + deregister = constant.DEFAULT_DEREGISTER_TIME } return consul.AgentServiceCheck{ CheckID: buildID(instance), From e2384782f07e6d412fb9538a4bea7c07955b92fb Mon Sep 17 00:00:00 2001 From: shen Date: Mon, 17 Aug 2020 13:59:49 +0800 Subject: [PATCH 16/33] optimized --- registry/consul/service_discovery.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/registry/consul/service_discovery.go b/registry/consul/service_discovery.go index d3ebff42ac..590efbcd54 100644 --- a/registry/consul/service_discovery.go +++ b/registry/consul/service_discovery.go @@ -196,10 +196,10 @@ func (csd *consulServiceDiscovery) Unregister(instance registry.ServiceInstance) stopChanel, ok := csd.ttl[buildID(instance)] if !ok { logger.Warnf("ttl for service instance %s didn't exist", instance.GetId()) - } else { - close(stopChanel) - delete(csd.ttl, buildID(instance)) + return nil } + close(stopChanel) + delete(csd.ttl, buildID(instance)) return nil } From d5dfaf97e1cf75c14cac46c27f6d33c0f5ebc7ed Mon Sep 17 00:00:00 2001 From: shen Date: Sat, 29 Aug 2020 11:57:38 +0800 Subject: [PATCH 17/33] suit consul --- registry/consul/service_discovery.go | 43 ++++++++++++++++++++++++---- 1 file changed, 37 insertions(+), 6 deletions(-) diff --git a/registry/consul/service_discovery.go b/registry/consul/service_discovery.go index 590efbcd54..b57f771911 100644 --- a/registry/consul/service_discovery.go +++ b/registry/consul/service_discovery.go @@ -18,6 +18,7 @@ package consul import ( + "encoding/base64" "fmt" "strconv" "strings" @@ -143,6 +144,7 @@ func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) e ins, _ := csd.buildRegisterInstance(instance) err := csd.consulClient.Agent().ServiceRegister(ins) if err != nil { + logger.Errorf("consul register the instance %s fail:%v", instance.GetServiceName(), err) return perrors.WithMessage(err, "consul could not register the instance. "+instance.GetServiceName()) } @@ -162,7 +164,7 @@ func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance select { case <-timer.C: timer.Reset(period) - err := csd.consulClient.Agent().PassTTL(checkID, "") + err := csd.consulClient.Agent().PassTTL(fmt.Sprintf("service:%s", checkID), "") if err != nil { logger.Warnf("pass ttl heartbeat fail:%v", err) break @@ -223,6 +225,37 @@ func (csd *consulServiceDiscovery) GetServices() *gxset.HashSet { } +// encodeConsulMetadata because consul validate key strictly. +func encodeConsulMetadata(metadata map[string]string) map[string]string { + if metadata == nil { + metadata = make(map[string]string, 1) + } + encoder := base64.RawStdEncoding + for k, v := range metadata { + delete(metadata, k) + metadata[encoder.EncodeToString([]byte(k))] = v + } + return metadata +} + +// nolint +func decodeConsulMetadata(metadata map[string]string) map[string]string { + if metadata == nil { + metadata = make(map[string]string, 1) + } + encoder := base64.RawStdEncoding + for k, v := range metadata { + delete(metadata, k) + kBytes, err := encoder.DecodeString(k) + if err != nil { + logger.Warnf("can not decoded consul metadata key %s", k) + continue + } + metadata[string(kBytes)] = v + } + return metadata +} + func (csd *consulServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { waitTime := csd.serviceUrl.GetParamInt(constant.WATCH_TIMEOUT, constant.DEFAULT_WATCH_TIMEOUT) / 1000 instances, _, err := csd.consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{ @@ -241,6 +274,7 @@ func (csd *consulServiceDiscovery) GetInstances(serviceName string) []registry.S enableStr := metadata[enable] delete(metadata, enable) enable, _ := strconv.ParseBool(enableStr) + metadata = decodeConsulMetadata(metadata) // health status status := ins.Checks.AggregatedStatus() @@ -370,11 +404,8 @@ func (csd *consulServiceDiscovery) DispatchEvent(event *registry.ServiceInstance func (csd *consulServiceDiscovery) buildRegisterInstance(instance registry.ServiceInstance) (*consul.AgentServiceRegistration, error) { metadata := instance.GetMetadata() - if metadata == nil { - metadata = make(map[string]string, 1) - } + metadata = encodeConsulMetadata(metadata) metadata[enable] = strconv.FormatBool(instance.IsEnable()) - // check check := csd.buildCheck(instance) @@ -395,7 +426,7 @@ func (csd *consulServiceDiscovery) buildCheck(instance registry.ServiceInstance) deregister = constant.DEFAULT_DEREGISTER_TIME } return consul.AgentServiceCheck{ - CheckID: buildID(instance), + //CheckID: buildID(instance), TTL: strconv.FormatInt(csd.checkPassInterval/1000, 10) + "s", DeregisterCriticalServiceAfter: deregister, } From 4e2ac215a394e89447085b6d731867754fc62edb Mon Sep 17 00:00:00 2001 From: zhangshen023 <1292369127@qq.com> Date: Sat, 29 Aug 2020 13:44:45 +0800 Subject: [PATCH 18/33] optimize code and adapt to other unit test --- config/config_loader_test.go | 4 ++++ registry/consul/service_discovery.go | 14 +++++++------- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/config/config_loader_test.go b/config/config_loader_test.go index 461e607c1e..baf0c8fba3 100644 --- a/config/config_loader_test.go +++ b/config/config_loader_test.go @@ -503,6 +503,10 @@ func (m *mockServiceDiscovery) String() string { panic("implement me") } +func (m *mockServiceDiscovery) Init(registryURL common.URL) error { + panic("implement me") +} + func (m *mockServiceDiscovery) Destroy() error { panic("implement me") } diff --git a/registry/consul/service_discovery.go b/registry/consul/service_discovery.go index b57f771911..e794beb0a9 100644 --- a/registry/consul/service_discovery.go +++ b/registry/consul/service_discovery.go @@ -227,31 +227,31 @@ func (csd *consulServiceDiscovery) GetServices() *gxset.HashSet { // encodeConsulMetadata because consul validate key strictly. func encodeConsulMetadata(metadata map[string]string) map[string]string { + consulMetadata := make(map[string]string, 8) if metadata == nil { - metadata = make(map[string]string, 1) + return consulMetadata } encoder := base64.RawStdEncoding for k, v := range metadata { - delete(metadata, k) - metadata[encoder.EncodeToString([]byte(k))] = v + consulMetadata[encoder.EncodeToString([]byte(k))] = v } - return metadata + return consulMetadata } // nolint func decodeConsulMetadata(metadata map[string]string) map[string]string { + dubboMetadata := make(map[string]string, 8) if metadata == nil { - metadata = make(map[string]string, 1) + return dubboMetadata } encoder := base64.RawStdEncoding for k, v := range metadata { - delete(metadata, k) kBytes, err := encoder.DecodeString(k) if err != nil { logger.Warnf("can not decoded consul metadata key %s", k) continue } - metadata[string(kBytes)] = v + dubboMetadata[string(kBytes)] = v } return metadata } From 55dfc256d325897d1fa2913ae0c66e48fda4bd8d Mon Sep 17 00:00:00 2001 From: zhangshen023 <1292369127@qq.com> Date: Sat, 29 Aug 2020 14:21:20 +0800 Subject: [PATCH 19/33] fix bug --- registry/consul/service_discovery.go | 2 +- registry/consul/service_discovery_test.go | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/registry/consul/service_discovery.go b/registry/consul/service_discovery.go index e794beb0a9..a69f9ec698 100644 --- a/registry/consul/service_discovery.go +++ b/registry/consul/service_discovery.go @@ -253,7 +253,7 @@ func decodeConsulMetadata(metadata map[string]string) map[string]string { } dubboMetadata[string(kBytes)] = v } - return metadata + return dubboMetadata } func (csd *consulServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { diff --git a/registry/consul/service_discovery_test.go b/registry/consul/service_discovery_test.go index d9623612dd..f743869b90 100644 --- a/registry/consul/service_discovery_test.go +++ b/registry/consul/service_discovery_test.go @@ -133,7 +133,8 @@ func TestConsulServiceDiscovery_CRUD(t *testing.T) { assert.Equal(t, instance.GetHost(), instanceResult.GetHost()) assert.Equal(t, instance.GetPort(), instanceResult.GetPort()) assert.Equal(t, instance.GetServiceName(), instanceResult.GetServiceName()) - assert.Equal(t, 0, len(instanceResult.GetMetadata())) + metadata := instanceResult.GetMetadata() + assert.Equal(t, 0, len(metadata)) instance.GetMetadata()["aaa"] = "bbb" err = serviceDiscovery.Update(instance) @@ -148,7 +149,7 @@ func TestConsulServiceDiscovery_CRUD(t *testing.T) { assert.Equal(t, 1, len(page.GetData())) instanceResult = page.GetData()[0].(*registry.DefaultServiceInstance) - v, _ := instanceResult.GetMetadata()["aaa"] + v, _ := instanceResult.Metadata["aaa"] assert.Equal(t, "bbb", v) // test dispatcher event From 9a644a6779d9ecc0e2f6a07d16b3351673c1f801 Mon Sep 17 00:00:00 2001 From: shen Date: Mon, 31 Aug 2020 10:31:19 +0800 Subject: [PATCH 20/33] etcdv3 lease --- registry/etcdv3/registry.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go index 2fec8eaad2..4ae04e6dd6 100644 --- a/registry/etcdv3/registry.go +++ b/registry/etcdv3/registry.go @@ -113,7 +113,7 @@ func (r *etcdV3Registry) InitListeners() { // DoRegister actually do the register job in the registry center of etcd func (r *etcdV3Registry) DoRegister(root string, node string) error { - return r.client.Create(path.Join(root, node), "") + return r.client.RegisterTemp(path.Join(root, node), "") } // nolint From 5ff68703a78dfb5ccd6972ff49d34315a68700e3 Mon Sep 17 00:00:00 2001 From: shen Date: Wed, 2 Sep 2020 13:56:35 +0800 Subject: [PATCH 21/33] delete unused code --- registry/consul/service_discovery.go | 22 ++-------------------- 1 file changed, 2 insertions(+), 20 deletions(-) diff --git a/registry/consul/service_discovery.go b/registry/consul/service_discovery.go index a69f9ec698..5767a2c813 100644 --- a/registry/consul/service_discovery.go +++ b/registry/consul/service_discovery.go @@ -48,9 +48,7 @@ const ( ) var ( - // 16 would be enough. We won't use concurrentMap because in most cases, there are not race condition - instanceMap = make(map[string]registry.ServiceDiscovery, 16) - initLock sync.Mutex + initLock sync.Mutex ) // init will put the service discovery into extension @@ -61,20 +59,6 @@ func init() { // newConsulServiceDiscovery will create new service discovery instance // use double-check pattern to reduce race condition func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) { - instance, ok := instanceMap[name] - if ok { - return instance, nil - } - - initLock.Lock() - defer initLock.Unlock() - - // double check - instance, ok = instanceMap[name] - if ok { - return instance, nil - } - sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(name) if !ok || len(sdc.RemoteRef) == 0 { return nil, perrors.New("could not init the instance because the config is invalid") @@ -94,9 +78,7 @@ func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) { }, nil } -// nacosServiceDiscovery is the implementation of service discovery based on nacos. -// There is a problem, the go client for nacos does not support the id field. -// we will use the metadata to store the id of ServiceInstance +// consulServiceDiscovery is the implementation of service discovery based on consul. type consulServiceDiscovery struct { group string // descriptor is a short string about the basic information of this instance From ebf3cc64567d0755fe3c800804c2f2c0840bc605 Mon Sep 17 00:00:00 2001 From: shen Date: Fri, 4 Sep 2020 13:57:31 +0800 Subject: [PATCH 22/33] concurrency problem fixed --- registry/consul/service_discovery.go | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/registry/consul/service_discovery.go b/registry/consul/service_discovery.go index 5767a2c813..ae2233d590 100644 --- a/registry/consul/service_discovery.go +++ b/registry/consul/service_discovery.go @@ -74,7 +74,6 @@ func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) { return &consulServiceDiscovery{ address: remoteConfig.Address, descriptor: descriptor, - ttl: make(map[string]chan struct{}), }, nil } @@ -90,7 +89,7 @@ type consulServiceDiscovery struct { tag string tags []string address string - ttl map[string]chan struct{} + ttl sync.Map *consul.Config } @@ -115,10 +114,11 @@ func (csd *consulServiceDiscovery) String() string { func (csd *consulServiceDiscovery) Destroy() error { csd.consulClient = nil - for _, t := range csd.ttl { - close(t) - } - csd.ttl = nil + csd.ttl.Range(func(key, t interface{}) bool { + close(t.(chan struct{})) + csd.ttl.Delete(key) + return true + }) return nil } @@ -137,7 +137,7 @@ func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance checkID := buildID(instance) stopChan := make(chan struct{}) - csd.ttl[buildID(instance)] = stopChan + csd.ttl.LoadOrStore(buildID(instance), stopChan) period := time.Duration(csd.checkPassInterval/8) * time.Millisecond timer := time.NewTimer(period) @@ -177,13 +177,14 @@ func (csd *consulServiceDiscovery) Unregister(instance registry.ServiceInstance) logger.Errorf("unregister service instance %s,error: %v", instance.GetId(), err) return err } - stopChanel, ok := csd.ttl[buildID(instance)] + + stopChanel, ok := csd.ttl.Load(buildID(instance)) if !ok { logger.Warnf("ttl for service instance %s didn't exist", instance.GetId()) return nil } - close(stopChanel) - delete(csd.ttl, buildID(instance)) + close(stopChanel.(chan struct{})) + csd.ttl.Delete(buildID(instance)) return nil } @@ -404,7 +405,7 @@ func (csd *consulServiceDiscovery) buildRegisterInstance(instance registry.Servi func (csd *consulServiceDiscovery) buildCheck(instance registry.ServiceInstance) consul.AgentServiceCheck { deregister, ok := instance.GetMetadata()[constant.DEREGISTER_AFTER] - if !ok || deregister == "" { + if !ok || len(deregister) == 0 { deregister = constant.DEFAULT_DEREGISTER_TIME } return consul.AgentServiceCheck{ From d210989d344cb385824ee11c247ce74616729388 Mon Sep 17 00:00:00 2001 From: shen Date: Tue, 8 Sep 2020 09:18:58 +0800 Subject: [PATCH 23/33] revert modifications on this pr,thus submitted this modification on another pr --- registry/etcdv3/registry.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go index 4ae04e6dd6..2fec8eaad2 100644 --- a/registry/etcdv3/registry.go +++ b/registry/etcdv3/registry.go @@ -113,7 +113,7 @@ func (r *etcdV3Registry) InitListeners() { // DoRegister actually do the register job in the registry center of etcd func (r *etcdV3Registry) DoRegister(root string, node string) error { - return r.client.RegisterTemp(path.Join(root, node), "") + return r.client.Create(path.Join(root, node), "") } // nolint From a8782ae31d5e2ed83bb750d50d4b9ccbd47d545f Mon Sep 17 00:00:00 2001 From: zhangshen023 <1292369127@qq.com> Date: Wed, 9 Sep 2020 00:24:36 +0800 Subject: [PATCH 24/33] delete the method 'Init' of interface ServiceDiscovery --- common/constant/key.go | 2 - config/config_loader_test.go | 4 - registry/consul/service_discovery.go | 73 +++++++++++-------- registry/consul/service_discovery_test.go | 8 +- registry/etcdv3/service_discovery.go | 5 -- ...vent_publishing_service_deiscovery_test.go | 5 -- .../event_publishing_service_discovery.go | 16 +--- registry/nacos/service_discovery.go | 5 -- registry/service_discovery.go | 10 --- .../service_discovery_registry.go | 1 - 10 files changed, 46 insertions(+), 83 deletions(-) diff --git a/common/constant/key.go b/common/constant/key.go index a05ebbdbf0..02db030a0a 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -186,8 +186,6 @@ const ( ACL_TOKEN = "acl-token" // default deregister critical server after DEFAULT_DEREGISTER_TIME = "20s" - DEFAULT_WATCH_TIMEOUT = 60 * 1000 - WATCH_TIMEOUT = "consul-watch-timeout" DEREGISTER_AFTER = "consul-deregister-critical-service-after" ) diff --git a/config/config_loader_test.go b/config/config_loader_test.go index baf0c8fba3..461e607c1e 100644 --- a/config/config_loader_test.go +++ b/config/config_loader_test.go @@ -503,10 +503,6 @@ func (m *mockServiceDiscovery) String() string { panic("implement me") } -func (m *mockServiceDiscovery) Init(registryURL common.URL) error { - panic("implement me") -} - func (m *mockServiceDiscovery) Destroy() error { panic("implement me") } diff --git a/registry/consul/service_discovery.go b/registry/consul/service_discovery.go index ae2233d590..7dd0ade65a 100644 --- a/registry/consul/service_discovery.go +++ b/registry/consul/service_discovery.go @@ -21,7 +21,6 @@ import ( "encoding/base64" "fmt" "strconv" - "strings" "sync" "time" ) @@ -35,7 +34,6 @@ import ( ) import ( - "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" @@ -71,43 +69,37 @@ func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) { descriptor := fmt.Sprintf("consul-service-discovery[%s]", remoteConfig.Address) + config := &consul.Config{Address: remoteConfig.Address, Token: remoteConfig.Params[constant.ACL_TOKEN]} + client, err := consul.NewClient(config) + if err != nil { + return nil, perrors.WithMessage(err, "create consul client failed.") + } + return &consulServiceDiscovery{ - address: remoteConfig.Address, - descriptor: descriptor, + address: remoteConfig.Address, + descriptor: descriptor, + checkPassInterval: getCheckPassInterval(remoteConfig.Params), + Config: config, + tag: remoteConfig.Params[constant.QUERY_TAG], + consulClient: client, + deregisterCriticalServiceAfter: getDeregisterAfter(remoteConfig.Params), }, nil } // consulServiceDiscovery is the implementation of service discovery based on consul. type consulServiceDiscovery struct { - group string // descriptor is a short string about the basic information of this instance descriptor string // Consul client. - consulClient *consul.Client - serviceUrl common.URL - checkPassInterval int64 - tag string - tags []string - address string - ttl sync.Map + consulClient *consul.Client + checkPassInterval int64 + tag string + address string + deregisterCriticalServiceAfter string + ttl sync.Map *consul.Config } -func (csd *consulServiceDiscovery) Init(registryURL common.URL) error { - csd.serviceUrl = registryURL - csd.checkPassInterval = registryURL.GetParamInt(constant.CHECK_PASS_INTERVAL, constant.DEFAULT_CHECK_PASS_INTERVAL) - csd.tag = registryURL.GetParam(constant.QUERY_TAG, "") - csd.tags = strings.Split(registryURL.GetParam("tags", ""), ",") - aclToken := registryURL.GetParam(constant.ACL_TOKEN, "") - csd.Config = &consul.Config{Address: csd.address, Token: aclToken} - client, err := consul.NewClient(csd.Config) - if err != nil { - return perrors.WithMessage(err, "create consul client failed.") - } - csd.consulClient = client - return nil -} - func (csd *consulServiceDiscovery) String() string { return csd.descriptor } @@ -240,9 +232,8 @@ func decodeConsulMetadata(metadata map[string]string) map[string]string { } func (csd *consulServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { - waitTime := csd.serviceUrl.GetParamInt(constant.WATCH_TIMEOUT, constant.DEFAULT_WATCH_TIMEOUT) / 1000 instances, _, err := csd.consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{ - WaitTime: time.Duration(waitTime), + WaitTime: time.Duration(csd.checkPassInterval), }) if err != nil { logger.Errorf("get instances for service %s,error: %v", serviceName, err) @@ -411,10 +402,32 @@ func (csd *consulServiceDiscovery) buildCheck(instance registry.ServiceInstance) return consul.AgentServiceCheck{ //CheckID: buildID(instance), TTL: strconv.FormatInt(csd.checkPassInterval/1000, 10) + "s", - DeregisterCriticalServiceAfter: deregister, + DeregisterCriticalServiceAfter: csd.deregisterCriticalServiceAfter, + } +} + +// nolint +func getCheckPassInterval(params map[string]string) int64 { + checkPassIntervalStr, ok := params[constant.CHECK_PASS_INTERVAL] + if !ok { + return constant.DEFAULT_CHECK_PASS_INTERVAL + } + checkPassInterval, err := strconv.ParseInt(checkPassIntervalStr, 10, 64) + if err != nil { + logger.Warnf("consul service discovery remote config error:%s", checkPassIntervalStr) + return constant.DEFAULT_CHECK_PASS_INTERVAL } + return checkPassInterval } +// nolint +func getDeregisterAfter(metadata map[string]string) string { + deregister, ok := metadata[constant.DEREGISTER_AFTER] + if !ok || len(deregister) == 0 { + deregister = constant.DEFAULT_DEREGISTER_TIME + } + return deregister +} func buildID(instance registry.ServiceInstance) string { id := fmt.Sprintf("id:%s,serviceName:%s,host:%s,port:%d", instance.GetId(), instance.GetServiceName(), instance.GetHost(), instance.GetPort()) diff --git a/registry/consul/service_discovery_test.go b/registry/consul/service_discovery_test.go index f743869b90..43d1cd1738 100644 --- a/registry/consul/service_discovery_test.go +++ b/registry/consul/service_discovery_test.go @@ -80,8 +80,7 @@ func TestConsulServiceDiscovery_newConsulServiceDiscovery(t *testing.T) { func TestConsulServiceDiscovery_Destroy(t *testing.T) { prepareData() serviceDiscovery, err := extension.GetServiceDiscovery(constant.CONSUL_KEY, testName) - _, registryUrl := prepareService() - serviceDiscovery.Init(registryUrl) + prepareService() assert.Nil(t, err) assert.NotNil(t, serviceDiscovery) err = serviceDiscovery.Destroy() @@ -103,15 +102,12 @@ func TestConsulServiceDiscovery_CRUD(t *testing.T) { extension.SetAndInitGlobalDispatcher("mock") rand.Seed(time.Now().Unix()) - instance, registryUrl := prepareService() + instance, _ := prepareService() // clean data serviceDiscovery, err := extension.GetServiceDiscovery(constant.CONSUL_KEY, testName) assert.Nil(t, err) - err = serviceDiscovery.Init(registryUrl) - assert.Nil(t, err) - err = serviceDiscovery.Unregister(instance) assert.Nil(t, err) diff --git a/registry/etcdv3/service_discovery.go b/registry/etcdv3/service_discovery.go index 080a699f9b..dceaa99df8 100644 --- a/registry/etcdv3/service_discovery.go +++ b/registry/etcdv3/service_discovery.go @@ -31,7 +31,6 @@ import ( ) import ( - "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" @@ -67,10 +66,6 @@ type etcdV3ServiceDiscovery struct { childListenerMap map[string]*etcdv3.EventListener } -func (e *etcdV3ServiceDiscovery) Init(registryURL common.URL) error { - return nil -} - // basic information of this instance func (e *etcdV3ServiceDiscovery) String() string { return e.descriptor diff --git a/registry/event/event_publishing_service_deiscovery_test.go b/registry/event/event_publishing_service_deiscovery_test.go index 2b4bf05f1d..54752c03c0 100644 --- a/registry/event/event_publishing_service_deiscovery_test.go +++ b/registry/event/event_publishing_service_deiscovery_test.go @@ -30,7 +30,6 @@ import ( ) import ( - "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/observer" dispatcher2 "github.com/apache/dubbo-go/common/observer/dispatcher" @@ -117,10 +116,6 @@ func (tel *TestServiceInstancePreRegisteredEventListener) GetEventType() reflect type ServiceDiscoveryA struct { } -func (msd *ServiceDiscoveryA) Init(registryURL common.URL) error { - return nil -} - // String return mockServiceDiscovery func (msd *ServiceDiscoveryA) String() string { return "testServiceDiscovery" diff --git a/registry/event/event_publishing_service_discovery.go b/registry/event/event_publishing_service_discovery.go index 14ee18997f..3ee2f4a449 100644 --- a/registry/event/event_publishing_service_discovery.go +++ b/registry/event/event_publishing_service_discovery.go @@ -17,17 +17,12 @@ package event -import ( - "sync" -) - import ( gxset "github.com/dubbogo/gost/container/set" gxpage "github.com/dubbogo/gost/page" ) import ( - "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/observer" "github.com/apache/dubbo-go/config" @@ -38,8 +33,7 @@ import ( // EventPublishingServiceDiscovery will enhance Service Discovery // Publish some event about service discovery type EventPublishingServiceDiscovery struct { - serviceDiscovery registry.ServiceDiscovery - serviceDiscoveryInitOnce sync.Once + serviceDiscovery registry.ServiceDiscovery } // NewEventPublishingServiceDiscovery is a constructor @@ -54,14 +48,6 @@ func (epsd *EventPublishingServiceDiscovery) String() string { return epsd.serviceDiscovery.String() } -func (epsd *EventPublishingServiceDiscovery) Init(registryURL common.URL) error { - var err error - epsd.serviceDiscoveryInitOnce.Do(func() { - err = epsd.serviceDiscovery.Init(registryURL) - }) - return err -} - // Destroy delegate function func (epsd *EventPublishingServiceDiscovery) Destroy() error { f := func() error { diff --git a/registry/nacos/service_discovery.go b/registry/nacos/service_discovery.go index a112ebaad9..0e5ad8e699 100644 --- a/registry/nacos/service_discovery.go +++ b/registry/nacos/service_discovery.go @@ -32,7 +32,6 @@ import ( ) import ( - "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" @@ -65,10 +64,6 @@ type nacosServiceDiscovery struct { registryInstances []registry.ServiceInstance } -func (n *nacosServiceDiscovery) Init(registryURL common.URL) error { - return nil -} - // Destroy will close the service discovery. // Actually, it only marks the naming client as null and then return func (n *nacosServiceDiscovery) Destroy() error { diff --git a/registry/service_discovery.go b/registry/service_discovery.go index a4e9e24214..cb7a3c0182 100644 --- a/registry/service_discovery.go +++ b/registry/service_discovery.go @@ -26,10 +26,6 @@ import ( gxpage "github.com/dubbogo/gost/page" ) -import ( - "github.com/apache/dubbo-go/common" -) - const DefaultPageSize = 100 // ServiceDiscovery is the common operations of Service Discovery @@ -38,12 +34,6 @@ type ServiceDiscovery interface { // ----------------- lifecycle ------------------- - /** - * Initializes the ServiceDiscovery - * - */ - Init(registryURL common.URL) error - // Destroy will destroy the service discovery. // If the discovery cannot be destroy, it will return an error. Destroy() error diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go index 9722fc8eca..52006b2bb8 100644 --- a/registry/servicediscovery/service_discovery_registry.go +++ b/registry/servicediscovery/service_discovery_registry.go @@ -125,7 +125,6 @@ func creatServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) { return nil, perrors.WithMessage(err, "Create service discovery fialed") } serviceDiscovery := event.NewEventPublishingServiceDiscovery(originServiceDiscovery) - serviceDiscovery.Init(*url) return serviceDiscovery, nil } From b8a20bd6ff8cd5a4f20d134459310338cc7b19db Mon Sep 17 00:00:00 2001 From: shen Date: Wed, 9 Sep 2020 08:59:41 +0800 Subject: [PATCH 25/33] delete unused code --- registry/consul/service_discovery.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/registry/consul/service_discovery.go b/registry/consul/service_discovery.go index 7dd0ade65a..0d5e43c5fa 100644 --- a/registry/consul/service_discovery.go +++ b/registry/consul/service_discovery.go @@ -45,10 +45,6 @@ const ( enable = "enable" ) -var ( - initLock sync.Mutex -) - // init will put the service discovery into extension func init() { extension.SetServiceDiscovery(constant.CONSUL_KEY, newConsulServiceDiscovery) @@ -400,7 +396,6 @@ func (csd *consulServiceDiscovery) buildCheck(instance registry.ServiceInstance) deregister = constant.DEFAULT_DEREGISTER_TIME } return consul.AgentServiceCheck{ - //CheckID: buildID(instance), TTL: strconv.FormatInt(csd.checkPassInterval/1000, 10) + "s", DeregisterCriticalServiceAfter: csd.deregisterCriticalServiceAfter, } @@ -428,8 +423,9 @@ func getDeregisterAfter(metadata map[string]string) string { } return deregister } -func buildID(instance registry.ServiceInstance) string { +// nolint +func buildID(instance registry.ServiceInstance) string { id := fmt.Sprintf("id:%s,serviceName:%s,host:%s,port:%d", instance.GetId(), instance.GetServiceName(), instance.GetHost(), instance.GetPort()) return id } From b3aab08638b2985665ef722f2904d001f2ca7d59 Mon Sep 17 00:00:00 2001 From: shen Date: Wed, 9 Sep 2020 09:46:32 +0800 Subject: [PATCH 26/33] delete unused code --- registry/servicediscovery/service_discovery_registry_test.go | 4 ---- registry/zookeeper/service_discovery.go | 4 ---- 2 files changed, 8 deletions(-) diff --git a/registry/servicediscovery/service_discovery_registry_test.go b/registry/servicediscovery/service_discovery_registry_test.go index b42158c1f3..53eb86507e 100644 --- a/registry/servicediscovery/service_discovery_registry_test.go +++ b/registry/servicediscovery/service_discovery_registry_test.go @@ -126,10 +126,6 @@ func (m *mockServiceNameMapping) Get(serviceInterface string, group string, vers type mockServiceDiscovery struct { } -func (m *mockServiceDiscovery) Init(registryURL common.URL) error { - return nil -} - func (m *mockServiceDiscovery) String() string { panic("implement me") } diff --git a/registry/zookeeper/service_discovery.go b/registry/zookeeper/service_discovery.go index bf73691c25..5ad83ef909 100644 --- a/registry/zookeeper/service_discovery.go +++ b/registry/zookeeper/service_discovery.go @@ -163,10 +163,6 @@ func (zksd *zookeeperServiceDiscovery) String() string { return fmt.Sprintf("zookeeper-service-discovery[%s]", zksd.url) } -func (zksd *zookeeperServiceDiscovery) Init(registryURL common.URL) error { - return nil -} - // Close client be closed func (zksd *zookeeperServiceDiscovery) Destroy() error { zksd.client.Close() From 2fa6522c02b9a66039169f21695d261d07233eed Mon Sep 17 00:00:00 2001 From: zhangshen023 <1292369127@qq.com> Date: Wed, 9 Sep 2020 21:03:17 +0800 Subject: [PATCH 27/33] optimized some code --- registry/consul/service_discovery.go | 23 +++++++++---------- .../service_discovery_registry.go | 3 +-- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/registry/consul/service_discovery.go b/registry/consul/service_discovery.go index 0d5e43c5fa..ea4ea99ed3 100644 --- a/registry/consul/service_discovery.go +++ b/registry/consul/service_discovery.go @@ -42,7 +42,12 @@ import ( ) const ( - enable = "enable" + enable = "enable" + watch_type = "type" + watch_type_service = "service" + watch_service = "service" + watch_passingonly = "passingonly" + watch_passingonly_true = true ) // init will put the service discovery into extension @@ -198,10 +203,7 @@ func (csd *consulServiceDiscovery) GetServices() *gxset.HashSet { // encodeConsulMetadata because consul validate key strictly. func encodeConsulMetadata(metadata map[string]string) map[string]string { - consulMetadata := make(map[string]string, 8) - if metadata == nil { - return consulMetadata - } + consulMetadata := make(map[string]string, len(metadata)) encoder := base64.RawStdEncoding for k, v := range metadata { consulMetadata[encoder.EncodeToString([]byte(k))] = v @@ -211,10 +213,7 @@ func encodeConsulMetadata(metadata map[string]string) map[string]string { // nolint func decodeConsulMetadata(metadata map[string]string) map[string]string { - dubboMetadata := make(map[string]string, 8) - if metadata == nil { - return dubboMetadata - } + dubboMetadata := make(map[string]string, len(metadata)) encoder := base64.RawStdEncoding for k, v := range metadata { kBytes, err := encoder.DecodeString(k) @@ -305,9 +304,9 @@ func (csd *consulServiceDiscovery) GetRequestInstances(serviceNames []string, of func (csd *consulServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error { params := make(map[string]interface{}, 8) - params["type"] = "service" - params["service"] = listener.ServiceName - params["passingonly"] = true + params[watch_type] = watch_type_service + params[watch_service] = listener.ServiceName + params[watch_passingonly] = watch_passingonly_true plan, err := watch.Parse(params) if err != nil { logger.Errorf("add listener for service %s,error:%v", listener.ServiceName, err) diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go index 52006b2bb8..7576804eb5 100644 --- a/registry/servicediscovery/service_discovery_registry.go +++ b/registry/servicediscovery/service_discovery_registry.go @@ -124,8 +124,7 @@ func creatServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) { if err != nil { return nil, perrors.WithMessage(err, "Create service discovery fialed") } - serviceDiscovery := event.NewEventPublishingServiceDiscovery(originServiceDiscovery) - return serviceDiscovery, nil + return event.NewEventPublishingServiceDiscovery(originServiceDiscovery), nil } func parseServices(literalServices string) *gxset.HashSet { From 77e59d105a7a2f23549ad9682889b00ff3f5dd57 Mon Sep 17 00:00:00 2001 From: zhangshen023 <1292369127@qq.com> Date: Sat, 12 Sep 2020 15:27:59 +0800 Subject: [PATCH 28/33] fix concurrency problems --- registry/consul/service_discovery.go | 85 ++++++++++++++++++++++++---- 1 file changed, 73 insertions(+), 12 deletions(-) diff --git a/registry/consul/service_discovery.go b/registry/consul/service_discovery.go index ea4ea99ed3..698758735a 100644 --- a/registry/consul/service_discovery.go +++ b/registry/consul/service_discovery.go @@ -84,6 +84,7 @@ func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) { tag: remoteConfig.Params[constant.QUERY_TAG], consulClient: client, deregisterCriticalServiceAfter: getDeregisterAfter(remoteConfig.Params), + clientLock: sync.Mutex{}, }, nil } @@ -98,6 +99,7 @@ type consulServiceDiscovery struct { address string deregisterCriticalServiceAfter string ttl sync.Map + clientLock sync.Mutex *consul.Config } @@ -105,8 +107,17 @@ func (csd *consulServiceDiscovery) String() string { return csd.descriptor } +// nolint +func (csd *consulServiceDiscovery) getConsulClient() (*consul.Client, error) { + if csd.consulClient == nil { + return nil, perrors.New("consul client is destroyed or not ready!") + } + return csd.consulClient, nil +} func (csd *consulServiceDiscovery) Destroy() error { + csd.clientLock.Lock() csd.consulClient = nil + csd.clientLock.Unlock() csd.ttl.Range(func(key, t interface{}) bool { close(t.(chan struct{})) csd.ttl.Delete(key) @@ -116,8 +127,16 @@ func (csd *consulServiceDiscovery) Destroy() error { } func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) error { + var ( + err error + consulClient *consul.Client + ) ins, _ := csd.buildRegisterInstance(instance) - err := csd.consulClient.Agent().ServiceRegister(ins) + csd.clientLock.Lock() + if consulClient, err = csd.getConsulClient(); err == nil { + err = consulClient.Agent().ServiceRegister(ins) + } + csd.clientLock.Unlock() if err != nil { logger.Errorf("consul register the instance %s fail:%v", instance.GetServiceName(), err) return perrors.WithMessage(err, "consul could not register the instance. "+instance.GetServiceName()) @@ -127,19 +146,28 @@ func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) e } func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance) error { + var ( + err error + consulClient *consul.Client + ) + checkID := buildID(instance) stopChan := make(chan struct{}) csd.ttl.LoadOrStore(buildID(instance), stopChan) period := time.Duration(csd.checkPassInterval/8) * time.Millisecond - timer := time.NewTimer(period) + timer := time.NewTicker(period) go func() { + defer timer.Stop() for { select { case <-timer.C: - timer.Reset(period) - err := csd.consulClient.Agent().PassTTL(fmt.Sprintf("service:%s", checkID), "") + csd.clientLock.Lock() + if consulClient, err = csd.getConsulClient(); err == nil { + err = consulClient.Agent().PassTTL(fmt.Sprintf("service:%s", checkID), "") + } + csd.clientLock.Unlock() if err != nil { logger.Warnf("pass ttl heartbeat fail:%v", err) break @@ -156,8 +184,16 @@ func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance } func (csd *consulServiceDiscovery) Update(instance registry.ServiceInstance) error { + var ( + err error + consulClient *consul.Client + ) ins, _ := csd.buildRegisterInstance(instance) - err := csd.consulClient.Agent().ServiceDeregister(buildID(instance)) + csd.clientLock.Lock() + defer csd.clientLock.Unlock() + if consulClient, err = csd.getConsulClient(); err == nil { + err = consulClient.Agent().ServiceDeregister(buildID(instance)) + } if err != nil { logger.Warnf("unregister instance %s fail:%v", instance.GetServiceName(), err) } @@ -165,12 +201,19 @@ func (csd *consulServiceDiscovery) Update(instance registry.ServiceInstance) err } func (csd *consulServiceDiscovery) Unregister(instance registry.ServiceInstance) error { - err := csd.consulClient.Agent().ServiceDeregister(buildID(instance)) + var ( + err error + consulClient *consul.Client + ) + csd.clientLock.Lock() + if consulClient, err = csd.getConsulClient(); err == nil { + err = consulClient.Agent().ServiceDeregister(buildID(instance)) + } + csd.clientLock.Unlock() if err != nil { logger.Errorf("unregister service instance %s,error: %v", instance.GetId(), err) return err } - stopChanel, ok := csd.ttl.Load(buildID(instance)) if !ok { logger.Warnf("ttl for service instance %s didn't exist", instance.GetId()) @@ -186,9 +229,17 @@ func (csd *consulServiceDiscovery) GetDefaultPageSize() int { } func (csd *consulServiceDiscovery) GetServices() *gxset.HashSet { - + var ( + err error + consulClient *consul.Client + services map[string][]string + ) var res = gxset.NewSet() - services, _, err := csd.consulClient.Catalog().Services(nil) + csd.clientLock.Lock() + if consulClient, err = csd.getConsulClient(); err == nil { + services, _, err = consulClient.Catalog().Services(nil) + } + csd.clientLock.Unlock() if err != nil { logger.Errorf("get services,error: %v", err) return res @@ -227,9 +278,19 @@ func decodeConsulMetadata(metadata map[string]string) map[string]string { } func (csd *consulServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { - instances, _, err := csd.consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{ - WaitTime: time.Duration(csd.checkPassInterval), - }) + var ( + err error + consulClient *consul.Client + instances []*consul.ServiceEntry + ) + csd.clientLock.Lock() + if consulClient, err = csd.getConsulClient(); err == nil { + instances, _, err = consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{ + WaitTime: time.Duration(csd.checkPassInterval), + }) + } + csd.clientLock.Unlock() + if err != nil { logger.Errorf("get instances for service %s,error: %v", serviceName, err) return nil From 24e8293b7f6ed97f252fe3316c93e41344d407bb Mon Sep 17 00:00:00 2001 From: zhangshen023 <1292369127@qq.com> Date: Sat, 12 Sep 2020 15:38:32 +0800 Subject: [PATCH 29/33] use rwlock --- registry/consul/service_discovery.go | 28 +++++++++++------------ registry/consul/service_discovery_test.go | 8 +------ 2 files changed, 15 insertions(+), 21 deletions(-) diff --git a/registry/consul/service_discovery.go b/registry/consul/service_discovery.go index 698758735a..db039508e7 100644 --- a/registry/consul/service_discovery.go +++ b/registry/consul/service_discovery.go @@ -84,7 +84,7 @@ func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) { tag: remoteConfig.Params[constant.QUERY_TAG], consulClient: client, deregisterCriticalServiceAfter: getDeregisterAfter(remoteConfig.Params), - clientLock: sync.Mutex{}, + clientLock: sync.RWMutex{}, }, nil } @@ -99,7 +99,7 @@ type consulServiceDiscovery struct { address string deregisterCriticalServiceAfter string ttl sync.Map - clientLock sync.Mutex + clientLock sync.RWMutex *consul.Config } @@ -132,11 +132,11 @@ func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) e consulClient *consul.Client ) ins, _ := csd.buildRegisterInstance(instance) - csd.clientLock.Lock() + csd.clientLock.RLock() if consulClient, err = csd.getConsulClient(); err == nil { err = consulClient.Agent().ServiceRegister(ins) } - csd.clientLock.Unlock() + csd.clientLock.RUnlock() if err != nil { logger.Errorf("consul register the instance %s fail:%v", instance.GetServiceName(), err) return perrors.WithMessage(err, "consul could not register the instance. "+instance.GetServiceName()) @@ -163,11 +163,11 @@ func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance for { select { case <-timer.C: - csd.clientLock.Lock() + csd.clientLock.RLock() if consulClient, err = csd.getConsulClient(); err == nil { err = consulClient.Agent().PassTTL(fmt.Sprintf("service:%s", checkID), "") } - csd.clientLock.Unlock() + csd.clientLock.RUnlock() if err != nil { logger.Warnf("pass ttl heartbeat fail:%v", err) break @@ -189,8 +189,8 @@ func (csd *consulServiceDiscovery) Update(instance registry.ServiceInstance) err consulClient *consul.Client ) ins, _ := csd.buildRegisterInstance(instance) - csd.clientLock.Lock() - defer csd.clientLock.Unlock() + csd.clientLock.RLock() + defer csd.clientLock.RUnlock() if consulClient, err = csd.getConsulClient(); err == nil { err = consulClient.Agent().ServiceDeregister(buildID(instance)) } @@ -205,11 +205,11 @@ func (csd *consulServiceDiscovery) Unregister(instance registry.ServiceInstance) err error consulClient *consul.Client ) - csd.clientLock.Lock() + csd.clientLock.RLock() if consulClient, err = csd.getConsulClient(); err == nil { err = consulClient.Agent().ServiceDeregister(buildID(instance)) } - csd.clientLock.Unlock() + csd.clientLock.RUnlock() if err != nil { logger.Errorf("unregister service instance %s,error: %v", instance.GetId(), err) return err @@ -235,11 +235,11 @@ func (csd *consulServiceDiscovery) GetServices() *gxset.HashSet { services map[string][]string ) var res = gxset.NewSet() - csd.clientLock.Lock() + csd.clientLock.RLock() if consulClient, err = csd.getConsulClient(); err == nil { services, _, err = consulClient.Catalog().Services(nil) } - csd.clientLock.Unlock() + csd.clientLock.RUnlock() if err != nil { logger.Errorf("get services,error: %v", err) return res @@ -283,13 +283,13 @@ func (csd *consulServiceDiscovery) GetInstances(serviceName string) []registry.S consulClient *consul.Client instances []*consul.ServiceEntry ) - csd.clientLock.Lock() + csd.clientLock.RLock() if consulClient, err = csd.getConsulClient(); err == nil { instances, _, err = consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{ WaitTime: time.Duration(csd.checkPassInterval), }) } - csd.clientLock.Unlock() + csd.clientLock.RUnlock() if err != nil { logger.Errorf("get instances for service %s,error: %v", serviceName, err) diff --git a/registry/consul/service_discovery_test.go b/registry/consul/service_discovery_test.go index 43d1cd1738..ed7220f2de 100644 --- a/registry/consul/service_discovery_test.go +++ b/registry/consul/service_discovery_test.go @@ -44,13 +44,7 @@ var ( consulCheckPassInterval = 17000 consulDeregisterCriticalServiceAfter = "20s" consulWatchTimeout = 60000 - registryURL = common.URL{ - Path: "", - Username: "", - Password: "", - Methods: nil, - SubURL: nil, - } + registryURL = common.URL{} ) func TestConsulServiceDiscovery_newConsulServiceDiscovery(t *testing.T) { From 7662982e4d8bec15e43153fe8bfd6246a94fc2ce Mon Sep 17 00:00:00 2001 From: zhangshen023 <1292369127@qq.com> Date: Sun, 13 Sep 2020 15:22:58 +0800 Subject: [PATCH 30/33] refactor repeated code --- registry/consul/service_discovery.go | 48 +++++++++++++++------------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/registry/consul/service_discovery.go b/registry/consul/service_discovery.go index db039508e7..23743ee68f 100644 --- a/registry/consul/service_discovery.go +++ b/registry/consul/service_discovery.go @@ -92,6 +92,7 @@ func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) { type consulServiceDiscovery struct { // descriptor is a short string about the basic information of this instance descriptor string + clientLock sync.RWMutex // Consul client. consulClient *consul.Client checkPassInterval int64 @@ -99,7 +100,6 @@ type consulServiceDiscovery struct { address string deregisterCriticalServiceAfter string ttl sync.Map - clientLock sync.RWMutex *consul.Config } @@ -108,16 +108,25 @@ func (csd *consulServiceDiscovery) String() string { } // nolint -func (csd *consulServiceDiscovery) getConsulClient() (*consul.Client, error) { +func (csd *consulServiceDiscovery) getConsulClient() (consulClient consul.Client, err error) { + csd.clientLock.RLock() + defer csd.clientLock.RUnlock() if csd.consulClient == nil { - return nil, perrors.New("consul client is destroyed or not ready!") + err = perrors.New("consul client is destroyed or not ready!") + return } - return csd.consulClient, nil + return *csd.consulClient, nil } -func (csd *consulServiceDiscovery) Destroy() error { + +// nolint +func (csd *consulServiceDiscovery) setConsulClient(consulClient *consul.Client) { csd.clientLock.Lock() - csd.consulClient = nil - csd.clientLock.Unlock() + defer csd.clientLock.Unlock() + csd.consulClient = consulClient +} + +func (csd *consulServiceDiscovery) Destroy() error { + csd.setConsulClient(nil) csd.ttl.Range(func(key, t interface{}) bool { close(t.(chan struct{})) csd.ttl.Delete(key) @@ -129,14 +138,12 @@ func (csd *consulServiceDiscovery) Destroy() error { func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) error { var ( err error - consulClient *consul.Client + consulClient consul.Client ) ins, _ := csd.buildRegisterInstance(instance) - csd.clientLock.RLock() if consulClient, err = csd.getConsulClient(); err == nil { err = consulClient.Agent().ServiceRegister(ins) } - csd.clientLock.RUnlock() if err != nil { logger.Errorf("consul register the instance %s fail:%v", instance.GetServiceName(), err) return perrors.WithMessage(err, "consul could not register the instance. "+instance.GetServiceName()) @@ -148,7 +155,7 @@ func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) e func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance) error { var ( err error - consulClient *consul.Client + consulClient consul.Client ) checkID := buildID(instance) @@ -163,11 +170,9 @@ func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance for { select { case <-timer.C: - csd.clientLock.RLock() if consulClient, err = csd.getConsulClient(); err == nil { err = consulClient.Agent().PassTTL(fmt.Sprintf("service:%s", checkID), "") } - csd.clientLock.RUnlock() if err != nil { logger.Warnf("pass ttl heartbeat fail:%v", err) break @@ -186,30 +191,27 @@ func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance func (csd *consulServiceDiscovery) Update(instance registry.ServiceInstance) error { var ( err error - consulClient *consul.Client + consulClient consul.Client ) ins, _ := csd.buildRegisterInstance(instance) - csd.clientLock.RLock() - defer csd.clientLock.RUnlock() - if consulClient, err = csd.getConsulClient(); err == nil { + consulClient, err = csd.getConsulClient() + if err == nil { err = consulClient.Agent().ServiceDeregister(buildID(instance)) } if err != nil { logger.Warnf("unregister instance %s fail:%v", instance.GetServiceName(), err) } - return csd.consulClient.Agent().ServiceRegister(ins) + return consulClient.Agent().ServiceRegister(ins) } func (csd *consulServiceDiscovery) Unregister(instance registry.ServiceInstance) error { var ( err error - consulClient *consul.Client + consulClient consul.Client ) - csd.clientLock.RLock() if consulClient, err = csd.getConsulClient(); err == nil { err = consulClient.Agent().ServiceDeregister(buildID(instance)) } - csd.clientLock.RUnlock() if err != nil { logger.Errorf("unregister service instance %s,error: %v", instance.GetId(), err) return err @@ -231,7 +233,7 @@ func (csd *consulServiceDiscovery) GetDefaultPageSize() int { func (csd *consulServiceDiscovery) GetServices() *gxset.HashSet { var ( err error - consulClient *consul.Client + consulClient consul.Client services map[string][]string ) var res = gxset.NewSet() @@ -280,7 +282,7 @@ func decodeConsulMetadata(metadata map[string]string) map[string]string { func (csd *consulServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { var ( err error - consulClient *consul.Client + consulClient consul.Client instances []*consul.ServiceEntry ) csd.clientLock.RLock() From 02b0a82a810ac3603bbf03e6cbe4fe9a3649e2bf Mon Sep 17 00:00:00 2001 From: zhangshen023 <1292369127@qq.com> Date: Sun, 13 Sep 2020 16:35:08 +0800 Subject: [PATCH 31/33] refactor code --- registry/consul/service_discovery.go | 46 ++++++++++++++-------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/registry/consul/service_discovery.go b/registry/consul/service_discovery.go index 23743ee68f..e2d73a3887 100644 --- a/registry/consul/service_discovery.go +++ b/registry/consul/service_discovery.go @@ -55,6 +55,21 @@ func init() { extension.SetServiceDiscovery(constant.CONSUL_KEY, newConsulServiceDiscovery) } +// consulServiceDiscovery is the implementation of service discovery based on consul. +type consulServiceDiscovery struct { + // descriptor is a short string about the basic information of this instance + descriptor string + clientLock sync.RWMutex + // Consul client. + consulClient *consul.Client + checkPassInterval int64 + tag string + address string + deregisterCriticalServiceAfter string + ttl sync.Map + *consul.Config +} + // newConsulServiceDiscovery will create new service discovery instance // use double-check pattern to reduce race condition func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) { @@ -88,34 +103,19 @@ func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) { }, nil } -// consulServiceDiscovery is the implementation of service discovery based on consul. -type consulServiceDiscovery struct { - // descriptor is a short string about the basic information of this instance - descriptor string - clientLock sync.RWMutex - // Consul client. - consulClient *consul.Client - checkPassInterval int64 - tag string - address string - deregisterCriticalServiceAfter string - ttl sync.Map - *consul.Config -} - func (csd *consulServiceDiscovery) String() string { return csd.descriptor } // nolint -func (csd *consulServiceDiscovery) getConsulClient() (consulClient consul.Client, err error) { +func (csd *consulServiceDiscovery) getConsulClient() (consulClient *consul.Client, err error) { csd.clientLock.RLock() defer csd.clientLock.RUnlock() if csd.consulClient == nil { err = perrors.New("consul client is destroyed or not ready!") return } - return *csd.consulClient, nil + return csd.consulClient, nil } // nolint @@ -138,7 +138,7 @@ func (csd *consulServiceDiscovery) Destroy() error { func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) error { var ( err error - consulClient consul.Client + consulClient *consul.Client ) ins, _ := csd.buildRegisterInstance(instance) if consulClient, err = csd.getConsulClient(); err == nil { @@ -155,7 +155,7 @@ func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) e func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance) error { var ( err error - consulClient consul.Client + consulClient *consul.Client ) checkID := buildID(instance) @@ -191,7 +191,7 @@ func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance func (csd *consulServiceDiscovery) Update(instance registry.ServiceInstance) error { var ( err error - consulClient consul.Client + consulClient *consul.Client ) ins, _ := csd.buildRegisterInstance(instance) consulClient, err = csd.getConsulClient() @@ -207,7 +207,7 @@ func (csd *consulServiceDiscovery) Update(instance registry.ServiceInstance) err func (csd *consulServiceDiscovery) Unregister(instance registry.ServiceInstance) error { var ( err error - consulClient consul.Client + consulClient *consul.Client ) if consulClient, err = csd.getConsulClient(); err == nil { err = consulClient.Agent().ServiceDeregister(buildID(instance)) @@ -233,7 +233,7 @@ func (csd *consulServiceDiscovery) GetDefaultPageSize() int { func (csd *consulServiceDiscovery) GetServices() *gxset.HashSet { var ( err error - consulClient consul.Client + consulClient *consul.Client services map[string][]string ) var res = gxset.NewSet() @@ -282,7 +282,7 @@ func decodeConsulMetadata(metadata map[string]string) map[string]string { func (csd *consulServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { var ( err error - consulClient consul.Client + consulClient *consul.Client instances []*consul.ServiceEntry ) csd.clientLock.RLock() From bc43a00bd6ebe949f9c8e227f7722d796a8e282e Mon Sep 17 00:00:00 2001 From: zhangshen023 <1292369127@qq.com> Date: Sun, 13 Sep 2020 16:52:05 +0800 Subject: [PATCH 32/33] refactor code --- registry/consul/service_discovery.go | 51 ++++++++++++++-------------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/registry/consul/service_discovery.go b/registry/consul/service_discovery.go index e2d73a3887..4c7f6e915e 100644 --- a/registry/consul/service_discovery.go +++ b/registry/consul/service_discovery.go @@ -108,14 +108,10 @@ func (csd *consulServiceDiscovery) String() string { } // nolint -func (csd *consulServiceDiscovery) getConsulClient() (consulClient *consul.Client, err error) { +func (csd *consulServiceDiscovery) getConsulClient() *consul.Client { csd.clientLock.RLock() defer csd.clientLock.RUnlock() - if csd.consulClient == nil { - err = perrors.New("consul client is destroyed or not ready!") - return - } - return csd.consulClient, nil + return csd.consulClient } // nolint @@ -141,9 +137,10 @@ func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) e consulClient *consul.Client ) ins, _ := csd.buildRegisterInstance(instance) - if consulClient, err = csd.getConsulClient(); err == nil { - err = consulClient.Agent().ServiceRegister(ins) + if consulClient = csd.getConsulClient(); consulClient == nil { + return perrors.New("consul client is closed!") } + err = consulClient.Agent().ServiceRegister(ins) if err != nil { logger.Errorf("consul register the instance %s fail:%v", instance.GetServiceName(), err) return perrors.WithMessage(err, "consul could not register the instance. "+instance.GetServiceName()) @@ -170,9 +167,11 @@ func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance for { select { case <-timer.C: - if consulClient, err = csd.getConsulClient(); err == nil { - err = consulClient.Agent().PassTTL(fmt.Sprintf("service:%s", checkID), "") + if consulClient = csd.getConsulClient(); consulClient == nil { + logger.Debugf("consul client is closed!") + return } + err = consulClient.Agent().PassTTL(fmt.Sprintf("service:%s", checkID), "") if err != nil { logger.Warnf("pass ttl heartbeat fail:%v", err) break @@ -194,10 +193,11 @@ func (csd *consulServiceDiscovery) Update(instance registry.ServiceInstance) err consulClient *consul.Client ) ins, _ := csd.buildRegisterInstance(instance) - consulClient, err = csd.getConsulClient() - if err == nil { - err = consulClient.Agent().ServiceDeregister(buildID(instance)) + consulClient = csd.getConsulClient() + if consulClient == nil { + return perrors.New("consul client is closed!") } + err = consulClient.Agent().ServiceDeregister(buildID(instance)) if err != nil { logger.Warnf("unregister instance %s fail:%v", instance.GetServiceName(), err) } @@ -209,9 +209,10 @@ func (csd *consulServiceDiscovery) Unregister(instance registry.ServiceInstance) err error consulClient *consul.Client ) - if consulClient, err = csd.getConsulClient(); err == nil { - err = consulClient.Agent().ServiceDeregister(buildID(instance)) + if consulClient = csd.getConsulClient(); consulClient == nil { + return perrors.New("consul client is closed!") } + err = consulClient.Agent().ServiceDeregister(buildID(instance)) if err != nil { logger.Errorf("unregister service instance %s,error: %v", instance.GetId(), err) return err @@ -237,11 +238,11 @@ func (csd *consulServiceDiscovery) GetServices() *gxset.HashSet { services map[string][]string ) var res = gxset.NewSet() - csd.clientLock.RLock() - if consulClient, err = csd.getConsulClient(); err == nil { - services, _, err = consulClient.Catalog().Services(nil) + if consulClient = csd.getConsulClient(); consulClient == nil { + logger.Warnf("consul client is closed!") + return res } - csd.clientLock.RUnlock() + services, _, err = consulClient.Catalog().Services(nil) if err != nil { logger.Errorf("get services,error: %v", err) return res @@ -285,13 +286,13 @@ func (csd *consulServiceDiscovery) GetInstances(serviceName string) []registry.S consulClient *consul.Client instances []*consul.ServiceEntry ) - csd.clientLock.RLock() - if consulClient, err = csd.getConsulClient(); err == nil { - instances, _, err = consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{ - WaitTime: time.Duration(csd.checkPassInterval), - }) + if consulClient = csd.getConsulClient(); consulClient == nil { + logger.Warn("consul client is closed!") + return nil } - csd.clientLock.RUnlock() + instances, _, err = consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{ + WaitTime: time.Duration(csd.checkPassInterval), + }) if err != nil { logger.Errorf("get instances for service %s,error: %v", serviceName, err) From fc1561727a66f0a28ef42d8bb81618bbc4ef9363 Mon Sep 17 00:00:00 2001 From: zhangshen023 <1292369127@qq.com> Date: Sun, 13 Sep 2020 16:59:57 +0800 Subject: [PATCH 33/33] refactor code --- registry/consul/service_discovery.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/registry/consul/service_discovery.go b/registry/consul/service_discovery.go index 4c7f6e915e..d8ab93f31e 100644 --- a/registry/consul/service_discovery.go +++ b/registry/consul/service_discovery.go @@ -50,6 +50,10 @@ const ( watch_passingonly_true = true ) +var ( + errConsulClientClosed = perrors.New("consul client is closed") +) + // init will put the service discovery into extension func init() { extension.SetServiceDiscovery(constant.CONSUL_KEY, newConsulServiceDiscovery) @@ -138,7 +142,7 @@ func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) e ) ins, _ := csd.buildRegisterInstance(instance) if consulClient = csd.getConsulClient(); consulClient == nil { - return perrors.New("consul client is closed!") + return errConsulClientClosed } err = consulClient.Agent().ServiceRegister(ins) if err != nil { @@ -195,7 +199,7 @@ func (csd *consulServiceDiscovery) Update(instance registry.ServiceInstance) err ins, _ := csd.buildRegisterInstance(instance) consulClient = csd.getConsulClient() if consulClient == nil { - return perrors.New("consul client is closed!") + return errConsulClientClosed } err = consulClient.Agent().ServiceDeregister(buildID(instance)) if err != nil { @@ -210,7 +214,7 @@ func (csd *consulServiceDiscovery) Unregister(instance registry.ServiceInstance) consulClient *consul.Client ) if consulClient = csd.getConsulClient(); consulClient == nil { - return perrors.New("consul client is closed!") + return errConsulClientClosed } err = consulClient.Agent().ServiceDeregister(buildID(instance)) if err != nil {