Skip to content

Commit

Permalink
Add bearer token forward to the GRPC storage plugin
Browse files Browse the repository at this point in the history
Signed-off-by: radekg <radek@gruchalski.com>
  • Loading branch information
radekg committed Sep 28, 2019
1 parent 757f2b9 commit 9ce0549
Show file tree
Hide file tree
Showing 7 changed files with 383 additions and 81 deletions.
7 changes: 4 additions & 3 deletions plugin/storage/grpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"os/exec"
"runtime"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
hclog "github.com/hashicorp/go-hclog"
plugin "github.com/hashicorp/go-plugin"

"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
)
Expand All @@ -29,6 +29,7 @@ import (
type Configuration struct {
PluginBinary string `yaml:"binary"`
PluginConfigurationFile string `yaml:"configuration-file"`
AllowTokenFromContext bool `yaml:"allow-token-from-context"`
}

// Build instantiates a StoragePlugin
Expand All @@ -39,7 +40,7 @@ func (c *Configuration) Build() (shared.StoragePlugin, error) {
client := plugin.NewClient(&plugin.ClientConfig{
HandshakeConfig: shared.Handshake,
VersionedPlugins: map[int]plugin.PluginSet{
1: shared.PluginMap,
1: shared.GetPluginMap(c.AllowTokenFromContext),
},
Cmd: cmd,
AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
Expand Down
3 changes: 3 additions & 0 deletions plugin/storage/grpc/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package grpc
import (
"flag"

"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/plugin/storage/grpc/config"
Expand All @@ -41,4 +42,6 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
func (opt *Options) InitFromViper(v *viper.Viper) {
opt.Configuration.PluginBinary = v.GetString(pluginBinary)
opt.Configuration.PluginConfigurationFile = v.GetString(pluginConfigurationFile)
// TODO: Need to figure out a better way for do this. (same as ElasticSearch storage)
opt.Configuration.AllowTokenFromContext = v.GetBool(spanstore.StoragePropagationKey)
}
8 changes: 7 additions & 1 deletion plugin/storage/grpc/proto/storage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,20 @@ message GetTraceRequest {
(gogoproto.customtype) = "github.com/jaegertracing/jaeger/model.TraceID",
(gogoproto.customname) = "TraceID"
];
string bearerToken = 2;
}

message GetServicesRequest {}
message GetServicesRequest {
string bearerToken = 1;
}

message GetServicesResponse {
repeated string services = 1;
}

message GetOperationsRequest {
string service = 1;
string bearerToken = 2;
}

message GetOperationsResponse {
Expand Down Expand Up @@ -107,6 +111,7 @@ message TraceQueryParameters {

message FindTracesRequest {
TraceQueryParameters query = 1;
string bearerToken = 2;
}

message SpansResponseChunk {
Expand All @@ -117,6 +122,7 @@ message SpansResponseChunk {

message FindTraceIDsRequest {
TraceQueryParameters query = 1;
string bearerToken = 2;
}

message FindTraceIDsResponse {
Expand Down
30 changes: 24 additions & 6 deletions plugin/storage/grpc/shared/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,20 @@ import (

// grpcClient implements shared.StoragePlugin and reads/writes spans and dependencies
type grpcClient struct {
readerClient storage_v1.SpanReaderPluginClient
writerClient storage_v1.SpanWriterPluginClient
depsReaderClient storage_v1.DependenciesReaderPluginClient
allowTokenFromContext bool
readerClient storage_v1.SpanReaderPluginClient
writerClient storage_v1.SpanWriterPluginClient
depsReaderClient storage_v1.DependenciesReaderPluginClient
}

// getBearerTokenToForward determines is there is a bearer token to forward
// and returns it only when it, only when bearer token forward is allowed.
func (c *grpcClient) getBearerTokenToForward(ctx context.Context) string {
ctxBearerToken, hasToken := spanstore.GetBearerToken(ctx)
if hasToken && c.allowTokenFromContext {
return ctxBearerToken
}
return ""
}

// DependencyReader implements shared.StoragePlugin.
Expand All @@ -51,8 +62,10 @@ func (c *grpcClient) SpanWriter() spanstore.Writer {

// GetTrace takes a traceID and returns a Trace associated with that traceID
func (c *grpcClient) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) {

stream, err := c.readerClient.GetTrace(ctx, &storage_v1.GetTraceRequest{
TraceID: traceID,
TraceID: traceID,
BearerToken: c.getBearerTokenToForward(ctx),
})
if err != nil {
return nil, errors.Wrap(err, "plugin error")
Expand All @@ -74,7 +87,9 @@ func (c *grpcClient) GetTrace(ctx context.Context, traceID model.TraceID) (*mode

// GetServices returns a list of all known services
func (c *grpcClient) GetServices(ctx context.Context) ([]string, error) {
resp, err := c.readerClient.GetServices(ctx, &storage_v1.GetServicesRequest{})
resp, err := c.readerClient.GetServices(ctx, &storage_v1.GetServicesRequest{
BearerToken: c.getBearerTokenToForward(ctx),
})
if err != nil {
return nil, errors.Wrap(err, "plugin error")
}
Expand All @@ -85,7 +100,8 @@ func (c *grpcClient) GetServices(ctx context.Context) ([]string, error) {
// GetOperations returns the operations of a given service
func (c *grpcClient) GetOperations(ctx context.Context, service string) ([]string, error) {
resp, err := c.readerClient.GetOperations(ctx, &storage_v1.GetOperationsRequest{
Service: service,
Service: service,
BearerToken: c.getBearerTokenToForward(ctx),
})
if err != nil {
return nil, errors.Wrap(err, "plugin error")
Expand All @@ -107,6 +123,7 @@ func (c *grpcClient) FindTraces(ctx context.Context, query *spanstore.TraceQuery
DurationMax: query.DurationMax,
NumTraces: int32(query.NumTraces),
},
BearerToken: c.getBearerTokenToForward(ctx),
})
if err != nil {
return nil, errors.Wrap(err, "plugin error")
Expand Down Expand Up @@ -145,6 +162,7 @@ func (c *grpcClient) FindTraceIDs(ctx context.Context, query *spanstore.TraceQue
DurationMax: query.DurationMax,
NumTraces: int32(query.NumTraces),
},
BearerToken: c.getBearerTokenToForward(ctx),
})
if err != nil {
return nil, errors.Wrap(err, "plugin error")
Expand Down
26 changes: 21 additions & 5 deletions plugin/storage/grpc/shared/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,17 @@ type grpcServer struct {
Impl StoragePlugin
}

// getMaybeBearerTokenContext returns a context with bearer token, if token string is not empty
// the assumption is that if the token has arrived on the wire, the grpcClient
// verified that bearer token forward was enabled.
// If token is empty, returns original context.
func (s *grpcServer) getMaybeBearerTokenContext(ctx context.Context, token string) context.Context {
if token != "" {
return spanstore.ContextWithBearerToken(ctx, token)
}
return ctx
}

// GetDependencies returns all interservice dependencies
func (s *grpcServer) GetDependencies(ctx context.Context, r *storage_v1.GetDependenciesRequest) (*storage_v1.GetDependenciesResponse, error) {
deps, err := s.Impl.DependencyReader().GetDependencies(r.EndTime, r.EndTime.Sub(r.StartTime))
Expand All @@ -53,7 +64,8 @@ func (s *grpcServer) WriteSpan(ctx context.Context, r *storage_v1.WriteSpanReque

// GetTrace takes a traceID and streams a Trace associated with that traceID
func (s *grpcServer) GetTrace(r *storage_v1.GetTraceRequest, stream storage_v1.SpanReaderPlugin_GetTraceServer) error {
trace, err := s.Impl.SpanReader().GetTrace(stream.Context(), r.TraceID)
opCtx := s.getMaybeBearerTokenContext(stream.Context(), r.BearerToken)
trace, err := s.Impl.SpanReader().GetTrace(opCtx, r.TraceID)
if err != nil {
return err
}
Expand All @@ -68,7 +80,8 @@ func (s *grpcServer) GetTrace(r *storage_v1.GetTraceRequest, stream storage_v1.S

// GetServices returns a list of all known services
func (s *grpcServer) GetServices(ctx context.Context, r *storage_v1.GetServicesRequest) (*storage_v1.GetServicesResponse, error) {
services, err := s.Impl.SpanReader().GetServices(ctx)
opCtx := s.getMaybeBearerTokenContext(ctx, r.BearerToken)
services, err := s.Impl.SpanReader().GetServices(opCtx)
if err != nil {
return nil, err
}
Expand All @@ -79,7 +92,8 @@ func (s *grpcServer) GetServices(ctx context.Context, r *storage_v1.GetServicesR

// GetOperations returns the operations of a given service
func (s *grpcServer) GetOperations(ctx context.Context, r *storage_v1.GetOperationsRequest) (*storage_v1.GetOperationsResponse, error) {
operations, err := s.Impl.SpanReader().GetOperations(ctx, r.Service)
opCtx := s.getMaybeBearerTokenContext(ctx, r.BearerToken)
operations, err := s.Impl.SpanReader().GetOperations(opCtx, r.Service)
if err != nil {
return nil, err
}
Expand All @@ -90,7 +104,8 @@ func (s *grpcServer) GetOperations(ctx context.Context, r *storage_v1.GetOperati

// FindTraces streams traces that match the traceQuery
func (s *grpcServer) FindTraces(r *storage_v1.FindTracesRequest, stream storage_v1.SpanReaderPlugin_FindTracesServer) error {
traces, err := s.Impl.SpanReader().FindTraces(stream.Context(), &spanstore.TraceQueryParameters{
opCtx := s.getMaybeBearerTokenContext(stream.Context(), r.BearerToken)
traces, err := s.Impl.SpanReader().FindTraces(opCtx, &spanstore.TraceQueryParameters{
ServiceName: r.Query.ServiceName,
OperationName: r.Query.OperationName,
Tags: r.Query.Tags,
Expand All @@ -116,7 +131,8 @@ func (s *grpcServer) FindTraces(r *storage_v1.FindTracesRequest, stream storage_

// FindTraceIDs retrieves traceIDs that match the traceQuery
func (s *grpcServer) FindTraceIDs(ctx context.Context, r *storage_v1.FindTraceIDsRequest) (*storage_v1.FindTraceIDsResponse, error) {
traceIDs, err := s.Impl.SpanReader().FindTraceIDs(ctx, &spanstore.TraceQueryParameters{
opCtx := s.getMaybeBearerTokenContext(ctx, r.BearerToken)
traceIDs, err := s.Impl.SpanReader().FindTraceIDs(opCtx, &spanstore.TraceQueryParameters{
ServiceName: r.Query.ServiceName,
OperationName: r.Query.OperationName,
Tags: r.Query.Tags,
Expand Down
22 changes: 14 additions & 8 deletions plugin/storage/grpc/shared/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package shared
import (
"context"

"github.com/hashicorp/go-plugin"
plugin "github.com/hashicorp/go-plugin"
"google.golang.org/grpc"

"github.com/jaegertracing/jaeger/proto-gen/storage_v1"
Expand All @@ -34,9 +34,13 @@ var Handshake = plugin.HandshakeConfig{
MagicCookieValue: "jaeger",
}

// PluginMap is the map of plugins we can dispense.
var PluginMap = map[string]plugin.Plugin{
StoragePluginIdentifier: &StorageGRPCPlugin{},
// GetPluginMap returns a plugin map.
func GetPluginMap(allowTokenFromContext bool) map[string]plugin.Plugin {
return map[string]plugin.Plugin{
StoragePluginIdentifier: &StorageGRPCPlugin{
allowTokenFromContext: allowTokenFromContext,
},
}
}

// StoragePlugin is the interface we're exposing as a plugin.
Expand All @@ -48,6 +52,7 @@ type StoragePlugin interface {

// StorageGRPCPlugin is the implementation of plugin.GRPCPlugin so we can serve/consume this.
type StorageGRPCPlugin struct {
allowTokenFromContext bool
plugin.Plugin
// Concrete implementation, written in Go. This is only used for plugins
// that are written in Go.
Expand All @@ -64,10 +69,11 @@ func (p *StorageGRPCPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server
}

// GRPCClient is used by go-plugin to create a grpc plugin client
func (*StorageGRPCPlugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) {
func (p *StorageGRPCPlugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) {
return &grpcClient{
readerClient: storage_v1.NewSpanReaderPluginClient(c),
writerClient: storage_v1.NewSpanWriterPluginClient(c),
depsReaderClient: storage_v1.NewDependenciesReaderPluginClient(c),
allowTokenFromContext: p.allowTokenFromContext,
readerClient: storage_v1.NewSpanReaderPluginClient(c),
writerClient: storage_v1.NewSpanWriterPluginClient(c),
depsReaderClient: storage_v1.NewDependenciesReaderPluginClient(c),
}, nil
}
Loading

0 comments on commit 9ce0549

Please sign in to comment.