From e02e579941dce22b5af1ed67046e6affe3a3d74e Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Sat, 24 Aug 2019 16:07:03 -0400 Subject: [PATCH 1/6] Add unit test for gRPC over cmux Signed-off-by: Yuri Shkuro --- Gopkg.lock | 2 ++ cmd/query/app/grpc_handler_test.go | 9 +++---- cmd/query/app/server.go | 7 +++++- cmd/query/app/server_test.go | 39 +++++++++++++++++++++++++++--- 4 files changed, 47 insertions(+), 10 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index cbdf41f0478..8515273a9bc 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1281,6 +1281,8 @@ "github.com/bsm/sarama-cluster", "github.com/crossdock/crossdock-go", "github.com/dgraph-io/badger", + "github.com/dgraph-io/badger/options", + "github.com/fsnotify/fsnotify", "github.com/go-openapi/errors", "github.com/go-openapi/loads", "github.com/go-openapi/runtime", diff --git a/cmd/query/app/grpc_handler_test.go b/cmd/query/app/grpc_handler_test.go index 0ffc1944b53..3235a028005 100644 --- a/cmd/query/app/grpc_handler_test.go +++ b/cmd/query/app/grpc_handler_test.go @@ -40,7 +40,6 @@ import ( ) var ( - grpcServerPort = ":0" errStorageMsgGRPC = "Storage error" errStorageGRPC = errors.New(errStorageMsgGRPC) errStatusStorageGRPC = status.Error(2, errStorageMsgGRPC) @@ -138,7 +137,7 @@ type grpcClient struct { } func newGRPCServer(t *testing.T, q *querysvc.QueryService, logger *zap.Logger, tracer opentracing.Tracer) (*grpc.Server, net.Addr) { - lis, _ := net.Listen("tcp", grpcServerPort) + lis, _ := net.Listen("tcp", ":0") grpcServer := grpc.NewServer() grpcHandler := NewGRPCHandler(q, logger, tracer) api_v2.RegisterQueryServiceServer(grpcServer, grpcHandler) @@ -151,8 +150,8 @@ func newGRPCServer(t *testing.T, q *querysvc.QueryService, logger *zap.Logger, t return grpcServer, lis.Addr() } -func newGRPCClient(t *testing.T, addr net.Addr) *grpcClient { - conn, err := grpc.Dial(addr.String(), grpc.WithInsecure()) +func newGRPCClient(t *testing.T, addr string) *grpcClient { + conn, err := grpc.Dial(addr, grpc.WithInsecure()) require.NoError(t, err) return &grpcClient{ @@ -192,7 +191,7 @@ func initializeTestServerGRPCWithOptions(t *testing.T) *grpcServer { func withServerAndClient(t *testing.T, actualTest func(server *grpcServer, client *grpcClient)) { server := initializeTestServerGRPCWithOptions(t) - client := newGRPCClient(t, server.lisAddr) + client := newGRPCClient(t, server.lisAddr.String()) defer server.server.Stop() defer client.conn.Close() diff --git a/cmd/query/app/server.go b/cmd/query/app/server.go index 5d2841aac74..4a007f431b4 100644 --- a/cmd/query/app/server.go +++ b/cmd/query/app/server.go @@ -104,7 +104,12 @@ func (s *Server) Start() error { grpcListener := cmuxServer.Match( cmux.HTTP2HeaderField("content-type", "application/grpc"), - cmux.HTTP2HeaderField("content-type", "application/grpc+proto")) + cmux.HTTP2HeaderField("content-type", "application/grpc+proto"), + ) + // grpcListener := cmuxServer.MatchWithWriters( + // cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"), + // cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc+proto"), + // ) httpListener := cmuxServer.Match(cmux.Any()) go func() { diff --git a/cmd/query/app/server_test.go b/cmd/query/app/server_test.go index 3cde1d21bbc..d609a9f48f7 100644 --- a/cmd/query/app/server_test.go +++ b/cmd/query/app/server_test.go @@ -15,6 +15,7 @@ package app import ( + "context" "fmt" "testing" "time" @@ -28,6 +29,9 @@ import ( "github.com/jaegertracing/jaeger/cmd/query/app/querysvc" "github.com/jaegertracing/jaeger/pkg/healthcheck" "github.com/jaegertracing/jaeger/ports" + "github.com/jaegertracing/jaeger/proto-gen/api_v2" + depsmocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks" + spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" ) func TestServerError(t *testing.T) { @@ -43,13 +47,40 @@ func TestServer(t *testing.T) { flagsSvc := flags.NewService(ports.AgentAdminHTTP) flagsSvc.Logger = zap.NewNop() - querySvc := &querysvc.QueryService{} + spanReader := &spanstoremocks.Reader{} + dependencyReader := &depsmocks.Reader{} + + querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{}) + tracer := opentracing.NoopTracer{} - server := NewServer(flagsSvc, querySvc, &QueryOptions{Port: ports.QueryAdminHTTP, - BearerTokenPropagation: true}, tracer) + server := NewServer(flagsSvc, querySvc, + &QueryOptions{Port: ports.QueryAdminHTTP, BearerTokenPropagation: true}, + tracer) assert.NoError(t, server.Start()) + client := newGRPCClient(t, fmt.Sprintf(":%d", ports.QueryHTTP)) + defer client.conn.Close() + + var queryErr error + for i := 0; i < 10; i++ { + queryErr = func() error { + ctx, cancel := context.WithTimeout(context.Background(), 2000*time.Millisecond) + defer cancel() + + _, err := client.GetTrace(ctx, &api_v2.GetTraceRequest{}) + if err != nil { + t.Log("cannot GetTrace", err) + } + return err + }() + if queryErr == nil { + break + } + time.Sleep(100 * time.Millisecond) + } + assert.NoError(t, queryErr, "Connection test did not succeed") + // TODO wait for servers to come up and test http and grpc endpoints time.Sleep(1 * time.Second) @@ -82,6 +113,6 @@ func TestServerGracefulExit(t *testing.T) { for _, logEntry := range logs.All() { assert.True(t, logEntry.Level != zap.ErrorLevel, - fmt.Sprintf("Error log found on server exit: %v", logEntry)) + "Error log found on server exit: %v", logEntry) } } From c740205b9470d7b3adc419e04cad0180e5ea657c Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Sun, 25 Aug 2019 15:04:25 -0400 Subject: [PATCH 2/6] Fix tests Signed-off-by: Yuri Shkuro --- cmd/query/app/grpc_handler_test.go | 2 +- cmd/query/app/server.go | 14 +++++++------- cmd/query/app/server_test.go | 21 ++++++++++++--------- 3 files changed, 20 insertions(+), 17 deletions(-) diff --git a/cmd/query/app/grpc_handler_test.go b/cmd/query/app/grpc_handler_test.go index 3235a028005..0bb5dc40eed 100644 --- a/cmd/query/app/grpc_handler_test.go +++ b/cmd/query/app/grpc_handler_test.go @@ -151,7 +151,7 @@ func newGRPCServer(t *testing.T, q *querysvc.QueryService, logger *zap.Logger, t } func newGRPCClient(t *testing.T, addr string) *grpcClient { - conn, err := grpc.Dial(addr, grpc.WithInsecure()) + conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(1*time.Second)) require.NoError(t, err) return &grpcClient{ diff --git a/cmd/query/app/server.go b/cmd/query/app/server.go index 4a007f431b4..2baee28d4fe 100644 --- a/cmd/query/app/server.go +++ b/cmd/query/app/server.go @@ -102,14 +102,14 @@ func (s *Server) Start() error { // cmux server acts as a reverse-proxy between HTTP and GRPC backends. cmuxServer := cmux.New(s.conn) - grpcListener := cmuxServer.Match( - cmux.HTTP2HeaderField("content-type", "application/grpc"), - cmux.HTTP2HeaderField("content-type", "application/grpc+proto"), - ) - // grpcListener := cmuxServer.MatchWithWriters( - // cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"), - // cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc+proto"), + // grpcListener := cmuxServer.Match( + // cmux.HTTP2HeaderField("content-type", "application/grpc"), + // cmux.HTTP2HeaderField("content-type", "application/grpc+proto"), // ) + grpcListener := cmuxServer.MatchWithWriters( + cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"), + // cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc+proto"), + ) httpListener := cmuxServer.Match(cmux.Any()) go func() { diff --git a/cmd/query/app/server_test.go b/cmd/query/app/server_test.go index d609a9f48f7..0ed85386483 100644 --- a/cmd/query/app/server_test.go +++ b/cmd/query/app/server_test.go @@ -22,6 +22,7 @@ import ( opentracing "github.com/opentracing/opentracing-go" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "go.uber.org/zap" "go.uber.org/zap/zaptest/observer" @@ -44,33 +45,35 @@ func TestServerError(t *testing.T) { } func TestServer(t *testing.T) { - flagsSvc := flags.NewService(ports.AgentAdminHTTP) + flagsSvc := flags.NewService(ports.QueryAdminHTTP) flagsSvc.Logger = zap.NewNop() spanReader := &spanstoremocks.Reader{} dependencyReader := &depsmocks.Reader{} + expectedServices := []string{"test"} + spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil) querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{}) - tracer := opentracing.NoopTracer{} - server := NewServer(flagsSvc, querySvc, - &QueryOptions{Port: ports.QueryAdminHTTP, BearerTokenPropagation: true}, - tracer) + &QueryOptions{Port: ports.QueryHTTP, BearerTokenPropagation: true}, + opentracing.NoopTracer{}) assert.NoError(t, server.Start()) + time.Sleep(1 * time.Second) + client := newGRPCClient(t, fmt.Sprintf(":%d", ports.QueryHTTP)) defer client.conn.Close() var queryErr error - for i := 0; i < 10; i++ { + for i := 0; i < 1; i++ { queryErr = func() error { ctx, cancel := context.WithTimeout(context.Background(), 2000*time.Millisecond) defer cancel() - _, err := client.GetTrace(ctx, &api_v2.GetTraceRequest{}) + _, err := client.GetServices(ctx, &api_v2.GetServicesRequest{}) if err != nil { - t.Log("cannot GetTrace", err) + t.Log("cannot GetServices", err) } return err }() @@ -95,7 +98,7 @@ func TestServer(t *testing.T) { } func TestServerGracefulExit(t *testing.T) { - flagsSvc := flags.NewService(ports.AgentAdminHTTP) + flagsSvc := flags.NewService(ports.QueryAdminHTTP) zapCore, logs := observer.New(zap.ErrorLevel) assert.Equal(t, 0, logs.Len(), "Expected initial ObservedLogs to have zero length.") From 8c195177abb92a4c342c163df66293cb9f0cd602 Mon Sep 17 00:00:00 2001 From: Christian Weichel Date: Fri, 23 Aug 2019 11:24:55 +0000 Subject: [PATCH 3/6] Fix gRPC query service cmux breaking change: https://github.com/grpc/grpc-go/issues/2406 workaround described in: - https://github.com/soheilhy/cmux/issues/64 - https://github.com/soheilhy/cmux#limitations Signed-off-by: Christian Weichel --- cmd/query/app/server.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cmd/query/app/server.go b/cmd/query/app/server.go index 5d2841aac74..2d81ba9f0cf 100644 --- a/cmd/query/app/server.go +++ b/cmd/query/app/server.go @@ -102,9 +102,7 @@ func (s *Server) Start() error { // cmux server acts as a reverse-proxy between HTTP and GRPC backends. cmuxServer := cmux.New(s.conn) - grpcListener := cmuxServer.Match( - cmux.HTTP2HeaderField("content-type", "application/grpc"), - cmux.HTTP2HeaderField("content-type", "application/grpc+proto")) + grpcListener := cmuxServer.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc")) httpListener := cmuxServer.Match(cmux.Any()) go func() { From e2d474a40b0f05a6814ce4ff2129aad462cfc772 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Sun, 25 Aug 2019 15:12:08 -0400 Subject: [PATCH 4/6] Fix asertions Signed-off-by: Yuri Shkuro --- cmd/query/app/server.go | 6 +----- cmd/query/app/server_test.go | 27 +++++++-------------------- 2 files changed, 8 insertions(+), 25 deletions(-) diff --git a/cmd/query/app/server.go b/cmd/query/app/server.go index 2baee28d4fe..17759e6f879 100644 --- a/cmd/query/app/server.go +++ b/cmd/query/app/server.go @@ -102,13 +102,9 @@ func (s *Server) Start() error { // cmux server acts as a reverse-proxy between HTTP and GRPC backends. cmuxServer := cmux.New(s.conn) - // grpcListener := cmuxServer.Match( - // cmux.HTTP2HeaderField("content-type", "application/grpc"), - // cmux.HTTP2HeaderField("content-type", "application/grpc+proto"), - // ) grpcListener := cmuxServer.MatchWithWriters( cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"), - // cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc+proto"), + cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc+proto"), ) httpListener := cmuxServer.Match(cmux.Any()) diff --git a/cmd/query/app/server_test.go b/cmd/query/app/server_test.go index 0ed85386483..de3e71ed170 100644 --- a/cmd/query/app/server_test.go +++ b/cmd/query/app/server_test.go @@ -60,32 +60,19 @@ func TestServer(t *testing.T) { opentracing.NoopTracer{}) assert.NoError(t, server.Start()) + // wait for the server to come up + // TODO find a way to not wait full 1s, only as long as needed time.Sleep(1 * time.Second) client := newGRPCClient(t, fmt.Sprintf(":%d", ports.QueryHTTP)) defer client.conn.Close() - var queryErr error - for i := 0; i < 1; i++ { - queryErr = func() error { - ctx, cancel := context.WithTimeout(context.Background(), 2000*time.Millisecond) - defer cancel() - - _, err := client.GetServices(ctx, &api_v2.GetServicesRequest{}) - if err != nil { - t.Log("cannot GetServices", err) - } - return err - }() - if queryErr == nil { - break - } - time.Sleep(100 * time.Millisecond) - } - assert.NoError(t, queryErr, "Connection test did not succeed") + ctx, cancel := context.WithTimeout(context.Background(), 2000*time.Millisecond) + defer cancel() - // TODO wait for servers to come up and test http and grpc endpoints - time.Sleep(1 * time.Second) + res, err := client.GetServices(ctx, &api_v2.GetServicesRequest{}) + assert.NoError(t, err) + assert.Equal(t, expectedServices, res.Services) server.Close() for i := 0; i < 10; i++ { From 03cee53bac0d74a496445ebdd4dabb99dbb7618e Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Sun, 25 Aug 2019 15:17:40 -0400 Subject: [PATCH 5/6] Use DialContext Signed-off-by: Yuri Shkuro --- cmd/query/app/grpc_handler_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cmd/query/app/grpc_handler_test.go b/cmd/query/app/grpc_handler_test.go index 0bb5dc40eed..63a2bd3d11c 100644 --- a/cmd/query/app/grpc_handler_test.go +++ b/cmd/query/app/grpc_handler_test.go @@ -151,7 +151,9 @@ func newGRPCServer(t *testing.T, q *querysvc.QueryService, logger *zap.Logger, t } func newGRPCClient(t *testing.T, addr string) *grpcClient { - conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(1*time.Second)) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure()) require.NoError(t, err) return &grpcClient{ From fecc9e9772da966c24be5a86df51c5c726ca4057 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Sun, 25 Aug 2019 15:27:53 -0400 Subject: [PATCH 6/6] Clean-up timeouts Signed-off-by: Yuri Shkuro --- cmd/query/app/server_test.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/cmd/query/app/server_test.go b/cmd/query/app/server_test.go index de3e71ed170..c32aeb43160 100644 --- a/cmd/query/app/server_test.go +++ b/cmd/query/app/server_test.go @@ -60,14 +60,10 @@ func TestServer(t *testing.T) { opentracing.NoopTracer{}) assert.NoError(t, server.Start()) - // wait for the server to come up - // TODO find a way to not wait full 1s, only as long as needed - time.Sleep(1 * time.Second) - client := newGRPCClient(t, fmt.Sprintf(":%d", ports.QueryHTTP)) defer client.conn.Close() - ctx, cancel := context.WithTimeout(context.Background(), 2000*time.Millisecond) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() res, err := client.GetServices(ctx, &api_v2.GetServicesRequest{}) @@ -98,6 +94,7 @@ func TestServerGracefulExit(t *testing.T) { assert.NoError(t, server.Start()) // Wait for servers to come up before we can call .Close() + // TODO Find a way to wait only as long as necessary. Unconditional sleep slows down the tests. time.Sleep(1 * time.Second) server.Close()