Skip to content

Commit

Permalink
make cluster interceptor a chain (#1211)
Browse files Browse the repository at this point in the history
  • Loading branch information
beiwei30 authored May 20, 2021
1 parent 5a0a0dd commit 0f83b1a
Show file tree
Hide file tree
Showing 15 changed files with 210 additions and 67 deletions.
2 changes: 1 addition & 1 deletion cluster/cluster_impl/available_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ func NewAvailableCluster() cluster.Cluster {

// Join returns a baseClusterInvoker instance
func (cluster *availableCluster) Join(directory cluster.Directory) protocol.Invoker {
return NewAvailableClusterInvoker(directory)
return buildInterceptorChain(NewAvailableClusterInvoker(directory))
}
19 changes: 0 additions & 19 deletions cluster/cluster_impl/base_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@

package cluster_impl

import (
"context"
)

import (
perrors "github.com/pkg/errors"
"go.uber.org/atomic"
Expand All @@ -40,7 +36,6 @@ type baseClusterInvoker struct {
availablecheck bool
destroyed *atomic.Bool
stickyInvoker protocol.Invoker
interceptor cluster.ClusterInterceptor
}

func newBaseClusterInvoker(directory cluster.Directory) baseClusterInvoker {
Expand Down Expand Up @@ -165,20 +160,6 @@ func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invoc
return nil
}

func (invoker *baseClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
if invoker.interceptor != nil {
invoker.interceptor.BeforeInvoker(ctx, invocation)

result := invoker.interceptor.DoInvoke(ctx, invocation)

invoker.interceptor.AfterInvoker(ctx, invocation)

return result
}

return nil
}

func isInvoked(selectedInvoker protocol.Invoker, invoked []protocol.Invoker) bool {
for _, i := range invoked {
if i == selectedInvoker {
Expand Down
2 changes: 1 addition & 1 deletion cluster/cluster_impl/broadcast_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ func NewBroadcastCluster() cluster.Cluster {

// Join returns a baseClusterInvoker instance
func (cluster *broadcastCluster) Join(directory cluster.Directory) protocol.Invoker {
return newBroadcastClusterInvoker(directory)
return buildInterceptorChain(newBroadcastClusterInvoker(directory))
}
2 changes: 1 addition & 1 deletion cluster/cluster_impl/failback_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ func NewFailbackCluster() cluster.Cluster {

// Join returns a baseClusterInvoker instance
func (cluster *failbackCluster) Join(directory cluster.Directory) protocol.Invoker {
return newFailbackClusterInvoker(directory)
return buildInterceptorChain(newFailbackClusterInvoker(directory))
}
2 changes: 1 addition & 1 deletion cluster/cluster_impl/failfast_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ func NewFailFastCluster() cluster.Cluster {

// Join returns a baseClusterInvoker instance
func (cluster *failfastCluster) Join(directory cluster.Directory) protocol.Invoker {
return newFailFastClusterInvoker(directory)
return buildInterceptorChain(newFailFastClusterInvoker(directory))
}
2 changes: 1 addition & 1 deletion cluster/cluster_impl/failover_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ func NewFailoverCluster() cluster.Cluster {

// Join returns a baseClusterInvoker instance
func (cluster *failoverCluster) Join(directory cluster.Directory) protocol.Invoker {
return newFailoverClusterInvoker(directory)
return buildInterceptorChain(newFailoverClusterInvoker(directory))
}
2 changes: 1 addition & 1 deletion cluster/cluster_impl/failsafe_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ func NewFailsafeCluster() cluster.Cluster {

// Join returns a baseClusterInvoker instance
func (cluster *failsafeCluster) Join(directory cluster.Directory) protocol.Invoker {
return newFailsafeClusterInvoker(directory)
return buildInterceptorChain(newFailsafeClusterInvoker(directory))
}
2 changes: 1 addition & 1 deletion cluster/cluster_impl/forking_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ func NewForkingCluster() cluster.Cluster {

// Join returns a baseClusterInvoker instance
func (cluster *forkingCluster) Join(directory cluster.Directory) protocol.Invoker {
return newForkingClusterInvoker(directory)
return buildInterceptorChain(newForkingClusterInvoker(directory))
}
76 changes: 76 additions & 0 deletions cluster/cluster_impl/interceptor_invoker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cluster_impl

import (
"context"
)

import (
"dubbo.apache.org/dubbo-go/v3/cluster"
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

// InterceptorInvoker mocks cluster interceptor as an invoker
type InterceptorInvoker struct {
next protocol.Invoker
interceptor cluster.Interceptor
}

// GetURL is used to get url from InterceptorInvoker
func (i *InterceptorInvoker) GetURL() *common.URL {
return i.next.GetURL()
}

// IsAvailable is used to get available status
func (i *InterceptorInvoker) IsAvailable() bool {
return i.next.IsAvailable()
}

// Invoke is used to call service method by invocation
func (i *InterceptorInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
return i.interceptor.Invoke(ctx, i.next, invocation)
}

// Destroy will destroy invoker
func (i *InterceptorInvoker) Destroy() {
i.next.Destroy()
}

func buildInterceptorChain(invoker protocol.Invoker, builtins ...cluster.Interceptor) protocol.Invoker {
// The order of interceptors is from left to right, so loading from right to left
next := invoker
interceptors := extension.GetClusterInterceptors()
if len(interceptors) != 0 {
for i := len(interceptors) - 1; i >= 0; i-- {
v := &InterceptorInvoker{next: next, interceptor: interceptors[i]}
next = v
}
}

if builtins != nil && len(builtins) > 0 {
for i := len(builtins) - 1; i >= 0; i-- {
v := &InterceptorInvoker{next: next, interceptor: builtins[i]}
next = v
}
}

return next
}
2 changes: 1 addition & 1 deletion cluster/cluster_impl/mock_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ func NewMockCluster() cluster.Cluster {

// nolint
func (cluster *mockCluster) Join(directory cluster.Directory) protocol.Invoker {
return protocol.NewBaseInvoker(directory.GetURL())
return buildInterceptorChain(protocol.NewBaseInvoker(directory.GetURL()))
}
2 changes: 1 addition & 1 deletion cluster/cluster_impl/zone_aware_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ func NewZoneAwareCluster() cluster.Cluster {

// Join returns a zoneAwareClusterInvoker instance
func (cluster *zoneAwareCluster) Join(directory cluster.Directory) protocol.Invoker {
return newZoneAwareClusterInvoker(directory)
return buildInterceptorChain(newZoneAwareClusterInvoker(directory), getZoneAwareInterceptor())
}
57 changes: 57 additions & 0 deletions cluster/cluster_impl/zone_aware_cluster_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cluster_impl

import (
"context"
)

import (
"dubbo.apache.org/dubbo-go/v3/cluster"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

type zoneAwareInterceptor struct {
}

func (z *zoneAwareInterceptor) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
key := constant.REGISTRY_KEY + "." + constant.ZONE_FORCE_KEY
force := ctx.Value(key)

if force != nil {
switch value := force.(type) {
case bool:
if value {
invocation.SetAttachments(key, "true")
}
case string:
if "true" == value {
invocation.SetAttachments(key, "true")
}
default:
// ignore
}
}

return invoker.Invoke(ctx, invocation)
}

func getZoneAwareInterceptor() cluster.Interceptor {
return &zoneAwareInterceptor{}
}
31 changes: 3 additions & 28 deletions cluster/cluster_impl/zone_aware_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,14 @@ type zoneAwareClusterInvoker struct {
}

func newZoneAwareClusterInvoker(directory cluster.Directory) protocol.Invoker {
invoke := &zoneAwareClusterInvoker{
invoker := &zoneAwareClusterInvoker{
baseClusterInvoker: newBaseClusterInvoker(directory),
}
// add local to interceptor
invoke.interceptor = invoke
return invoke
return invoker
}

// nolint
func (invoker *zoneAwareClusterInvoker) DoInvoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
func (invoker *zoneAwareClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)

err := invoker.checkInvokers(invokers, invocation)
Expand Down Expand Up @@ -104,29 +102,6 @@ func (invoker *zoneAwareClusterInvoker) DoInvoke(ctx context.Context, invocation
}
}

func (invoker *zoneAwareClusterInvoker) BeforeInvoker(ctx context.Context, invocation protocol.Invocation) {
key := constant.REGISTRY_KEY + "." + constant.ZONE_FORCE_KEY
force := ctx.Value(key)

if force != nil {
switch value := force.(type) {
case bool:
if value {
invocation.SetAttachments(key, "true")
}
case string:
if "true" == value {
invocation.SetAttachments(key, "true")
}
default:
// ignore
}
}
}

func (invoker *zoneAwareClusterInvoker) AfterInvoker(ctx context.Context, invocation protocol.Invocation) {
}

func matchParam(target, key, def string, invoker protocol.Invoker) bool {
return target == invoker.GetURL().GetParam(key, def)
}
16 changes: 5 additions & 11 deletions cluster/cluster_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,9 @@ import (
"dubbo.apache.org/dubbo-go/v3/protocol"
)

// ClusterInterceptor
// Extension - ClusterInterceptor
type ClusterInterceptor interface {
// Before DoInvoke method
BeforeInvoker(ctx context.Context, invocation protocol.Invocation)

// After DoInvoke method
AfterInvoker(ctx context.Context, invocation protocol.Invocation)

// Corresponding cluster invoke
DoInvoke(ctx context.Context, invocation protocol.Invocation) protocol.Result
// Interceptor
// Extension - Interceptor
type Interceptor interface {
// Invoke is the core function of a cluster interceptor, it determines the process of the interceptor
Invoke(context.Context, protocol.Invoker, protocol.Invocation) protocol.Result
}
60 changes: 60 additions & 0 deletions common/extension/cluster_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package extension

import (
"sync"
)

import (
"dubbo.apache.org/dubbo-go/v3/cluster"
)

var (
lock sync.RWMutex
interceptors = make(map[string]func() cluster.Interceptor)
)

// SetClusterInterceptor sets cluster interceptor so that user has chance to inject extra logics before and after
// cluster invoker
func SetClusterInterceptor(name string, fun func() cluster.Interceptor) {
lock.Lock()
defer lock.Unlock()
interceptors[name] = fun
}

// GetClusterInterceptor returns the cluster interceptor instance with the given name
func GetClusterInterceptor(name string) cluster.Interceptor {
lock.RLock()
defer lock.RUnlock()
if interceptors[name] == nil {
panic("cluster_interceptor for " + name + " doesn't exist, make sure the corresponding package is imported")
}
return interceptors[name]()
}

// GetClusterInterceptors returns all instances of registered cluster interceptors
func GetClusterInterceptors() []cluster.Interceptor {
lock.RLock()
defer lock.RUnlock()
ret := make([]cluster.Interceptor, 0, len(interceptors))
for _, f := range interceptors {
ret = append(ret, f())
}
return ret
}

0 comments on commit 0f83b1a

Please sign in to comment.