Skip to content

Commit

Permalink
Include remote address, port in Request, Call
Browse files Browse the repository at this point in the history
This allows the application to log the remote address for a connection.
It is primarily aimed at debugging and audit use cases. One can imagine
it can be injected to a logging context, for example.

e.g.
```
peerAddrPort := yarpc.CallFromContext(ctx).CallerPeerAddrPort().String()
```
  • Loading branch information
jquirke committed Apr 10, 2024
1 parent d33ff85 commit 7154419
Show file tree
Hide file tree
Showing 20 changed files with 633 additions and 324 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

=======
## [Unreleased]
- No changes yet.
- api: expose `CallerPeerAddrPort` field in the transport request. Middleware and the server handler
can then utilise this to implement audit/logging/debugging.

## [1.72.1] - 2024-03-14
- tchannel: Renamed caller-procedure header from `$rpc$-caller-procedure` to `rpc-caller-procedure`.
Expand Down
9 changes: 9 additions & 0 deletions api/encoding/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package encoding

import (
"context"
"net/netip"
"sort"

"go.uber.org/yarpc/api/transport"
Expand Down Expand Up @@ -172,3 +173,11 @@ func (c *Call) CallerProcedure() string {
}
return c.md.CallerProcedure()
}

// CallerProcedure returns the remote network address making the request
func (c *Call) CallerPeerAddrPort() netip.AddrPort {
if c == nil {
return netip.AddrPort{}
}
return c.md.CallerPeerAddrPort()
}
64 changes: 36 additions & 28 deletions api/encoding/call_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package encoding

import (
"context"
"net/netip"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -42,6 +43,7 @@ func TestNilCall(t *testing.T) {
assert.Equal(t, "", call.RoutingKey())
assert.Equal(t, "", call.RoutingDelegate())
assert.Equal(t, "", call.CallerProcedure())
assert.Zero(t, call.CallerPeerAddrPort().Compare(netip.AddrPort{}))
assert.Equal(t, "", call.Header("foo"))
assert.Empty(t, call.HeaderNames())
assert.Nil(t, call.OriginalHeaders())
Expand All @@ -52,15 +54,16 @@ func TestNilCall(t *testing.T) {
func TestReadFromRequest(t *testing.T) {
ctx, icall := NewInboundCall(context.Background())
icall.ReadFromRequest(&transport.Request{
Service: "service",
Transport: "transport",
Caller: "caller",
Encoding: transport.Encoding("raw"),
Procedure: "proc",
ShardKey: "sk",
RoutingKey: "rk",
RoutingDelegate: "rd",
CallerProcedure: "cp",
Service: "service",
Transport: "transport",
Caller: "caller",
Encoding: transport.Encoding("raw"),
Procedure: "proc",
ShardKey: "sk",
RoutingKey: "rk",
RoutingDelegate: "rd",
CallerProcedure: "cp",
CallerPeerAddrPort: netip.MustParseAddrPort("1.2.3.4:1234"),
// later header's key/value takes precedence
Headers: transport.NewHeaders().With("Foo", "Bar").With("foo", "bar"),
})
Expand All @@ -78,6 +81,7 @@ func TestReadFromRequest(t *testing.T) {
assert.Equal(t, "bar", call.Header("foo"))
assert.Equal(t, map[string]string{"Foo": "Bar", "foo": "bar"}, call.OriginalHeaders())
assert.Equal(t, "cp", call.CallerProcedure())
assert.Zero(t, netip.MustParseAddrPort("1.2.3.4:1234").Compare(call.CallerPeerAddrPort()))
assert.Len(t, call.HeaderNames(), 1)

assert.NoError(t, call.WriteResponseHeader("foo2", "bar2"))
Expand All @@ -88,15 +92,16 @@ func TestReadFromRequest(t *testing.T) {
func TestReadFromRequestMeta(t *testing.T) {
ctx, icall := NewInboundCall(context.Background())
icall.ReadFromRequestMeta(&transport.RequestMeta{
Service: "service",
Caller: "caller",
Transport: "transport",
Encoding: transport.Encoding("raw"),
Procedure: "proc",
ShardKey: "sk",
RoutingKey: "rk",
RoutingDelegate: "rd",
CallerProcedure: "cp",
Service: "service",
Caller: "caller",
Transport: "transport",
Encoding: transport.Encoding("raw"),
Procedure: "proc",
ShardKey: "sk",
RoutingKey: "rk",
RoutingDelegate: "rd",
CallerProcedure: "cp",
CallerPeerAddrPort: netip.MustParseAddrPort("1.2.3.4:1234"),
// later header's key/value takes precedence
Headers: transport.NewHeaders().With("Foo", "Bar").With("foo", "bar"),
})
Expand All @@ -112,6 +117,7 @@ func TestReadFromRequestMeta(t *testing.T) {
assert.Equal(t, "rk", call.RoutingKey())
assert.Equal(t, "rd", call.RoutingDelegate())
assert.Equal(t, "cp", call.CallerProcedure())
assert.Zero(t, netip.MustParseAddrPort("1.2.3.4:1234").Compare(call.CallerPeerAddrPort()))
assert.Equal(t, "bar", call.Header("foo"))
assert.Equal(t, map[string]string{"Foo": "Bar", "foo": "bar"}, call.OriginalHeaders())
assert.Len(t, call.HeaderNames(), 1)
Expand All @@ -124,16 +130,17 @@ func TestReadFromRequestMeta(t *testing.T) {
func TestDisabledResponseHeaders(t *testing.T) {
ctx, icall := NewInboundCallWithOptions(context.Background(), DisableResponseHeaders())
icall.ReadFromRequest(&transport.Request{
Service: "service",
Transport: "transport",
Caller: "caller",
Encoding: transport.Encoding("raw"),
Procedure: "proc",
ShardKey: "sk",
RoutingKey: "rk",
RoutingDelegate: "rd",
CallerProcedure: "cp",
Headers: transport.NewHeaders().With("foo", "bar"),
Service: "service",
Transport: "transport",
Caller: "caller",
Encoding: transport.Encoding("raw"),
Procedure: "proc",
ShardKey: "sk",
RoutingKey: "rk",
RoutingDelegate: "rd",
CallerProcedure: "cp",
CallerPeerAddrPort: netip.MustParseAddrPort("1.2.3.4:1234"),
Headers: transport.NewHeaders().With("foo", "bar"),
})
call := CallFromContext(ctx)
require.NotNil(t, call)
Expand All @@ -147,6 +154,7 @@ func TestDisabledResponseHeaders(t *testing.T) {
assert.Equal(t, "rk", call.RoutingKey())
assert.Equal(t, "rd", call.RoutingDelegate())
assert.Equal(t, "cp", call.CallerProcedure())
assert.Zero(t, netip.MustParseAddrPort("1.2.3.4:1234").Compare(call.CallerPeerAddrPort()))
assert.Equal(t, "bar", call.Header("foo"))
assert.Len(t, call.HeaderNames(), 1)

Expand Down
5 changes: 5 additions & 0 deletions api/encoding/inbound_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package encoding

import (
"context"
"net/netip"

"go.uber.org/yarpc/api/transport"
"go.uber.org/yarpc/internal/inboundcall"
Expand Down Expand Up @@ -157,6 +158,10 @@ func (ic *inboundCallMetadata) CallerProcedure() string {
return ic.req.CallerProcedure
}

func (ic *inboundCallMetadata) CallerPeerAddrPort() netip.AddrPort {
return ic.req.CallerPeerAddrPort
}

func (ic *inboundCallMetadata) WriteResponseHeader(k, v string) error {
if ic.disableResponseHeaders {
return yarpcerrors.InvalidArgumentErrorf("call does not support setting response headers")
Expand Down
11 changes: 7 additions & 4 deletions api/encoding/inbound_call_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package encoding

import (
"context"
"net/netip"
"sort"
"testing"

Expand All @@ -43,10 +44,11 @@ func TestInboundCallReadFromRequest(t *testing.T) {
"Foo": "bar",
"success": "true",
}),
ShardKey: "shardKey",
RoutingKey: "routingKey",
RoutingDelegate: "routingDelegate",
CallerProcedure: "callerProcedure",
ShardKey: "shardKey",
RoutingKey: "routingKey",
RoutingDelegate: "routingDelegate",
CallerProcedure: "callerProcedure",
CallerPeerAddrPort: netip.MustParseAddrPort("1.2.3.4:1234"),
})
require.NoError(t, err)

Expand All @@ -59,6 +61,7 @@ func TestInboundCallReadFromRequest(t *testing.T) {
assert.Equal(t, "routingKey", call.RoutingKey())
assert.Equal(t, "routingDelegate", call.RoutingDelegate())
assert.Equal(t, "callerProcedure", call.CallerProcedure())
assert.Zero(t, netip.MustParseAddrPort("1.2.3.4:1234").Compare(call.CallerPeerAddrPort()))
assert.Equal(t, "World", call.Header("Hello"))
assert.Equal(t, "bar", call.Header("FOO"))
assert.Equal(t, "true", call.Header("success"))
Expand Down
55 changes: 35 additions & 20 deletions api/transport/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package transport
import (
"context"
"io"
"net/netip"
"strings"

"go.uber.org/yarpc/yarpcerrors"
Expand Down Expand Up @@ -75,21 +76,27 @@ type Request struct {
// content-length. It should be noted that this value is set manually and
// will not be updated automatically if the body is being modified
BodySize int

// The network address of the remote peer. There is no guarantee that this
// information will be available. Consumers should check the result of
// calling IsValid() before taking any action.
CallerPeerAddrPort netip.AddrPort
}

// ToRequestMeta converts a Request into a RequestMeta.
func (r *Request) ToRequestMeta() *RequestMeta {
return &RequestMeta{
Caller: r.Caller,
Service: r.Service,
Transport: r.Transport,
Encoding: r.Encoding,
Procedure: r.Procedure,
Headers: r.Headers,
ShardKey: r.ShardKey,
RoutingKey: r.RoutingKey,
RoutingDelegate: r.RoutingDelegate,
CallerProcedure: r.CallerProcedure,
Caller: r.Caller,
Service: r.Service,
Transport: r.Transport,
Encoding: r.Encoding,
Procedure: r.Procedure,
Headers: r.Headers,
ShardKey: r.ShardKey,
RoutingKey: r.RoutingKey,
RoutingDelegate: r.RoutingDelegate,
CallerProcedure: r.CallerProcedure,
CallerPeerAddrPort: r.CallerPeerAddrPort,
}
}

Expand All @@ -105,6 +112,8 @@ func (r *Request) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddString("routingKey", r.RoutingKey)
enc.AddString("routingDelegate", r.RoutingDelegate)
enc.AddString("callerProcedure", r.CallerProcedure)
enc.AddString("callerPeerAddrPort", r.CallerPeerAddrPort.String())

return nil
}

Expand Down Expand Up @@ -198,6 +207,11 @@ type RequestMeta struct {

// CallerProcedure refers to the name of the rpc procedure of the service making this request.
CallerProcedure string

// CalerPeerAddrPort refers to network address of the remote peer.
// There is no guarantee that this information will be available.
// Consumers should check the result of calling IsValid() before taking any action.
CallerPeerAddrPort netip.AddrPort
}

// ToRequest converts a RequestMeta into a Request.
Expand All @@ -206,15 +220,16 @@ func (r *RequestMeta) ToRequest() *Request {
return &Request{}
}
return &Request{
Caller: r.Caller,
Service: r.Service,
Transport: r.Transport,
Encoding: r.Encoding,
Procedure: r.Procedure,
Headers: r.Headers,
ShardKey: r.ShardKey,
RoutingKey: r.RoutingKey,
RoutingDelegate: r.RoutingDelegate,
CallerProcedure: r.CallerProcedure,
Caller: r.Caller,
Service: r.Service,
Transport: r.Transport,
Encoding: r.Encoding,
Procedure: r.Procedure,
Headers: r.Headers,
ShardKey: r.ShardKey,
RoutingKey: r.RoutingKey,
RoutingDelegate: r.RoutingDelegate,
CallerProcedure: r.CallerProcedure,
CallerPeerAddrPort: r.CallerPeerAddrPort,
}
}
64 changes: 34 additions & 30 deletions api/transport/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package transport_test

import (
"context"
"net/netip"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -127,45 +128,48 @@ func TestValidator(t *testing.T) {

func TestRequestLogMarshaling(t *testing.T) {
r := &transport.Request{
Caller: "caller",
Service: "service",
Transport: "transport",
Encoding: "raw",
Procedure: "procedure",
Headers: transport.NewHeaders().With("password", "super-secret"),
ShardKey: "shard01",
RoutingKey: "routing-key",
RoutingDelegate: "routing-delegate",
CallerProcedure: "caller-procedure",
Body: strings.NewReader("body"),
Caller: "caller",
Service: "service",
Transport: "transport",
Encoding: "raw",
Procedure: "procedure",
Headers: transport.NewHeaders().With("password", "super-secret"),
ShardKey: "shard01",
RoutingKey: "routing-key",
RoutingDelegate: "routing-delegate",
CallerProcedure: "caller-procedure",
Body: strings.NewReader("body"),
CallerPeerAddrPort: netip.MustParseAddrPort("1.2.3.4:1234"),
}
enc := zapcore.NewMapObjectEncoder()
assert.NoError(t, r.MarshalLogObject(enc), "Unexpected error marshaling request.")
assert.Equal(t, map[string]interface{}{
"caller": "caller",
"service": "service",
"transport": "transport",
"encoding": "raw",
"procedure": "procedure",
"shardKey": "shard01",
"routingKey": "routing-key",
"routingDelegate": "routing-delegate",
"callerProcedure": "caller-procedure",
"caller": "caller",
"service": "service",
"transport": "transport",
"encoding": "raw",
"procedure": "procedure",
"shardKey": "shard01",
"routingKey": "routing-key",
"routingDelegate": "routing-delegate",
"callerProcedure": "caller-procedure",
"callerPeerAddrPort": "1.2.3.4:1234",
}, enc.Fields, "Unexpected output after marshaling request.")
}

func TestRequestMetaToRequestConversionAndBack(t *testing.T) {
reqMeta := &transport.RequestMeta{
Caller: "caller",
Service: "service",
Transport: "transport",
Encoding: "raw",
Procedure: "hello",
Headers: transport.NewHeaders().With("key", "val"),
ShardKey: "shard",
RoutingKey: "rk",
RoutingDelegate: "rd",
CallerProcedure: "cp",
Caller: "caller",
Service: "service",
Transport: "transport",
Encoding: "raw",
Procedure: "hello",
Headers: transport.NewHeaders().With("key", "val"),
ShardKey: "shard",
RoutingKey: "rk",
RoutingDelegate: "rd",
CallerProcedure: "cp",
CallerPeerAddrPort: netip.MustParseAddrPort("1.2.3.4:1234"),
}

req := reqMeta.ToRequest()
Expand Down
Loading

0 comments on commit 7154419

Please sign in to comment.