From b87dbe226895fc45fe2f5eb81b14b4319682c371 Mon Sep 17 00:00:00 2001 From: "vito.he" Date: Wed, 6 May 2020 17:44:57 +0800 Subject: [PATCH 1/7] Add:metadata report delegate & remote metadata service --- common/constant/key.go | 4 + config/instance/metedata_report.go | 25 +- config/metadata_report_config.go | 2 +- go.mod | 1 + go.sum | 11 + metadata/definition/definition.go | 40 +++ metadata/definition/definition_test.go | 52 ++++ metadata/definition/mock.go | 46 +++ .../identifier/service_metadata_identifier.go | 13 + .../subscribe_metadata_identifier.go | 2 +- metadata/report/delegate/delegate_report.go | 281 ++++++++++++++++++ .../report/delegate/delegate_report_test.go | 123 ++++++++ metadata/report/report.go | 13 +- metadata/service/inmemory/service.go | 39 ++- metadata/service/inmemory/service_test.go | 27 +- metadata/service/remote/service.go | 195 ++++++++++++ metadata/service/remote/service_test.go | 139 +++++++++ metadata/service/service.go | 7 + 18 files changed, 961 insertions(+), 59 deletions(-) create mode 100644 metadata/definition/definition_test.go create mode 100644 metadata/definition/mock.go create mode 100644 metadata/report/delegate/delegate_report.go create mode 100644 metadata/report/delegate/delegate_report_test.go create mode 100644 metadata/service/remote/service.go create mode 100644 metadata/service/remote/service_test.go diff --git a/common/constant/key.go b/common/constant/key.go index da21a3a9e1..6acb2299c4 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -79,6 +79,10 @@ const ( EXECUTE_REJECTED_EXECUTION_HANDLER_KEY = "execute.limit.rejected.handler" PROVIDER_SHUTDOWN_FILTER = "pshutdown" CONSUMER_SHUTDOWN_FILTER = "cshutdown" + SYNC_REPORT_KEY = "sync.report" + RETRY_PERIOD_KEY = "retry.period" + RETRY_TIMES_KEY = "retry.times" + CYCLE_REPORT_KEY = "cycle.report" ) const ( diff --git a/config/instance/metedata_report.go b/config/instance/metedata_report.go index 9cf435bc9d..70e6ffc23d 100644 --- a/config/instance/metedata_report.go +++ b/config/instance/metedata_report.go @@ -28,14 +28,31 @@ import ( ) var ( - instance report.MetadataReport - once sync.Once + instance report.MetadataReport + reportUrl common.URL + once sync.Once ) -// GetMetadataReportInstance ... -func GetMetadataReportInstance(url *common.URL) report.MetadataReport { +// InitMetadataReportInstance will create the metadata report instance by the specified metadata report url +func InitMetadataReportInstance(url *common.URL) report.MetadataReport { once.Do(func() { instance = extension.GetMetadataReportFactory(url.Protocol).CreateMetadataReport(url) + reportUrl = *url }) return instance } + +// GetMetadataReportInstance will return the instance +func GetMetadataReportInstance() report.MetadataReport { + return instance +} + +// GetMetadataReportUrl will return the report instance url +func GetMetadataReportUrl() common.URL { + return reportUrl +} + +// SetMetadataReportUrl will only can be used by unit test to mock url +func SetMetadataReportUrl(url common.URL) { + reportUrl = url +} diff --git a/config/metadata_report_config.go b/config/metadata_report_config.go index 41fb6b4769..95506462a2 100644 --- a/config/metadata_report_config.go +++ b/config/metadata_report_config.go @@ -101,7 +101,7 @@ func startMetadataReport(metadataType string, metadataReportConfig *MetadataRepo } if url, err := metadataReportConfig.ToUrl(); err == nil { - instance.GetMetadataReportInstance(url) + instance.InitMetadataReportInstance(url) } else { return perrors.New("MetadataConfig is invalid!") } diff --git a/go.mod b/go.mod index fe1891ea6e..54e5ebcd93 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/dubbogo/gost v1.8.0 github.com/emicklei/go-restful/v3 v3.0.0 github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect + github.com/go-co-op/gocron v0.1.1 github.com/go-errors/errors v1.0.1 // indirect github.com/go-resty/resty/v2 v2.1.0 github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect diff --git a/go.sum b/go.sum index 73c3da87a0..46235271ab 100644 --- a/go.sum +++ b/go.sum @@ -137,6 +137,8 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo github.com/ghodss/yaml v0.0.0-20150909031657-73d445a93680/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/go-co-op/gocron v0.1.1 h1:OfDmkqkCguFtFMsm6Eaayci3DADLa8pXvdmOlPU/JcU= +github.com/go-co-op/gocron v0.1.1/go.mod h1:Y9PWlYqDChf2Nbgg7kfS+ZsXHDTZbMZYPEQ0MILqH+M= github.com/go-errors/errors v1.0.1 h1:LUHzmkK3GUKUrL/1gfBUxAHzcev3apQlezX/+O7ma6w= github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q= github.com/go-ini/ini v1.25.4 h1:Mujh4R/dH6YL8bxuISne3xX2+qcQ9p0IxKAP6ExWoUo= @@ -152,6 +154,7 @@ github.com/go-openapi/jsonpointer v0.0.0-20160704185906-46af16f9f7b1/go.mod h1:+ github.com/go-openapi/jsonreference v0.0.0-20160704190145-13c6e3589ad9/go.mod h1:W3Z9FmVs9qj+KR4zFKmDPGiLdk1D9Rlm7cyMvf57TTg= github.com/go-openapi/spec v0.0.0-20160808142527-6aced65f8501/go.mod h1:J8+jY1nAiCcj+friV/PDoE1/3eeccG9LYBs0tYvLOWc= github.com/go-openapi/swag v0.0.0-20160704191624-1d0bd113de87/go.mod h1:DXUve3Dpr1UfpPtxFw+EFuQ41HhCWZfha5jSVRG7C7I= +github.com/go-redis/redis v6.15.5+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= github.com/go-resty/resty/v2 v2.1.0 h1:Z6IefCpUMfnvItVJaJXWv/pMiiD11So35QgwEELsldE= github.com/go-resty/resty/v2 v2.1.0/go.mod h1:dZGr0i9PLlaaTD4H/hoZIDjQ+r6xq8mgbRzHZf7f2J8= github.com/go-sql-driver/mysql v0.0.0-20180618115901-749ddf1598b4 h1:1LlmVz15APoKz9dnm5j2ePptburJlwEH+/v/pUuoxck= @@ -391,10 +394,14 @@ github.com/oklog/run v0.0.0-20180308005104-6934b124db28/go.mod h1:dlhp/R75TPv97u github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.6.0 h1:Ix8l273rp3QzYgXSR+c8d1fTG7UPgYkOSELPhiY/YGw= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo= +github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v1.4.1/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I= github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME= +github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/opencontainers/go-digest v1.0.0-rc1 h1:WzifXhOVOEOuFYOJAW6aQqW0TooG2iki3E3Ii+WN7gQ= github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s= github.com/opencontainers/image-spec v1.0.1 h1:JMemWkRwHx4Zj+fVxWoMCFm/8sYGGrUVojFA6h/TRcI= @@ -526,6 +533,8 @@ golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190628185345-da137c7871d7 h1:rTIdg5QFRR7XCaK4LCjBiPbx8j4DQRpdYMnGn/bJUEU= golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b h1:0mm1VjtFUOIlE1SbDlwjYaDxZVDP2S5ou6y0gSgXHu8= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.0.0-20170807180024-9a379c6b3e95/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be h1:vEDujvNQGv4jgYKudGeI/+DAX4Jffq6hpD55MmoEvKs= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -534,6 +543,8 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/metadata/definition/definition.go b/metadata/definition/definition.go index 8d4a584ee5..18578cb2c1 100644 --- a/metadata/definition/definition.go +++ b/metadata/definition/definition.go @@ -19,6 +19,8 @@ package definition import ( "bytes" + "encoding/json" + "fmt" ) import ( @@ -26,6 +28,11 @@ import ( "github.com/apache/dubbo-go/common/constant" ) +// ServiceDefinition is a interface of service's definition +type ServiceDefiner interface { + ToBytes() ([]byte, error) +} + // ServiceDefinition is the describer of service definition type ServiceDefinition struct { CanonicalName string @@ -34,6 +41,39 @@ type ServiceDefinition struct { Types []TypeDefinition } +func (def ServiceDefinition) ToBytes() ([]byte, error) { + return json.Marshal(def) + +} + +func (def ServiceDefinition) String() string { + var methodStr string + for _, m := range def.Methods { + var paramType string + for _, p := range m.ParameterTypes { + paramType = paramType + fmt.Sprintf("{type:%v}", p) + } + var param string + for _, d := range m.Parameters { + param = param + fmt.Sprintf("{id:%v,type:%v,builderName:%v}", d.Id, d.Type, d.TypeBuilderName) + } + methodStr = methodStr + fmt.Sprintf("{name:%v,parameterTypes:[%v],returnType:%v,params:[%v] }", m.Name, paramType, m.ReturnType, param) + + } + var types string + for _, d := range def.Types { + types = types + fmt.Sprintf("{id:%v,type:%v,builderName:%v}", d.Id, d.Type, d.TypeBuilderName) + } + + return fmt.Sprintf("{canonicalName:%v, codeSource:%v, methods:[%v], types:[%v]}", def.CanonicalName, def.CodeSource, methodStr, types) +} + +// FullServiceDefinition is the describer of service definition with parameters +type FullServiceDefinition struct { + ServiceDefinition + Params map[string]string +} + // MethodDefinition is the describer of method definition type MethodDefinition struct { Name string diff --git a/metadata/definition/definition_test.go b/metadata/definition/definition_test.go new file mode 100644 index 0000000000..958f9324d0 --- /dev/null +++ b/metadata/definition/definition_test.go @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package definition + +import ( + "fmt" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" +) + +func TestBuildServiceDefinition(t *testing.T) { + serviceName := "com.ikurento.user.UserProvider" + group := "group1" + version := "0.0.1" + protocol := "dubbo" + beanName := "UserProvider" + url, err := common.NewURL(fmt.Sprintf( + "%v://127.0.0.1:20000/com.ikurento.user.UserProvider1?anyhost=true&"+ + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+ + "environment=dev&interface=%v&ip=192.168.56.1&methods=GetUser&module=dubbogo+user-info+server&org=ikurento.com&"+ + "owner=ZX&pid=1447&revision=0.0.1&side=provider&timeout=3000×tamp=1556509797245&group=%v&version=%v&bean.name=%v", + protocol, serviceName, group, version, beanName)) + assert.NoError(t, err) + _, err = common.ServiceMap.Register(serviceName, protocol, &UserProvider{}) + assert.NoError(t, err) + service := common.ServiceMap.GetService(url.Protocol, url.GetParam(constant.BEAN_NAME_KEY, url.Service())) + sd := BuildServiceDefinition(*service, url) + assert.Equal(t, "{canonicalName:com.ikurento.user.UserProvider, codeSource:, methods:[{name:GetUser,parameterTypes:[{type:slice}],returnType:ptr,params:[] }], types:[]}", sd.String()) +} diff --git a/metadata/definition/mock.go b/metadata/definition/mock.go new file mode 100644 index 0000000000..ca9e125a74 --- /dev/null +++ b/metadata/definition/mock.go @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package definition + +import ( + "context" + "time" +) + +type User struct { + Id string + Name string + Age int32 + Time time.Time +} + +type UserProvider struct { +} + +func (u *UserProvider) GetUser(ctx context.Context, req []interface{}) (*User, error) { + rsp := User{"A001", "Alex Stocks", 18, time.Now()} + return &rsp, nil +} + +func (u *UserProvider) Reference() string { + return "UserProvider" +} + +func (u User) JavaClassName() string { + return "com.ikurento.user.User" +} diff --git a/metadata/identifier/service_metadata_identifier.go b/metadata/identifier/service_metadata_identifier.go index 92c15704db..7cdb55e53d 100644 --- a/metadata/identifier/service_metadata_identifier.go +++ b/metadata/identifier/service_metadata_identifier.go @@ -18,6 +18,7 @@ package identifier import ( + "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" ) @@ -28,6 +29,18 @@ type ServiceMetadataIdentifier struct { BaseMetadataIdentifier } +func NewServiceMetadataIdentifier(url common.URL) *ServiceMetadataIdentifier { + return &ServiceMetadataIdentifier{ + BaseMetadataIdentifier: BaseMetadataIdentifier{ + ServiceInterface: url.Service(), + Version: url.GetParam(constant.VERSION_KEY, ""), + Group: url.GetParam(constant.GROUP_KEY, ""), + Side: url.GetParam(constant.SIDE_KEY, ""), + }, + Protocol: url.Protocol, + } +} + // GetIdentifierKey will return string format as service:Version:Group:Side:Protocol:"revision"+Revision func (mdi *ServiceMetadataIdentifier) GetIdentifierKey() string { return mdi.BaseMetadataIdentifier.getIdentifierKey(mdi.Protocol, constant.KEY_REVISON_PREFIX+mdi.Revision) diff --git a/metadata/identifier/subscribe_metadata_identifier.go b/metadata/identifier/subscribe_metadata_identifier.go index e599fc9e0d..fa35ab79d6 100644 --- a/metadata/identifier/subscribe_metadata_identifier.go +++ b/metadata/identifier/subscribe_metadata_identifier.go @@ -20,7 +20,7 @@ package identifier // SubscriberMetadataIdentifier is inherit baseMetaIdentifier with service params: Revision type SubscriberMetadataIdentifier struct { Revision string - BaseMetadataIdentifier + MetadataIdentifier } // GetIdentifierKey will return string format as service:Version:Group:Side:Revision diff --git a/metadata/report/delegate/delegate_report.go b/metadata/report/delegate/delegate_report.go new file mode 100644 index 0000000000..870625b9cb --- /dev/null +++ b/metadata/report/delegate/delegate_report.go @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package delegate + +import ( + "encoding/json" + "sync" + "time" +) + +import ( + "github.com/go-co-op/gocron" + "go.uber.org/atomic" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config/instance" + "github.com/apache/dubbo-go/metadata/definition" + "github.com/apache/dubbo-go/metadata/identifier" +) + +const ( + // defaultMetadataReportRetryTimes is defined for max times to retry + defaultMetadataReportRetryTimes int64 = 100 + // defaultMetadataReportRetryPeriod is defined for cycle interval to retry, the unit is second + defaultMetadataReportRetryPeriod int64 = 3 + // defaultMetadataReportRetryPeriod is defined for cycle report or not + defaultMetadataReportCycleReport bool = true +) + +// metadataReportRetry is a scheduler for retrying task +type metadataReportRetry struct { + retryPeriod int64 + retryLimit int64 + scheduler *gocron.Scheduler + job *gocron.Job + retryCounter *atomic.Int64 + // if no failed report, wait how many times to run retry task. + retryTimesIfNonFail int64 +} + +// newMetadataReportRetry will create a scheduler for retry task +func newMetadataReportRetry(retryPeriod int64, retryLimit int64, retryFunc func() bool) (*metadataReportRetry, error) { + s1 := gocron.NewScheduler(time.UTC) + + mrr := &metadataReportRetry{ + retryPeriod: retryPeriod, + retryLimit: retryLimit, + scheduler: s1, + retryCounter: atomic.NewInt64(0), + retryTimesIfNonFail: 600, + } + + newJob, err := mrr.scheduler.Every(uint64(mrr.retryPeriod)).Seconds().Do( + func() { + mrr.retryCounter.Inc() + logger.Infof("start to retry task for metadata report. retry times: %v", mrr.retryCounter.Load()) + if mrr.retryCounter.Load() > mrr.retryLimit { + mrr.scheduler.Clear() + } else if retryFunc() && mrr.retryCounter.Load() > mrr.retryTimesIfNonFail { + mrr.scheduler.Clear() // may not interrupt the running job + } + }) + + mrr.job = newJob + return mrr, err +} + +// startRetryTask will make scheduler with retry task run +func (mrr *metadataReportRetry) startRetryTask() { + mrr.scheduler.StartAt(time.Now().Add(500 * time.Millisecond)) + mrr.scheduler.Start() +} + +// MetadataReport is a absolute delegate for MetadataReport +type MetadataReport struct { + reportUrl common.URL + syncReport bool + metadataReportRetry *metadataReportRetry + + failedReports map[*identifier.MetadataIdentifier]interface{} + failedReportsLock sync.RWMutex + + // allMetadataReports store all the metdadata reports records in memory + allMetadataReports map[*identifier.MetadataIdentifier]interface{} + allMetadataReportsLock sync.RWMutex +} + +// NewMetadataReport will create a MetadataReport with initiation +func NewMetadataReport() (*MetadataReport, error) { + url := instance.GetMetadataReportUrl() + bmr := &MetadataReport{ + reportUrl: url, + syncReport: url.GetParamBool(constant.SYNC_REPORT_KEY, false), + failedReports: make(map[*identifier.MetadataIdentifier]interface{}, 4), + allMetadataReports: make(map[*identifier.MetadataIdentifier]interface{}, 4), + } + + mrr, err := newMetadataReportRetry( + url.GetParamInt(constant.RETRY_PERIOD_KEY, defaultMetadataReportRetryPeriod), + url.GetParamInt(constant.RETRY_TIMES_KEY, defaultMetadataReportRetryTimes), + bmr.retry, + ) + + if err != nil { + return nil, err + } + + bmr.metadataReportRetry = mrr + if url.GetParamBool(constant.CYCLE_REPORT_KEY, defaultMetadataReportCycleReport) { + scheduler := gocron.NewScheduler(time.UTC) + _, err := scheduler.Every(1).Day().Do( + func() { + logger.Info("start to publish all metadata.") + bmr.allMetadataReportsLock.RLock() + bmr.doHandlerMetadataCollection(bmr.allMetadataReports) + bmr.allMetadataReportsLock.RUnlock() + + }) + if err != nil { + return nil, err + } + scheduler.StartAt(time.Now().Add(500 * time.Millisecond)) + scheduler.Start() + } + return bmr, nil +} + +// retry will do metadata failed reports collection by call metadata report sdk +func (bmr *MetadataReport) retry() bool { + bmr.failedReportsLock.RLock() + bmr.failedReportsLock.Unlock() + return bmr.doHandlerMetadataCollection(bmr.failedReports) +} + +// StoreProviderMetadata will delegate to call remote metadata's sdk to store provider service definition +func (bmr *MetadataReport) StoreProviderMetadata(identifier *identifier.MetadataIdentifier, definer definition.ServiceDefiner) { + if bmr.syncReport { + bmr.storeMetadataTask(common.PROVIDER, identifier, definer) + } else { + go bmr.storeMetadataTask(common.PROVIDER, identifier, definer) + } +} + +// storeMetadataTask will delegate to call remote metadata's sdk to store +func (bmr *MetadataReport) storeMetadataTask(role int, identifier *identifier.MetadataIdentifier, definer interface{}) { + logger.Infof("store provider metadata. Identifier :%v ; definition: %v .", identifier, definer) + bmr.allMetadataReportsLock.Lock() + bmr.allMetadataReports[identifier] = definer + bmr.allMetadataReportsLock.Unlock() + + bmr.failedReportsLock.Lock() + delete(bmr.failedReports, identifier) + bmr.failedReportsLock.Unlock() + // data is store the json marshaled definition + var ( + data []byte + err error + ) + + defer func() { + if r := recover(); r != nil { + bmr.failedReportsLock.Lock() + bmr.failedReports[identifier] = definer + bmr.failedReportsLock.Unlock() + bmr.metadataReportRetry.startRetryTask() + logger.Errorf("Failed to put provider metadata %v in %v, cause: %v", identifier, string(data), r) + } + }() + + data, err = json.Marshal(definer) + if err != nil { + logger.Errorf("storeProviderMetadataTask error in stage json.Marshal, msg is %v", err) + panic(err) + } + report := instance.GetMetadataReportInstance() + if role == common.PROVIDER { + err = report.StoreProviderMetadata(identifier, string(data)) + } else if role == common.CONSUMER { + err = report.StoreConsumerMetadata(identifier, string(data)) + } + + if err != nil { + logger.Errorf("storeProviderMetadataTask error in stage call metadata report to StoreProviderMetadata, msg is %v", err) + panic(err) + } + +} + +// StoreConsumerMetadata will delegate to call remote metadata's sdk to store consumer side service definition +func (bmr *MetadataReport) StoreConsumerMetadata(identifier *identifier.MetadataIdentifier, definer map[string]string) { + if bmr.syncReport { + bmr.storeMetadataTask(common.CONSUMER, identifier, definer) + } else { + go bmr.storeMetadataTask(common.CONSUMER, identifier, definer) + } +} + +// SaveServiceMetadata will delegate to call remote metadata's sdk to save service metadata +func (bmr *MetadataReport) SaveServiceMetadata(identifier *identifier.ServiceMetadataIdentifier, url common.URL) error { + report := instance.GetMetadataReportInstance() + if bmr.syncReport { + return report.SaveServiceMetadata(identifier, url) + } else { + go report.SaveServiceMetadata(identifier, url) + return nil + } +} + +// RemoveServiceMetadata will delegate to call remote metadata's sdk to remove service metadata +func (bmr *MetadataReport) RemoveServiceMetadata(identifier *identifier.ServiceMetadataIdentifier) error { + report := instance.GetMetadataReportInstance() + if bmr.syncReport { + return report.RemoveServiceMetadata(identifier) + } else { + go report.RemoveServiceMetadata(identifier) + return nil + } +} + +// GetExportedURLs will delegate to call remote metadata's sdk to get exported urls +func (bmr *MetadataReport) GetExportedURLs(identifier *identifier.ServiceMetadataIdentifier) []string { + report := instance.GetMetadataReportInstance() + return report.GetExportedURLs(identifier) +} + +// SaveSubscribedData will delegate to call remote metadata's sdk to save subscribed data +func (bmr *MetadataReport) SaveSubscribedData(identifier *identifier.SubscriberMetadataIdentifier, urls []common.URL) error { + report := instance.GetMetadataReportInstance() + if bmr.syncReport { + return report.SaveSubscribedData(identifier, urls) + } else { + go report.SaveSubscribedData(identifier, urls) + return nil + } +} + +// GetSubscribedURLs will delegate to call remote metadata's sdk to get subscribed urls +func (MetadataReport) GetSubscribedURLs(identifier *identifier.SubscriberMetadataIdentifier) []string { + report := instance.GetMetadataReportInstance() + return report.GetSubscribedURLs(identifier) +} + +// GetServiceDefinition will delegate to call remote metadata's sdk to get service definitions +func (MetadataReport) GetServiceDefinition(identifier *identifier.MetadataIdentifier) string { + report := instance.GetMetadataReportInstance() + return report.GetServiceDefinition(identifier) +} + +// doHandlerMetadataCollection will store metadata to metadata support with given metadataMap +func (bmr *MetadataReport) doHandlerMetadataCollection(metadataMap map[*identifier.MetadataIdentifier]interface{}) bool { + if len(metadataMap) == 0 { + return true + } + for e := range metadataMap { + if common.RoleType(common.PROVIDER).Role() == e.Side { + bmr.StoreProviderMetadata(e, metadataMap[e].(*definition.FullServiceDefinition)) + } else if common.RoleType(common.CONSUMER).Role() == e.Side { + bmr.StoreConsumerMetadata(e, metadataMap[e].(map[string]string)) + } + } + return false +} diff --git a/metadata/report/delegate/delegate_report_test.go b/metadata/report/delegate/delegate_report_test.go new file mode 100644 index 0000000000..b5f8551a1a --- /dev/null +++ b/metadata/report/delegate/delegate_report_test.go @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package delegate + +import ( + "fmt" + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config/instance" + "github.com/apache/dubbo-go/metadata/definition" + "github.com/apache/dubbo-go/metadata/identifier" +) + +func TestMetadataReport_MetadataReportRetry(t *testing.T) { + counter := 1 + + retry, err := newMetadataReportRetry(1, 10, func() bool { + counter++ + return true + }) + assert.NoError(t, err) + retry.startRetryTask() + itsTime := time.After(2500 * time.Millisecond) + select { + case <-itsTime: + retry.scheduler.Clear() + assert.Equal(t, counter, 3) + logger.Info("over") + } +} + +func TestMetadataReport_MetadataReportRetryWithLimit(t *testing.T) { + counter := 1 + + retry, err := newMetadataReportRetry(1, 1, func() bool { + counter++ + return true + }) + assert.NoError(t, err) + retry.startRetryTask() + itsTime := time.After(2500 * time.Millisecond) + select { + case <-itsTime: + retry.scheduler.Clear() + assert.Equal(t, counter, 2) + logger.Info("over") + } + +} + +func mockNewMetadataReport(t *testing.T) *MetadataReport { + syncReportKey := "false" + retryPeroidKey := "3" + retryTimesKey := "100" + cycleReportKey := "true" + + url, err := common.NewURL(fmt.Sprintf( + "test://127.0.0.1:20000/?"+constant.SYNC_REPORT_KEY+"=%v&"+constant.RETRY_PERIOD_KEY+"=%v&"+ + constant.RETRY_TIMES_KEY+"=%v&"+constant.CYCLE_REPORT_KEY+"=%v", + syncReportKey, retryPeroidKey, retryTimesKey, cycleReportKey)) + assert.NoError(t, err) + instance.SetMetadataReportUrl(url) + mtr, err := NewMetadataReport() + assert.NoError(t, err) + assert.NotNil(t, mtr) + return mtr +} + +func TestMetadataReport_StoreProviderMetadata(t *testing.T) { + mtr := mockNewMetadataReport(t) + var metadataId = &identifier.MetadataIdentifier{ + Application: "app", + BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{ + ServiceInterface: "com.ikurento.user.UserProvider", + Version: "0.0.1", + Group: "group1", + Side: "provider", + }, + } + + mtr.StoreProviderMetadata(metadataId, getMockDefinition(metadataId, t)) +} + +func getMockDefinition(id *identifier.MetadataIdentifier, t *testing.T) definition.ServiceDefinition { + protocol := "dubbo" + beanName := "UserProvider" + url, err := common.NewURL(fmt.Sprintf( + "%v://127.0.0.1:20000/com.ikurento.user.UserProvider1?anyhost=true&"+ + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+ + "environment=dev&interface=%v&ip=192.168.56.1&methods=GetUser&module=dubbogo+user-info+server&org=ikurento.com&"+ + "owner=ZX&pid=1447&revision=0.0.1&side=provider&timeout=3000×tamp=1556509797245&group=%v&version=%v&bean.name=%v", + protocol, id.ServiceInterface, id.Group, id.Version, beanName)) + assert.NoError(t, err) + _, err = common.ServiceMap.Register(id.ServiceInterface, protocol, &definition.UserProvider{}) + assert.NoError(t, err) + service := common.ServiceMap.GetService(url.Protocol, url.GetParam(constant.BEAN_NAME_KEY, url.Service())) + return definition.BuildServiceDefinition(*service, url) +} diff --git a/metadata/report/report.go b/metadata/report/report.go index 81227e0c76..61cdda1f96 100644 --- a/metadata/report/report.go +++ b/metadata/report/report.go @@ -19,18 +19,17 @@ package report import ( "github.com/apache/dubbo-go/common" - "github.com/apache/dubbo-go/metadata/definition" "github.com/apache/dubbo-go/metadata/identifier" ) // MetadataReport is an interface of remote metadata report type MetadataReport interface { - StoreProviderMetadata(*identifier.MetadataIdentifier, *definition.ServiceDefinition) - StoreConsumerMetadata(*identifier.MetadataIdentifier, map[string]string) - SaveServiceMetadata(*identifier.ServiceMetadataIdentifier, *common.URL) - RemoveServiceMetadata(*identifier.ServiceMetadataIdentifier) + StoreProviderMetadata(*identifier.MetadataIdentifier, string) error + StoreConsumerMetadata(*identifier.MetadataIdentifier, string) error + SaveServiceMetadata(*identifier.ServiceMetadataIdentifier, common.URL) error + RemoveServiceMetadata(*identifier.ServiceMetadataIdentifier) error GetExportedURLs(*identifier.ServiceMetadataIdentifier) []string - SaveSubscribedData(*identifier.SubscriberMetadataIdentifier, []*common.URL) + SaveSubscribedData(*identifier.SubscriberMetadataIdentifier, []common.URL) error GetSubscribedURLs(*identifier.SubscriberMetadataIdentifier) []string - GetServiceDefinition(*identifier.MetadataIdentifier) + GetServiceDefinition(*identifier.MetadataIdentifier) string } diff --git a/metadata/service/inmemory/service.go b/metadata/service/inmemory/service.go index c59949401f..df0ec7a4a7 100644 --- a/metadata/service/inmemory/service.go +++ b/metadata/service/inmemory/service.go @@ -17,7 +17,6 @@ package inmemory import ( - "encoding/json" "sync" ) @@ -53,13 +52,13 @@ func NewMetadataService() *MetadataService { } } -// comparator is defined as Comparator for skip list to compare the URL -type comparator common.URL +// Comparator is defined as Comparator for skip list to compare the URL +type Comparator common.URL // Compare is defined as Comparator for skip list to compare the URL -func (c comparator) Compare(comp cm.Comparator) int { +func (c Comparator) Compare(comp cm.Comparator) int { a := common.URL(c).String() - b := common.URL(comp.(comparator)).String() + b := common.URL(comp.(Comparator)).String() switch { case a > b: return 1 @@ -79,7 +78,7 @@ func (mts *MetadataService) addURL(targetMap *sync.Map, url *common.URL) bool { logger.Debug(url.ServiceKey()) if urlSet, loaded = targetMap.LoadOrStore(url.ServiceKey(), skip.New(uint64(0))); loaded { mts.lock.RLock() - wantedUrl := urlSet.(*skip.SkipList).Get(comparator(*url)) + wantedUrl := urlSet.(*skip.SkipList).Get(Comparator(*url)) if len(wantedUrl) > 0 && wantedUrl[0] != nil { mts.lock.RUnlock() return false @@ -88,12 +87,12 @@ func (mts *MetadataService) addURL(targetMap *sync.Map, url *common.URL) bool { } mts.lock.Lock() //double chk - wantedUrl := urlSet.(*skip.SkipList).Get(comparator(*url)) + wantedUrl := urlSet.(*skip.SkipList).Get(Comparator(*url)) if len(wantedUrl) > 0 && wantedUrl[0] != nil { mts.lock.Unlock() return false } - urlSet.(*skip.SkipList).Insert(comparator(*url)) + urlSet.(*skip.SkipList).Insert(Comparator(*url)) mts.lock.Unlock() return true } @@ -102,7 +101,7 @@ func (mts *MetadataService) addURL(targetMap *sync.Map, url *common.URL) bool { func (mts *MetadataService) removeURL(targetMap *sync.Map, url *common.URL) { if value, loaded := targetMap.Load(url.ServiceKey()); loaded { mts.lock.Lock() - value.(*skip.SkipList).Delete(comparator(*url)) + value.(*skip.SkipList).Delete(Comparator(*url)) mts.lock.Unlock() mts.lock.RLock() defer mts.lock.RUnlock() @@ -118,9 +117,9 @@ func (mts *MetadataService) getAllService(services *sync.Map) *skip.SkipList { services.Range(func(key, value interface{}) bool { urls := value.(*skip.SkipList) for i := uint64(0); i < urls.Len(); i++ { - url := common.URL(urls.ByPosition(i).(comparator)) + url := common.URL(urls.ByPosition(i).(Comparator)) if url.GetParam(constant.INTERFACE_KEY, url.Path) != "MetadataService" { - skipList.Insert(comparator(url)) + skipList.Insert(Comparator(url)) } } return true @@ -135,9 +134,9 @@ func (mts *MetadataService) getSpecifiedService(services *sync.Map, serviceKey s if loaded { urls := serviceList.(*skip.SkipList) for i := uint64(0); i < urls.Len(); i++ { - url := common.URL(urls.ByPosition(i).(comparator)) + url := common.URL(urls.ByPosition(i).(Comparator)) if len(protocol) == 0 || url.Protocol == protocol || url.GetParam(constant.PROTOCOL_KEY, "") == protocol { - skipList.Insert(comparator(url)) + skipList.Insert(Comparator(url)) } } } @@ -182,9 +181,9 @@ func (mts *MetadataService) PublishServiceDefinition(url common.URL) error { // //TODO:generate the service definition and store it //} sd := definition.BuildServiceDefinition(*service, url) - data, err := json.Marshal(sd) + data, err := sd.ToBytes() if err != nil { - logger.Errorf("publishProvider getServiceDescriptor error. providerUrl:%v , error: ", url, err) + logger.Errorf("publishProvider getServiceDescriptor error. providerUrl:%v , error:%v ", url, err) } mts.serviceDefinitions.Store(url.ServiceKey(), string(data)) return nil @@ -221,12 +220,12 @@ func (mts *MetadataService) GetServiceDefinitionByServiceKey(serviceKey string) return v.(string), nil } -// Version will return the version of metadata service -func (mts *MetadataService) Version() string { - return "1.0.0" +// RefreshMetadata will always return true because it will be implement by remote service +func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedRevision string) bool { + return true } // Version will return the version of metadata service -func (mts *MetadataService) Reference() string { - return "MetadataService" +func (mts *MetadataService) Version() string { + return "1.0.0" } diff --git a/metadata/service/inmemory/service_test.go b/metadata/service/inmemory/service_test.go index 9e593db282..fc0410ecca 100644 --- a/metadata/service/inmemory/service_test.go +++ b/metadata/service/inmemory/service_test.go @@ -18,10 +18,8 @@ package inmemory import ( - "context" "fmt" "testing" - "time" ) import ( @@ -33,29 +31,6 @@ import ( "github.com/apache/dubbo-go/metadata/definition" ) -type User struct { - Id string - Name string - Age int32 - Time time.Time -} - -type UserProvider struct { -} - -func (u *UserProvider) GetUser(ctx context.Context, req []interface{}) (*User, error) { - rsp := User{"A001", "Alex Stocks", 18, time.Now()} - return &rsp, nil -} - -func (u *UserProvider) Reference() string { - return "UserProvider" -} - -func (u User) JavaClassName() string { - return "com.ikurento.user.User" -} - func TestMetadataService(t *testing.T) { mts := NewMetadataService() serviceName := "com.ikurento.user.UserProvider" @@ -111,7 +86,7 @@ func TestMetadataService(t *testing.T) { list4, _ := mts.GetSubscribedURLs() assert.Equal(t, uint64(0), list4.Len()) - userProvider := &UserProvider{} + userProvider := &definition.UserProvider{} common.ServiceMap.Register(serviceName, protocol, userProvider) mts.PublishServiceDefinition(u) expected := "{\"CanonicalName\":\"com.ikurento.user.UserProvider\",\"CodeSource\":\"\"," + diff --git a/metadata/service/remote/service.go b/metadata/service/remote/service.go new file mode 100644 index 0000000000..706f2f919c --- /dev/null +++ b/metadata/service/remote/service.go @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package remote + +import ( + "github.com/Workiva/go-datastructures/slice/skip" + "go.uber.org/atomic" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/metadata/definition" + "github.com/apache/dubbo-go/metadata/identifier" + "github.com/apache/dubbo-go/metadata/report/delegate" + "github.com/apache/dubbo-go/metadata/service" + "github.com/apache/dubbo-go/metadata/service/inmemory" +) + +// MetadataService is a implement of metadata service which will delegate the remote metadata report +type MetadataService struct { + service.BaseMetadataService + inMemoryMetadataService *inmemory.MetadataService + exportedRevision atomic.String + subscribedRevision atomic.String + delegateReport *delegate.MetadataReport +} + +// NewMetadataService will create a new remote MetadataService instance +func NewMetadataService() (*MetadataService, error) { + mr, err := delegate.NewMetadataReport() + if err != nil { + return nil, err + } + return &MetadataService{ + inMemoryMetadataService: inmemory.NewMetadataService(), + delegateReport: mr, + }, nil +} + +// setInMemoryMetadataService will replace the in memory metadata service by the specific param +func (mts *MetadataService) setInMemoryMetadataService(metadata *inmemory.MetadataService) { + mts.inMemoryMetadataService = metadata +} + +// ExportURL will be implemented by in memory service +func (mts *MetadataService) ExportURL(url common.URL) (bool, error) { + return true, nil +} + +// UnexportURL +func (mts *MetadataService) UnexportURL(url common.URL) error { + smi := identifier.NewServiceMetadataIdentifier(url) + smi.Revision = mts.exportedRevision.Load() + return mts.delegateReport.RemoveServiceMetadata(smi) +} + +// SubscribeURL will be implemented by in memory service +func (MetadataService) SubscribeURL(url common.URL) (bool, error) { + return true, nil +} + +// UnsubscribeURL will be implemented by in memory service +func (MetadataService) UnsubscribeURL(url common.URL) error { + return nil +} + +// PublishServiceDefinition will call remote metadata's StoreProviderMetadata to store url info and service definition +func (mts *MetadataService) PublishServiceDefinition(url common.URL) error { + interfaceName := url.GetParam(constant.INTERFACE_KEY, "") + isGeneric := url.GetParamBool(constant.GENERIC_KEY, false) + if len(interfaceName) > 0 && !isGeneric { + service := common.ServiceMap.GetService(url.Protocol, url.GetParam(constant.BEAN_NAME_KEY, url.Service())) + sd := definition.BuildServiceDefinition(*service, url) + id := &identifier.MetadataIdentifier{ + BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{ + ServiceInterface: interfaceName, + Version: url.GetParam(constant.VERSION_KEY, ""), + Group: url.GetParam(constant.GROUP_KEY, ""), + }, + } + mts.delegateReport.StoreProviderMetadata(id, sd) + } + logger.Errorf("publishProvider interfaceName is empty . providerUrl:%v ", url) + return nil +} + +// GetExportedURLs will be implemented by in memory service +func (MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) (*skip.SkipList, error) { + return nil, nil +} + +// GetSubscribedURLs will be implemented by in memory service +func (MetadataService) GetSubscribedURLs() (*skip.SkipList, error) { + return nil, nil +} + +// GetServiceDefinition will be implemented by in memory service +func (MetadataService) GetServiceDefinition(interfaceName string, group string, version string) (string, error) { + return "", nil +} + +// GetServiceDefinitionByServiceKey will be implemented by in memory service +func (MetadataService) GetServiceDefinitionByServiceKey(serviceKey string) (string, error) { + return "", nil +} + +// RefreshMetadata will refresh the exported & subscribed metadata to remote metadata report from the inmemory metadata service +func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedRevision string) bool { + result := true + if len(exportedRevision) != 0 && exportedRevision != mts.exportedRevision.Load() { + mts.exportedRevision.Store(exportedRevision) + urls, err := mts.inMemoryMetadataService.GetExportedURLs(constant.ANY_VALUE, "", "", "") + if err != nil { + logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %v", err) + result = false + } + iterator := urls.Iter(inmemory.Comparator{}) + logger.Infof("urls length = %v", urls.Len()) + for { + if !iterator.Next() { + break + } + url := iterator.Value().(inmemory.Comparator) + id := identifier.NewServiceMetadataIdentifier(common.URL(url)) + id.Revision = mts.exportedRevision.Load() + if err := mts.delegateReport.SaveServiceMetadata(id, common.URL(url)); err != nil { + logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %v", err) + result = false + } + + } + } + + if len(subscribedRevision) != 0 && subscribedRevision != mts.subscribedRevision.Load() { + mts.subscribedRevision.Store(subscribedRevision) + urls, err := mts.inMemoryMetadataService.GetSubscribedURLs() + if err != nil { + logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %v", err) + result = false + } + if urls != nil && urls.Len() > 0 { + id := &identifier.SubscriberMetadataIdentifier{ + MetadataIdentifier: identifier.MetadataIdentifier{ + Application: config.GetApplicationConfig().Name, + }, + Revision: subscribedRevision, + } + if err := mts.delegateReport.SaveSubscribedData(id, convertUrls(urls)); err != nil { + logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %v", err) + result = false + } + } + } + return result +} + +// Version will return the remote service version +func (MetadataService) Version() string { + return "1.0.0" +} + +// convertUrls will convert the skip list to slice +func convertUrls(list *skip.SkipList) []common.URL { + urls := make([]common.URL, list.Len()) + iterator := list.Iter(inmemory.Comparator{}) + for { + if iterator.Value() == nil { + break + } + url := iterator.Value().(inmemory.Comparator) + urls = append(urls, common.URL(url)) + if !iterator.Next() { + break + } + } + return urls +} diff --git a/metadata/service/remote/service_test.go b/metadata/service/remote/service_test.go new file mode 100644 index 0000000000..05378cb8b1 --- /dev/null +++ b/metadata/service/remote/service_test.go @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package remote + +import ( + "fmt" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config/instance" + "github.com/apache/dubbo-go/metadata/definition" + "github.com/apache/dubbo-go/metadata/identifier" + "github.com/apache/dubbo-go/metadata/report" + "github.com/apache/dubbo-go/metadata/report/factory" + "github.com/apache/dubbo-go/metadata/service/inmemory" +) + +var serviceMetadata = make(map[*identifier.ServiceMetadataIdentifier]common.URL, 4) +var subscribedMetadata = make(map[*identifier.SubscriberMetadataIdentifier][]common.URL, 4) + +func getMetadataReportFactory() factory.MetadataReportFactory { + return &metadataReportFactory{} +} + +type metadataReportFactory struct { +} + +func (mrf *metadataReportFactory) CreateMetadataReport(*common.URL) report.MetadataReport { + return &metadataReport{} +} + +type metadataReport struct { +} + +func (metadataReport) StoreProviderMetadata(*identifier.MetadataIdentifier, string) error { + return nil +} + +func (metadataReport) StoreConsumerMetadata(*identifier.MetadataIdentifier, string) error { + return nil +} + +func (mr *metadataReport) SaveServiceMetadata(id *identifier.ServiceMetadataIdentifier, url common.URL) error { + logger.Infof("SaveServiceMetadata , url is %v", url) + serviceMetadata[id] = url + return nil +} + +func (metadataReport) RemoveServiceMetadata(*identifier.ServiceMetadataIdentifier) error { + return nil +} + +func (metadataReport) GetExportedURLs(*identifier.ServiceMetadataIdentifier) []string { + return nil +} + +func (mr *metadataReport) SaveSubscribedData(id *identifier.SubscriberMetadataIdentifier, urls []common.URL) error { + logger.Infof("SaveSubscribedData, , url is %v", urls) + subscribedMetadata[id] = urls + return nil +} + +func (metadataReport) GetSubscribedURLs(*identifier.SubscriberMetadataIdentifier) []string { + return nil +} + +func (metadataReport) GetServiceDefinition(*identifier.MetadataIdentifier) string { + return "" +} + +func TestMetadataService(t *testing.T) { + extension.SetMetadataReportFactory("mock", getMetadataReportFactory) + u, err := common.NewURL(fmt.Sprintf( + "mock://127.0.0.1:20000/?sync.report=true")) + assert.NoError(t, err) + instance.InitMetadataReportInstance(&u) + mts, err := NewMetadataService() + assert.NoError(t, err) + mts.setInMemoryMetadataService(mockInmemoryProc(t)) + mts.RefreshMetadata("0.0.1", "0.0.1") + assert.Equal(t, 1, len(serviceMetadata)) + assert.Equal(t, 1, len(subscribedMetadata)) +} + +func mockInmemoryProc(t *testing.T) *inmemory.MetadataService { + mts := inmemory.NewMetadataService() + serviceName := "com.ikurento.user.UserProvider" + group := "group1" + version := "0.0.1" + protocol := "dubbo" + beanName := "UserProvider" + + u, err := common.NewURL(fmt.Sprintf( + "%v://127.0.0.1:20000/com.ikurento.user.UserProvider1?anyhost=true&"+ + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+ + "environment=dev&interface=%v&ip=192.168.56.1&methods=GetUser&module=dubbogo+user-info+server&org=ikurento.com&"+ + "owner=ZX&pid=1447&revision=0.0.1&side=provider&timeout=3000×tamp=1556509797245&group=%v&version=%v&bean.name=%v", + protocol, serviceName, group, version, beanName)) + assert.NoError(t, err) + mts.ExportURL(u) + + mts.SubscribeURL(u) + + userProvider := &definition.UserProvider{} + common.ServiceMap.Register(serviceName, protocol, userProvider) + mts.PublishServiceDefinition(u) + expected := "{\"CanonicalName\":\"com.ikurento.user.UserProvider\",\"CodeSource\":\"\"," + + "\"Methods\":[{\"Name\":\"GetUser\",\"ParameterTypes\":[\"slice\"],\"ReturnType\":\"ptr\"," + + "\"Parameters\":null}],\"Types\":null}" + def1, _ := mts.GetServiceDefinition(serviceName, group, version) + assert.Equal(t, expected, def1) + serviceKey := definition.ServiceDescriperBuild(serviceName, group, version) + def2, _ := mts.GetServiceDefinitionByServiceKey(serviceKey) + assert.Equal(t, expected, def2) + return mts +} diff --git a/metadata/service/service.go b/metadata/service/service.go index bc526c5411..a1b812c0ac 100644 --- a/metadata/service/service.go +++ b/metadata/service/service.go @@ -49,6 +49,8 @@ type MetadataService interface { GetServiceDefinition(interfaceName string, group string, version string) (string, error) // GetServiceDefinition will get the target service info store in metadata by service key GetServiceDefinitionByServiceKey(serviceKey string) (string, error) + // RefreshMetadata will refresh the metadata + RefreshMetadata(exportedRevision string, subscribedRevision string) bool // Version will return the metadata service version Version() string } @@ -61,3 +63,8 @@ type BaseMetadataService struct { func (mts *BaseMetadataService) ServiceName() (string, error) { return config.GetApplicationConfig().Name, nil } + +// Version will return the version of metadata service +func (mts *BaseMetadataService) Reference() string { + return "MetadataService" +} From 73f21fa3e1316b2e80a982b1374de8f9bf1c3e96 Mon Sep 17 00:00:00 2001 From: "vito.he" Date: Wed, 6 May 2020 17:47:31 +0800 Subject: [PATCH 2/7] Mod:for ut --- .../identifier/subscribe_metadata_identifier_test.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/metadata/identifier/subscribe_metadata_identifier_test.go b/metadata/identifier/subscribe_metadata_identifier_test.go index 9c9ef70641..215aa3c569 100644 --- a/metadata/identifier/subscribe_metadata_identifier_test.go +++ b/metadata/identifier/subscribe_metadata_identifier_test.go @@ -27,11 +27,13 @@ import ( var subscribeMetadataId = &SubscriberMetadataIdentifier{ Revision: "1.0", - BaseMetadataIdentifier: BaseMetadataIdentifier{ - ServiceInterface: "org.apache.pkg.mockService", - Version: "1.0.0", - Group: "Group", - Side: "provider", + MetadataIdentifier: MetadataIdentifier{ + BaseMetadataIdentifier: BaseMetadataIdentifier{ + ServiceInterface: "org.apache.pkg.mockService", + Version: "1.0.0", + Group: "Group", + Side: "provider", + }, }, } From 0048609d9bedde83cca98c2d03b98d376d50dca4 Mon Sep 17 00:00:00 2001 From: "vito.he" Date: Sun, 17 May 2020 16:27:09 +0800 Subject: [PATCH 3/7] Mod:resolve pr review --- common/constant/default.go | 4 ++++ config/instance/metedata_report.go | 18 +++++++-------- config/metadata_report_config.go | 2 +- go.mod | 2 -- go.sum | 6 ----- metadata/definition/definition.go | 22 +++++++++---------- metadata/report/delegate/delegate_report.go | 1 - .../report/delegate/delegate_report_test.go | 14 ++++++------ .../service/exporter/configurable/exporter.go | 2 +- metadata/service/inmemory/service.go | 7 ++++-- metadata/service/remote/service.go | 6 +++-- metadata/service/remote/service_test.go | 2 +- metadata/service/service.go | 3 ++- 13 files changed, 44 insertions(+), 45 deletions(-) diff --git a/common/constant/default.go b/common/constant/default.go index 3c889158e4..6b9d914c87 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -74,3 +74,7 @@ const ( const ( COMMA_SPLIT_PATTERN = "\\s*[,]+\\s*" ) + +const ( + SIMPLE_METADATA_SERVICE_NAME = "MetadataService" +) diff --git a/config/instance/metedata_report.go b/config/instance/metedata_report.go index 70e6ffc23d..8e833dd70b 100644 --- a/config/instance/metedata_report.go +++ b/config/instance/metedata_report.go @@ -33,20 +33,20 @@ var ( once sync.Once ) -// InitMetadataReportInstance will create the metadata report instance by the specified metadata report url -func InitMetadataReportInstance(url *common.URL) report.MetadataReport { +// GetMetadataReportInstance will return the instance in lazy mode. Be careful the instance create will only +// execute once. +func GetMetadataReportInstance(selectiveUrl ...*common.URL) report.MetadataReport { once.Do(func() { - instance = extension.GetMetadataReportFactory(url.Protocol).CreateMetadataReport(url) - reportUrl = *url + var url *common.URL + if len(selectiveUrl) > 0 { + url = selectiveUrl[0] + instance = extension.GetMetadataReportFactory(url.Protocol).CreateMetadataReport(url) + reportUrl = *url + } }) return instance } -// GetMetadataReportInstance will return the instance -func GetMetadataReportInstance() report.MetadataReport { - return instance -} - // GetMetadataReportUrl will return the report instance url func GetMetadataReportUrl() common.URL { return reportUrl diff --git a/config/metadata_report_config.go b/config/metadata_report_config.go index 95506462a2..41fb6b4769 100644 --- a/config/metadata_report_config.go +++ b/config/metadata_report_config.go @@ -101,7 +101,7 @@ func startMetadataReport(metadataType string, metadataReportConfig *MetadataRepo } if url, err := metadataReportConfig.ToUrl(); err == nil { - instance.InitMetadataReportInstance(url) + instance.GetMetadataReportInstance(url) } else { return perrors.New("MetadataConfig is invalid!") } diff --git a/go.mod b/go.mod index 5de123bdb3..ba3cf3e219 100644 --- a/go.mod +++ b/go.mod @@ -14,9 +14,7 @@ require ( github.com/dubbogo/go-zookeeper v1.0.0 github.com/dubbogo/gost v1.9.0 github.com/emicklei/go-restful/v3 v3.0.0 - github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect github.com/go-co-op/gocron v0.1.1 - github.com/go-errors/errors v1.0.1 // indirect github.com/go-resty/resty/v2 v2.1.0 github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect github.com/golang/mock v1.3.1 diff --git a/go.sum b/go.sum index 5e07006205..eb84bde1fb 100644 --- a/go.sum +++ b/go.sum @@ -35,11 +35,8 @@ github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 h1:rFw4nCn9iMW+Vaj github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= -github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e h1:MSuLXx/mveDbpDNhVrcWTMeV4lbYWKcyO4rH+jAxmX0= -github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e/go.mod h1:myCDvQSzCW+wB1WAlocEru4wMGJxy+vlxHdhegi1CDQ= github.com/aliyun/alibaba-cloud-sdk-go v1.61.18 h1:zOVTBdCKFd9JbCKz9/nt+FovbjPFmb7mUnp8nH9fQBA= github.com/aliyun/alibaba-cloud-sdk-go v1.61.18/go.mod h1:v8ESoHo4SyHmuB4b1tJqDHxfTGEciD+yhvOU/5s1Rfk= -github.com/aliyun/aliyun-oss-go-sdk v0.0.0-20190307165228-86c17b95fcd5/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= github.com/apache/dubbo-go-hessian2 v1.5.0 h1:fzulDG5G7nX0ccgKdiN9XipJ7tZ4WXKgmk4stdlDS6s= github.com/apache/dubbo-go-hessian2 v1.5.0/go.mod h1:VwEnsOMidkM1usya2uPfGpSLO9XUF//WQcWn3y+jFz8= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e h1:QEF07wC0T1rKkctt1RINW/+RMTVmiwxETico2l3gxJA= @@ -53,7 +50,6 @@ github.com/asaskevich/govalidator v0.0.0-20180319081651-7d2e70ef918f h1:/8NcnxL6 github.com/asaskevich/govalidator v0.0.0-20180319081651-7d2e70ef918f/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= github.com/aws/aws-sdk-go v1.15.24 h1:xLAdTA/ore6xdPAljzZRed7IGqQgC+nY+ERS5vaj4Ro= github.com/aws/aws-sdk-go v1.15.24/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0= -github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f/go.mod h1:AuiFmCCPBSrqvVMvuqFuk0qogytodnVFVSN5CeJB8Gc= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -392,8 +388,6 @@ github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9 github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/nacos-group/nacos-sdk-go v0.0.0-20191128082542-fe1b325b125c h1:WoCa3AvgQMVKNs+RIFlWPRgY9QVJwUxJDrGxHs0fcRo= -github.com/nacos-group/nacos-sdk-go v0.0.0-20191128082542-fe1b325b125c/go.mod h1:CEkSvEpoveoYjA81m4HNeYQ0sge0LFGKSEqO3JKHllo= github.com/nacos-group/nacos-sdk-go v0.3.1 h1:MI7bNDAN5m9UFcRRUTSPfJi4dCQo+TYG85qVB1rCHeg= github.com/nacos-group/nacos-sdk-go v0.3.1/go.mod h1:ESKb6yF0gxSc8GuS+0jaMBe+n8rJ5/k4ya6LyFG2xi8= github.com/nicolai86/scaleway-sdk v1.10.2-0.20180628010248-798f60e20bb2 h1:BQ1HW7hr4IVovMwWg0E0PYcyW8CzqDcVmaew9cujU4s= diff --git a/metadata/definition/definition.go b/metadata/definition/definition.go index 18578cb2c1..11e137a14b 100644 --- a/metadata/definition/definition.go +++ b/metadata/definition/definition.go @@ -21,6 +21,7 @@ import ( "bytes" "encoding/json" "fmt" + "strings" ) import ( @@ -43,29 +44,26 @@ type ServiceDefinition struct { func (def ServiceDefinition) ToBytes() ([]byte, error) { return json.Marshal(def) - } func (def ServiceDefinition) String() string { - var methodStr string + var methodStr strings.Builder for _, m := range def.Methods { - var paramType string + var paramType strings.Builder for _, p := range m.ParameterTypes { - paramType = paramType + fmt.Sprintf("{type:%v}", p) + paramType.WriteString(fmt.Sprintf("{type:%v}", p)) } - var param string + var param strings.Builder for _, d := range m.Parameters { - param = param + fmt.Sprintf("{id:%v,type:%v,builderName:%v}", d.Id, d.Type, d.TypeBuilderName) + param.WriteString(fmt.Sprintf("{id:%v,type:%v,builderName:%v}", d.Id, d.Type, d.TypeBuilderName)) } - methodStr = methodStr + fmt.Sprintf("{name:%v,parameterTypes:[%v],returnType:%v,params:[%v] }", m.Name, paramType, m.ReturnType, param) - + methodStr.WriteString(fmt.Sprintf("{name:%v,parameterTypes:[%v],returnType:%v,params:[%v] }", m.Name, paramType.String(), m.ReturnType, param.String())) } - var types string + var types strings.Builder for _, d := range def.Types { - types = types + fmt.Sprintf("{id:%v,type:%v,builderName:%v}", d.Id, d.Type, d.TypeBuilderName) + types.WriteString(fmt.Sprintf("{id:%v,type:%v,builderName:%v}", d.Id, d.Type, d.TypeBuilderName)) } - - return fmt.Sprintf("{canonicalName:%v, codeSource:%v, methods:[%v], types:[%v]}", def.CanonicalName, def.CodeSource, methodStr, types) + return fmt.Sprintf("{canonicalName:%v, codeSource:%v, methods:[%v], types:[%v]}", def.CanonicalName, def.CodeSource, methodStr.String(), types.String()) } // FullServiceDefinition is the describer of service definition with parameters diff --git a/metadata/report/delegate/delegate_report.go b/metadata/report/delegate/delegate_report.go index 870625b9cb..a363047988 100644 --- a/metadata/report/delegate/delegate_report.go +++ b/metadata/report/delegate/delegate_report.go @@ -202,7 +202,6 @@ func (bmr *MetadataReport) storeMetadataTask(role int, identifier *identifier.Me logger.Errorf("storeProviderMetadataTask error in stage call metadata report to StoreProviderMetadata, msg is %v", err) panic(err) } - } // StoreConsumerMetadata will delegate to call remote metadata's sdk to store consumer side service definition diff --git a/metadata/report/delegate/delegate_report_test.go b/metadata/report/delegate/delegate_report_test.go index b5f8551a1a..0e8da60700 100644 --- a/metadata/report/delegate/delegate_report_test.go +++ b/metadata/report/delegate/delegate_report_test.go @@ -25,6 +25,7 @@ import ( import ( "github.com/stretchr/testify/assert" + "go.uber.org/atomic" ) import ( @@ -37,10 +38,10 @@ import ( ) func TestMetadataReport_MetadataReportRetry(t *testing.T) { - counter := 1 + counter := atomic.NewInt64(1) retry, err := newMetadataReportRetry(1, 10, func() bool { - counter++ + counter.Add(1) return true }) assert.NoError(t, err) @@ -49,16 +50,16 @@ func TestMetadataReport_MetadataReportRetry(t *testing.T) { select { case <-itsTime: retry.scheduler.Clear() - assert.Equal(t, counter, 3) + assert.Equal(t, counter.Load(), int64(3)) logger.Info("over") } } func TestMetadataReport_MetadataReportRetryWithLimit(t *testing.T) { - counter := 1 + counter := atomic.NewInt64(1) retry, err := newMetadataReportRetry(1, 1, func() bool { - counter++ + counter.Add(1) return true }) assert.NoError(t, err) @@ -67,10 +68,9 @@ func TestMetadataReport_MetadataReportRetryWithLimit(t *testing.T) { select { case <-itsTime: retry.scheduler.Clear() - assert.Equal(t, counter, 2) + assert.Equal(t, counter.Load(), int64(2)) logger.Info("over") } - } func mockNewMetadataReport(t *testing.T) *MetadataReport { diff --git a/metadata/service/exporter/configurable/exporter.go b/metadata/service/exporter/configurable/exporter.go index 3d12e0ecd4..ec3f8ec2d0 100644 --- a/metadata/service/exporter/configurable/exporter.go +++ b/metadata/service/exporter/configurable/exporter.go @@ -49,7 +49,7 @@ func NewMetadataServiceExporter(metadataService service.MetadataService) exporte func (exporter *MetadataServiceExporter) Export() error { if !exporter.IsExported() { - serviceConfig := config.NewServiceConfig("MetadataService", context.Background()) + serviceConfig := config.NewServiceConfig(constant.SIMPLE_METADATA_SERVICE_NAME, context.Background()) serviceConfig.Protocol = constant.DEFAULT_PROTOCOL serviceConfig.Protocols = map[string]*config.ProtocolConfig{ constant.DEFAULT_PROTOCOL: generateMetadataProtocol(), diff --git a/metadata/service/inmemory/service.go b/metadata/service/inmemory/service.go index df0ec7a4a7..4b6f4330a1 100644 --- a/metadata/service/inmemory/service.go +++ b/metadata/service/inmemory/service.go @@ -33,6 +33,9 @@ import ( "github.com/apache/dubbo-go/metadata/service" ) +// version will be used by Version func +const version = "1.0.0" + // MetadataService is store and query the metadata info in memory when each service registry type MetadataService struct { service.BaseMetadataService @@ -118,7 +121,7 @@ func (mts *MetadataService) getAllService(services *sync.Map) *skip.SkipList { urls := value.(*skip.SkipList) for i := uint64(0); i < urls.Len(); i++ { url := common.URL(urls.ByPosition(i).(Comparator)) - if url.GetParam(constant.INTERFACE_KEY, url.Path) != "MetadataService" { + if url.GetParam(constant.INTERFACE_KEY, url.Path) != constant.SIMPLE_METADATA_SERVICE_NAME { skipList.Insert(Comparator(url)) } } @@ -227,5 +230,5 @@ func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedR // Version will return the version of metadata service func (mts *MetadataService) Version() string { - return "1.0.0" + return version } diff --git a/metadata/service/remote/service.go b/metadata/service/remote/service.go index 706f2f919c..f4587638ef 100644 --- a/metadata/service/remote/service.go +++ b/metadata/service/remote/service.go @@ -34,6 +34,9 @@ import ( "github.com/apache/dubbo-go/metadata/service/inmemory" ) +// version will be used by Version func +const version = "1.0.0" + // MetadataService is a implement of metadata service which will delegate the remote metadata report type MetadataService struct { service.BaseMetadataService @@ -145,7 +148,6 @@ func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedR logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %v", err) result = false } - } } @@ -174,7 +176,7 @@ func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedR // Version will return the remote service version func (MetadataService) Version() string { - return "1.0.0" + return version } // convertUrls will convert the skip list to slice diff --git a/metadata/service/remote/service_test.go b/metadata/service/remote/service_test.go index 05378cb8b1..308c631e41 100644 --- a/metadata/service/remote/service_test.go +++ b/metadata/service/remote/service_test.go @@ -96,7 +96,7 @@ func TestMetadataService(t *testing.T) { u, err := common.NewURL(fmt.Sprintf( "mock://127.0.0.1:20000/?sync.report=true")) assert.NoError(t, err) - instance.InitMetadataReportInstance(&u) + instance.GetMetadataReportInstance(&u) mts, err := NewMetadataService() assert.NoError(t, err) mts.setInMemoryMetadataService(mockInmemoryProc(t)) diff --git a/metadata/service/service.go b/metadata/service/service.go index a1b812c0ac..13464087ed 100644 --- a/metadata/service/service.go +++ b/metadata/service/service.go @@ -23,6 +23,7 @@ import ( import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/config" ) @@ -66,5 +67,5 @@ func (mts *BaseMetadataService) ServiceName() (string, error) { // Version will return the version of metadata service func (mts *BaseMetadataService) Reference() string { - return "MetadataService" + return constant.SIMPLE_METADATA_SERVICE_NAME } From 235edf5881a731a49af24442b4eb8016a4f4bed7 Mon Sep 17 00:00:00 2001 From: "vito.he" Date: Sun, 17 May 2020 16:54:18 +0800 Subject: [PATCH 4/7] Mod:resolve pr review --- metadata/report/delegate/delegate_report.go | 23 ++++++++------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/metadata/report/delegate/delegate_report.go b/metadata/report/delegate/delegate_report.go index a363047988..4e3995d2ea 100644 --- a/metadata/report/delegate/delegate_report.go +++ b/metadata/report/delegate/delegate_report.go @@ -147,7 +147,7 @@ func NewMetadataReport() (*MetadataReport, error) { // retry will do metadata failed reports collection by call metadata report sdk func (bmr *MetadataReport) retry() bool { bmr.failedReportsLock.RLock() - bmr.failedReportsLock.Unlock() + defer bmr.failedReportsLock.RUnlock() return bmr.doHandlerMetadataCollection(bmr.failedReports) } @@ -155,9 +155,8 @@ func (bmr *MetadataReport) retry() bool { func (bmr *MetadataReport) StoreProviderMetadata(identifier *identifier.MetadataIdentifier, definer definition.ServiceDefiner) { if bmr.syncReport { bmr.storeMetadataTask(common.PROVIDER, identifier, definer) - } else { - go bmr.storeMetadataTask(common.PROVIDER, identifier, definer) } + go bmr.storeMetadataTask(common.PROVIDER, identifier, definer) } // storeMetadataTask will delegate to call remote metadata's sdk to store @@ -208,9 +207,8 @@ func (bmr *MetadataReport) storeMetadataTask(role int, identifier *identifier.Me func (bmr *MetadataReport) StoreConsumerMetadata(identifier *identifier.MetadataIdentifier, definer map[string]string) { if bmr.syncReport { bmr.storeMetadataTask(common.CONSUMER, identifier, definer) - } else { - go bmr.storeMetadataTask(common.CONSUMER, identifier, definer) } + go bmr.storeMetadataTask(common.CONSUMER, identifier, definer) } // SaveServiceMetadata will delegate to call remote metadata's sdk to save service metadata @@ -218,10 +216,9 @@ func (bmr *MetadataReport) SaveServiceMetadata(identifier *identifier.ServiceMet report := instance.GetMetadataReportInstance() if bmr.syncReport { return report.SaveServiceMetadata(identifier, url) - } else { - go report.SaveServiceMetadata(identifier, url) - return nil } + go report.SaveServiceMetadata(identifier, url) + return nil } // RemoveServiceMetadata will delegate to call remote metadata's sdk to remove service metadata @@ -229,10 +226,9 @@ func (bmr *MetadataReport) RemoveServiceMetadata(identifier *identifier.ServiceM report := instance.GetMetadataReportInstance() if bmr.syncReport { return report.RemoveServiceMetadata(identifier) - } else { - go report.RemoveServiceMetadata(identifier) - return nil } + go report.RemoveServiceMetadata(identifier) + return nil } // GetExportedURLs will delegate to call remote metadata's sdk to get exported urls @@ -246,10 +242,9 @@ func (bmr *MetadataReport) SaveSubscribedData(identifier *identifier.SubscriberM report := instance.GetMetadataReportInstance() if bmr.syncReport { return report.SaveSubscribedData(identifier, urls) - } else { - go report.SaveSubscribedData(identifier, urls) - return nil } + go report.SaveSubscribedData(identifier, urls) + return nil } // GetSubscribedURLs will delegate to call remote metadata's sdk to get subscribed urls From c312cc8588f8a5c0f5bb3fb63b60a04d2e95652f Mon Sep 17 00:00:00 2001 From: "vito.he" Date: Sat, 23 May 2020 23:55:51 +0800 Subject: [PATCH 5/7] Mod:modify for code review --- metadata/definition/definition.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/metadata/definition/definition.go b/metadata/definition/definition.go index 11e137a14b..c8dd86b18f 100644 --- a/metadata/definition/definition.go +++ b/metadata/definition/definition.go @@ -29,7 +29,7 @@ import ( "github.com/apache/dubbo-go/common/constant" ) -// ServiceDefinition is a interface of service's definition +// ServiceDefiner is a interface of service's definition type ServiceDefiner interface { ToBytes() ([]byte, error) } @@ -42,11 +42,11 @@ type ServiceDefinition struct { Types []TypeDefinition } -func (def ServiceDefinition) ToBytes() ([]byte, error) { +func (def *ServiceDefinition) ToBytes() ([]byte, error) { return json.Marshal(def) } -func (def ServiceDefinition) String() string { +func (def *ServiceDefinition) String() string { var methodStr strings.Builder for _, m := range def.Methods { var paramType strings.Builder From 0d78b2a65f1c35e93682adeed0d62b0d5e3c679e Mon Sep 17 00:00:00 2001 From: "vito.he" Date: Sun, 24 May 2020 12:02:51 +0800 Subject: [PATCH 6/7] Mod:for code review --- metadata/definition/definition.go | 4 ++-- metadata/report/delegate/delegate_report_test.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/metadata/definition/definition.go b/metadata/definition/definition.go index c8dd86b18f..fa195d09d7 100644 --- a/metadata/definition/definition.go +++ b/metadata/definition/definition.go @@ -91,8 +91,8 @@ type TypeDefinition struct { } // BuildServiceDefinition can build service definition which will be used to describe a service -func BuildServiceDefinition(service common.Service, url common.URL) ServiceDefinition { - sd := ServiceDefinition{} +func BuildServiceDefinition(service common.Service, url common.URL) *ServiceDefinition { + sd := &ServiceDefinition{} sd.CanonicalName = url.Service() for k, m := range service.Method() { diff --git a/metadata/report/delegate/delegate_report_test.go b/metadata/report/delegate/delegate_report_test.go index 0e8da60700..04c9e64839 100644 --- a/metadata/report/delegate/delegate_report_test.go +++ b/metadata/report/delegate/delegate_report_test.go @@ -106,7 +106,7 @@ func TestMetadataReport_StoreProviderMetadata(t *testing.T) { mtr.StoreProviderMetadata(metadataId, getMockDefinition(metadataId, t)) } -func getMockDefinition(id *identifier.MetadataIdentifier, t *testing.T) definition.ServiceDefinition { +func getMockDefinition(id *identifier.MetadataIdentifier, t *testing.T) *definition.ServiceDefinition { protocol := "dubbo" beanName := "UserProvider" url, err := common.NewURL(fmt.Sprintf( From c024ce17b3fcd636f4f9e690bfb3a5a3a7f39c32 Mon Sep 17 00:00:00 2001 From: "vito.he" Date: Sun, 24 May 2020 22:42:32 +0800 Subject: [PATCH 7/7] Mod:code review --- metadata/report/delegate/delegate_report.go | 76 +++++++++++---------- metadata/service/remote/service.go | 8 +-- 2 files changed, 43 insertions(+), 41 deletions(-) diff --git a/metadata/report/delegate/delegate_report.go b/metadata/report/delegate/delegate_report.go index 4e3995d2ea..cb7e42030b 100644 --- a/metadata/report/delegate/delegate_report.go +++ b/metadata/report/delegate/delegate_report.go @@ -19,6 +19,7 @@ package delegate import ( "encoding/json" + "runtime/debug" "sync" "time" ) @@ -129,7 +130,7 @@ func NewMetadataReport() (*MetadataReport, error) { scheduler := gocron.NewScheduler(time.UTC) _, err := scheduler.Every(1).Day().Do( func() { - logger.Info("start to publish all metadata.") + logger.Info("start to publish all metadata in metadata report %v.", url) bmr.allMetadataReportsLock.RLock() bmr.doHandlerMetadataCollection(bmr.allMetadataReports) bmr.allMetadataReportsLock.RUnlock() @@ -145,30 +146,30 @@ func NewMetadataReport() (*MetadataReport, error) { } // retry will do metadata failed reports collection by call metadata report sdk -func (bmr *MetadataReport) retry() bool { - bmr.failedReportsLock.RLock() - defer bmr.failedReportsLock.RUnlock() - return bmr.doHandlerMetadataCollection(bmr.failedReports) +func (mr *MetadataReport) retry() bool { + mr.failedReportsLock.RLock() + defer mr.failedReportsLock.RUnlock() + return mr.doHandlerMetadataCollection(mr.failedReports) } // StoreProviderMetadata will delegate to call remote metadata's sdk to store provider service definition -func (bmr *MetadataReport) StoreProviderMetadata(identifier *identifier.MetadataIdentifier, definer definition.ServiceDefiner) { - if bmr.syncReport { - bmr.storeMetadataTask(common.PROVIDER, identifier, definer) +func (mr *MetadataReport) StoreProviderMetadata(identifier *identifier.MetadataIdentifier, definer definition.ServiceDefiner) { + if mr.syncReport { + mr.storeMetadataTask(common.PROVIDER, identifier, definer) } - go bmr.storeMetadataTask(common.PROVIDER, identifier, definer) + go mr.storeMetadataTask(common.PROVIDER, identifier, definer) } // storeMetadataTask will delegate to call remote metadata's sdk to store -func (bmr *MetadataReport) storeMetadataTask(role int, identifier *identifier.MetadataIdentifier, definer interface{}) { +func (mr *MetadataReport) storeMetadataTask(role int, identifier *identifier.MetadataIdentifier, definer interface{}) { logger.Infof("store provider metadata. Identifier :%v ; definition: %v .", identifier, definer) - bmr.allMetadataReportsLock.Lock() - bmr.allMetadataReports[identifier] = definer - bmr.allMetadataReportsLock.Unlock() + mr.allMetadataReportsLock.Lock() + mr.allMetadataReports[identifier] = definer + mr.allMetadataReportsLock.Unlock() - bmr.failedReportsLock.Lock() - delete(bmr.failedReports, identifier) - bmr.failedReportsLock.Unlock() + mr.failedReportsLock.Lock() + delete(mr.failedReports, identifier) + mr.failedReportsLock.Unlock() // data is store the json marshaled definition var ( data []byte @@ -177,17 +178,18 @@ func (bmr *MetadataReport) storeMetadataTask(role int, identifier *identifier.Me defer func() { if r := recover(); r != nil { - bmr.failedReportsLock.Lock() - bmr.failedReports[identifier] = definer - bmr.failedReportsLock.Unlock() - bmr.metadataReportRetry.startRetryTask() - logger.Errorf("Failed to put provider metadata %v in %v, cause: %v", identifier, string(data), r) + mr.failedReportsLock.Lock() + mr.failedReports[identifier] = definer + mr.failedReportsLock.Unlock() + mr.metadataReportRetry.startRetryTask() + logger.Errorf("Failed to put provider metadata %v in %v, cause: %v\n%s\n", + identifier, string(data), r, string(debug.Stack())) } }() data, err = json.Marshal(definer) if err != nil { - logger.Errorf("storeProviderMetadataTask error in stage json.Marshal, msg is %v", err) + logger.Errorf("storeProviderMetadataTask error in stage json.Marshal, msg is %+v", err) panic(err) } report := instance.GetMetadataReportInstance() @@ -198,23 +200,23 @@ func (bmr *MetadataReport) storeMetadataTask(role int, identifier *identifier.Me } if err != nil { - logger.Errorf("storeProviderMetadataTask error in stage call metadata report to StoreProviderMetadata, msg is %v", err) + logger.Errorf("storeProviderMetadataTask error in stage call metadata report to StoreProviderMetadata, msg is %+v", err) panic(err) } } // StoreConsumerMetadata will delegate to call remote metadata's sdk to store consumer side service definition -func (bmr *MetadataReport) StoreConsumerMetadata(identifier *identifier.MetadataIdentifier, definer map[string]string) { - if bmr.syncReport { - bmr.storeMetadataTask(common.CONSUMER, identifier, definer) +func (mr *MetadataReport) StoreConsumerMetadata(identifier *identifier.MetadataIdentifier, definer map[string]string) { + if mr.syncReport { + mr.storeMetadataTask(common.CONSUMER, identifier, definer) } - go bmr.storeMetadataTask(common.CONSUMER, identifier, definer) + go mr.storeMetadataTask(common.CONSUMER, identifier, definer) } // SaveServiceMetadata will delegate to call remote metadata's sdk to save service metadata -func (bmr *MetadataReport) SaveServiceMetadata(identifier *identifier.ServiceMetadataIdentifier, url common.URL) error { +func (mr *MetadataReport) SaveServiceMetadata(identifier *identifier.ServiceMetadataIdentifier, url common.URL) error { report := instance.GetMetadataReportInstance() - if bmr.syncReport { + if mr.syncReport { return report.SaveServiceMetadata(identifier, url) } go report.SaveServiceMetadata(identifier, url) @@ -222,9 +224,9 @@ func (bmr *MetadataReport) SaveServiceMetadata(identifier *identifier.ServiceMet } // RemoveServiceMetadata will delegate to call remote metadata's sdk to remove service metadata -func (bmr *MetadataReport) RemoveServiceMetadata(identifier *identifier.ServiceMetadataIdentifier) error { +func (mr *MetadataReport) RemoveServiceMetadata(identifier *identifier.ServiceMetadataIdentifier) error { report := instance.GetMetadataReportInstance() - if bmr.syncReport { + if mr.syncReport { return report.RemoveServiceMetadata(identifier) } go report.RemoveServiceMetadata(identifier) @@ -232,15 +234,15 @@ func (bmr *MetadataReport) RemoveServiceMetadata(identifier *identifier.ServiceM } // GetExportedURLs will delegate to call remote metadata's sdk to get exported urls -func (bmr *MetadataReport) GetExportedURLs(identifier *identifier.ServiceMetadataIdentifier) []string { +func (mr *MetadataReport) GetExportedURLs(identifier *identifier.ServiceMetadataIdentifier) []string { report := instance.GetMetadataReportInstance() return report.GetExportedURLs(identifier) } // SaveSubscribedData will delegate to call remote metadata's sdk to save subscribed data -func (bmr *MetadataReport) SaveSubscribedData(identifier *identifier.SubscriberMetadataIdentifier, urls []common.URL) error { +func (mr *MetadataReport) SaveSubscribedData(identifier *identifier.SubscriberMetadataIdentifier, urls []common.URL) error { report := instance.GetMetadataReportInstance() - if bmr.syncReport { + if mr.syncReport { return report.SaveSubscribedData(identifier, urls) } go report.SaveSubscribedData(identifier, urls) @@ -260,15 +262,15 @@ func (MetadataReport) GetServiceDefinition(identifier *identifier.MetadataIdenti } // doHandlerMetadataCollection will store metadata to metadata support with given metadataMap -func (bmr *MetadataReport) doHandlerMetadataCollection(metadataMap map[*identifier.MetadataIdentifier]interface{}) bool { +func (mr *MetadataReport) doHandlerMetadataCollection(metadataMap map[*identifier.MetadataIdentifier]interface{}) bool { if len(metadataMap) == 0 { return true } for e := range metadataMap { if common.RoleType(common.PROVIDER).Role() == e.Side { - bmr.StoreProviderMetadata(e, metadataMap[e].(*definition.FullServiceDefinition)) + mr.StoreProviderMetadata(e, metadataMap[e].(*definition.FullServiceDefinition)) } else if common.RoleType(common.CONSUMER).Role() == e.Side { - bmr.StoreConsumerMetadata(e, metadataMap[e].(map[string]string)) + mr.StoreConsumerMetadata(e, metadataMap[e].(map[string]string)) } } return false diff --git a/metadata/service/remote/service.go b/metadata/service/remote/service.go index f4587638ef..f55c482ad8 100644 --- a/metadata/service/remote/service.go +++ b/metadata/service/remote/service.go @@ -132,7 +132,7 @@ func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedR mts.exportedRevision.Store(exportedRevision) urls, err := mts.inMemoryMetadataService.GetExportedURLs(constant.ANY_VALUE, "", "", "") if err != nil { - logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %v", err) + logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err) result = false } iterator := urls.Iter(inmemory.Comparator{}) @@ -145,7 +145,7 @@ func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedR id := identifier.NewServiceMetadataIdentifier(common.URL(url)) id.Revision = mts.exportedRevision.Load() if err := mts.delegateReport.SaveServiceMetadata(id, common.URL(url)); err != nil { - logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %v", err) + logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err) result = false } } @@ -155,7 +155,7 @@ func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedR mts.subscribedRevision.Store(subscribedRevision) urls, err := mts.inMemoryMetadataService.GetSubscribedURLs() if err != nil { - logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %v", err) + logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %v+", err) result = false } if urls != nil && urls.Len() > 0 { @@ -166,7 +166,7 @@ func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedR Revision: subscribedRevision, } if err := mts.delegateReport.SaveSubscribedData(id, convertUrls(urls)); err != nil { - logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %v", err) + logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err) result = false } }