Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[network] grpc client passing timeout context #324

Merged
merged 2 commits into from
Apr 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion network/client/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,36 @@ type discoveryClient struct {
conn *rawGrpc.ClientConn

isClosed *atomic.Bool

metrics Metrics
}

const (
/**
from grpc.UnaryServerInfo

info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/v1.Discovery/FindPeers",
}
**/
methodNameFindPeers = "/v1.Discovery/FindPeers"
)

func (i *discoveryClient) FindPeers(ctx context.Context, in *proto.FindPeersReq) (*proto.FindPeersResp, error) {
return i.clt.FindPeers(ctx, in, rawGrpc.WaitForReady(false))
i.metrics.rpcMethodCallCountInc(methodNameFindPeers)

begin := i.metrics.rpcMethodCallBegin(methodNameFindPeers)
defer i.metrics.rpcMethodCallEnd(methodNameFindPeers, begin)

resp, err := i.clt.FindPeers(ctx, in, rawGrpc.WaitForReady(false))
if err != nil {
i.metrics.rpcMethodCallErrorCountInc(methodNameFindPeers)

return nil, err
}

return resp, nil
}

func (i *discoveryClient) Close() error {
Expand All @@ -42,13 +68,15 @@ func (i *discoveryClient) IsClose() bool {

func NewDiscoveryClient(
logger hclog.Logger,
metrics Metrics,
clt proto.DiscoveryClient,
conn *rawGrpc.ClientConn,
) DiscoveryClient {
wrapClt := &discoveryClient{
clt: clt,
conn: conn,
isClosed: atomic.NewBool(false),
metrics: metrics,
}

// print a error log if the client is not closed before GC
Expand Down
30 changes: 29 additions & 1 deletion network/client/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,36 @@ type identityClient struct {
conn *rawGrpc.ClientConn

isClosed *atomic.Bool

metrics Metrics
}

const (
/**
from grpc.UnaryServerInfo

info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/v1.Identity/Hello",
}
**/
methodNameHello = "/v1.Identity/Hello"
)

func (i *identityClient) Hello(ctx context.Context, in *proto.Status) (*proto.Status, error) {
return i.clt.Hello(ctx, in, rawGrpc.WaitForReady(false))
i.metrics.rpcMethodCallCountInc(methodNameHello)

begin := i.metrics.rpcMethodCallBegin(methodNameHello)
defer i.metrics.rpcMethodCallEnd(methodNameHello, begin)

status, err := i.clt.Hello(ctx, in, rawGrpc.WaitForReady(false))
if err != nil {
i.metrics.rpcMethodCallErrorCountInc(methodNameHello)

return nil, err
}

return status, nil
}

func (i *identityClient) Close() error {
Expand All @@ -42,13 +68,15 @@ func (i *identityClient) IsClose() bool {

func NewIdentityClient(
logger hclog.Logger,
metrics Metrics,
clt proto.IdentityClient,
conn *rawGrpc.ClientConn,
) IdentityClient {
wrapClt := &identityClient{
clt: clt,
conn: conn,
isClosed: atomic.NewBool(false),
metrics: metrics,
}

// print a error log if the client is not closed before GC
Expand Down
96 changes: 96 additions & 0 deletions network/client/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package client

import (
"time"

"github.com/prometheus/client_golang/prometheus"
)

type methodName string

// Metrics represents the grpc client metrics
type Metrics interface {
// rpcMethodCallBegin is the duration of a grpc client call
rpcMethodCallBegin(method methodName) time.Time

// rpcMethodCallEnd is the duration of a grpc client call
rpcMethodCallEnd(method methodName, begin time.Time)

// GrpcClientCallCount is the count of a grpc client call
rpcMethodCallCountInc(method methodName)

// GrpcClientCallErrorCount is the count of a grpc client call error
rpcMethodCallErrorCountInc(method methodName)
}

type metrics struct {
// GrpcClientCallDurations is the duration of a grpc client call
rpcMethodCallDurationVec *prometheus.HistogramVec

// GrpcClientCallCount is the count of a grpc client call
rpcMethodCallCount *prometheus.CounterVec

// GrpcClientCallErrorCount is the count of a grpc client call error
rpcMethodCallErrorCount *prometheus.CounterVec
}

func NewMetrics() Metrics {
m := &metrics{
rpcMethodCallDurationVec: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "grpc_client_call_durations_seconds",
Help: "The grpc client call latencies in seconds.",
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 10),
DarianShawn marked this conversation as resolved.
Show resolved Hide resolved
},
[]string{"method"},
),
rpcMethodCallCount: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "grpc_client_call_count",
Help: "The grpc client call count.",
},
[]string{"method"},
),
rpcMethodCallErrorCount: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "grpc_client_call_error_count",
Help: "The grpc client call error count.",
},
[]string{"method"},
),
}

prometheus.MustRegister(
m.rpcMethodCallDurationVec,
m.rpcMethodCallCount,
m.rpcMethodCallErrorCount,
)

return m
}

func (m *metrics) rpcMethodCallBegin(method methodName) time.Time {
return time.Now()
}

func (m *metrics) rpcMethodCallEnd(method methodName, begin time.Time) {
if m.rpcMethodCallDurationVec != nil {
m.rpcMethodCallDurationVec.WithLabelValues(string(method)).Observe(time.Since(begin).Seconds())
}
}

func (m *metrics) rpcMethodCallCountInc(method methodName) {
if m.rpcMethodCallCount != nil {
m.rpcMethodCallCount.WithLabelValues(string(method)).Inc()
}
}

func (m *metrics) rpcMethodCallErrorCountInc(method methodName) {
if m.rpcMethodCallErrorCount != nil {
m.rpcMethodCallErrorCount.WithLabelValues(string(method)).Inc()
}
}

func NilMetrics() Metrics {
return &metrics{}
}
110 changes: 98 additions & 12 deletions network/client/syncerV1.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,30 +29,114 @@ type syncerV1Client struct {
conn *rawGrpc.ClientConn

isClosed *atomic.Bool

metrics Metrics
}

const (
/**
from grpc.UnaryServerInfo

info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/v1.Syncer/GetCurrent",
}
**/

methodNameGetCurrent = "/v1.Syncer/GetCurrent"
methodNameGetObjectsByHash = "/v1.Syncer/GetObjectsByHash"
methodNameGetHeaders = "/v1.Syncer/GetHeaders"
methodNameNotify = "/v1.Syncer/Notify"
methodNameGetBlocks = "/v1.Syncer/GetBlocks"
methodNameGetStatus = "/v1.Syncer/GetStatus"
)

func (i *syncerV1Client) wrapMetricsCall(name methodName, call func() error) {
i.metrics.rpcMethodCallCountInc(name)

begin := i.metrics.rpcMethodCallBegin(name)
defer i.metrics.rpcMethodCallEnd(name, begin)

err := call()
if err != nil {
i.metrics.rpcMethodCallErrorCountInc(name)
}
}

func (i *syncerV1Client) GetCurrent(ctx context.Context, in *emptypb.Empty) (*proto.V1Status, error) {
return i.clt.GetCurrent(ctx, in, rawGrpc.WaitForReady(false))
func (i *syncerV1Client) GetCurrent(
ctx context.Context,
in *emptypb.Empty,
) (status *proto.V1Status, err error) {
i.wrapMetricsCall(methodNameGetCurrent, func() error {
status, err = i.clt.GetCurrent(ctx, in, rawGrpc.WaitForReady(false))

return err
})

return status, err
}

func (i *syncerV1Client) GetObjectsByHash(ctx context.Context, in *proto.HashRequest) (*proto.Response, error) {
return i.clt.GetObjectsByHash(ctx, in, rawGrpc.WaitForReady(false))
func (i *syncerV1Client) GetObjectsByHash(
ctx context.Context,
in *proto.HashRequest,
) (response *proto.Response, err error) {
i.wrapMetricsCall(methodNameGetObjectsByHash, func() error {
response, err = i.clt.GetObjectsByHash(
ctx,
in,
rawGrpc.WaitForReady(false),
)

return err
})

return response, err
}

func (i *syncerV1Client) GetHeaders(ctx context.Context, in *proto.GetHeadersRequest) (*proto.Response, error) {
return i.clt.GetHeaders(ctx, in, rawGrpc.WaitForReady(false))
func (i *syncerV1Client) GetHeaders(
ctx context.Context,
in *proto.GetHeadersRequest,
) (response *proto.Response, err error) {
i.wrapMetricsCall(methodNameGetHeaders, func() error {
response, err = i.clt.GetHeaders(ctx, in, rawGrpc.WaitForReady(false))

return err
})

return response, err
}

func (i *syncerV1Client) Notify(ctx context.Context, in *proto.NotifyReq) (*emptypb.Empty, error) {
return i.clt.Notify(ctx, in, rawGrpc.WaitForReady(false))
func (i *syncerV1Client) Notify(ctx context.Context, in *proto.NotifyReq) (response *emptypb.Empty, err error) {
i.wrapMetricsCall(methodNameNotify, func() error {
response, err = i.clt.Notify(ctx, in, rawGrpc.WaitForReady(false))

return err
})

return response, err
}

func (i *syncerV1Client) GetBlocks(ctx context.Context, in *proto.GetBlocksRequest) (*proto.GetBlocksResponse, error) {
return i.clt.GetBlocks(ctx, in, rawGrpc.WaitForReady(false))
func (i *syncerV1Client) GetBlocks(
ctx context.Context,
in *proto.GetBlocksRequest,
) (response *proto.GetBlocksResponse, err error) {
i.wrapMetricsCall(methodNameGetBlocks, func() error {
response, err = i.clt.GetBlocks(ctx, in, rawGrpc.WaitForReady(false))

return err
})

return response, err
}

func (i *syncerV1Client) GetStatus(ctx context.Context, in *emptypb.Empty) (*proto.SyncPeerStatus, error) {
return i.clt.GetStatus(ctx, in, rawGrpc.WaitForReady(false))
func (i *syncerV1Client) GetStatus(ctx context.Context, in *emptypb.Empty) (response *proto.SyncPeerStatus, err error) {
i.wrapMetricsCall(methodNameGetStatus, func() error {
response, err = i.clt.GetStatus(ctx, in, rawGrpc.WaitForReady(false))

return err
})

return response, err
}

func (i *syncerV1Client) Close() error {
Expand All @@ -69,13 +153,15 @@ func (i *syncerV1Client) IsClose() bool {

func NewSyncerV1Client(
logger hclog.Logger,
metrics Metrics,
clt proto.V1Client,
conn *rawGrpc.ClientConn,
) SyncerV1Client {
wrapClt := &syncerV1Client{
clt: clt,
conn: conn,
isClosed: atomic.NewBool(false),
metrics: metrics,
}

// print a error log if the client is not closed before GC
Expand Down
6 changes: 4 additions & 2 deletions network/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type networkingServer interface {
// PROTOCOL MANIPULATION //

// NewDiscoveryClient returns a discovery gRPC client connection
NewDiscoveryClient(peerID peer.ID) (client.DiscoveryClient, error)
NewDiscoveryClient(ctx context.Context, peerID peer.ID) (client.DiscoveryClient, error)

// PEER MANIPULATION //

Expand Down Expand Up @@ -400,11 +400,13 @@ func (d *DiscoveryService) findPeersCall(
ctx, cancel := context.WithTimeout(d.ctx, maxDiscoveryPeerReqTimeout)
defer cancel()

clt, clientErr := d.baseServer.NewDiscoveryClient(peerID)
clt, clientErr := d.baseServer.NewDiscoveryClient(ctx, peerID)
if clientErr != nil {
return nil, clientErr
}

defer clt.Close()

resp, err := clt.FindPeers(
ctx,
&proto.FindPeersReq{
Expand Down
2 changes: 1 addition & 1 deletion network/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func TestDiscoveryService_RegularPeerDiscoveryUnconnected(t *testing.T) {
})

// Define the new discovery client creation
server.HookNewDiscoveryClient(func(id peer.ID) (client.DiscoveryClient, error) {
server.HookNewDiscoveryClient(func(ctx context.Context, id peer.ID) (client.DiscoveryClient, error) {
return nil, errors.New("peer is not connected anymore")
})

Expand Down
Loading