From 41d1203993b886cc8cc760994edc8f2d0e49c15e Mon Sep 17 00:00:00 2001 From: XiaoWeiKIN <2484713618@qq.com> Date: Sun, 19 Dec 2021 21:00:24 +0800 Subject: [PATCH 01/15] =?UTF-8?q?=E4=BC=98=E9=9B=85=E4=B8=8B=E7=BA=BF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/graceful_shutdown.go | 20 ++- config/graceful_shutdown_config.go | 4 +- config/service_config.go | 2 + filter/filter_impl/import.go | 2 +- filter/graceful_shutdown/consumer_filter.go | 90 ++++++++++ .../provider_filter.go} | 47 +++--- filter/gshutdown/filter_test.go | 75 --------- imports/imports.go | 2 +- protocol/grpc/grpc_protocol.go | 6 +- protocol/grpc/server.go | 5 + registry/base_registry.go | 126 +++++--------- registry/protocol/protocol.go | 158 +++++++++++------- 12 files changed, 286 insertions(+), 251 deletions(-) create mode 100644 filter/graceful_shutdown/consumer_filter.go rename filter/{gshutdown/filter.go => graceful_shutdown/provider_filter.go} (71%) delete mode 100644 filter/gshutdown/filter_test.go diff --git a/config/graceful_shutdown.go b/config/graceful_shutdown.go index dfcc79ec4d..b359c36221 100644 --- a/config/graceful_shutdown.go +++ b/config/graceful_shutdown.go @@ -64,6 +64,10 @@ func GracefulShutdownInit() { filter.Set(constant.GracefulShutdownFilterShutdownConfig, rootConfig.Shutdown) } + if filter, ok := extension.GetFilter(constant.GracefulShutdownProviderFilterKey).(Setter); ok && rootConfig.Shutdown != nil { + filter.Set(constant.GracefulShutdownFilterShutdownConfig, rootConfig.Shutdown) + } + go func() { select { case sig := <-signals: @@ -115,13 +119,21 @@ func destroyAllRegistries() { // First we destroy provider's protocols, and then we destroy the consumer protocols. func destroyProtocols() { logger.Info("Graceful shutdown --- Destroy protocols. ") - logger.Info("Graceful shutdown --- First destroy provider's protocols. ") - consumerProtocols := getConsumerProtocols() if rootConfig.Protocols == nil { return } + consumerProtocols := getConsumerProtocols() + + destroyProviderProtocols(consumerProtocols) + destroyConsumerProtocols(consumerProtocols) +} + +// destroyProviderProtocols destroys the provider's protocol. +// if the protocol is consumer's protocol too, we will keep it +func destroyProviderProtocols(consumerProtocols *gxset.HashSet) { + logger.Info("Graceful shutdown --- First destroy provider's protocols. ") for _, protocol := range rootConfig.Protocols { // the protocol is the consumer's protocol too, we can not destroy it. if consumerProtocols.Contains(protocol.Name) { @@ -129,8 +141,10 @@ func destroyProtocols() { } extension.GetProtocol(protocol.Name).Destroy() } +} - logger.Info("Graceful shutdown --- Second destroy consumer's protocols. ") +func destroyConsumerProtocols(consumerProtocols *gxset.HashSet) { + logger.Info("Graceful shutdown --- Second Destroy consumer's protocols. ") for name := range consumerProtocols.Items { extension.GetProtocol(name.(string)).Destroy() } diff --git a/config/graceful_shutdown_config.go b/config/graceful_shutdown_config.go index fdf2376cbe..ff3a233fc3 100644 --- a/config/graceful_shutdown_config.go +++ b/config/graceful_shutdown_config.go @@ -28,7 +28,7 @@ import ( const ( defaultTimeout = 60 * time.Second - defaultStepTimeout = 10 * time.Second + defaultStepTimeout = 3 * time.Second ) // ShutdownConfig is used as configuration for graceful shutdown @@ -47,7 +47,7 @@ type ShutdownConfig struct { * and the 99.9% requests will return response in 2s, so the StepTimeout will be bigger than(10+2) * 1000ms, * maybe (10 + 2*3) * 1000ms is a good choice. */ - StepTimeout string `default:"10s" yaml:"step_timeout" json:"step.timeout,omitempty" property:"step.timeout"` + StepTimeout string `default:"3s" yaml:"step_timeout" json:"step.timeout,omitempty" property:"step.timeout"` // when we try to shutdown the applicationConfig, we will reject the new requests. In most cases, you don't need to configure this. RejectRequestHandler string `yaml:"reject_handler" json:"reject_handler,omitempty" property:"reject_handler"` // true -> new request will be rejected. diff --git a/config/service_config.go b/config/service_config.go index 9bc8e52962..3afd3acca3 100644 --- a/config/service_config.go +++ b/config/service_config.go @@ -21,6 +21,7 @@ import ( "container/list" "fmt" "net/url" + "os" "strconv" "strings" "sync" @@ -403,6 +404,7 @@ func (svc *ServiceConfig) getUrlMap() url.Values { // whether to export or not urlMap.Set(constant.ExportKey, strconv.FormatBool(svc.export)) + urlMap.Set(constant.PIDKey, fmt.Sprintf("%d", os.Getpid())) for _, v := range svc.Methods { prefix := "methods." + v.Name + "." diff --git a/filter/filter_impl/import.go b/filter/filter_impl/import.go index 015a5da987..2a415a17e2 100644 --- a/filter/filter_impl/import.go +++ b/filter/filter_impl/import.go @@ -28,7 +28,7 @@ import ( _ "dubbo.apache.org/dubbo-go/v3/filter/echo" _ "dubbo.apache.org/dubbo-go/v3/filter/exec_limit" _ "dubbo.apache.org/dubbo-go/v3/filter/generic" - _ "dubbo.apache.org/dubbo-go/v3/filter/gshutdown" + _ "dubbo.apache.org/dubbo-go/v3/filter/graceful_shutdown" _ "dubbo.apache.org/dubbo-go/v3/filter/hystrix" _ "dubbo.apache.org/dubbo-go/v3/filter/metrics" _ "dubbo.apache.org/dubbo-go/v3/filter/seata" diff --git a/filter/graceful_shutdown/consumer_filter.go b/filter/graceful_shutdown/consumer_filter.go new file mode 100644 index 0000000000..9471d44783 --- /dev/null +++ b/filter/graceful_shutdown/consumer_filter.go @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package graceful_shutdown + +import ( + "context" + "sync" + "sync/atomic" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/common/constant" + "dubbo.apache.org/dubbo-go/v3/common/extension" + "dubbo.apache.org/dubbo-go/v3/common/logger" + "dubbo.apache.org/dubbo-go/v3/config" + "dubbo.apache.org/dubbo-go/v3/filter" + "dubbo.apache.org/dubbo-go/v3/protocol" +) + +var ( + csfOnce sync.Once + csf *consumerGracefulShutdownFilter +) + +func init() { + // `init()` is performed before config.Load(), so shutdownConfig will be retrieved after config was loaded. + extension.SetFilter(constant.GracefulShutdownConsumerFilterKey, func() filter.Filter { + return newConsumerGracefulShutdownFilter() + }) + +} + +type consumerGracefulShutdownFilter struct { + activeCount int32 + shutdownConfig *config.ShutdownConfig +} + +func newConsumerGracefulShutdownFilter() filter.Filter { + if csf == nil { + csfOnce.Do(func() { + csf = &consumerGracefulShutdownFilter{} + + }) + } + return csf +} + +// Invoke adds the requests count and block the new requests if application is closing +func (f *consumerGracefulShutdownFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { + atomic.AddInt32(&f.activeCount, 1) + return invoker.Invoke(ctx, invocation) +} + +// OnResponse reduces the number of active processes then return the process result +func (f *consumerGracefulShutdownFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { + atomic.AddInt32(&f.activeCount, -1) + // although this isn't thread safe, it won't be a problem if the f.rejectNewRequest() is true. + if f.shutdownConfig != nil && f.shutdownConfig.RejectRequest && f.activeCount <= 0 { + f.shutdownConfig.RequestsFinished = true + } + return result +} + +func (f *consumerGracefulShutdownFilter) Set(name string, conf interface{}) { + switch name { + case constant.GracefulShutdownFilterShutdownConfig: + if shutdownConfig, ok := conf.(*config.ShutdownConfig); ok { + f.shutdownConfig = shutdownConfig + return + } + logger.Warnf("the type of config for {%s} should be *config.ShutdownConfig", constant.GracefulShutdownFilterShutdownConfig) + default: + // do nothing + } +} diff --git a/filter/gshutdown/filter.go b/filter/graceful_shutdown/provider_filter.go similarity index 71% rename from filter/gshutdown/filter.go rename to filter/graceful_shutdown/provider_filter.go index f2c3bae606..461b477211 100644 --- a/filter/gshutdown/filter.go +++ b/filter/graceful_shutdown/provider_filter.go @@ -15,11 +15,11 @@ * limitations under the License. */ -package gshutdown +package graceful_shutdown import ( "context" - "sync/atomic" + "sync" ) import ( @@ -31,44 +31,49 @@ import ( "dubbo.apache.org/dubbo-go/v3/protocol" ) +var ( + psfOnce sync.Once + psf *providerGracefulShutdownFilter +) + func init() { // `init()` is performed before config.Load(), so shutdownConfig will be retrieved after config was loaded. - var csf = &Filter{} - var psf = &Filter{} - extension.SetFilter(constant.GracefulShutdownConsumerFilterKey, func() filter.Filter { - return csf - }) + extension.SetFilter(constant.GracefulShutdownProviderFilterKey, func() filter.Filter { - return psf + return newProviderGracefulShutdownFilter() }) } -type Filter struct { - activeCount int32 +type providerGracefulShutdownFilter struct { shutdownConfig *config.ShutdownConfig } +func newProviderGracefulShutdownFilter() filter.Filter { + if psf == nil { + psfOnce.Do(func() { + psf = &providerGracefulShutdownFilter{} + + }) + } + return psf +} + // Invoke adds the requests count and block the new requests if application is closing -func (f *Filter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { +func (f *providerGracefulShutdownFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { if f.rejectNewRequest() { logger.Info("The application is closing, new request will be rejected.") return f.getRejectHandler().RejectedExecution(invoker.GetURL(), invocation) } - atomic.AddInt32(&f.activeCount, 1) return invoker.Invoke(ctx, invocation) } // OnResponse reduces the number of active processes then return the process result -func (f *Filter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { - atomic.AddInt32(&f.activeCount, -1) - // although this isn't thread safe, it won't be a problem if the f.rejectNewRequest() is true. - if f.shutdownConfig != nil && f.shutdownConfig.RejectRequest && f.activeCount <= 0 { - f.shutdownConfig.RequestsFinished = true - } +func (f *providerGracefulShutdownFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { + return result } -func (f *Filter) Set(name string, conf interface{}) { +func (f *providerGracefulShutdownFilter) Set(name string, conf interface{}) { switch name { case constant.GracefulShutdownFilterShutdownConfig: if shutdownConfig, ok := conf.(*config.ShutdownConfig); ok { @@ -81,14 +86,14 @@ func (f *Filter) Set(name string, conf interface{}) { } } -func (f *Filter) rejectNewRequest() bool { +func (f *providerGracefulShutdownFilter) rejectNewRequest() bool { if f.shutdownConfig == nil { return false } return f.shutdownConfig.RejectRequest } -func (f *Filter) getRejectHandler() filter.RejectedExecutionHandler { +func (f *providerGracefulShutdownFilter) getRejectHandler() filter.RejectedExecutionHandler { handler := constant.DefaultKey if f.shutdownConfig != nil && len(f.shutdownConfig.RejectRequestHandler) > 0 { handler = f.shutdownConfig.RejectRequestHandler diff --git a/filter/gshutdown/filter_test.go b/filter/gshutdown/filter_test.go deleted file mode 100644 index 36ddccb1cf..0000000000 --- a/filter/gshutdown/filter_test.go +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package gshutdown - -import ( - "context" - "net/url" - "testing" -) - -import ( - "github.com/stretchr/testify/assert" -) - -import ( - "dubbo.apache.org/dubbo-go/v3/common" - "dubbo.apache.org/dubbo-go/v3/common/constant" - "dubbo.apache.org/dubbo-go/v3/common/extension" - "dubbo.apache.org/dubbo-go/v3/config" - "dubbo.apache.org/dubbo-go/v3/filter" - common2 "dubbo.apache.org/dubbo-go/v3/filter/handler" - "dubbo.apache.org/dubbo-go/v3/protocol" - "dubbo.apache.org/dubbo-go/v3/protocol/invocation" -) - -func TestGenericFilterInvoke(t *testing.T) { - invoc := invocation.NewRPCInvocation("GetUser", []interface{}{"OK"}, make(map[string]interface{})) - invokeUrl := common.NewURLWithOptions(common.WithParams(url.Values{})) - - shutdownFilter := extension.GetFilter(constant.GracefulShutdownProviderFilterKey).(*Filter) - - rootConfig := config.GetRootConfig() - - assert.False(t, shutdownFilter.rejectNewRequest()) - assert.Nil(t, rootConfig.Shutdown) - - assert.Equal(t, extension.GetRejectedExecutionHandler(constant.DefaultKey), - shutdownFilter.getRejectHandler()) - - result := shutdownFilter.Invoke(context.Background(), protocol.NewBaseInvoker(invokeUrl), invoc) - assert.NotNil(t, result) - assert.Nil(t, result.Error()) - - rootConfig.Shutdown = &config.ShutdownConfig{ - RejectRequest: true, - RejectRequestHandler: "mock", - } - shutdownFilter.shutdownConfig = rootConfig.Shutdown - - assert.True(t, shutdownFilter.rejectNewRequest()) - result = shutdownFilter.OnResponse(context.Background(), nil, protocol.NewBaseInvoker(invokeUrl), invoc) - assert.Nil(t, result) - - rejectHandler := &common2.OnlyLogRejectedExecutionHandler{} - extension.SetRejectedExecutionHandler("mock", func() filter.RejectedExecutionHandler { - return rejectHandler - }) - assert.True(t, rootConfig.Shutdown.RequestsFinished) - assert.Equal(t, rejectHandler, shutdownFilter.getRejectHandler()) -} diff --git a/imports/imports.go b/imports/imports.go index 6e0cc1e6f4..23f9d676f4 100644 --- a/imports/imports.go +++ b/imports/imports.go @@ -44,7 +44,7 @@ import ( _ "dubbo.apache.org/dubbo-go/v3/filter/echo" _ "dubbo.apache.org/dubbo-go/v3/filter/exec_limit" _ "dubbo.apache.org/dubbo-go/v3/filter/generic" - _ "dubbo.apache.org/dubbo-go/v3/filter/gshutdown" + _ "dubbo.apache.org/dubbo-go/v3/filter/graceful_shutdown" _ "dubbo.apache.org/dubbo-go/v3/filter/hystrix" _ "dubbo.apache.org/dubbo-go/v3/filter/metrics" _ "dubbo.apache.org/dubbo-go/v3/filter/seata" diff --git a/protocol/grpc/grpc_protocol.go b/protocol/grpc/grpc_protocol.go index 048d8a023f..6b8596b725 100644 --- a/protocol/grpc/grpc_protocol.go +++ b/protocol/grpc/grpc_protocol.go @@ -103,14 +103,14 @@ func (gp *GrpcProtocol) Refer(url *common.URL) protocol.Invoker { func (gp *GrpcProtocol) Destroy() { logger.Infof("GrpcProtocol destroy.") - gp.BaseProtocol.Destroy() - gp.serverLock.Lock() defer gp.serverLock.Unlock() for key, server := range gp.serverMap { delete(gp.serverMap, key) - server.Stop() + server.GracefulStop() } + + gp.BaseProtocol.Destroy() } // GetProtocol gets gRPC protocol, will create if null. diff --git a/protocol/grpc/server.go b/protocol/grpc/server.go index d8bfa6acc5..dc16074f7c 100644 --- a/protocol/grpc/server.go +++ b/protocol/grpc/server.go @@ -165,3 +165,8 @@ func registerService(providerServices map[string]*config.ServiceConfig, server * func (s *Server) Stop() { s.grpcServer.Stop() } + +// GracefulStop gRPC server +func (s *Server) GracefulStop() { + s.grpcServer.GracefulStop() +} diff --git a/registry/base_registry.go b/registry/base_registry.go index c6151e8c88..eb860b2f1e 100644 --- a/registry/base_registry.go +++ b/registry/base_registry.go @@ -95,11 +95,11 @@ type BaseRegistry struct { // context context.Context facadeBasedRegistry FacadeBasedRegistry *common.URL - birth int64 // time of file birth, seconds since Epoch; 0 if unknown - wg sync.WaitGroup // wg+done for zk restart - done chan struct{} - cltLock sync.RWMutex // ctl lock is a lock for services map - services map[string]*common.URL // service name + protocol -> service config, for store the service registered + birth int64 // time of file birth, seconds since Epoch; 0 if unknown + wg sync.WaitGroup // wg+done for zk restart + done chan struct{} + registered *sync.Map + cltLock sync.RWMutex } // InitBaseRegistry for init some local variables and set BaseRegistry's subclass to it @@ -107,7 +107,7 @@ func (r *BaseRegistry) InitBaseRegistry(url *common.URL, facadeRegistry FacadeBa r.URL = url r.birth = time.Now().UnixNano() r.done = make(chan struct{}) - r.services = make(map[string]*common.URL) + r.registered = &sync.Map{} r.facadeBasedRegistry = facadeRegistry return r } @@ -131,77 +131,46 @@ func (r *BaseRegistry) Destroy() { } // Register implement interface registry to register -func (r *BaseRegistry) Register(conf *common.URL) error { - var ( - ok bool - err error - ) +func (r *BaseRegistry) Register(url *common.URL) error { // if developer define registry port and ip, use it first. - if ipToRegistry := os.Getenv("DUBBO_IP_TO_REGISTRY"); ipToRegistry != "" { - conf.Ip = ipToRegistry + if ipToRegistry := os.Getenv("DUBBO_IP_TO_REGISTRY"); len(ipToRegistry) > 0 { + url.Ip = ipToRegistry + } else { + url.Ip = common.GetLocalIp() } - if portToRegistry := os.Getenv("DUBBO_PORT_TO_REGISTRY"); portToRegistry != "" { - conf.Port = portToRegistry + if portToRegistry := os.Getenv("DUBBO_PORT_TO_REGISTRY"); len(portToRegistry) > 0 { + url.Port = portToRegistry } // todo bug when provider、consumer simultaneous initialization - //role, _ := strconv.Atoi(r.URL.GetParam(constant.RegistryRoleKey, "")) - role, _ := strconv.Atoi(conf.GetParam(constant.RegistryRoleKey, "")) - // Check if the service has been registered - r.cltLock.Lock() - _, ok = r.services[conf.Key()] - r.cltLock.Unlock() - if ok { - return perrors.Errorf("Path{%s} has been registered", conf.Key()) + if _, ok := r.registered.Load(url.Key()); ok { + return perrors.Errorf("Service {%s} has been registered", url.Key()) } - err = r.register(conf) - if err != nil { - return perrors.WithMessagef(err, "register(conf:%+v)", conf) - } + err := r.register(url) + if err == nil { + r.registered.Store(url.Key(), url) - r.cltLock.Lock() - r.services[conf.Key()] = conf - r.cltLock.Unlock() - logger.Debugf("(%sRegistry)Register(conf{%#v})", common.DubboRole[role], conf) + } else { + err = perrors.WithMessagef(err, "register(url:%+v)", url) + } - return nil + return err } // UnRegister implement interface registry to unregister -func (r *BaseRegistry) UnRegister(conf *common.URL) error { - var ( - ok bool - err error - oldURL *common.URL - ) - - func() { - r.cltLock.Lock() - defer r.cltLock.Unlock() - oldURL, ok = r.services[conf.Key()] - - if !ok { - err = perrors.Errorf("Path{%s} has not registered", conf.Key()) - } - - delete(r.services, conf.Key()) - }() - - if err != nil { - return err +func (r *BaseRegistry) UnRegister(url *common.URL) error { + if _, ok := r.registered.Load(url.Key()); !ok { + return perrors.Errorf("Service {%s} has not registered", url.Key()) } - err = r.unregister(conf) - if err != nil { - func() { - r.cltLock.Lock() - defer r.cltLock.Unlock() - r.services[conf.Key()] = oldURL - }() - return perrors.WithMessagef(err, "register(conf:%+v)", conf) + err := r.unregister(url) + if err == nil { + r.registered.Delete(url.Key()) + } else { + err = perrors.WithMessagef(err, "unregister(url:%+v)", url) } - return nil + return err } // service is for getting service path stored in url @@ -211,23 +180,20 @@ func (r *BaseRegistry) service(c *common.URL) string { // RestartCallBack for reregister when reconnect func (r *BaseRegistry) RestartCallBack() bool { - // copy r.services - services := make([]*common.URL, 0, len(r.services)) - for _, confIf := range r.services { - services = append(services, confIf) - } - flag := true - for _, confIf := range services { - err := r.register(confIf) + r.registered.Range(func(key, value interface{}) bool { + registeredUrl := value.(*common.URL) + err := r.register(registeredUrl) if err != nil { - logger.Errorf("(ZkProviderRegistry)register(conf{%#v}) = error{%#v}", - confIf, perrors.WithStack(err)) flag = false - break + logger.Errorf("failed to re-register service :%v, error{%#v}", + registeredUrl, perrors.WithStack(err)) + return flag } - logger.Infof("success to re-register service :%v", confIf.Key()) - } + + logger.Infof("success to re-register service :%v", registeredUrl.Key()) + return flag + }) if flag { r.facadeBasedRegistry.InitListeners() @@ -266,14 +232,8 @@ func (r *BaseRegistry) processURL(c *common.URL, f func(string, string) error, c return true }) - params.Add("pid", processID) - params.Add("ip", localIP) - // params.Add("timeout", fmt.Sprintf("%d", int64(r.Timeout)/1e6)) - role, _ := strconv.Atoi(c.GetParam(constant.RegistryRoleKey, "")) - //role, _ := strconv.Atoi(r.URL.GetParam(constant.RegistryRoleKey, "")) switch role { - case common.PROVIDER: dubboPath, rawURL, err = r.providerRegistry(c, params, cpf) case common.CONSUMER: @@ -330,7 +290,7 @@ func (r *BaseRegistry) providerRegistry(c *common.URL, params url.Values, f crea } logger.Debugf("provider url params:%#v", params) var host string - if c.Ip == "" { + if len(c.Ip) > 0 { host = localIP } else { host = c.Ip @@ -458,7 +418,7 @@ func (r *BaseRegistry) closeRegisters() { // Close and remove(set to nil) the registry client r.facadeBasedRegistry.CloseAndNilClient() // reset the services map - r.services = nil + r.registered = nil } // IsAvailable judge to is registry not closed by chan r.done diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index 56606a0722..76730b1e33 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -21,6 +21,7 @@ import ( "context" "strings" "sync" + "time" ) import ( @@ -72,12 +73,6 @@ func init() { extension.SetProtocol("registry", GetProtocol) } -func getCacheKey(invoker protocol.Invoker) string { - url := getProviderUrl(invoker) - delKeys := gxset.NewSet("dynamic", "enabled") - return url.CloneExceptParams(delKeys).String() -} - func newRegistryProtocol() *registryProtocol { return ®istryProtocol{ registries: &sync.Map{}, @@ -85,13 +80,24 @@ func newRegistryProtocol() *registryProtocol { } } -func getRegistry(regUrl *common.URL) registry.Registry { - reg, err := extension.GetRegistry(regUrl.Protocol, regUrl) - if err != nil { - logger.Errorf("Registry can not connect success, program is going to panic.Error message is %s", err.Error()) - panic(err.Error()) +func (proto *registryProtocol) getRegistry(registryUrl *common.URL) registry.Registry { + var err error + reg, loaded := proto.registries.Load(registryUrl.Location) + if !loaded { + reg, err = extension.GetRegistry(registryUrl.Protocol, registryUrl) + if err != nil { + logger.Errorf("Registry can not connect success, program is going to panic.Error message is %s", err.Error()) + panic(err.Error()) + } + proto.registries.Store(registryUrl.Location, reg) } - return reg + return reg.(registry.Registry) +} + +func getCacheKey(invoker protocol.Invoker) string { + url := getProviderUrl(invoker) + delKeys := gxset.NewSet("dynamic", "enabled") + return url.CloneExceptParams(delKeys).String() } func getUrlToRegistry(providerUrl *common.URL, registryUrl *common.URL) *common.URL { @@ -140,13 +146,7 @@ func (proto *registryProtocol) Refer(url *common.URL) protocol.Invoker { registryUrl.Protocol = registryUrl.GetParam(constant.RegistryKey, "") } - var reg registry.Registry - if regI, loaded := proto.registries.Load(registryUrl.Key()); !loaded { - reg = getRegistry(registryUrl) - proto.registries.Store(registryUrl.Key(), reg) - } else { - reg = regI.(registry.Registry) - } + reg := proto.getRegistry(url) // new registry directory for store service url from registry directory, err := extension.GetDefaultRegistryDirectory(registryUrl, reg) @@ -170,66 +170,71 @@ func (proto *registryProtocol) Refer(url *common.URL) protocol.Invoker { } // Export provider service to registry center -func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporter { +func (proto *registryProtocol) Export(originInvoker protocol.Invoker) protocol.Exporter { proto.once.Do(func() { proto.initConfigurationListeners() }) - registryUrl := getRegistryUrl(invoker) - providerUrl := getProviderUrl(invoker) + registryUrl := getRegistryUrl(originInvoker) + providerUrl := getProviderUrl(originInvoker) overriderUrl := getSubscribedOverrideUrl(providerUrl) // Deprecated! subscribe to override rules in 2.6.x or before. - overrideSubscribeListener := newOverrideSubscribeListener(overriderUrl, invoker, proto) + overrideSubscribeListener := newOverrideSubscribeListener(overriderUrl, originInvoker, proto) proto.overrideListeners.Store(overriderUrl, overrideSubscribeListener) proto.providerConfigurationListener.OverrideUrl(providerUrl) serviceConfigurationListener := newServiceConfigurationListener(overrideSubscribeListener, providerUrl) proto.serviceConfigurationListeners.Store(providerUrl.ServiceKey(), serviceConfigurationListener) serviceConfigurationListener.OverrideUrl(providerUrl) - var reg registry.Registry - if registryUrl.Protocol != "" { - if regI, loaded := proto.registries.Load(registryUrl.Key()); !loaded { - reg = getRegistry(registryUrl) - proto.registries.Store(registryUrl.Key(), reg) - logger.Debugf("Export proto:%p registries address:%p", proto, proto.registries) - } else { - reg = regI.(registry.Registry) - } + // export invoker + exporter := proto.doLocalExport(originInvoker, providerUrl) + + if len(registryUrl.Protocol) > 0 { + // url to registry + reg := proto.getRegistry(registryUrl) registeredProviderUrl := getUrlToRegistry(providerUrl, registryUrl) + err := reg.Register(registeredProviderUrl) if err != nil { logger.Errorf("provider service %v register registry %v error, error message is %s", providerUrl.Key(), registryUrl.Key(), err.Error()) return nil } - } - - key := getCacheKey(invoker) - logger.Debugf("The cached exporter keys is %v!", key) - cachedExporter, loaded := proto.bounds.Load(key) - if loaded { - logger.Debugf("The exporter has been cached, and will return cached exporter!") - } else { - wrappedInvoker := newWrappedInvoker(invoker, providerUrl) - cachedExporter = extension.GetProtocol(protocolwrapper.FILTER).Export(wrappedInvoker) - proto.bounds.Store(key, cachedExporter) - logger.Debugf("The exporter has not been cached, and will return a new exporter!") - } - if registryUrl.Protocol != "" { go func() { if err := reg.Subscribe(overriderUrl, overrideSubscribeListener); err != nil { logger.Warnf("reg.subscribe(overriderUrl:%v) = error:%v", overriderUrl, err) } }() + + exporter.SetRegisterUrl(registeredProviderUrl) + exporter.SetSubscribeUrl(overriderUrl) + + } else { + logger.Warnf("provider service %v do not regist to registry %v. possible direct connection provider", + providerUrl.Key(), registryUrl.Key()) + } + + return exporter +} + +func (proto *registryProtocol) doLocalExport(originInvoker protocol.Invoker, providerUrl *common.URL) *exporterChangeableWrapper { + key := getCacheKey(originInvoker) + cachedExporter, loaded := proto.bounds.Load(key) + if !loaded { + // new Exporter + invokerDelegate := newInvokerDelegate(originInvoker, providerUrl) + cachedExporter = newExporterChangeableWrapper(originInvoker, + extension.GetProtocol(protocolwrapper.FILTER).Export(invokerDelegate)) + proto.bounds.Store(key, cachedExporter) } - return cachedExporter.(protocol.Exporter) + return cachedExporter.(*exporterChangeableWrapper) } func (proto *registryProtocol) reExport(invoker protocol.Invoker, newUrl *common.URL) { key := getCacheKey(invoker) if oldExporter, loaded := proto.bounds.Load(key); loaded { - wrappedNewInvoker := newWrappedInvoker(invoker, newUrl) + wrappedNewInvoker := newInvokerDelegate(invoker, newUrl) oldExporter.(protocol.Exporter).Unexport() proto.bounds.Delete(key) // oldExporter Unexport function unRegister rpcService from the serviceMap, so need register it again as far as possible @@ -389,20 +394,22 @@ func getSubscribedOverrideUrl(providerUrl *common.URL) *common.URL { // Destroy registry protocol func (proto *registryProtocol) Destroy() { - // invoker.Destroy() should be performed in config.destroyConsumerProtocols(). - proto.invokers = []protocol.Invoker{} proto.bounds.Range(func(key, value interface{}) bool { // protocol holds the exporters actually, instead, registry holds them in order to avoid export repeatedly, so // the work for unexport should be finished in protocol.Unexport(), see also config.destroyProviderProtocols(). - proto.bounds.Delete(key) - return true - }) - proto.registries.Range(func(key, value interface{}) bool { - reg := value.(registry.Registry) - if reg.IsAvailable() { - reg.Destroy() + exporter := value.(*exporterChangeableWrapper) + reg := proto.getRegistry(getRegistryUrl(exporter.originInvoker)) + if err := reg.UnRegister(exporter.registerUrl); err != nil { + panic(err) + } + // TODO unsubscribeUrl + + // TODO shutdwon phase + select { + case <-time.After(3 * time.Second): + exporter.Unexport() + proto.bounds.Delete(key) } - proto.registries.Delete(key) return true }) } @@ -435,23 +442,50 @@ func GetProtocol() protocol.Protocol { return regProtocol } -type wrappedInvoker struct { +type invokerDelegate struct { invoker protocol.Invoker protocol.BaseInvoker } -func newWrappedInvoker(invoker protocol.Invoker, url *common.URL) *wrappedInvoker { - return &wrappedInvoker{ +func newInvokerDelegate(invoker protocol.Invoker, url *common.URL) *invokerDelegate { + return &invokerDelegate{ invoker: invoker, BaseInvoker: *protocol.NewBaseInvoker(url), } } // Invoke remote service base on URL of wrappedInvoker -func (ivk *wrappedInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { +func (ivk *invokerDelegate) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { return ivk.invoker.Invoke(ctx, invocation) } +type exporterChangeableWrapper struct { + protocol.Exporter + originInvoker protocol.Invoker + exporter protocol.Exporter + registerUrl *common.URL + subscribeUrl *common.URL +} + +func (e *exporterChangeableWrapper) Unexport() { + e.exporter.Unexport() +} + +func (e *exporterChangeableWrapper) SetRegisterUrl(registerUrl *common.URL) { + e.registerUrl = registerUrl +} + +func (e *exporterChangeableWrapper) SetSubscribeUrl(subscribeUrl *common.URL) { + e.subscribeUrl = subscribeUrl +} + +func newExporterChangeableWrapper(originInvoker protocol.Invoker, exporter protocol.Exporter) *exporterChangeableWrapper { + return &exporterChangeableWrapper{ + originInvoker: originInvoker, + exporter: exporter, + } +} + type providerConfigurationListener struct { registry.BaseConfigurationListener overrideListeners *sync.Map From 5759c77018508a14a54ce0ff54636623966e597f Mon Sep 17 00:00:00 2001 From: XiaoWeiKIN <2484713618@qq.com> Date: Sun, 26 Dec 2021 20:54:58 +0800 Subject: [PATCH 02/15] =?UTF-8?q?=E4=BC=98=E9=9B=85=E4=B8=8B=E7=BA=BF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/graceful_shutdown.go | 50 ++++++++-------- config/graceful_shutdown_config.go | 7 +++ config/root_config.go | 17 ++++++ .../graceful_shutdown/consumer_filter_test.go | 36 +++++++++++ .../graceful_shutdown/procider_filter_test.go | 60 +++++++++++++++++++ filter/graceful_shutdown/provider_filter.go | 1 - registry/protocol/protocol.go | 3 +- 7 files changed, 146 insertions(+), 28 deletions(-) create mode 100644 filter/graceful_shutdown/consumer_filter_test.go create mode 100644 filter/graceful_shutdown/procider_filter_test.go diff --git a/config/graceful_shutdown.go b/config/graceful_shutdown.go index b359c36221..05e78f0518 100644 --- a/config/graceful_shutdown.go +++ b/config/graceful_shutdown.go @@ -53,40 +53,40 @@ import ( */ const defaultShutDownTime = time.Second * 60 -// GracefulShutdownInit todo GracefulShutdownInit in 3.0 should be discusesed. -func GracefulShutdownInit() { - signals := make(chan os.Signal, 1) - - signal.Notify(signals, ShutdownSignals...) - +func gracefulShutdownInit() { // retrieve ShutdownConfig for gracefulShutdownFilter if filter, ok := extension.GetFilter(constant.GracefulShutdownConsumerFilterKey).(Setter); ok && rootConfig.Shutdown != nil { - filter.Set(constant.GracefulShutdownFilterShutdownConfig, rootConfig.Shutdown) + filter.Set(constant.GracefulShutdownFilterShutdownConfig, GetShutDown()) } if filter, ok := extension.GetFilter(constant.GracefulShutdownProviderFilterKey).(Setter); ok && rootConfig.Shutdown != nil { - filter.Set(constant.GracefulShutdownFilterShutdownConfig, rootConfig.Shutdown) + filter.Set(constant.GracefulShutdownFilterShutdownConfig, GetShutDown()) } - go func() { - select { - case sig := <-signals: - logger.Infof("get signal %s, applicationConfig will shutdown.", sig) - // gracefulShutdownOnce.Do(func() { - time.AfterFunc(totalTimeout(), func() { - logger.Warn("Shutdown gracefully timeout, applicationConfig will shutdown immediately. ") - os.Exit(0) - }) - BeforeShutdown() - // those signals' original behavior is exit with dump ths stack, so we try to keep the behavior - for _, dumpSignal := range DumpHeapShutdownSignals { - if sig == dumpSignal { - debug.WriteHeapDump(os.Stdout.Fd()) + if GetShutDown().InternalSignal { + signals := make(chan os.Signal, 1) + signal.Notify(signals, ShutdownSignals...) + + go func() { + select { + case sig := <-signals: + logger.Infof("get signal %s, applicationConfig will shutdown.", sig) + // gracefulShutdownOnce.Do(func() { + time.AfterFunc(totalTimeout(), func() { + logger.Warn("Shutdown gracefully timeout, applicationConfig will shutdown immediately. ") + os.Exit(0) + }) + BeforeShutdown() + // those signals' original behavior is exit with dump ths stack, so we try to keep the behavior + for _, dumpSignal := range DumpHeapShutdownSignals { + if sig == dumpSignal { + debug.WriteHeapDump(os.Stdout.Fd()) + } } + os.Exit(0) } - os.Exit(0) - } - }() + }() + } } // BeforeShutdown provides processing flow before shutdown diff --git a/config/graceful_shutdown_config.go b/config/graceful_shutdown_config.go index ff3a233fc3..2760e35d18 100644 --- a/config/graceful_shutdown_config.go +++ b/config/graceful_shutdown_config.go @@ -55,6 +55,8 @@ type ShutdownConfig struct { // true -> all requests had been processed. In provider side it means that all requests are returned response to clients // In consumer side, it means that all requests getting response from servers RequestsFinished bool + // internal listen kill signal,the default is true. + InternalSignal bool `default:"true" yaml:"internal_signal" json:"internal.signal,omitempty" property:"internal.signal"` } // Prefix dubbo.shutdown @@ -117,6 +119,11 @@ func (scb *ShutdownConfigBuilder) SetRejectRequest(rejectRequest bool) *Shutdown return scb } +func (scb *ShutdownConfigBuilder) SetInternalSignal(internalSignal bool) *ShutdownConfigBuilder { + scb.shutdownConfig.InternalSignal = internalSignal + return scb +} + func (scb *ShutdownConfigBuilder) Build() *ShutdownConfig { return scb.shutdownConfig } diff --git a/config/root_config.go b/config/root_config.go index ad3c33acea..fb061923b4 100644 --- a/config/root_config.go +++ b/config/root_config.go @@ -121,6 +121,16 @@ func GetApplicationConfig() *ApplicationConfig { return rootConfig.Application } +func GetShutDown() *ShutdownConfig { + if err := check(); err != nil { + return NewShutDownConfigBuilder().Build() + } + if rootConfig.Shutdown != nil { + return rootConfig.Shutdown + } + return NewShutDownConfigBuilder().Build() +} + // getRegistryIds get registry ids func (rc *RootConfig) getRegistryIds() []string { ids := make([]string, 0) @@ -217,6 +227,7 @@ func (rc *RootConfig) Start() { // todo if register consumer instance or has exported services exportMetadataService() registerServiceInstance() + gracefulShutdownInit() }) } @@ -234,6 +245,7 @@ func newEmptyRootConfig() *RootConfig { Metric: NewMetricConfigBuilder().Build(), Logger: NewLoggerConfigBuilder().Build(), Custom: NewCustomConfigBuilder().Build(), + Shutdown: NewShutDownConfigBuilder().Build(), } return newRootConfig } @@ -326,6 +338,11 @@ func (rb *RootConfigBuilder) SetCustom(customConfig *CustomConfig) *RootConfigBu return rb } +func (rb *RootConfigBuilder) SetShutDown(shutDownConfig *ShutdownConfig) *RootConfigBuilder { + rb.rootConfig.Shutdown = shutDownConfig + return rb +} + func (rb *RootConfigBuilder) Build() *RootConfig { return rb.rootConfig } diff --git a/filter/graceful_shutdown/consumer_filter_test.go b/filter/graceful_shutdown/consumer_filter_test.go new file mode 100644 index 0000000000..3e551d4a79 --- /dev/null +++ b/filter/graceful_shutdown/consumer_filter_test.go @@ -0,0 +1,36 @@ +package graceful_shutdown + +import ( + "context" + "dubbo.apache.org/dubbo-go/v3/common" + "dubbo.apache.org/dubbo-go/v3/common/constant" + "dubbo.apache.org/dubbo-go/v3/common/extension" + "dubbo.apache.org/dubbo-go/v3/config" + "dubbo.apache.org/dubbo-go/v3/protocol" + "dubbo.apache.org/dubbo-go/v3/protocol/invocation" + "github.com/stretchr/testify/assert" + "net/url" + "testing" +) + +func TestConusmerFilterInvoke(t *testing.T) { + url := common.NewURLWithOptions(common.WithParams(url.Values{})) + invocation := invocation.NewRPCInvocation("GetUser", []interface{}{"OK"}, make(map[string]interface{})) + + rootConfig := config.NewRootConfigBuilder(). + SetShutDown(config.NewShutDownConfigBuilder(). + SetTimeout("60s"). + SetStepTimeout("3s"). + Build()).Build() + + config.SetRootConfig(*rootConfig) + + filter := extension.GetFilter(constant.GracefulShutdownConsumerFilterKey).(*consumerGracefulShutdownFilter) + filter.Set(constant.GracefulShutdownFilterShutdownConfig, config.GetShutDown()) + + assert.Equal(t, filter.shutdownConfig, config.GetShutDown()) + + result := filter.Invoke(context.Background(), protocol.NewBaseInvoker(url), invocation) + assert.NotNil(t, result) + assert.Nil(t, result.Error()) +} diff --git a/filter/graceful_shutdown/procider_filter_test.go b/filter/graceful_shutdown/procider_filter_test.go new file mode 100644 index 0000000000..21b61553a1 --- /dev/null +++ b/filter/graceful_shutdown/procider_filter_test.go @@ -0,0 +1,60 @@ +package graceful_shutdown + +import ( + "context" + "dubbo.apache.org/dubbo-go/v3/common" + "dubbo.apache.org/dubbo-go/v3/common/constant" + "dubbo.apache.org/dubbo-go/v3/common/extension" + "dubbo.apache.org/dubbo-go/v3/config" + "dubbo.apache.org/dubbo-go/v3/filter" + "dubbo.apache.org/dubbo-go/v3/protocol" + "dubbo.apache.org/dubbo-go/v3/protocol/invocation" + perrors "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "net/url" + "testing" +) + +func TestProviderFilterInvoke(t *testing.T) { + url := common.NewURLWithOptions(common.WithParams(url.Values{})) + invocation := invocation.NewRPCInvocation("GetUser", []interface{}{"OK"}, make(map[string]interface{})) + + extension.SetRejectedExecutionHandler("test", func() filter.RejectedExecutionHandler { + return &TestRejectedExecutionHandler{} + }) + + rootConfig := config.NewRootConfigBuilder(). + SetShutDown(config.NewShutDownConfigBuilder(). + SetTimeout("60s"). + SetStepTimeout("3s"). + SetRejectRequestHandler("test"). + Build()).Build() + + config.SetRootConfig(*rootConfig) + + filter := extension.GetFilter(constant.GracefulShutdownProviderFilterKey).(*providerGracefulShutdownFilter) + filter.Set(constant.GracefulShutdownFilterShutdownConfig, config.GetShutDown()) + + assert.Equal(t, filter.shutdownConfig, config.GetShutDown()) + + result := filter.Invoke(context.Background(), protocol.NewBaseInvoker(url), invocation) + assert.NotNil(t, result) + assert.Nil(t, result.Error()) + + config.GetShutDown().RejectRequest = true + result = filter.Invoke(context.Background(), protocol.NewBaseInvoker(url), invocation) + assert.NotNil(t, result) + assert.NotNil(t, result.Error().Error(), "Rejected") + +} + +type TestRejectedExecutionHandler struct{} + +// RejectedExecution will do nothing, it only log the invocation. +func (handler *TestRejectedExecutionHandler) RejectedExecution(url *common.URL, + _ protocol.Invocation) protocol.Result { + + return &protocol.RPCResult{ + Err: perrors.New("Rejected"), + } +} diff --git a/filter/graceful_shutdown/provider_filter.go b/filter/graceful_shutdown/provider_filter.go index 461b477211..56bbedb2a7 100644 --- a/filter/graceful_shutdown/provider_filter.go +++ b/filter/graceful_shutdown/provider_filter.go @@ -38,7 +38,6 @@ var ( func init() { // `init()` is performed before config.Load(), so shutdownConfig will be retrieved after config was loaded. - extension.SetFilter(constant.GracefulShutdownProviderFilterKey, func() filter.Filter { return newProviderGracefulShutdownFilter() }) diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index 76730b1e33..280073a421 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -404,9 +404,8 @@ func (proto *registryProtocol) Destroy() { } // TODO unsubscribeUrl - // TODO shutdwon phase select { - case <-time.After(3 * time.Second): + case <-time.After(config.GetShutDown().GetStepTimeout()): exporter.Unexport() proto.bounds.Delete(key) } From 9b7b8970d7f981090ba6f0e971b8eae96f072024 Mon Sep 17 00:00:00 2001 From: XiaoWeiKIN <2484713618@qq.com> Date: Sun, 26 Dec 2021 21:02:25 +0800 Subject: [PATCH 03/15] unit test add --- filter/graceful_shutdown/procider_filter_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/filter/graceful_shutdown/procider_filter_test.go b/filter/graceful_shutdown/procider_filter_test.go index 21b61553a1..386d8ddfad 100644 --- a/filter/graceful_shutdown/procider_filter_test.go +++ b/filter/graceful_shutdown/procider_filter_test.go @@ -34,7 +34,6 @@ func TestProviderFilterInvoke(t *testing.T) { filter := extension.GetFilter(constant.GracefulShutdownProviderFilterKey).(*providerGracefulShutdownFilter) filter.Set(constant.GracefulShutdownFilterShutdownConfig, config.GetShutDown()) - assert.Equal(t, filter.shutdownConfig, config.GetShutDown()) result := filter.Invoke(context.Background(), protocol.NewBaseInvoker(url), invocation) From 941351d192ec7be2c4330ead291b8048e28ac84b Mon Sep 17 00:00:00 2001 From: XiaoWeiKIN <2484713618@qq.com> Date: Sun, 26 Dec 2021 21:11:54 +0800 Subject: [PATCH 04/15] add test --- filter/graceful_shutdown/consumer_filter_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/filter/graceful_shutdown/consumer_filter_test.go b/filter/graceful_shutdown/consumer_filter_test.go index 3e551d4a79..91821ae0b1 100644 --- a/filter/graceful_shutdown/consumer_filter_test.go +++ b/filter/graceful_shutdown/consumer_filter_test.go @@ -27,7 +27,6 @@ func TestConusmerFilterInvoke(t *testing.T) { filter := extension.GetFilter(constant.GracefulShutdownConsumerFilterKey).(*consumerGracefulShutdownFilter) filter.Set(constant.GracefulShutdownFilterShutdownConfig, config.GetShutDown()) - assert.Equal(t, filter.shutdownConfig, config.GetShutDown()) result := filter.Invoke(context.Background(), protocol.NewBaseInvoker(url), invocation) From da25c24740bcfb916cc786a3fad0ce5622f04898 Mon Sep 17 00:00:00 2001 From: XiaoWeiKIN <2484713618@qq.com> Date: Sun, 26 Dec 2021 21:58:15 +0800 Subject: [PATCH 05/15] import format --- filter/graceful_shutdown/consumer_filter_test.go | 12 +++++++++--- filter/graceful_shutdown/procider_filter_test.go | 14 ++++++++++---- registry/protocol/protocol.go | 1 - 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/filter/graceful_shutdown/consumer_filter_test.go b/filter/graceful_shutdown/consumer_filter_test.go index 91821ae0b1..ed3371cdb1 100644 --- a/filter/graceful_shutdown/consumer_filter_test.go +++ b/filter/graceful_shutdown/consumer_filter_test.go @@ -2,15 +2,21 @@ package graceful_shutdown import ( "context" + "net/url" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" "dubbo.apache.org/dubbo-go/v3/config" "dubbo.apache.org/dubbo-go/v3/protocol" "dubbo.apache.org/dubbo-go/v3/protocol/invocation" - "github.com/stretchr/testify/assert" - "net/url" - "testing" ) func TestConusmerFilterInvoke(t *testing.T) { diff --git a/filter/graceful_shutdown/procider_filter_test.go b/filter/graceful_shutdown/procider_filter_test.go index 386d8ddfad..098bfc2aaa 100644 --- a/filter/graceful_shutdown/procider_filter_test.go +++ b/filter/graceful_shutdown/procider_filter_test.go @@ -2,6 +2,16 @@ package graceful_shutdown import ( "context" + "net/url" + "testing" +) + +import ( + perrors "github.com/pkg/errors" + "github.com/stretchr/testify/assert" +) + +import ( "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" @@ -9,10 +19,6 @@ import ( "dubbo.apache.org/dubbo-go/v3/filter" "dubbo.apache.org/dubbo-go/v3/protocol" "dubbo.apache.org/dubbo-go/v3/protocol/invocation" - perrors "github.com/pkg/errors" - "github.com/stretchr/testify/assert" - "net/url" - "testing" ) func TestProviderFilterInvoke(t *testing.T) { diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index 280073a421..785d399e43 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -26,7 +26,6 @@ import ( import ( gxset "github.com/dubbogo/gost/container/set" - perrors "github.com/pkg/errors" ) From e8791264ba3e5574eec42948fefe0dc86bd5534b Mon Sep 17 00:00:00 2001 From: LaurenceLiZhixin <382673304@qq.com> Date: Wed, 5 Jan 2022 16:39:23 +0800 Subject: [PATCH 06/15] Fix: import formatter --- filter/graceful_shutdown/procider_filter_test.go | 1 + registry/protocol/protocol.go | 1 + 2 files changed, 2 insertions(+) diff --git a/filter/graceful_shutdown/procider_filter_test.go b/filter/graceful_shutdown/procider_filter_test.go index 098bfc2aaa..1327aac8d0 100644 --- a/filter/graceful_shutdown/procider_filter_test.go +++ b/filter/graceful_shutdown/procider_filter_test.go @@ -8,6 +8,7 @@ import ( import ( perrors "github.com/pkg/errors" + "github.com/stretchr/testify/assert" ) diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index 785d399e43..280073a421 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -26,6 +26,7 @@ import ( import ( gxset "github.com/dubbogo/gost/container/set" + perrors "github.com/pkg/errors" ) From e2ff2ec2abca2006fc74297f3cc0f5d93d875207 Mon Sep 17 00:00:00 2001 From: LaurenceLiZhixin <382673304@qq.com> Date: Wed, 5 Jan 2022 16:54:00 +0800 Subject: [PATCH 07/15] Fix: license --- .../graceful_shutdown/consumer_filter_test.go | 17 +++++++++++++++++ .../graceful_shutdown/procider_filter_test.go | 17 +++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/filter/graceful_shutdown/consumer_filter_test.go b/filter/graceful_shutdown/consumer_filter_test.go index ed3371cdb1..65b5848d32 100644 --- a/filter/graceful_shutdown/consumer_filter_test.go +++ b/filter/graceful_shutdown/consumer_filter_test.go @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package graceful_shutdown import ( diff --git a/filter/graceful_shutdown/procider_filter_test.go b/filter/graceful_shutdown/procider_filter_test.go index 1327aac8d0..48a44ba042 100644 --- a/filter/graceful_shutdown/procider_filter_test.go +++ b/filter/graceful_shutdown/procider_filter_test.go @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package graceful_shutdown import ( From 3dd5c1ff9dd3a160f3a542c90faca2fe0eecb721 Mon Sep 17 00:00:00 2001 From: wangxiaowei14227 Date: Wed, 5 Jan 2022 17:28:56 +0800 Subject: [PATCH 08/15] invoker --- registry/protocol/protocol.go | 4 ++++ registry/protocol/protocol_test.go | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index 280073a421..4377f6eefd 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -478,6 +478,10 @@ func (e *exporterChangeableWrapper) SetSubscribeUrl(subscribeUrl *common.URL) { e.subscribeUrl = subscribeUrl } +func (e *exporterChangeableWrapper) GetInvoker() protocol.Invoker { + return e.originInvoker +} + func newExporterChangeableWrapper(originInvoker protocol.Invoker, exporter protocol.Exporter) *exporterChangeableWrapper { return &exporterChangeableWrapper{ originInvoker: originInvoker, diff --git a/registry/protocol/protocol_test.go b/registry/protocol/protocol_test.go index 0bef773252..e069759b57 100644 --- a/registry/protocol/protocol_test.go +++ b/registry/protocol/protocol_test.go @@ -133,7 +133,7 @@ func exporterNormal(t *testing.T, regProtocol *registryProtocol) *common.URL { invoker := protocol.NewBaseInvoker(url) exporter := regProtocol.Export(invoker) - assert.IsType(t, &protocol.BaseExporter{}, exporter) + assert.IsType(t, &exporterChangeableWrapper{}, exporter) assert.Equal(t, exporter.GetInvoker().GetURL().String(), suburl.String()) return url } From 5b8939ef4eab52ecbf4aef70ddf0b5f45843e1f5 Mon Sep 17 00:00:00 2001 From: wangxiaowei14227 Date: Wed, 5 Jan 2022 17:57:58 +0800 Subject: [PATCH 09/15] invoker --- registry/protocol/protocol.go | 4 +--- registry/protocol/protocol_test.go | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index 4377f6eefd..6b85a76cf0 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -56,7 +56,6 @@ var ( ) type registryProtocol struct { - invokers []protocol.Invoker // Registry Map registries *sync.Map // To solve the problem of RMI repeated exposure port conflicts, @@ -165,7 +164,6 @@ func (proto *registryProtocol) Refer(url *common.URL) protocol.Invoker { // new cluster invoker cluster := extension.GetCluster(serviceUrl.GetParam(constant.ClusterKey, constant.DefaultCluster)) invoker := cluster.Join(directory) - proto.invokers = append(proto.invokers, invoker) return invoker } @@ -479,7 +477,7 @@ func (e *exporterChangeableWrapper) SetSubscribeUrl(subscribeUrl *common.URL) { } func (e *exporterChangeableWrapper) GetInvoker() protocol.Invoker { - return e.originInvoker + return e.exporter.GetInvoker() } func newExporterChangeableWrapper(originInvoker protocol.Invoker, exporter protocol.Exporter) *exporterChangeableWrapper { diff --git a/registry/protocol/protocol_test.go b/registry/protocol/protocol_test.go index e069759b57..7e0713487e 100644 --- a/registry/protocol/protocol_test.go +++ b/registry/protocol/protocol_test.go @@ -46,6 +46,7 @@ import ( func init() { config.SetRootConfig(config.RootConfig{ Application: &config.ApplicationConfig{Name: "test-application"}, + Shutdown: &config.ShutdownConfig{StepTimeout: "0s"}, }) } @@ -209,7 +210,6 @@ func TestDestry(t *testing.T) { exporterNormal(t, regProtocol) regProtocol.Destroy() - assert.Equal(t, len(regProtocol.invokers), 0) var count int regProtocol.registries.Range(func(key, value interface{}) bool { From 6d6a61a359751fd2cd3a13cac56fad0255f8e824 Mon Sep 17 00:00:00 2001 From: LaurenceLiZhixin <382673304@qq.com> Date: Fri, 7 Jan 2022 16:35:03 +0800 Subject: [PATCH 10/15] fix: ut --- common/extension/filter.go | 6 +++--- config/graceful_shutdown.go | 12 ++++++++++-- filter/graceful_shutdown/consumer_filter_test.go | 3 ++- filter/graceful_shutdown/procider_filter_test.go | 3 ++- filter/sentinel/filter_test.go | 7 +++---- protocol/protocolwrapper/protocol_filter_wrapper.go | 2 +- registry/protocol/protocol.go | 5 +++++ registry/protocol/protocol_test.go | 12 ++++++------ 8 files changed, 32 insertions(+), 18 deletions(-) diff --git a/common/extension/filter.go b/common/extension/filter.go index a85d8e7efc..d99d16442c 100644 --- a/common/extension/filter.go +++ b/common/extension/filter.go @@ -33,11 +33,11 @@ func SetFilter(name string, v func() filter.Filter) { } // GetFilter finds the filter extension with @name -func GetFilter(name string) filter.Filter { +func GetFilter(name string) (filter.Filter, bool) { if filters[name] == nil { - panic("filter for " + name + " is not existing, make sure you have imported the package.") + return nil, false } - return filters[name]() + return filters[name](), true } // SetRejectedExecutionHandler sets the RejectedExecutionHandler with @name diff --git a/config/graceful_shutdown.go b/config/graceful_shutdown.go index 05e78f0518..941aa1e573 100644 --- a/config/graceful_shutdown.go +++ b/config/graceful_shutdown.go @@ -55,11 +55,19 @@ const defaultShutDownTime = time.Second * 60 func gracefulShutdownInit() { // retrieve ShutdownConfig for gracefulShutdownFilter - if filter, ok := extension.GetFilter(constant.GracefulShutdownConsumerFilterKey).(Setter); ok && rootConfig.Shutdown != nil { + cGracefulShutdownFilter, existcGracefulShutdownFilter := extension.GetFilter(constant.GracefulShutdownConsumerFilterKey) + if !existcGracefulShutdownFilter { + return + } + sGracefulShutdownFilter, existsGracefulShutdownFilter := extension.GetFilter(constant.GracefulShutdownConsumerFilterKey) + if !existsGracefulShutdownFilter { + return + } + if filter, ok := cGracefulShutdownFilter.(Setter); ok && rootConfig.Shutdown != nil { filter.Set(constant.GracefulShutdownFilterShutdownConfig, GetShutDown()) } - if filter, ok := extension.GetFilter(constant.GracefulShutdownProviderFilterKey).(Setter); ok && rootConfig.Shutdown != nil { + if filter, ok := sGracefulShutdownFilter.(Setter); ok && rootConfig.Shutdown != nil { filter.Set(constant.GracefulShutdownFilterShutdownConfig, GetShutDown()) } diff --git a/filter/graceful_shutdown/consumer_filter_test.go b/filter/graceful_shutdown/consumer_filter_test.go index 65b5848d32..e437b51184 100644 --- a/filter/graceful_shutdown/consumer_filter_test.go +++ b/filter/graceful_shutdown/consumer_filter_test.go @@ -48,7 +48,8 @@ func TestConusmerFilterInvoke(t *testing.T) { config.SetRootConfig(*rootConfig) - filter := extension.GetFilter(constant.GracefulShutdownConsumerFilterKey).(*consumerGracefulShutdownFilter) + filterValue, _ := extension.GetFilter(constant.GracefulShutdownConsumerFilterKey) + filter := filterValue.(*consumerGracefulShutdownFilter) filter.Set(constant.GracefulShutdownFilterShutdownConfig, config.GetShutDown()) assert.Equal(t, filter.shutdownConfig, config.GetShutDown()) diff --git a/filter/graceful_shutdown/procider_filter_test.go b/filter/graceful_shutdown/procider_filter_test.go index 48a44ba042..086373aea2 100644 --- a/filter/graceful_shutdown/procider_filter_test.go +++ b/filter/graceful_shutdown/procider_filter_test.go @@ -56,7 +56,8 @@ func TestProviderFilterInvoke(t *testing.T) { config.SetRootConfig(*rootConfig) - filter := extension.GetFilter(constant.GracefulShutdownProviderFilterKey).(*providerGracefulShutdownFilter) + filterValue, _ := extension.GetFilter(constant.GracefulShutdownProviderFilterKey) + filter := filterValue.(*providerGracefulShutdownFilter) filter.Set(constant.GracefulShutdownFilterShutdownConfig, config.GetShutDown()) assert.Equal(t, filter.shutdownConfig, config.GetShutDown()) diff --git a/filter/sentinel/filter_test.go b/filter/sentinel/filter_test.go index e981513529..36d9fa730f 100644 --- a/filter/sentinel/filter_test.go +++ b/filter/sentinel/filter_test.go @@ -22,7 +22,6 @@ import ( "sync" "sync/atomic" "testing" - "time" ) import ( @@ -81,9 +80,9 @@ func TestSentinelFilter_QPS(t *testing.T) { }() } wg.Wait() - time.Sleep(time.Second) - assert.True(t, atomic.LoadInt64(&pass) == 100) - assert.True(t, atomic.LoadInt64(&block) == 200) + // todo sentinel can't assure the passed count is 100, sometimes is 101 + assert.True(t, atomic.LoadInt64(&pass) <= 105 && atomic.LoadInt64(&pass) >= 95) + assert.True(t, atomic.LoadInt64(&block) <= 205 && atomic.LoadInt64(&block) >= 195) } func TestConsumerFilter_Invoke(t *testing.T) { diff --git a/protocol/protocolwrapper/protocol_filter_wrapper.go b/protocol/protocolwrapper/protocol_filter_wrapper.go index 2ad6cb0b85..e5f8165b69 100644 --- a/protocol/protocolwrapper/protocol_filter_wrapper.go +++ b/protocol/protocolwrapper/protocol_filter_wrapper.go @@ -82,7 +82,7 @@ func BuildInvokerChain(invoker protocol.Invoker, key string) protocol.Invoker { // The order of filters is from left to right, so loading from right to left next := invoker for i := len(filterNames) - 1; i >= 0; i-- { - flt := extension.GetFilter(strings.TrimSpace(filterNames[i])) + flt, _ := extension.GetFilter(strings.TrimSpace(filterNames[i])) fi := &FilterInvoker{next: next, invoker: invoker, filter: flt} next = fi } diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index 6b85a76cf0..bfac03d48b 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -409,6 +409,11 @@ func (proto *registryProtocol) Destroy() { } return true }) + + proto.registries.Range(func(key, value interface{}) bool { + proto.registries.Delete(key) + return true + }) } func getRegistryUrl(invoker protocol.Invoker) *common.URL { diff --git a/registry/protocol/protocol_test.go b/registry/protocol/protocol_test.go index 7e0713487e..9c932700f1 100644 --- a/registry/protocol/protocol_test.go +++ b/registry/protocol/protocol_test.go @@ -204,7 +204,7 @@ func TestOneRegAndProtoExporter(t *testing.T) { assert.Equal(t, count2, 1) } -func TestDestry(t *testing.T) { +func TestDestroy(t *testing.T) { regProtocol := newRegistryProtocol() referNormal(t, regProtocol) exporterNormal(t, regProtocol) @@ -216,14 +216,14 @@ func TestDestry(t *testing.T) { count++ return true }) - assert.Equal(t, count, 0) + assert.Equal(t, 0, count) var count2 int regProtocol.bounds.Range(func(key, value interface{}) bool { count2++ return true }) - assert.Equal(t, count2, 0) + assert.Equal(t, 0, count2) } func TestExportWithOverrideListener(t *testing.T) { @@ -232,7 +232,7 @@ func TestExportWithOverrideListener(t *testing.T) { regProtocol := newRegistryProtocol() url := exporterNormal(t, regProtocol) var reg *registry.MockRegistry - if regI, loaded := regProtocol.registries.Load(url.Key()); loaded { + if regI, loaded := regProtocol.registries.Load(url.Location); loaded { reg = regI.(*registry.MockRegistry) } else { assert.Fail(t, "regProtocol.registries.Load can not be loaded") @@ -259,7 +259,7 @@ func TestExportWithServiceConfig(t *testing.T) { common_cfg.GetEnvInstance().SetDynamicConfiguration(dc) regProtocol := newRegistryProtocol() url := exporterNormal(t, regProtocol) - if _, loaded := regProtocol.registries.Load(url.Key()); !loaded { + if _, loaded := regProtocol.registries.Load(url.Location); !loaded { assert.Fail(t, "regProtocol.registries.Load can not be loaded") return } @@ -282,7 +282,7 @@ func TestExportWithApplicationConfig(t *testing.T) { common_cfg.GetEnvInstance().SetDynamicConfiguration(dc) regProtocol := newRegistryProtocol() url := exporterNormal(t, regProtocol) - if _, loaded := regProtocol.registries.Load(url.Key()); !loaded { + if _, loaded := regProtocol.registries.Load(url.Location); !loaded { assert.Fail(t, "regProtocol.registries.Load can not be loaded") return } From 97fb3d51d90ea2ac12eed49a97a095f839dc8b94 Mon Sep 17 00:00:00 2001 From: LaurenceLiZhixin <382673304@qq.com> Date: Sun, 16 Jan 2022 19:00:03 +0800 Subject: [PATCH 11/15] ftr: finish graceful shugdown --- common/constant/default.go | 5 ++- config/graceful_shutdown.go | 28 +++++++++--- config/graceful_shutdown_config.go | 50 +++++++++++++++------ config/graceful_shutdown_config_test.go | 1 - config/root_config.go | 5 ++- filter/graceful_shutdown/consumer_filter.go | 10 +---- filter/graceful_shutdown/provider_filter.go | 5 ++- protocol/dubbo3/dubbo3_exporter.go | 3 +- registry/protocol/protocol.go | 14 +++--- 9 files changed, 80 insertions(+), 41 deletions(-) diff --git a/common/constant/default.go b/common/constant/default.go index 70a7eba50d..136f7d3cf3 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -60,8 +60,9 @@ const ( // that put the AdaptiveServiceProviderFilterKey at the end. DefaultServiceFilters = EchoFilterKey + "," + MetricsFilterKey + "," + TokenFilterKey + "," + AccessLogFilterKey + "," + TpsLimitFilterKey + "," + - GenericServiceFilterKey + "," + ExecuteLimitFilterKey + "," + GracefulShutdownProviderFilterKey + "," + - AdaptiveServiceProviderFilterKey + GenericServiceFilterKey + "," + ExecuteLimitFilterKey + "," + GracefulShutdownProviderFilterKey + //+ "," + + // AdaptiveServiceProviderFilterKey DefaultReferenceFilters = GracefulShutdownConsumerFilterKey ) diff --git a/config/graceful_shutdown.go b/config/graceful_shutdown.go index 941aa1e573..0e27ceeeb6 100644 --- a/config/graceful_shutdown.go +++ b/config/graceful_shutdown.go @@ -59,7 +59,7 @@ func gracefulShutdownInit() { if !existcGracefulShutdownFilter { return } - sGracefulShutdownFilter, existsGracefulShutdownFilter := extension.GetFilter(constant.GracefulShutdownConsumerFilterKey) + sGracefulShutdownFilter, existsGracefulShutdownFilter := extension.GetFilter(constant.GracefulShutdownProviderFilterKey) if !existsGracefulShutdownFilter { return } @@ -164,13 +164,28 @@ func waitAndAcceptNewRequests() { return } - timeout := rootConfig.Shutdown.GetStepTimeout() + time.Sleep(rootConfig.Shutdown.GetConsumerUpdateWaitTime()) + timeout := rootConfig.Shutdown.GetStepTimeout() // ignore this step if timeout < 0 { return } - time.Sleep(timeout) + waitingProviderProcessedTimeout(rootConfig.Shutdown) +} + +func waitingProviderProcessedTimeout(shutdownConfig *ShutdownConfig) { + timeout := shutdownConfig.GetStepTimeout() + if timeout <= 0 { + return + } + deadline := time.Now().Add(timeout) + + for time.Now().Before(deadline) && shutdownConfig.ProviderActiveCount > 0 { + // sleep 10 ms and then we check it again + time.Sleep(10 * time.Millisecond) + logger.Infof("waiting for provider active invocation count = %d", shutdownConfig.ProviderActiveCount) + } } //for provider. It will wait for processing receiving requests @@ -181,19 +196,20 @@ func waitForSendingAndReceivingRequests() { return } rootConfig.Shutdown.RejectRequest = true - waitingProcessedTimeout(rootConfig.Shutdown) + waitingConsumerProcessedTimeout(rootConfig.Shutdown) } -func waitingProcessedTimeout(shutdownConfig *ShutdownConfig) { +func waitingConsumerProcessedTimeout(shutdownConfig *ShutdownConfig) { timeout := shutdownConfig.GetStepTimeout() if timeout <= 0 { return } deadline := time.Now().Add(timeout) - for time.Now().Before(deadline) && !shutdownConfig.RequestsFinished { + for time.Now().Before(deadline) && shutdownConfig.ConsumerActiveCount > 0 { // sleep 10 ms and then we check it again time.Sleep(10 * time.Millisecond) + logger.Infof("waiting for consumer active invocation count = %d", shutdownConfig.ConsumerActiveCount) } } diff --git a/config/graceful_shutdown_config.go b/config/graceful_shutdown_config.go index 2760e35d18..cab02ce41e 100644 --- a/config/graceful_shutdown_config.go +++ b/config/graceful_shutdown_config.go @@ -21,14 +21,19 @@ import ( "time" ) +import ( + "github.com/creasty/defaults" +) + import ( "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/logger" ) const ( - defaultTimeout = 60 * time.Second - defaultStepTimeout = 3 * time.Second + defaultTimeout = 60 * time.Second + defaultStepTimeout = 3 * time.Second + defaultConsumerUpdateWaitTime = 3 * time.Second ) // ShutdownConfig is used as configuration for graceful shutdown @@ -47,16 +52,23 @@ type ShutdownConfig struct { * and the 99.9% requests will return response in 2s, so the StepTimeout will be bigger than(10+2) * 1000ms, * maybe (10 + 2*3) * 1000ms is a good choice. */ - StepTimeout string `default:"3s" yaml:"step_timeout" json:"step.timeout,omitempty" property:"step.timeout"` + StepTimeout string `default:"3s" yaml:"step-timeout" json:"step.timeout,omitempty" property:"step.timeout"` + + /* + * ConsumerUpdateWaitTime means when provider is shutting down, after the unregister, time to wait for client to + * update invokers. During this time, incoming invocation can be treated normally. + */ + ConsumerUpdateWaitTime string `default:"3s" yaml:"consumer-update-wait-time" json:"consumerUpdate.waitTIme,omitempty" property:"consumerUpdate.waitTIme"` // when we try to shutdown the applicationConfig, we will reject the new requests. In most cases, you don't need to configure this. - RejectRequestHandler string `yaml:"reject_handler" json:"reject_handler,omitempty" property:"reject_handler"` + RejectRequestHandler string `yaml:"reject-handler" json:"reject-handler,omitempty" property:"reject_handler"` + // internal listen kill signal,the default is true. + InternalSignal bool `default:"true" yaml:"internal-signal" json:"internal.signal,omitempty" property:"internal.signal"` + // true -> new request will be rejected. RejectRequest bool - // true -> all requests had been processed. In provider side it means that all requests are returned response to clients - // In consumer side, it means that all requests getting response from servers - RequestsFinished bool - // internal listen kill signal,the default is true. - InternalSignal bool `default:"true" yaml:"internal_signal" json:"internal.signal,omitempty" property:"internal.signal"` + // active invocation + ConsumerActiveCount int32 + ProviderActiveCount int32 } // Prefix dubbo.shutdown @@ -86,6 +98,20 @@ func (config *ShutdownConfig) GetStepTimeout() time.Duration { return result } +func (config *ShutdownConfig) GetConsumerUpdateWaitTime() time.Duration { + result, err := time.ParseDuration(config.ConsumerUpdateWaitTime) + if err != nil { + logger.Errorf("The ConsumerUpdateTimeout configuration is invalid: %s, and we will use the default value: %s, err: %v", + config.ConsumerActiveCount, defaultConsumerUpdateWaitTime.String(), err) + return defaultConsumerUpdateWaitTime + } + return result +} + +func (config *ShutdownConfig) Init() error { + return defaults.Set(config) +} + type ShutdownConfigBuilder struct { shutdownConfig *ShutdownConfig } @@ -109,11 +135,6 @@ func (scb *ShutdownConfigBuilder) SetRejectRequestHandler(rejectRequestHandler s return scb } -func (scb *ShutdownConfigBuilder) SetRequestsFinished(requestsFinished bool) *ShutdownConfigBuilder { - scb.shutdownConfig.RequestsFinished = requestsFinished - return scb -} - func (scb *ShutdownConfigBuilder) SetRejectRequest(rejectRequest bool) *ShutdownConfigBuilder { scb.shutdownConfig.RejectRequest = rejectRequest return scb @@ -125,5 +146,6 @@ func (scb *ShutdownConfigBuilder) SetInternalSignal(internalSignal bool) *Shutdo } func (scb *ShutdownConfigBuilder) Build() *ShutdownConfig { + defaults.Set(scb) return scb.shutdownConfig } diff --git a/config/graceful_shutdown_config_test.go b/config/graceful_shutdown_config_test.go index 43d2fc3c1b..1c77a2bb7b 100644 --- a/config/graceful_shutdown_config_test.go +++ b/config/graceful_shutdown_config_test.go @@ -29,7 +29,6 @@ import ( func TestShutdownConfigGetTimeout(t *testing.T) { config := ShutdownConfig{} assert.False(t, config.RejectRequest) - assert.False(t, config.RequestsFinished) config = ShutdownConfig{ Timeout: "60s", diff --git a/config/root_config.go b/config/root_config.go index affeffb449..e143b11dc3 100644 --- a/config/root_config.go +++ b/config/root_config.go @@ -218,6 +218,9 @@ func (rc *RootConfig) Init() error { if err := rc.Consumer.Init(rc); err != nil { return err } + if err := rc.Shutdown.Init(); err != nil { + return err + } // todo if we can remove this from Init in the future? rc.Start() return nil @@ -225,12 +228,12 @@ func (rc *RootConfig) Init() error { func (rc *RootConfig) Start() { startOnce.Do(func() { + gracefulShutdownInit() rc.Consumer.Load() rc.Provider.Load() // todo if register consumer instance or has exported services exportMetadataService() registerServiceInstance() - gracefulShutdownInit() }) } diff --git a/filter/graceful_shutdown/consumer_filter.go b/filter/graceful_shutdown/consumer_filter.go index 9471d44783..65ceda306c 100644 --- a/filter/graceful_shutdown/consumer_filter.go +++ b/filter/graceful_shutdown/consumer_filter.go @@ -46,7 +46,6 @@ func init() { } type consumerGracefulShutdownFilter struct { - activeCount int32 shutdownConfig *config.ShutdownConfig } @@ -54,7 +53,6 @@ func newConsumerGracefulShutdownFilter() filter.Filter { if csf == nil { csfOnce.Do(func() { csf = &consumerGracefulShutdownFilter{} - }) } return csf @@ -62,17 +60,13 @@ func newConsumerGracefulShutdownFilter() filter.Filter { // Invoke adds the requests count and block the new requests if application is closing func (f *consumerGracefulShutdownFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { - atomic.AddInt32(&f.activeCount, 1) + atomic.AddInt32(&f.shutdownConfig.ConsumerActiveCount, 1) return invoker.Invoke(ctx, invocation) } // OnResponse reduces the number of active processes then return the process result func (f *consumerGracefulShutdownFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { - atomic.AddInt32(&f.activeCount, -1) - // although this isn't thread safe, it won't be a problem if the f.rejectNewRequest() is true. - if f.shutdownConfig != nil && f.shutdownConfig.RejectRequest && f.activeCount <= 0 { - f.shutdownConfig.RequestsFinished = true - } + atomic.AddInt32(&f.shutdownConfig.ConsumerActiveCount, -1) return result } diff --git a/filter/graceful_shutdown/provider_filter.go b/filter/graceful_shutdown/provider_filter.go index 56bbedb2a7..85f0c487fe 100644 --- a/filter/graceful_shutdown/provider_filter.go +++ b/filter/graceful_shutdown/provider_filter.go @@ -20,6 +20,7 @@ package graceful_shutdown import ( "context" "sync" + "sync/atomic" ) import ( @@ -51,7 +52,6 @@ func newProviderGracefulShutdownFilter() filter.Filter { if psf == nil { psfOnce.Do(func() { psf = &providerGracefulShutdownFilter{} - }) } return psf @@ -63,12 +63,13 @@ func (f *providerGracefulShutdownFilter) Invoke(ctx context.Context, invoker pro logger.Info("The application is closing, new request will be rejected.") return f.getRejectHandler().RejectedExecution(invoker.GetURL(), invocation) } + atomic.AddInt32(&f.shutdownConfig.ProviderActiveCount, 1) return invoker.Invoke(ctx, invocation) } // OnResponse reduces the number of active processes then return the process result func (f *providerGracefulShutdownFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { - + atomic.AddInt32(&f.shutdownConfig.ProviderActiveCount, -1) return result } diff --git a/protocol/dubbo3/dubbo3_exporter.go b/protocol/dubbo3/dubbo3_exporter.go index e82423833a..a8ac595433 100644 --- a/protocol/dubbo3/dubbo3_exporter.go +++ b/protocol/dubbo3/dubbo3_exporter.go @@ -50,10 +50,9 @@ func NewDubboExporter(key string, invoker protocol.Invoker, exporterMap *sync.Ma // Unexport unexport dubbo3 service exporter. func (de *DubboExporter) Unexport() { url := de.GetInvoker().GetURL() - serviceId := url.GetParam(constant.BeanNameKey, "") interfaceName := url.GetParam(constant.InterfaceKey, "") de.BaseExporter.Unexport() - err := common.ServiceMap.UnRegister(interfaceName, tripleConstant.TRIPLE, serviceId) + err := common.ServiceMap.UnRegister(interfaceName, tripleConstant.TRIPLE, url.ServiceKey()) if err != nil { logger.Errorf("[DubboExporter.Unexport] error: %v", err) } diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index bfac03d48b..c1f22ee478 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -402,11 +402,15 @@ func (proto *registryProtocol) Destroy() { } // TODO unsubscribeUrl - select { - case <-time.After(config.GetShutDown().GetStepTimeout()): - exporter.Unexport() - proto.bounds.Delete(key) - } + // close all protocol server after consumerUpdateWait + stepTimeout(max time wait during + // waitAndAcceptNewRequests procedure) + go func() { + select { + case <-time.After(config.GetShutDown().GetStepTimeout() + config.GetShutDown().GetConsumerUpdateWaitTime()): + exporter.Unexport() + proto.bounds.Delete(key) + } + }() return true }) From 0a137fd2d91f1f698b510c397eea5d4cfd601f74 Mon Sep 17 00:00:00 2001 From: LaurenceLiZhixin <382673304@qq.com> Date: Sun, 16 Jan 2022 19:05:43 +0800 Subject: [PATCH 12/15] fix: update triple --- common/constant/default.go | 5 ++--- go.mod | 2 +- go.sum | 4 ++-- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/common/constant/default.go b/common/constant/default.go index 136f7d3cf3..70a7eba50d 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -60,9 +60,8 @@ const ( // that put the AdaptiveServiceProviderFilterKey at the end. DefaultServiceFilters = EchoFilterKey + "," + MetricsFilterKey + "," + TokenFilterKey + "," + AccessLogFilterKey + "," + TpsLimitFilterKey + "," + - GenericServiceFilterKey + "," + ExecuteLimitFilterKey + "," + GracefulShutdownProviderFilterKey - //+ "," + - // AdaptiveServiceProviderFilterKey + GenericServiceFilterKey + "," + ExecuteLimitFilterKey + "," + GracefulShutdownProviderFilterKey + "," + + AdaptiveServiceProviderFilterKey DefaultReferenceFilters = GracefulShutdownConsumerFilterKey ) diff --git a/go.mod b/go.mod index 69aba07e9e..f1ccb1b1a9 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/dubbogo/go-zookeeper v1.0.4-0.20211212162352-f9d2183d89d5 github.com/dubbogo/gost v1.11.22 github.com/dubbogo/grpc-go v1.42.7 - github.com/dubbogo/triple v1.1.7 + github.com/dubbogo/triple v1.1.8-rc2 github.com/emicklei/go-restful/v3 v3.7.3 github.com/fsnotify/fsnotify v1.5.1 github.com/ghodss/yaml v1.0.0 diff --git a/go.sum b/go.sum index 1b79c356d3..7af0a7d688 100644 --- a/go.sum +++ b/go.sum @@ -196,8 +196,8 @@ github.com/dubbogo/grpc-go v1.42.7/go.mod h1:F1T9hnUvYGW4JLK1QNriavpOkhusU677ovP github.com/dubbogo/jsonparser v1.0.1/go.mod h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU= github.com/dubbogo/net v0.0.4/go.mod h1:1CGOnM7X3he+qgGNqjeADuE5vKZQx/eMSeUkpU3ujIc= github.com/dubbogo/triple v1.0.9/go.mod h1:1t9me4j4CTvNDcsMZy6/OGarbRyAUSY0tFXGXHCp7Iw= -github.com/dubbogo/triple v1.1.7 h1:6KdOUJYwTaCrXtosir2eoQ++JzP5AGy/0zuE4UXZAG4= -github.com/dubbogo/triple v1.1.7/go.mod h1:7IZLmz0sWZuceYM4urzyZemIcqvoksrFq4CZZ8/wBjQ= +github.com/dubbogo/triple v1.1.8-rc2 h1:taPtjwH6J7WeixkoIjufd3mpYc1IESXMpgoH9gzhlw4= +github.com/dubbogo/triple v1.1.8-rc2/go.mod h1:7IZLmz0sWZuceYM4urzyZemIcqvoksrFq4CZZ8/wBjQ= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= From 6d34c665bc853819d44104b40fb32e3ab6203c95 Mon Sep 17 00:00:00 2001 From: LaurenceLiZhixin <382673304@qq.com> Date: Sun, 16 Jan 2022 19:13:18 +0800 Subject: [PATCH 13/15] fix: add nolint --- protocol/dubbo3/reflection/serverreflection.go | 1 + 1 file changed, 1 insertion(+) diff --git a/protocol/dubbo3/reflection/serverreflection.go b/protocol/dubbo3/reflection/serverreflection.go index 74c7a92a39..629538fe58 100644 --- a/protocol/dubbo3/reflection/serverreflection.go +++ b/protocol/dubbo3/reflection/serverreflection.go @@ -22,6 +22,7 @@ The service implemented is defined in: https://github.com/grpc/grpc/blob/master/src/proto/grpc/reflection/v1alpha/reflection.proto. */ +// nolint package reflection import ( From e8677d166c41efacae949b5c864bd7e13e0afb50 Mon Sep 17 00:00:00 2001 From: LaurenceLiZhixin <382673304@qq.com> Date: Sun, 16 Jan 2022 19:23:53 +0800 Subject: [PATCH 14/15] fix: lint --- protocol/dubbo3/dubbo3_invoker.go | 2 +- protocol/dubbo3/reflection/serverreflection.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/protocol/dubbo3/dubbo3_invoker.go b/protocol/dubbo3/dubbo3_invoker.go index bc07957bed..1e33bf2eb0 100644 --- a/protocol/dubbo3/dubbo3_invoker.go +++ b/protocol/dubbo3/dubbo3_invoker.go @@ -174,7 +174,7 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati for _, k := range attachmentKey { if v := di.GetURL().GetParam(k, ""); len(v) > 0 { - invocation.SetAttachments(k, v) + invocation.SetAttachment(k, v) } } diff --git a/protocol/dubbo3/reflection/serverreflection.go b/protocol/dubbo3/reflection/serverreflection.go index 629538fe58..74c7a92a39 100644 --- a/protocol/dubbo3/reflection/serverreflection.go +++ b/protocol/dubbo3/reflection/serverreflection.go @@ -22,7 +22,6 @@ The service implemented is defined in: https://github.com/grpc/grpc/blob/master/src/proto/grpc/reflection/v1alpha/reflection.proto. */ -// nolint package reflection import ( From 39916bb2566ced247f339975af867953f7e9540a Mon Sep 17 00:00:00 2001 From: LaurenceLiZhixin <382673304@qq.com> Date: Sun, 16 Jan 2022 19:39:46 +0800 Subject: [PATCH 15/15] fix: ut --- config/testdata/consumer_config.yml | 2 +- config/testdata/consumer_config_with_configcenter.yml | 2 +- config/testdata/consumer_config_withoutProtocol.yml | 2 +- config/testdata/provider_config.yml | 2 +- config/testdata/provider_config_withoutProtocol.yml | 2 +- protocol/rest/config/reader/testdata/consumer_config.yml | 2 +- registry/protocol/protocol_test.go | 7 ------- 7 files changed, 6 insertions(+), 13 deletions(-) diff --git a/config/testdata/consumer_config.yml b/config/testdata/consumer_config.yml index 0c47782357..f646e3cdae 100644 --- a/config/testdata/consumer_config.yml +++ b/config/testdata/consumer_config.yml @@ -53,7 +53,7 @@ references: shutdown_conf: timeout: 60s - step_timeout: 10s + step-timeout: 10s protocol_conf: # when you choose the Dubbo protocol, the following configuration takes effect diff --git a/config/testdata/consumer_config_with_configcenter.yml b/config/testdata/consumer_config_with_configcenter.yml index 1f15830baa..3f552aa66b 100644 --- a/config/testdata/consumer_config_with_configcenter.yml +++ b/config/testdata/consumer_config_with_configcenter.yml @@ -19,7 +19,7 @@ references: shutdown_conf: timeout: 60s - step_timeout: 10s + step-timeout: 10s protocol_conf: dubbo: diff --git a/config/testdata/consumer_config_withoutProtocol.yml b/config/testdata/consumer_config_withoutProtocol.yml index 669120e2ad..2432f96598 100644 --- a/config/testdata/consumer_config_withoutProtocol.yml +++ b/config/testdata/consumer_config_withoutProtocol.yml @@ -50,7 +50,7 @@ references: shutdown_conf: timeout: 60s - step_timeout: 10s + step-timeout: 10s protocol_conf: dubbo: diff --git a/config/testdata/provider_config.yml b/config/testdata/provider_config.yml index 843a500de0..4d81e5c0c0 100644 --- a/config/testdata/provider_config.yml +++ b/config/testdata/provider_config.yml @@ -73,7 +73,7 @@ protocols: shutdown_conf: timeout: 60s - step_timeout: 10s + step-timeout: 10s protocol_conf: dubbo: diff --git a/config/testdata/provider_config_withoutProtocol.yml b/config/testdata/provider_config_withoutProtocol.yml index b3d8921f9e..f88b6461ee 100644 --- a/config/testdata/provider_config_withoutProtocol.yml +++ b/config/testdata/provider_config_withoutProtocol.yml @@ -53,7 +53,7 @@ protocols: shutdown_conf: timeout: 60s - step_timeout: 10s + step-timeout: 10s protocol_conf: dubbo: diff --git a/protocol/rest/config/reader/testdata/consumer_config.yml b/protocol/rest/config/reader/testdata/consumer_config.yml index 127bc966e7..fa4ce3bd6b 100644 --- a/protocol/rest/config/reader/testdata/consumer_config.yml +++ b/protocol/rest/config/reader/testdata/consumer_config.yml @@ -70,5 +70,5 @@ references: shutdown_conf: timeout: 60s - step_timeout: 10s + step-timeout: 10s diff --git a/registry/protocol/protocol_test.go b/registry/protocol/protocol_test.go index 9c932700f1..7ed3711f92 100644 --- a/registry/protocol/protocol_test.go +++ b/registry/protocol/protocol_test.go @@ -217,13 +217,6 @@ func TestDestroy(t *testing.T) { return true }) assert.Equal(t, 0, count) - - var count2 int - regProtocol.bounds.Range(func(key, value interface{}) bool { - count2++ - return true - }) - assert.Equal(t, 0, count2) } func TestExportWithOverrideListener(t *testing.T) {