Skip to content

Commit

Permalink
Enable gRPC reflection service on collector/query (#3526)
Browse files Browse the repository at this point in the history
  • Loading branch information
yurishkuro authored Feb 13, 2022
1 parent 0b700b8 commit b3534f3
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 28 deletions.
19 changes: 12 additions & 7 deletions cmd/collector/app/server/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/reflection"

"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling"
Expand All @@ -42,14 +43,19 @@ type GRPCServerParams struct {
MaxReceiveMessageLength int
MaxConnectionAge time.Duration
MaxConnectionAgeGrace time.Duration

// Set by the server to indicate the actual host:port of the server.
HostPortActual string
}

// StartGRPCServer based on the given parameters
func StartGRPCServer(params *GRPCServerParams) (*grpc.Server, error) {
var server *grpc.Server
var grpcOpts []grpc.ServerOption

grpcOpts = append(grpcOpts, grpc.MaxRecvMsgSize(params.MaxReceiveMessageLength))
if params.MaxReceiveMessageLength > 0 {
grpcOpts = append(grpcOpts, grpc.MaxRecvMsgSize(params.MaxReceiveMessageLength))
}
grpcOpts = append(grpcOpts, grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionAge: params.MaxConnectionAge,
MaxConnectionAgeGrace: params.MaxConnectionAgeGrace,
Expand All @@ -64,17 +70,16 @@ func StartGRPCServer(params *GRPCServerParams) (*grpc.Server, error) {

creds := credentials.NewTLS(tlsCfg)
grpcOpts = append(grpcOpts, grpc.Creds(creds))

server = grpc.NewServer(grpcOpts...)
} else {
// server without TLS
server = grpc.NewServer(grpcOpts...)
}

server = grpc.NewServer(grpcOpts...)
reflection.Register(server)

listener, err := net.Listen("tcp", params.HostPort)
if err != nil {
return nil, fmt.Errorf("failed to listen on gRPC port: %w", err)
}
params.HostPortActual = listener.Addr().String()

if err := serveGRPC(server, listener, params); err != nil {
return nil, err
Expand All @@ -87,7 +92,7 @@ func serveGRPC(server *grpc.Server, listener net.Listener, params *GRPCServerPar
api_v2.RegisterCollectorServiceServer(server, params.Handler)
api_v2.RegisterSamplingManagerServer(server, sampling.NewGRPCHandler(params.SamplingStore))

params.Logger.Info("Starting jaeger-collector gRPC server", zap.String("grpc.host-port", params.HostPort))
params.Logger.Info("Starting jaeger-collector gRPC server", zap.String("grpc.host-port", params.HostPortActual))
go func() {
if err := server.Serve(listener); err != nil {
params.Logger.Error("Could not launch gRPC service", zap.Error(err))
Expand Down
51 changes: 35 additions & 16 deletions cmd/collector/app/server/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package server

import (
"context"
"net"
"sync"
"testing"

Expand All @@ -30,6 +29,7 @@ import (
"google.golang.org/grpc/test/bufconn"

"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
"github.com/jaegertracing/jaeger/internal/grpctest"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)
Expand Down Expand Up @@ -71,21 +71,19 @@ func TestFailServe(t *testing.T) {
func TestSpanCollector(t *testing.T) {
logger, _ := zap.NewDevelopment()
params := &GRPCServerParams{
Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}),
SamplingStore: &mockSamplingStore{},
Logger: logger,
Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}),
SamplingStore: &mockSamplingStore{},
Logger: logger,
MaxReceiveMessageLength: 1024 * 1024,
}

server := grpc.NewServer()
defer server.Stop()

listener, err := net.Listen("tcp", ":0")
server, err := StartGRPCServer(params)
require.NoError(t, err)
defer listener.Close()

serveGRPC(server, listener, params)
defer server.Stop()

conn, err := grpc.Dial(listener.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.Dial(
params.HostPortActual,
grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
defer conn.Close()

Expand All @@ -98,10 +96,9 @@ func TestSpanCollector(t *testing.T) {
func TestCollectorStartWithTLS(t *testing.T) {
logger, _ := zap.NewDevelopment()
params := &GRPCServerParams{
Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}),
SamplingStore: &mockSamplingStore{},
Logger: logger,
MaxReceiveMessageLength: 8 * 1024 * 1024,
Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}),
SamplingStore: &mockSamplingStore{},
Logger: logger,
TLSConfig: tlscfg.Options{
Enabled: true,
CertPath: testCertKeyLocation + "/example-server-cert.pem",
Expand All @@ -114,3 +111,25 @@ func TestCollectorStartWithTLS(t *testing.T) {
require.NoError(t, err)
defer server.Stop()
}

func TestCollectorReflection(t *testing.T) {
logger, _ := zap.NewDevelopment()
params := &GRPCServerParams{
Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}),
SamplingStore: &mockSamplingStore{},
Logger: logger,
}

server, err := StartGRPCServer(params)
require.NoError(t, err)
defer server.Stop()

grpctest.ReflectionServiceValidator{
HostPort: params.HostPortActual,
Server: server,
ExpectedServices: []string{
"jaeger.api_v2.CollectorService",
"jaeger.api_v2.SamplingManager",
},
}.Execute(t)
}
8 changes: 7 additions & 1 deletion cmd/query/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/reflection"

"github.com/jaegertracing/jaeger/cmd/query/app/apiv3"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
Expand Down Expand Up @@ -119,6 +120,7 @@ func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.
}

server := grpc.NewServer(grpcOpts...)
reflection.Register(server)

handler := &GRPCHandler{
queryService: querySvc,
Expand Down Expand Up @@ -195,7 +197,11 @@ func (s *Server) initListener() (cmux.CMux, error) {
if err != nil {
return nil, err
}
s.logger.Info("Query server started")
s.logger.Info(
"Query server started",
zap.String("http_addr", s.httpConn.Addr().String()),
zap.String("grpc_addr", s.grpcConn.Addr().String()),
)
return nil, nil
}

Expand Down
21 changes: 17 additions & 4 deletions cmd/query/app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

"github.com/jaegertracing/jaeger/cmd/flags"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/internal/grpctest"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
Expand Down Expand Up @@ -688,15 +689,27 @@ func TestServerHandlesPortZero(t *testing.T) {

querySvc := &querysvc.QueryService{}
tracer := opentracing.NoopTracer{}
server, err := NewServer(flagsSvc.Logger, querySvc, nil, &QueryOptions{GRPCHostPort: ":0", HTTPHostPort: ":0"}, tracer)
server, err := NewServer(flagsSvc.Logger, querySvc, nil,
&QueryOptions{GRPCHostPort: ":0", HTTPHostPort: ":0"},
tracer)
assert.Nil(t, err)
assert.NoError(t, server.Start())
server.Close()
defer server.Close()

message := logs.FilterMessage("Query server started")
assert.Equal(t, 1, message.Len(), "Expected query started log message.")
assert.Equal(t, 1, message.Len(), "Expected 'Query server started' log message.")

onlyEntry := message.All()[0]
port := onlyEntry.ContextMap()["port"]
port := onlyEntry.ContextMap()["port"].(int64)
assert.Greater(t, port, int64(0))

grpctest.ReflectionServiceValidator{
HostPort: fmt.Sprintf(":%v", port),
Server: server.grpcServer,
ExpectedServices: []string{
"jaeger.api_v2.QueryService",
"jaeger.api_v3.QueryService",
"jaeger.api_v2.metrics.MetricsQueryService",
},
}.Execute(t)
}
71 changes: 71 additions & 0 deletions internal/grpctest/reflection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (c) 2022 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package grpctest

import (
"context"
"testing"

"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
)

// ReflectionServiceValidator verifies that a gRPC service at a given address
// supports reflection service. Called must invoke Execute func.
type ReflectionServiceValidator struct {
Server *grpc.Server
HostPort string
ExpectedServices []string
}

// Execute performs validation.
func (v ReflectionServiceValidator) Execute(t *testing.T) {
conn, err := grpc.Dial(
v.HostPort,
grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
defer conn.Close()

client := grpc_reflection_v1alpha.NewServerReflectionClient(conn)
r, err := client.ServerReflectionInfo(context.Background())
require.NoError(t, err)
require.NotNil(t, r)

err = r.Send(&grpc_reflection_v1alpha.ServerReflectionRequest{
MessageRequest: &grpc_reflection_v1alpha.ServerReflectionRequest_ListServices{},
})
require.NoError(t, err)
m, err := r.Recv()
require.NoError(t, err)
require.IsType(t,
new(grpc_reflection_v1alpha.ServerReflectionResponse_ListServicesResponse),
m.MessageResponse)

resp := m.MessageResponse.(*grpc_reflection_v1alpha.ServerReflectionResponse_ListServicesResponse)
for _, svc := range v.ExpectedServices {
var found string
for _, s := range resp.ListServicesResponse.Service {
if svc == s.Name {
found = s.Name
break
}
}
require.Equalf(t, svc, found,
"service not found, got '%+v'",
resp.ListServicesResponse.Service)
}
}
45 changes: 45 additions & 0 deletions internal/grpctest/reflection_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright (c) 2022 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package grpctest

import (
"net"
"testing"

"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)

func TestReflectionServiceValidator(t *testing.T) {
server := grpc.NewServer()
reflection.Register(server)

listener, err := net.Listen("tcp", ":0")
require.NoError(t, err)
defer listener.Close()

go func() {
err := server.Serve(listener)
require.NoError(t, err)
}()
defer server.Stop()

ReflectionServiceValidator{
HostPort: listener.Addr().String(),
Server: server,
ExpectedServices: []string{"grpc.reflection.v1alpha.ServerReflection"},
}.Execute(t)
}

0 comments on commit b3534f3

Please sign in to comment.