Skip to content

Commit

Permalink
Merge pull request #1066 from MakMukhi/peer_calloption
Browse files Browse the repository at this point in the history
Added calloption to retrieve peer information
  • Loading branch information
menghanl authored Feb 8, 2017
2 parents 2a6bf61 + dfb494c commit 9b791e0
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 0 deletions.
4 changes: 4 additions & 0 deletions call.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"golang.org/x/net/context"
"golang.org/x/net/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/transport"
)
Expand Down Expand Up @@ -85,6 +86,9 @@ func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTran
dopts.copts.StatsHandler.HandleRPC(ctx, inPayload)
}
c.trailerMD = stream.Trailer()
if peer, ok := peer.FromContext(stream.Context()); ok {
c.peer = peer
}
return nil
}

Expand Down
10 changes: 10 additions & 0 deletions rpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/transport"
)
Expand Down Expand Up @@ -140,6 +141,7 @@ type callInfo struct {
failFast bool
headerMD metadata.MD
trailerMD metadata.MD
peer *peer.Peer
traceInfo traceInfo // in trace.go
}

Expand Down Expand Up @@ -183,6 +185,14 @@ func Trailer(md *metadata.MD) CallOption {
})
}

// Peer returns a CallOption that retrieves peer information for a
// unary RPC.
func Peer(peer *peer.Peer) CallOption {
return afterCall(func(c *callInfo) {
*peer = *c.peer
})
}

// FailFast configures the action to take when an RPC is attempted on broken
// connections or unreachable servers. If failfast is true, the RPC will fail
// immediately. Otherwise, the RPC client will block the call until a
Expand Down
37 changes: 37 additions & 0 deletions test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1440,6 +1440,43 @@ func testExceedMsgLimit(t *testing.T, e env) {
}
}

func TestPeerClientSide(t *testing.T) {
defer leakCheck(t)()
for _, e := range listTestEnv() {
testPeerClientSide(t, e)
}
}

func testPeerClientSide(t *testing.T, e env) {
te := newTest(t, e)
te.userAgent = testAppUA
te.startServer(&testServer{security: e.security})
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
peer := new(peer.Peer)
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(peer), grpc.FailFast(false)); err != nil {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
}
pa := peer.Addr.String()
if e.network == "unix" {
if pa != te.srvAddr {
t.Fatalf("peer.Addr = %v, want %v", pa, te.srvAddr)
}
return
}
_, pp, err := net.SplitHostPort(pa)
if err != nil {
t.Fatalf("Failed to parse address from peer.")
}
_, sp, err := net.SplitHostPort(te.srvAddr)
if err != nil {
t.Fatalf("Failed to parse address of test server.")
}
if pp != sp {
t.Fatalf("peer.Addr = localhost:%v, want localhost:%v", pp, sp)
}
}

func TestMetadataUnaryRPC(t *testing.T) {
defer leakCheck(t)()
for _, e := range listTestEnv() {
Expand Down

0 comments on commit 9b791e0

Please sign in to comment.