Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make cluster interceptor a chain #1211

Merged
merged 1 commit into from
May 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cluster/cluster_impl/available_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ func NewAvailableCluster() cluster.Cluster {

// Join returns a baseClusterInvoker instance
func (cluster *availableCluster) Join(directory cluster.Directory) protocol.Invoker {
return NewAvailableClusterInvoker(directory)
return buildInterceptorChain(NewAvailableClusterInvoker(directory))
}
19 changes: 0 additions & 19 deletions cluster/cluster_impl/base_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@

package cluster_impl

import (
"context"
)

import (
perrors "github.com/pkg/errors"
"go.uber.org/atomic"
Expand All @@ -40,7 +36,6 @@ type baseClusterInvoker struct {
availablecheck bool
destroyed *atomic.Bool
stickyInvoker protocol.Invoker
interceptor cluster.ClusterInterceptor
}

func newBaseClusterInvoker(directory cluster.Directory) baseClusterInvoker {
Expand Down Expand Up @@ -165,20 +160,6 @@ func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invoc
return nil
}

func (invoker *baseClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
if invoker.interceptor != nil {
invoker.interceptor.BeforeInvoker(ctx, invocation)

result := invoker.interceptor.DoInvoke(ctx, invocation)

invoker.interceptor.AfterInvoker(ctx, invocation)

return result
}

return nil
}

func isInvoked(selectedInvoker protocol.Invoker, invoked []protocol.Invoker) bool {
for _, i := range invoked {
if i == selectedInvoker {
Expand Down
2 changes: 1 addition & 1 deletion cluster/cluster_impl/broadcast_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ func NewBroadcastCluster() cluster.Cluster {

// Join returns a baseClusterInvoker instance
func (cluster *broadcastCluster) Join(directory cluster.Directory) protocol.Invoker {
return newBroadcastClusterInvoker(directory)
return buildInterceptorChain(newBroadcastClusterInvoker(directory))
}
2 changes: 1 addition & 1 deletion cluster/cluster_impl/failback_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ func NewFailbackCluster() cluster.Cluster {

// Join returns a baseClusterInvoker instance
func (cluster *failbackCluster) Join(directory cluster.Directory) protocol.Invoker {
return newFailbackClusterInvoker(directory)
return buildInterceptorChain(newFailbackClusterInvoker(directory))
}
2 changes: 1 addition & 1 deletion cluster/cluster_impl/failfast_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ func NewFailFastCluster() cluster.Cluster {

// Join returns a baseClusterInvoker instance
func (cluster *failfastCluster) Join(directory cluster.Directory) protocol.Invoker {
return newFailFastClusterInvoker(directory)
return buildInterceptorChain(newFailFastClusterInvoker(directory))
}
2 changes: 1 addition & 1 deletion cluster/cluster_impl/failover_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ func NewFailoverCluster() cluster.Cluster {

// Join returns a baseClusterInvoker instance
func (cluster *failoverCluster) Join(directory cluster.Directory) protocol.Invoker {
return newFailoverClusterInvoker(directory)
return buildInterceptorChain(newFailoverClusterInvoker(directory))
}
2 changes: 1 addition & 1 deletion cluster/cluster_impl/failsafe_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ func NewFailsafeCluster() cluster.Cluster {

// Join returns a baseClusterInvoker instance
func (cluster *failsafeCluster) Join(directory cluster.Directory) protocol.Invoker {
return newFailsafeClusterInvoker(directory)
return buildInterceptorChain(newFailsafeClusterInvoker(directory))
}
2 changes: 1 addition & 1 deletion cluster/cluster_impl/forking_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ func NewForkingCluster() cluster.Cluster {

// Join returns a baseClusterInvoker instance
func (cluster *forkingCluster) Join(directory cluster.Directory) protocol.Invoker {
return newForkingClusterInvoker(directory)
return buildInterceptorChain(newForkingClusterInvoker(directory))
}
76 changes: 76 additions & 0 deletions cluster/cluster_impl/interceptor_invoker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cluster_impl

import (
"context"
)

import (
"dubbo.apache.org/dubbo-go/v3/cluster"
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

// InterceptorInvoker mocks cluster interceptor as an invoker
type InterceptorInvoker struct {
next protocol.Invoker
interceptor cluster.Interceptor
}

// GetURL is used to get url from InterceptorInvoker
func (i *InterceptorInvoker) GetURL() *common.URL {
return i.next.GetURL()
}

// IsAvailable is used to get available status
func (i *InterceptorInvoker) IsAvailable() bool {
return i.next.IsAvailable()
}

// Invoke is used to call service method by invocation
func (i *InterceptorInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
return i.interceptor.Invoke(ctx, i.next, invocation)
}

// Destroy will destroy invoker
func (i *InterceptorInvoker) Destroy() {
i.next.Destroy()
}

func buildInterceptorChain(invoker protocol.Invoker, builtins ...cluster.Interceptor) protocol.Invoker {
// The order of interceptors is from left to right, so loading from right to left
next := invoker
interceptors := extension.GetClusterInterceptors()
if len(interceptors) != 0 {
for i := len(interceptors) - 1; i >= 0; i-- {
v := &InterceptorInvoker{next: next, interceptor: interceptors[i]}
next = v
}
}

if builtins != nil && len(builtins) > 0 {
for i := len(builtins) - 1; i >= 0; i-- {
v := &InterceptorInvoker{next: next, interceptor: builtins[i]}
next = v
}
}

return next
}
2 changes: 1 addition & 1 deletion cluster/cluster_impl/mock_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ func NewMockCluster() cluster.Cluster {

// nolint
func (cluster *mockCluster) Join(directory cluster.Directory) protocol.Invoker {
return protocol.NewBaseInvoker(directory.GetURL())
return buildInterceptorChain(protocol.NewBaseInvoker(directory.GetURL()))
}
2 changes: 1 addition & 1 deletion cluster/cluster_impl/zone_aware_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ func NewZoneAwareCluster() cluster.Cluster {

// Join returns a zoneAwareClusterInvoker instance
func (cluster *zoneAwareCluster) Join(directory cluster.Directory) protocol.Invoker {
return newZoneAwareClusterInvoker(directory)
return buildInterceptorChain(newZoneAwareClusterInvoker(directory), getZoneAwareInterceptor())
}
57 changes: 57 additions & 0 deletions cluster/cluster_impl/zone_aware_cluster_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cluster_impl

import (
"context"
)

import (
"dubbo.apache.org/dubbo-go/v3/cluster"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

type zoneAwareInterceptor struct {
}

func (z *zoneAwareInterceptor) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
key := constant.REGISTRY_KEY + "." + constant.ZONE_FORCE_KEY
force := ctx.Value(key)

if force != nil {
switch value := force.(type) {
case bool:
if value {
invocation.SetAttachments(key, "true")
}
case string:
if "true" == value {
invocation.SetAttachments(key, "true")
}
default:
// ignore
}
}

return invoker.Invoke(ctx, invocation)
}

func getZoneAwareInterceptor() cluster.Interceptor {
return &zoneAwareInterceptor{}
}
31 changes: 3 additions & 28 deletions cluster/cluster_impl/zone_aware_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,14 @@ type zoneAwareClusterInvoker struct {
}

func newZoneAwareClusterInvoker(directory cluster.Directory) protocol.Invoker {
invoke := &zoneAwareClusterInvoker{
invoker := &zoneAwareClusterInvoker{
baseClusterInvoker: newBaseClusterInvoker(directory),
}
// add local to interceptor
invoke.interceptor = invoke
return invoke
return invoker
}

// nolint
func (invoker *zoneAwareClusterInvoker) DoInvoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
func (invoker *zoneAwareClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)

err := invoker.checkInvokers(invokers, invocation)
Expand Down Expand Up @@ -104,29 +102,6 @@ func (invoker *zoneAwareClusterInvoker) DoInvoke(ctx context.Context, invocation
}
}

func (invoker *zoneAwareClusterInvoker) BeforeInvoker(ctx context.Context, invocation protocol.Invocation) {
key := constant.REGISTRY_KEY + "." + constant.ZONE_FORCE_KEY
force := ctx.Value(key)

if force != nil {
switch value := force.(type) {
case bool:
if value {
invocation.SetAttachments(key, "true")
}
case string:
if "true" == value {
invocation.SetAttachments(key, "true")
}
default:
// ignore
}
}
}

func (invoker *zoneAwareClusterInvoker) AfterInvoker(ctx context.Context, invocation protocol.Invocation) {
}

func matchParam(target, key, def string, invoker protocol.Invoker) bool {
return target == invoker.GetURL().GetParam(key, def)
}
16 changes: 5 additions & 11 deletions cluster/cluster_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,9 @@ import (
"dubbo.apache.org/dubbo-go/v3/protocol"
)

// ClusterInterceptor
// Extension - ClusterInterceptor
type ClusterInterceptor interface {
// Before DoInvoke method
BeforeInvoker(ctx context.Context, invocation protocol.Invocation)

// After DoInvoke method
AfterInvoker(ctx context.Context, invocation protocol.Invocation)

// Corresponding cluster invoke
DoInvoke(ctx context.Context, invocation protocol.Invocation) protocol.Result
// Interceptor
// Extension - Interceptor
type Interceptor interface {
// Invoke is the core function of a cluster interceptor, it determines the process of the interceptor
Invoke(context.Context, protocol.Invoker, protocol.Invocation) protocol.Result
}
60 changes: 60 additions & 0 deletions common/extension/cluster_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package extension

import (
"sync"
)

import (
"dubbo.apache.org/dubbo-go/v3/cluster"
)

var (
lock sync.RWMutex
interceptors = make(map[string]func() cluster.Interceptor)
)

// SetClusterInterceptor sets cluster interceptor so that user has chance to inject extra logics before and after
// cluster invoker
func SetClusterInterceptor(name string, fun func() cluster.Interceptor) {
lock.Lock()
defer lock.Unlock()
interceptors[name] = fun
}

// GetClusterInterceptor returns the cluster interceptor instance with the given name
func GetClusterInterceptor(name string) cluster.Interceptor {
lock.RLock()
defer lock.RUnlock()
if interceptors[name] == nil {
panic("cluster_interceptor for " + name + " doesn't exist, make sure the corresponding package is imported")
}
return interceptors[name]()
}

// GetClusterInterceptors returns all instances of registered cluster interceptors
func GetClusterInterceptors() []cluster.Interceptor {
lock.RLock()
defer lock.RUnlock()
ret := make([]cluster.Interceptor, 0, len(interceptors))
for _, f := range interceptors {
ret = append(ret, f())
}
return ret
}