Skip to content

Commit

Permalink
fix: grpc connectTimeout
Browse files Browse the repository at this point in the history
  • Loading branch information
kezhan committed Mar 22, 2021
1 parent 2cc9f13 commit a0276f8
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 9 deletions.
18 changes: 14 additions & 4 deletions protocol/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package grpc
import (
"reflect"
"strconv"
"time"
)

import (
Expand Down Expand Up @@ -90,20 +91,29 @@ type Client struct {
}

// NewClient creates a new gRPC client.
func NewClient(url *common.URL) *Client {
func NewClient(url *common.URL) (*Client, error) {
// if global trace instance was set , it means trace function enabled. If not , will return Nooptracer
tracer := opentracing.GlobalTracer()
dialOpts := make([]grpc.DialOption, 0, 4)
maxMessageSize, _ := strconv.Atoi(url.GetParam(constant.MESSAGE_SIZE_KEY, "4"))
dialOpts = append(dialOpts, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor(

//consumer config client connectTimeout
connectTimeout := config.GetConsumerConfig().ConnectTimeout

dialOpts = append(dialOpts, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(connectTimeout), grpc.WithUnaryInterceptor(
otgrpc.OpenTracingClientInterceptor(tracer, otgrpc.LogPayloads())),
grpc.WithDefaultCallOptions(
grpc.CallContentSubtype(clientConf.ContentSubType),
grpc.MaxCallRecvMsgSize(1024*1024*maxMessageSize),
grpc.MaxCallSendMsgSize(1024*1024*maxMessageSize)))

logger.Infof("begin grpc dail:%s, begin time: %s ", url, time.Now().Format("2006-01-02 15:04:05.000"))
conn, err := grpc.Dial(url.Location, dialOpts...)
logger.Infof("end grpc dail: dail:%s, end time: %s", url, time.Now().Format("2006-01-02 15:04:05.000"))

if err != nil {
panic(err)
logger.Errorf("grpc dail error: %v", err)
return nil, err
}

key := url.GetParam(constant.BEAN_NAME_KEY, "")
Expand All @@ -113,7 +123,7 @@ func NewClient(url *common.URL) *Client {
return &Client{
ClientConn: conn,
invoker: reflect.ValueOf(invoker),
}
}, nil
}

func getInvoker(impl interface{}, conn *grpc.ClientConn) interface{} {
Expand Down
6 changes: 5 additions & 1 deletion protocol/grpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package grpc

import (
"github.com/apache/dubbo-go/common/logger"
"reflect"
"testing"
)
Expand Down Expand Up @@ -48,6 +49,9 @@ func TestNewClient(t *testing.T) {

url, err := common.NewURL("grpc://127.0.0.1:30000/GrpcGreeterImpl?accesslog=&anyhost=true&app.version=0.0.1&application=BDTService&async=false&bean.name=GrpcGreeterImpl&category=providers&cluster=failover&dubbo=dubbo-provider-golang-2.6.0&environment=dev&execute.limit=&execute.limit.rejected.handler=&generic=false&group=&interface=io.grpc.examples.helloworld.GreeterGrpc%24IGreeter&ip=192.168.1.106&loadbalance=random&methods.SayHello.loadbalance=random&methods.SayHello.retries=1&methods.SayHello.tps.limit.interval=&methods.SayHello.tps.limit.rate=&methods.SayHello.tps.limit.strategy=&methods.SayHello.weight=0&module=dubbogo+say-hello+client&name=BDTService&organization=ikurento.com&owner=ZX&pid=49427&reference.filter=cshutdown&registry.role=3&remote.timestamp=1576923717&retries=&service.filter=echo%2Ctoken%2Caccesslog%2Ctps%2Cexecute%2Cpshutdown&side=provider&timestamp=1576923740&tps.limit.interval=&tps.limit.rate=&tps.limit.rejected.handler=&tps.limit.strategy=&tps.limiter=&version=&warmup=100!")
assert.Nil(t, err)
cli := NewClient(url)
cli, err := NewClient(url)
if err != nil {
logger.Errorf("grpc new client error %v", err)
}
assert.NotNil(t, cli)
}
7 changes: 5 additions & 2 deletions protocol/grpc/grpc_invoker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package grpc

import (
"context"
"github.com/apache/dubbo-go/common/logger"
"reflect"
"testing"
)
Expand Down Expand Up @@ -48,8 +49,10 @@ func TestInvoke(t *testing.T) {
url, err := common.NewURL(mockGrpcCommonUrl)
assert.Nil(t, err)

cli := NewClient(url)

cli, err := NewClient(url)
if err != nil {
logger.Errorf("grpc new client error %v", err)
}
invoker := NewGrpcInvoker(url, cli)

args := []reflect.Value{}
Expand Down
7 changes: 6 additions & 1 deletion protocol/grpc/grpc_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,12 @@ func (gp *GrpcProtocol) openServer(url *common.URL) {

// Refer a remote gRPC service
func (gp *GrpcProtocol) Refer(url *common.URL) protocol.Invoker {
invoker := NewGrpcInvoker(url, NewClient(url))
client, err := NewClient(url)
if err != nil {
logger.Warnf("can't dial the server: %s", url.Key())
return nil
}
invoker := NewGrpcInvoker(url, client)
gp.SetInvokers(invoker)
logger.Infof("Refer service: %s", url.String())
return invoker
Expand Down
6 changes: 5 additions & 1 deletion registry/directory/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (dir *RegistryDirectory) Notify(event *registry.ServiceEvent) {
if event == nil {
return
}
go dir.refreshInvokers(event)
dir.refreshInvokers(event)
}

// NotifyAll notify the events that are complete Service Event List.
Expand Down Expand Up @@ -370,6 +370,8 @@ func (dir *RegistryDirectory) doCacheInvoker(newUrl *common.URL) (protocol.Invok
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(newUrl)
if newInvoker != nil {
dir.cacheInvokersMap.Store(key, newInvoker)
} else {
logger.Warnf("service will be added in cache invokers fail, result is null, invokers url is %+v", newUrl.String())
}
} else {
// if cached invoker has the same URL with the new URL, then no need to re-refer, and no need to destroy
Expand All @@ -383,6 +385,8 @@ func (dir *RegistryDirectory) doCacheInvoker(newUrl *common.URL) (protocol.Invok
if newInvoker != nil {
dir.cacheInvokersMap.Store(key, newInvoker)
return cacheInvoker.(protocol.Invoker), true
} else {
logger.Warnf("service will be updated in cache invokers fail, result is null, invokers url is %+v", newUrl.String())
}
}
return nil, false
Expand Down
5 changes: 5 additions & 0 deletions test/integrate/dubbo/go-server/go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
module github.com/apache/dubbo-go/test/integrate/dubbo/go-server

go 1.13

require (
github.com/apache/dubbo-go v1.5.5
github.com/apache/dubbo-go-hessian2 v1.9.1
)

0 comments on commit a0276f8

Please sign in to comment.