Skip to content

Commit

Permalink
Merge branch 'develop' of https://github.com/apache/dubbo-go into fea…
Browse files Browse the repository at this point in the history
…ture/access_log_filter
  • Loading branch information
flycash committed Sep 25, 2019
2 parents 25c3bf9 + dfbdd01 commit 7d01558
Show file tree
Hide file tree
Showing 20 changed files with 75 additions and 289 deletions.
6 changes: 3 additions & 3 deletions cluster/cluster_impl/base_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package cluster_impl

import (
gxnet "github.com/dubbogo/gost/net"
perrors "github.com/pkg/errors"
"go.uber.org/atomic"
)
Expand All @@ -27,7 +28,6 @@ import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/utils"
"github.com/apache/dubbo-go/protocol"
)

Expand Down Expand Up @@ -63,7 +63,7 @@ func (invoker *baseClusterInvoker) IsAvailable() bool {
//check invokers availables
func (invoker *baseClusterInvoker) checkInvokers(invokers []protocol.Invoker, invocation protocol.Invocation) error {
if len(invokers) == 0 {
ip, _ := utils.GetLocalIP()
ip, _ := gxnet.GetLocalIP()
return perrors.Errorf("Failed to invoke the method %v. No provider available for the service %v from "+
"registry %v on the consumer %v using the dubbo version %v .Please check if the providers have been started and registered.",
invocation.MethodName(), invoker.directory.GetUrl().SubURL.Key(), invoker.directory.GetUrl().String(), ip, constant.Version)
Expand All @@ -75,7 +75,7 @@ func (invoker *baseClusterInvoker) checkInvokers(invokers []protocol.Invoker, in
//check cluster invoker is destroyed or not
func (invoker *baseClusterInvoker) checkWhetherDestroyed() error {
if invoker.destroyed.Load() {
ip, _ := utils.GetLocalIP()
ip, _ := gxnet.GetLocalIP()
return perrors.Errorf("Rpc cluster invoker for %v on consumer %v use dubbo version %v is now destroyed! can not invoke any more. ",
invoker.directory.GetUrl().Service(), ip, constant.Version)
}
Expand Down
10 changes: 8 additions & 2 deletions cluster/cluster_impl/failover_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import (
)

import (
gxnet "github.com/dubbogo/gost/net"
perrors "github.com/pkg/errors"
)

import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/common/utils"
"github.com/apache/dubbo-go/protocol"
)

Expand Down Expand Up @@ -72,6 +72,9 @@ func (invoker *failoverClusterInvoker) Invoke(invocation protocol.Invocation) pr
invoked := []protocol.Invoker{}
providers := []string{}
var result protocol.Result
if retries > len(invokers) {
retries = len(invokers)
}
for i := 0; i <= retries; i++ {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
Expand All @@ -87,6 +90,9 @@ func (invoker *failoverClusterInvoker) Invoke(invocation protocol.Invocation) pr
}
}
ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked)
if ivk == nil {
continue
}
invoked = append(invoked, ivk)
//DO INVOKE
result = ivk.Invoke(invocation)
Expand All @@ -97,7 +103,7 @@ func (invoker *failoverClusterInvoker) Invoke(invocation protocol.Invocation) pr
return result
}
}
ip, _ := utils.GetLocalIP()
ip, _ := gxnet.GetLocalIP()
return &protocol.RPCResult{Err: perrors.Errorf("Failed to invoke the method %v in the service %v. Tried %v times of "+
"the providers %v (%v/%v)from the registry %v on the consumer %v using the dubbo version %v. Last error is %v.",
methodName, invoker.GetUrl().Service(), retries, providers, len(providers), len(invokers), invoker.directory.GetUrl(), ip, constant.Version, result.Error().Error(),
Expand Down
20 changes: 10 additions & 10 deletions cluster/router/condition_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ import (
)

import (
"github.com/dubbogo/gost/container"
"github.com/dubbogo/gost/container/gxset"
gxnet "github.com/dubbogo/gost/net"
perrors "github.com/pkg/errors"
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/common/utils"
"github.com/apache/dubbo-go/protocol"
)

Expand Down Expand Up @@ -126,7 +126,7 @@ func (c *ConditionRouter) Route(invokers []protocol.Invoker, url common.URL, inv
if len(c.ThenCondition) == 0 {
return result
}
localIP, _ := utils.GetLocalIP()
localIP, _ := gxnet.GetLocalIP()
for _, invoker := range invokers {
isMatchThen, err := c.MatchThen(invoker.GetUrl(), url)
if err != nil {
Expand Down Expand Up @@ -157,7 +157,7 @@ func parseRule(rule string) (map[string]MatchPair, error) {
return condition, nil
}
var pair MatchPair
values := container.NewSet()
values := gxset.NewSet()
reg := regexp.MustCompile(`([&!=,]*)\s*([^&!=,\s]+)`)
var startIndex = 0
if indexTuple := reg.FindIndex([]byte(rule)); len(indexTuple) > 0 {
Expand All @@ -170,17 +170,17 @@ func parseRule(rule string) (map[string]MatchPair, error) {
switch separator {
case "":
pair = MatchPair{
Matches: container.NewSet(),
Mismatches: container.NewSet(),
Matches: gxset.NewSet(),
Mismatches: gxset.NewSet(),
}
condition[content] = pair
case "&":
if r, ok := condition[content]; ok {
pair = r
} else {
pair = MatchPair{
Matches: container.NewSet(),
Mismatches: container.NewSet(),
Matches: gxset.NewSet(),
Mismatches: gxset.NewSet(),
}
condition[content] = pair
}
Expand Down Expand Up @@ -257,8 +257,8 @@ func MatchCondition(pairs map[string]MatchPair, url *common.URL, param *common.U
}

type MatchPair struct {
Matches *container.HashSet
Mismatches *container.HashSet
Matches *gxset.HashSet
Mismatches *gxset.HashSet
}

func (pair MatchPair) isMatch(value string, param *common.URL) bool {
Expand Down
22 changes: 11 additions & 11 deletions cluster/router/condition_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ import (
)

import (
"github.com/dubbogo/gost/net"
perrors "github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/common/utils"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
)
Expand Down Expand Up @@ -146,7 +146,7 @@ func TestRoute_matchWhen(t *testing.T) {
}

func TestRoute_matchFilter(t *testing.T) {
localIP, _ := utils.GetLocalIP()
localIP, _ := gxnet.GetLocalIP()
url1, _ := common.NewURL(context.TODO(), "dubbo://10.20.3.3:20880/com.foo.BarService?default.serialization=fastjson")
url2, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
url3, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
Expand Down Expand Up @@ -204,7 +204,7 @@ func TestRoute_methodRoute(t *testing.T) {

func TestRoute_ReturnFalse(t *testing.T) {
url, _ := common.NewURL(context.TODO(), "")
localIP, _ := utils.GetLocalIP()
localIP, _ := gxnet.GetLocalIP()
invokers := []protocol.Invoker{NewMockInvoker(url, 1), NewMockInvoker(url, 2), NewMockInvoker(url, 3)}
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => false"))
Expand All @@ -215,7 +215,7 @@ func TestRoute_ReturnFalse(t *testing.T) {
}

func TestRoute_ReturnEmpty(t *testing.T) {
localIP, _ := utils.GetLocalIP()
localIP, _ := gxnet.GetLocalIP()
url, _ := common.NewURL(context.TODO(), "")
invokers := []protocol.Invoker{NewMockInvoker(url, 1), NewMockInvoker(url, 2), NewMockInvoker(url, 3)}
inv := &invocation.RPCInvocation{}
Expand All @@ -227,7 +227,7 @@ func TestRoute_ReturnEmpty(t *testing.T) {
}

func TestRoute_ReturnAll(t *testing.T) {
localIP, _ := utils.GetLocalIP()
localIP, _ := gxnet.GetLocalIP()
invokers := []protocol.Invoker{&MockInvoker{}, &MockInvoker{}, &MockInvoker{}}
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = " + localIP))
Expand All @@ -238,7 +238,7 @@ func TestRoute_ReturnAll(t *testing.T) {
}

func TestRoute_HostFilter(t *testing.T) {
localIP, _ := utils.GetLocalIP()
localIP, _ := gxnet.GetLocalIP()
url1, _ := common.NewURL(context.TODO(), "dubbo://10.20.3.3:20880/com.foo.BarService")
url2, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
url3, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
Expand All @@ -257,7 +257,7 @@ func TestRoute_HostFilter(t *testing.T) {
}

func TestRoute_Empty_HostFilter(t *testing.T) {
localIP, _ := utils.GetLocalIP()
localIP, _ := gxnet.GetLocalIP()
url1, _ := common.NewURL(context.TODO(), "dubbo://10.20.3.3:20880/com.foo.BarService")
url2, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
url3, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
Expand All @@ -276,7 +276,7 @@ func TestRoute_Empty_HostFilter(t *testing.T) {
}

func TestRoute_False_HostFilter(t *testing.T) {
localIP, _ := utils.GetLocalIP()
localIP, _ := gxnet.GetLocalIP()
url1, _ := common.NewURL(context.TODO(), "dubbo://10.20.3.3:20880/com.foo.BarService")
url2, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
url3, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
Expand All @@ -295,7 +295,7 @@ func TestRoute_False_HostFilter(t *testing.T) {
}

func TestRoute_Placeholder(t *testing.T) {
localIP, _ := utils.GetLocalIP()
localIP, _ := gxnet.GetLocalIP()
url1, _ := common.NewURL(context.TODO(), "dubbo://10.20.3.3:20880/com.foo.BarService")
url2, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
url3, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
Expand All @@ -314,7 +314,7 @@ func TestRoute_Placeholder(t *testing.T) {
}

func TestRoute_NoForce(t *testing.T) {
localIP, _ := utils.GetLocalIP()
localIP, _ := gxnet.GetLocalIP()
url1, _ := common.NewURL(context.TODO(), "dubbo://10.20.3.3:20880/com.foo.BarService")
url2, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
url3, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
Expand All @@ -331,7 +331,7 @@ func TestRoute_NoForce(t *testing.T) {
}

func TestRoute_Force(t *testing.T) {
localIP, _ := utils.GetLocalIP()
localIP, _ := gxnet.GetLocalIP()
url1, _ := common.NewURL(context.TODO(), "dubbo://10.20.3.3:20880/com.foo.BarService")
url2, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
url3, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
Expand Down
4 changes: 2 additions & 2 deletions common/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
)

import (
"github.com/dubbogo/gost/container"
"github.com/dubbogo/gost/container/gxset"
"github.com/jinzhu/copier"
perrors "github.com/pkg/errors"
"github.com/satori/go.uuid"
Expand Down Expand Up @@ -447,7 +447,7 @@ func (c URL) GetMethodParam(method string, key string, d string) string {
return r
}

func (c *URL) RemoveParams(set *container.HashSet) {
func (c *URL) RemoveParams(set *gxset.HashSet) {
c.paramsLock.Lock()
defer c.paramsLock.Unlock()
for k := range set.Items {
Expand Down
124 changes: 0 additions & 124 deletions common/utils/net.go

This file was deleted.

Loading

0 comments on commit 7d01558

Please sign in to comment.