Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature-graceful-shutdwon #1675

Merged
merged 17 commits into from
Jan 17, 2022
6 changes: 3 additions & 3 deletions common/extension/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ func SetFilter(name string, v func() filter.Filter) {
}

// GetFilter finds the filter extension with @name
func GetFilter(name string) filter.Filter {
func GetFilter(name string) (filter.Filter, bool) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

原来返回一个参数感觉更舒服,panic去掉就可以了

if filters[name] == nil {
panic("filter for " + name + " is not existing, make sure you have imported the package.")
return nil, false
}
return filters[name]()
return filters[name](), true
}

// SetRejectedExecutionHandler sets the RejectedExecutionHandler with @name
Expand Down
78 changes: 50 additions & 28 deletions config/graceful_shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,36 +53,48 @@ import (
*/
const defaultShutDownTime = time.Second * 60

// GracefulShutdownInit todo GracefulShutdownInit in 3.0 should be discusesed.
func GracefulShutdownInit() {
signals := make(chan os.Signal, 1)

signal.Notify(signals, ShutdownSignals...)

func gracefulShutdownInit() {
// retrieve ShutdownConfig for gracefulShutdownFilter
if filter, ok := extension.GetFilter(constant.GracefulShutdownConsumerFilterKey).(Setter); ok && rootConfig.Shutdown != nil {
filter.Set(constant.GracefulShutdownFilterShutdownConfig, rootConfig.Shutdown)
cGracefulShutdownFilter, existcGracefulShutdownFilter := extension.GetFilter(constant.GracefulShutdownConsumerFilterKey)
if !existcGracefulShutdownFilter {
return
}
sGracefulShutdownFilter, existsGracefulShutdownFilter := extension.GetFilter(constant.GracefulShutdownConsumerFilterKey)
if !existsGracefulShutdownFilter {
return
}
if filter, ok := cGracefulShutdownFilter.(Setter); ok && rootConfig.Shutdown != nil {
filter.Set(constant.GracefulShutdownFilterShutdownConfig, GetShutDown())
}

go func() {
select {
case sig := <-signals:
logger.Infof("get signal %s, applicationConfig will shutdown.", sig)
// gracefulShutdownOnce.Do(func() {
time.AfterFunc(totalTimeout(), func() {
logger.Warn("Shutdown gracefully timeout, applicationConfig will shutdown immediately. ")
os.Exit(0)
})
BeforeShutdown()
// those signals' original behavior is exit with dump ths stack, so we try to keep the behavior
for _, dumpSignal := range DumpHeapShutdownSignals {
if sig == dumpSignal {
debug.WriteHeapDump(os.Stdout.Fd())
if filter, ok := sGracefulShutdownFilter.(Setter); ok && rootConfig.Shutdown != nil {
filter.Set(constant.GracefulShutdownFilterShutdownConfig, GetShutDown())
}

if GetShutDown().InternalSignal {
signals := make(chan os.Signal, 1)
signal.Notify(signals, ShutdownSignals...)

go func() {
select {
case sig := <-signals:
logger.Infof("get signal %s, applicationConfig will shutdown.", sig)
// gracefulShutdownOnce.Do(func() {
time.AfterFunc(totalTimeout(), func() {
logger.Warn("Shutdown gracefully timeout, applicationConfig will shutdown immediately. ")
os.Exit(0)
})
BeforeShutdown()
// those signals' original behavior is exit with dump ths stack, so we try to keep the behavior
for _, dumpSignal := range DumpHeapShutdownSignals {
if sig == dumpSignal {
debug.WriteHeapDump(os.Stdout.Fd())
}
}
os.Exit(0)
}
os.Exit(0)
}
}()
}()
}
}

// BeforeShutdown provides processing flow before shutdown
Expand Down Expand Up @@ -115,22 +127,32 @@ func destroyAllRegistries() {
// First we destroy provider's protocols, and then we destroy the consumer protocols.
func destroyProtocols() {
logger.Info("Graceful shutdown --- Destroy protocols. ")
logger.Info("Graceful shutdown --- First destroy provider's protocols. ")

consumerProtocols := getConsumerProtocols()
if rootConfig.Protocols == nil {
return
}

consumerProtocols := getConsumerProtocols()

destroyProviderProtocols(consumerProtocols)
destroyConsumerProtocols(consumerProtocols)
}

// destroyProviderProtocols destroys the provider's protocol.
// if the protocol is consumer's protocol too, we will keep it
func destroyProviderProtocols(consumerProtocols *gxset.HashSet) {
logger.Info("Graceful shutdown --- First destroy provider's protocols. ")
for _, protocol := range rootConfig.Protocols {
// the protocol is the consumer's protocol too, we can not destroy it.
if consumerProtocols.Contains(protocol.Name) {
continue
}
extension.GetProtocol(protocol.Name).Destroy()
}
}

logger.Info("Graceful shutdown --- Second destroy consumer's protocols. ")
func destroyConsumerProtocols(consumerProtocols *gxset.HashSet) {
logger.Info("Graceful shutdown --- Second Destroy consumer's protocols. ")
for name := range consumerProtocols.Items {
extension.GetProtocol(name.(string)).Destroy()
}
Expand Down
11 changes: 9 additions & 2 deletions config/graceful_shutdown_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

const (
defaultTimeout = 60 * time.Second
defaultStepTimeout = 10 * time.Second
defaultStepTimeout = 3 * time.Second
)

// ShutdownConfig is used as configuration for graceful shutdown
Expand All @@ -47,14 +47,16 @@ type ShutdownConfig struct {
* and the 99.9% requests will return response in 2s, so the StepTimeout will be bigger than(10+2) * 1000ms,
* maybe (10 + 2*3) * 1000ms is a good choice.
*/
StepTimeout string `default:"10s" yaml:"step_timeout" json:"step.timeout,omitempty" property:"step.timeout"`
StepTimeout string `default:"3s" yaml:"step_timeout" json:"step.timeout,omitempty" property:"step.timeout"`
// when we try to shutdown the applicationConfig, we will reject the new requests. In most cases, you don't need to configure this.
RejectRequestHandler string `yaml:"reject_handler" json:"reject_handler,omitempty" property:"reject_handler"`
// true -> new request will be rejected.
RejectRequest bool
// true -> all requests had been processed. In provider side it means that all requests are returned response to clients
// In consumer side, it means that all requests getting response from servers
RequestsFinished bool
// internal listen kill signal,the default is true.
InternalSignal bool `default:"true" yaml:"internal_signal" json:"internal.signal,omitempty" property:"internal.signal"`
}

// Prefix dubbo.shutdown
Expand Down Expand Up @@ -117,6 +119,11 @@ func (scb *ShutdownConfigBuilder) SetRejectRequest(rejectRequest bool) *Shutdown
return scb
}

func (scb *ShutdownConfigBuilder) SetInternalSignal(internalSignal bool) *ShutdownConfigBuilder {
scb.shutdownConfig.InternalSignal = internalSignal
return scb
}

func (scb *ShutdownConfigBuilder) Build() *ShutdownConfig {
return scb.shutdownConfig
}
17 changes: 17 additions & 0 deletions config/root_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,16 @@ func GetApplicationConfig() *ApplicationConfig {
return rootConfig.Application
}

func GetShutDown() *ShutdownConfig {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这样写也许更整洁点
if err := check(); err == nil && rootConfig.Shutdown != nil {
return rootConfig.Shutdown
}
return NewShutDownConfigBuilder().Build()

if err := check(); err != nil {
return NewShutDownConfigBuilder().Build()
}
if rootConfig.Shutdown != nil {
return rootConfig.Shutdown
}
return NewShutDownConfigBuilder().Build()
}

// getRegistryIds get registry ids
func (rc *RootConfig) getRegistryIds() []string {
ids := make([]string, 0)
Expand Down Expand Up @@ -220,6 +230,7 @@ func (rc *RootConfig) Start() {
// todo if register consumer instance or has exported services
exportMetadataService()
registerServiceInstance()
gracefulShutdownInit()
})
}

Expand All @@ -237,6 +248,7 @@ func newEmptyRootConfig() *RootConfig {
Metric: NewMetricConfigBuilder().Build(),
Logger: NewLoggerConfigBuilder().Build(),
Custom: NewCustomConfigBuilder().Build(),
Shutdown: NewShutDownConfigBuilder().Build(),
}
return newRootConfig
}
Expand Down Expand Up @@ -329,6 +341,11 @@ func (rb *RootConfigBuilder) SetCustom(customConfig *CustomConfig) *RootConfigBu
return rb
}

func (rb *RootConfigBuilder) SetShutDown(shutDownConfig *ShutdownConfig) *RootConfigBuilder {
rb.rootConfig.Shutdown = shutDownConfig
return rb
}

func (rb *RootConfigBuilder) Build() *RootConfig {
return rb.rootConfig
}
Expand Down
2 changes: 2 additions & 0 deletions config/service_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"container/list"
"fmt"
"net/url"
"os"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -403,6 +404,7 @@ func (svc *ServiceConfig) getUrlMap() url.Values {

// whether to export or not
urlMap.Set(constant.ExportKey, strconv.FormatBool(svc.export))
urlMap.Set(constant.PIDKey, fmt.Sprintf("%d", os.Getpid()))

for _, v := range svc.Methods {
prefix := "methods." + v.Name + "."
Expand Down
2 changes: 1 addition & 1 deletion filter/filter_impl/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
_ "dubbo.apache.org/dubbo-go/v3/filter/echo"
_ "dubbo.apache.org/dubbo-go/v3/filter/exec_limit"
_ "dubbo.apache.org/dubbo-go/v3/filter/generic"
_ "dubbo.apache.org/dubbo-go/v3/filter/gshutdown"
_ "dubbo.apache.org/dubbo-go/v3/filter/graceful_shutdown"
_ "dubbo.apache.org/dubbo-go/v3/filter/hystrix"
_ "dubbo.apache.org/dubbo-go/v3/filter/metrics"
_ "dubbo.apache.org/dubbo-go/v3/filter/seata"
Expand Down
90 changes: 90 additions & 0 deletions filter/graceful_shutdown/consumer_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package graceful_shutdown

import (
"context"
"sync"
"sync/atomic"
)

import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/filter"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

var (
csfOnce sync.Once
csf *consumerGracefulShutdownFilter
)

func init() {
// `init()` is performed before config.Load(), so shutdownConfig will be retrieved after config was loaded.
extension.SetFilter(constant.GracefulShutdownConsumerFilterKey, func() filter.Filter {
return newConsumerGracefulShutdownFilter()
})

}

type consumerGracefulShutdownFilter struct {
activeCount int32
shutdownConfig *config.ShutdownConfig
}

func newConsumerGracefulShutdownFilter() filter.Filter {
if csf == nil {
csfOnce.Do(func() {
csf = &consumerGracefulShutdownFilter{}

})
}
return csf
}

// Invoke adds the requests count and block the new requests if application is closing
func (f *consumerGracefulShutdownFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
atomic.AddInt32(&f.activeCount, 1)
return invoker.Invoke(ctx, invocation)
}

// OnResponse reduces the number of active processes then return the process result
func (f *consumerGracefulShutdownFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
atomic.AddInt32(&f.activeCount, -1)
// although this isn't thread safe, it won't be a problem if the f.rejectNewRequest() is true.
if f.shutdownConfig != nil && f.shutdownConfig.RejectRequest && f.activeCount <= 0 {
f.shutdownConfig.RequestsFinished = true
}
return result
}

func (f *consumerGracefulShutdownFilter) Set(name string, conf interface{}) {
switch name {
case constant.GracefulShutdownFilterShutdownConfig:
if shutdownConfig, ok := conf.(*config.ShutdownConfig); ok {
f.shutdownConfig = shutdownConfig
return
}
logger.Warnf("the type of config for {%s} should be *config.ShutdownConfig", constant.GracefulShutdownFilterShutdownConfig)
default:
// do nothing
}
}
59 changes: 59 additions & 0 deletions filter/graceful_shutdown/consumer_filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package graceful_shutdown

import (
"context"
"net/url"
"testing"
)

import (
"github.com/stretchr/testify/assert"
)

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/invocation"
)

func TestConusmerFilterInvoke(t *testing.T) {
url := common.NewURLWithOptions(common.WithParams(url.Values{}))
invocation := invocation.NewRPCInvocation("GetUser", []interface{}{"OK"}, make(map[string]interface{}))

rootConfig := config.NewRootConfigBuilder().
SetShutDown(config.NewShutDownConfigBuilder().
SetTimeout("60s").
SetStepTimeout("3s").
Build()).Build()

config.SetRootConfig(*rootConfig)

filterValue, _ := extension.GetFilter(constant.GracefulShutdownConsumerFilterKey)
filter := filterValue.(*consumerGracefulShutdownFilter)
filter.Set(constant.GracefulShutdownFilterShutdownConfig, config.GetShutDown())
assert.Equal(t, filter.shutdownConfig, config.GetShutDown())

result := filter.Invoke(context.Background(), protocol.NewBaseInvoker(url), invocation)
assert.NotNil(t, result)
assert.Nil(t, result.Error())
}
Loading