diff --git a/common/extension/metadata_service.go b/common/extension/metadata_service.go index 1823273b8f..e35677d148 100644 --- a/common/extension/metadata_service.go +++ b/common/extension/metadata_service.go @@ -21,6 +21,10 @@ import ( "fmt" ) +import ( + perrors "github.com/pkg/errors" +) + import ( "github.com/apache/dubbo-go/metadata/service" ) @@ -36,12 +40,11 @@ func SetMetadataService(msType string, creator func() (service.MetadataService, } // GetMetadataService will create a MetadataService instance -// it will panic if msType not found func GetMetadataService(msType string) (service.MetadataService, error) { if creator, ok := metadataServiceInsMap[msType]; ok { return creator() } - panic(fmt.Sprintf("could not find the metadata service creator for metadataType: %s, please check whether you have imported relative packages, \n"+ + return nil, perrors.New(fmt.Sprintf("could not find the metadata service creator for metadataType: %s, please check whether you have imported relative packages, \n"+ "local - github.com/apache/dubbo-go/metadata/service/inmemory, \n"+ "remote - github.com/apache/dubbo-go/metadata/service/remote", msType)) } diff --git a/config/config_loader.go b/config/config_loader.go index 8b196305b9..75b82628d6 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -21,11 +21,14 @@ import ( "fmt" "log" "os" + "reflect" + "strconv" "sync" "time" ) import ( + gxnet "github.com/dubbogo/gost/net" perrors "github.com/pkg/errors" ) @@ -35,6 +38,7 @@ import ( "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" _ "github.com/apache/dubbo-go/common/observer/dispatcher" + "github.com/apache/dubbo-go/registry" ) var ( @@ -206,6 +210,97 @@ func loadProviderConfig() { panic(fmt.Sprintf("service %s export failed! err: %#v", key, err)) } } + registerServiceInstance() +} + +// registerServiceInstance register service instance +func registerServiceInstance() { + url := selectMetadataServiceExportedURL() + if url == nil { + return + } + instance, err := createInstance(*url) + if err != nil { + panic(err) + } + p := extension.GetProtocol(constant.REGISTRY_KEY) + var rp registry.RegistryFactory + var ok bool + if rp, ok = p.(registry.RegistryFactory); !ok { + panic("dubbo registry protocol{" + reflect.TypeOf(p).String() + "} is invalid") + } + rs := rp.GetRegistries() + for _, r := range rs { + var sdr registry.ServiceDiscoveryHolder + if sdr, ok = r.(registry.ServiceDiscoveryHolder); !ok { + continue + } + err := sdr.GetServiceDiscovery().Register(instance) + if err != nil { + panic(err) + } + } +} + +// nolint +func createInstance(url common.URL) (registry.ServiceInstance, error) { + appConfig := GetApplicationConfig() + port, err := strconv.ParseInt(url.Port, 10, 32) + if err != nil { + return nil, perrors.WithMessage(err, "invalid port: "+url.Port) + } + + host := url.Ip + if len(host) == 0 { + host, err = gxnet.GetLocalIP() + if err != nil { + return nil, perrors.WithMessage(err, "could not get the local Ip") + } + } + + // usually we will add more metadata + metadata := make(map[string]string, 8) + metadata[constant.METADATA_STORAGE_TYPE_PROPERTY_NAME] = appConfig.MetadataType + + return ®istry.DefaultServiceInstance{ + ServiceName: appConfig.Name, + Host: host, + Port: int(port), + Id: host + constant.KEY_SEPARATOR + url.Port, + Enable: true, + Healthy: true, + Metadata: metadata, + }, nil +} + +// selectMetadataServiceExportedURL get already be exported url +func selectMetadataServiceExportedURL() *common.URL { + var selectedUrl common.URL + metaDataService, err := extension.GetMetadataService(GetApplicationConfig().MetadataType) + if err != nil { + logger.Warn(err) + return nil + } + list, err := metaDataService.GetExportedURLs(constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE) + if err != nil { + panic(err) + } + if len(list) == 0 { + return nil + } + for _, urlStr := range list { + url, err := common.NewURL(urlStr.(string)) + if err != nil { + logger.Errorf("url format error {%v}", url) + continue + } + selectedUrl = url + // rest first + if url.Protocol == "rest" { + break + } + } + return &selectedUrl } func initRouter() { diff --git a/config/config_loader_test.go b/config/config_loader_test.go index a219b9f465..461e607c1e 100644 --- a/config/config_loader_test.go +++ b/config/config_loader_test.go @@ -19,10 +19,16 @@ package config import ( "path/filepath" + "sort" + "sync" "testing" ) import ( + cm "github.com/Workiva/go-datastructures/common" + "github.com/Workiva/go-datastructures/slice/skip" + gxset "github.com/dubbogo/gost/container/set" + gxpage "github.com/dubbogo/gost/page" "github.com/stretchr/testify/assert" "go.uber.org/atomic" ) @@ -33,8 +39,11 @@ import ( "github.com/apache/dubbo-go/common/config" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/common/proxy/proxy_factory" "github.com/apache/dubbo-go/config_center" + "github.com/apache/dubbo-go/metadata/service" + "github.com/apache/dubbo-go/registry" ) const mockConsumerConfigPath = "./testdata/consumer_config.yml" @@ -74,7 +83,17 @@ func TestLoad(t *testing.T) { extension.SetProtocol("registry", GetProtocol) extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster) extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory) - + GetApplicationConfig().MetadataType = "mock" + var mm *mockMetadataService + extension.SetMetadataService("mock", func() (metadataService service.MetadataService, err error) { + if mm == nil { + mm = &mockMetadataService{ + exportedServiceURLs: new(sync.Map), + lock: new(sync.RWMutex), + } + } + return mm, nil + }) Load() assert.Equal(t, ms, GetRPCService(ms.Reference())) @@ -103,7 +122,17 @@ func TestLoadWithSingleReg(t *testing.T) { extension.SetProtocol("registry", GetProtocol) extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster) extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory) - + var mm *mockMetadataService + GetApplicationConfig().MetadataType = "mock" + extension.SetMetadataService("mock", func() (metadataService service.MetadataService, err error) { + if mm == nil { + mm = &mockMetadataService{ + exportedServiceURLs: new(sync.Map), + lock: new(sync.RWMutex), + } + } + return mm, nil + }) Load() assert.Equal(t, ms, GetRPCService(ms.Reference())) @@ -132,7 +161,17 @@ func TestWithNoRegLoad(t *testing.T) { extension.SetProtocol("registry", GetProtocol) extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster) extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory) - + var mm *mockMetadataService + GetApplicationConfig().MetadataType = "mock" + extension.SetMetadataService("mock", func() (metadataService service.MetadataService, err error) { + if mm == nil { + mm = &mockMetadataService{ + exportedServiceURLs: new(sync.Map), + lock: new(sync.RWMutex), + } + } + return mm, nil + }) Load() assert.Equal(t, ms, GetRPCService(ms.Reference())) @@ -300,3 +339,234 @@ func mockInitProviderWithSingleRegistry() { }, } } + +type mockMetadataService struct { + exportedServiceURLs *sync.Map + lock *sync.RWMutex +} + +func (m *mockMetadataService) Reference() string { + panic("implement me") +} + +func (m *mockMetadataService) ServiceName() (string, error) { + panic("implement me") +} + +func (m *mockMetadataService) ExportURL(url common.URL) (bool, error) { + return m.addURL(m.exportedServiceURLs, &url), nil +} + +func (m *mockMetadataService) UnexportURL(url common.URL) error { + panic("implement me") +} + +func (m *mockMetadataService) SubscribeURL(url common.URL) (bool, error) { + panic("implement me") +} + +func (m *mockMetadataService) UnsubscribeURL(url common.URL) error { + panic("implement me") +} + +func (m *mockMetadataService) PublishServiceDefinition(url common.URL) error { + return nil +} + +func (m *mockMetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]interface{}, error) { + return ConvertURLArrToIntfArr(m.getAllService(m.exportedServiceURLs)), nil +} + +func (m *mockMetadataService) MethodMapper() map[string]string { + panic("implement me") +} + +func (m *mockMetadataService) GetSubscribedURLs() ([]common.URL, error) { + panic("implement me") +} + +func (m *mockMetadataService) GetServiceDefinition(interfaceName string, group string, version string) (string, error) { + panic("implement me") +} + +func (m *mockMetadataService) GetServiceDefinitionByServiceKey(serviceKey string) (string, error) { + panic("implement me") +} + +func (m *mockMetadataService) RefreshMetadata(exportedRevision string, subscribedRevision string) (bool, error) { + panic("implement me") +} + +func (m *mockMetadataService) Version() (string, error) { + panic("implement me") +} + +func (mts *mockMetadataService) addURL(targetMap *sync.Map, url *common.URL) bool { + var ( + urlSet interface{} + loaded bool + ) + logger.Debug(url.ServiceKey()) + if urlSet, loaded = targetMap.LoadOrStore(url.ServiceKey(), skip.New(uint64(0))); loaded { + mts.lock.RLock() + wantedUrl := urlSet.(*skip.SkipList).Get(Comparator(*url)) + if len(wantedUrl) > 0 && wantedUrl[0] != nil { + mts.lock.RUnlock() + return false + } + mts.lock.RUnlock() + } + mts.lock.Lock() + // double chk + wantedUrl := urlSet.(*skip.SkipList).Get(Comparator(*url)) + if len(wantedUrl) > 0 && wantedUrl[0] != nil { + mts.lock.Unlock() + return false + } + urlSet.(*skip.SkipList).Insert(Comparator(*url)) + mts.lock.Unlock() + return true +} + +func (m *mockMetadataService) getAllService(services *sync.Map) []common.URL { + // using skip list to dedup and sorting + res := make([]common.URL, 0) + services.Range(func(key, value interface{}) bool { + urls := value.(*skip.SkipList) + for i := uint64(0); i < urls.Len(); i++ { + url := common.URL(urls.ByPosition(i).(Comparator)) + if url.GetParam(constant.INTERFACE_KEY, url.Path) != constant.METADATA_SERVICE_NAME { + res = append(res, url) + } + } + return true + }) + sort.Sort(common.URLSlice(res)) + return res +} + +type Comparator common.URL + +// Compare is defined as Comparator for skip list to compare the URL +func (c Comparator) Compare(comp cm.Comparator) int { + a := common.URL(c).String() + b := common.URL(comp.(Comparator)).String() + switch { + case a > b: + return 1 + case a < b: + return -1 + default: + return 0 + } +} + +type mockServiceDiscoveryRegistry struct { +} + +func (mr *mockServiceDiscoveryRegistry) GetUrl() common.URL { + panic("implement me") +} + +func (mr *mockServiceDiscoveryRegistry) IsAvailable() bool { + panic("implement me") +} + +func (mr *mockServiceDiscoveryRegistry) Destroy() { + panic("implement me") +} + +func (mr *mockServiceDiscoveryRegistry) Register(url common.URL) error { + panic("implement me") +} + +func (mr *mockServiceDiscoveryRegistry) UnRegister(url common.URL) error { + panic("implement me") +} + +func (mr *mockServiceDiscoveryRegistry) Subscribe(*common.URL, registry.NotifyListener) error { + panic("implement me") +} + +func (mr *mockServiceDiscoveryRegistry) UnSubscribe(*common.URL, registry.NotifyListener) error { + panic("implement me") +} + +func (s *mockServiceDiscoveryRegistry) GetServiceDiscovery() registry.ServiceDiscovery { + return &mockServiceDiscovery{} +} + +type mockServiceDiscovery struct { +} + +func (m *mockServiceDiscovery) String() string { + panic("implement me") +} + +func (m *mockServiceDiscovery) Destroy() error { + panic("implement me") +} + +func (m *mockServiceDiscovery) Register(instance registry.ServiceInstance) error { + return nil +} + +func (m *mockServiceDiscovery) Update(instance registry.ServiceInstance) error { + panic("implement me") +} + +func (m *mockServiceDiscovery) Unregister(instance registry.ServiceInstance) error { + panic("implement me") +} + +func (m *mockServiceDiscovery) GetDefaultPageSize() int { + panic("implement me") +} + +func (m *mockServiceDiscovery) GetServices() *gxset.HashSet { + panic("implement me") +} + +func (m *mockServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { + panic("implement me") +} + +func (m *mockServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager { + panic("implement me") +} + +func (m *mockServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager { + panic("implement me") +} + +func (m *mockServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager { + panic("implement me") +} + +func (m *mockServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error { + panic("implement me") +} + +func (m *mockServiceDiscovery) DispatchEventByServiceName(serviceName string) error { + panic("implement me") +} + +func (m *mockServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error { + panic("implement me") +} + +func (m *mockServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error { + panic("implement me") +} + +func ConvertURLArrToIntfArr(urls []common.URL) []interface{} { + if len(urls) == 0 { + return []interface{}{} + } + + res := make([]interface{}, 0, len(urls)) + for _, u := range urls { + res = append(res, u.String()) + } + return res +} diff --git a/config/reference_config_test.go b/config/reference_config_test.go index e457801596..a4345ad13d 100644 --- a/config/reference_config_test.go +++ b/config/reference_config_test.go @@ -32,6 +32,7 @@ import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/registry" ) var regProtocol protocol.Protocol @@ -338,9 +339,37 @@ func (*mockRegistryProtocol) Refer(url common.URL) protocol.Invoker { } func (*mockRegistryProtocol) Export(invoker protocol.Invoker) protocol.Exporter { + registryUrl := getRegistryUrl(invoker) + if registryUrl.Protocol == "service-discovery" { + metaDataService, err := extension.GetMetadataService(GetApplicationConfig().MetadataType) + if err != nil { + panic(err) + } + ok, err := metaDataService.ExportURL(*invoker.GetUrl().SubURL.Clone()) + if err != nil { + panic(err) + } + if !ok { + panic("The URL has been registry!") + } + } return protocol.NewBaseExporter("test", invoker, &sync.Map{}) } func (*mockRegistryProtocol) Destroy() { // Destroy is a mock function } +func getRegistryUrl(invoker protocol.Invoker) *common.URL { + // here add * for return a new url + url := invoker.GetUrl() + // if the protocol == registry ,set protocol the registry value in url.params + if url.Protocol == constant.REGISTRY_PROTOCOL { + protocol := url.GetParam(constant.REGISTRY_KEY, "") + url.Protocol = protocol + } + return &url +} + +func (p *mockRegistryProtocol) GetRegistries() []registry.Registry { + return []registry.Registry{&mockServiceDiscoveryRegistry{}} +} diff --git a/config/service_config_test.go b/config/service_config_test.go index d2bbda0c49..4d4122ee70 100644 --- a/config/service_config_test.go +++ b/config/service_config_test.go @@ -40,13 +40,33 @@ func doInitProvider() { Module: "module", Version: "2.6.0", Owner: "dubbo", - Environment: "test"}, + Environment: "test", + }, + Remotes: map[string]*RemoteConfig{ + "test1": { + Address: "127.0.0.5:2181", + TimeoutStr: "5s", + Username: "user1", + Password: "pwd1", + Params: nil, + }, + }, + ServiceDiscoveries: map[string]*ServiceDiscoveryConfig{ + "mock_servicediscovery": { + Protocol: "mock", + RemoteRef: "test1", + }, + }, + MetadataReportConfig: &MetadataReportConfig{ + Protocol: "mock", + RemoteRef: "test1", + }, }, Services: map[string]*ServiceConfig{ "MockService": { InterfaceName: "com.MockService", Protocol: "mock", - Registry: "shanghai_reg1,shanghai_reg2,hangzhou_reg1,hangzhou_reg2", + Registry: "shanghai_reg1,shanghai_reg2,hangzhou_reg1,hangzhou_reg2,hangzhou_service_discovery_reg", Cluster: "failover", Loadbalance: "random", Retries: "3", @@ -71,7 +91,7 @@ func doInitProvider() { "MockServiceNoRightProtocol": { InterfaceName: "com.MockService", Protocol: "mock1", - Registry: "shanghai_reg1,shanghai_reg2,hangzhou_reg1,hangzhou_reg2", + Registry: "shanghai_reg1,shanghai_reg2,hangzhou_reg1,hangzhou_reg2,hangzhou_service_discovery_reg", Cluster: "failover", Loadbalance: "random", Retries: "3", @@ -128,6 +148,14 @@ func doInitProvider() { Username: "user1", Password: "pwd1", }, + "hangzhou_service_discovery_reg": { + Protocol: "service-discovery", + Params: map[string]string{ + "service_discovery": "mock_servicediscovery", + "name_mapping": "in-memory", + "metadata": "default", + }, + }, }, Protocols: map[string]*ProtocolConfig{ diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index 4c669b2cee..cbaafd7731 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -117,6 +117,18 @@ func (proto *registryProtocol) initConfigurationListeners() { proto.providerConfigurationListener = newProviderConfigurationListener(proto.overrideListeners) } +// nolint +func (proto *registryProtocol) GetRegistries() []registry.Registry { + var rs []registry.Registry + proto.registries.Range(func(_, v interface{}) bool { + if r, ok := v.(registry.Registry); ok { + rs = append(rs, r) + } + return true + }) + return rs +} + // Refer provider service from registry center func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { var registryUrl = url @@ -373,7 +385,7 @@ func setProviderUrl(regURL *common.URL, providerURL *common.URL) { regURL.SubURL = providerURL } -// GetProtocol return the singleton RegistryProtocol +// GetProtocol return the singleton registryProtocol func GetProtocol() protocol.Protocol { once.Do(func() { regProtocol = newRegistryProtocol() diff --git a/registry/registry_factory.go b/registry/registry_factory.go new file mode 100644 index 0000000000..caefbce2eb --- /dev/null +++ b/registry/registry_factory.go @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package registry + +// RegistryFactory +type RegistryFactory interface { + // GetRegistries get registries + GetRegistries() []Registry +} diff --git a/registry/service_discovery_factory.go b/registry/service_discovery_factory.go new file mode 100644 index 0000000000..3bcf72612a --- /dev/null +++ b/registry/service_discovery_factory.go @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package registry + +// ServiceDiscoveryHolder we can get a service discovery +// it always be a service discovery registry +type ServiceDiscoveryHolder interface { + // GetServiceDiscovery get service discovery + GetServiceDiscovery() ServiceDiscovery +} diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go index 061d832b03..cdb586c137 100644 --- a/registry/servicediscovery/service_discovery_registry.go +++ b/registry/servicediscovery/service_discovery_registry.go @@ -28,7 +28,6 @@ import ( import ( cm "github.com/Workiva/go-datastructures/common" gxset "github.com/dubbogo/gost/container/set" - gxnet "github.com/dubbogo/gost/net" perrors "github.com/pkg/errors" "go.uber.org/atomic" ) @@ -176,18 +175,6 @@ func (s *serviceDiscoveryRegistry) Register(url common.URL) error { logger.Warnf("The URL[%s] has been registry!", url.String()) } - // we try to register this instance. Dubbo do this in org.apache.dubbo.config.bootstrap.DubboBootstrap - // But we don't want to design a similar bootstrap class. - ins, err := createInstance(url) - if err != nil { - return perrors.WithMessage(err, "could not create servcie instance, please check your service url") - } - - err = s.serviceDiscovery.Register(ins) - if err != nil { - return perrors.WithMessage(err, "register the service failed") - } - err = s.metaDataService.PublishServiceDefinition(url) if err != nil { return perrors.WithMessage(err, "publish the service definition failed. ") @@ -198,36 +185,6 @@ func (s *serviceDiscoveryRegistry) Register(url common.URL) error { url.Protocol) } -func createInstance(url common.URL) (registry.ServiceInstance, error) { - appConfig := config.GetApplicationConfig() - port, err := strconv.ParseInt(url.Port, 10, 32) - if err != nil { - return nil, perrors.WithMessage(err, "invalid port: "+url.Port) - } - - host := url.Ip - if len(host) == 0 { - host, err = gxnet.GetLocalIP() - if err != nil { - return nil, perrors.WithMessage(err, "could not get the local Ip") - } - } - - // usually we will add more metadata - metadata := make(map[string]string, 8) - metadata[constant.METADATA_STORAGE_TYPE_PROPERTY_NAME] = appConfig.MetadataType - - return ®istry.DefaultServiceInstance{ - ServiceName: appConfig.Name, - Host: host, - Port: int(port), - Id: host + constant.KEY_SEPARATOR + url.Port, - Enable: true, - Healthy: true, - Metadata: metadata, - }, nil -} - func shouldRegister(url common.URL) bool { side := url.GetParam(constant.SIDE_KEY, "") if side == constant.PROVIDER_PROTOCOL {