From 37b2e54b1044bcc091cd1bab66dd1a19b219f36f Mon Sep 17 00:00:00 2001 From: Christian Weichel Date: Sun, 25 Aug 2019 10:29:36 +0000 Subject: [PATCH] Fix gRPC query service cmux breaking change: https://github.com/grpc/grpc-go/issues/2406 workaround described in: - https://github.com/soheilhy/cmux/issues/64 - https://github.com/soheilhy/cmux#limitations Signed-off-by: Christian Weichel --- cmd/query/app/server.go | 4 +-- cmd/query/app/server_test.go | 59 ++++++++++++++++++++++++++++++++++-- 2 files changed, 57 insertions(+), 6 deletions(-) diff --git a/cmd/query/app/server.go b/cmd/query/app/server.go index 5d2841aac74..2d81ba9f0cf 100644 --- a/cmd/query/app/server.go +++ b/cmd/query/app/server.go @@ -102,9 +102,7 @@ func (s *Server) Start() error { // cmux server acts as a reverse-proxy between HTTP and GRPC backends. cmuxServer := cmux.New(s.conn) - grpcListener := cmuxServer.Match( - cmux.HTTP2HeaderField("content-type", "application/grpc"), - cmux.HTTP2HeaderField("content-type", "application/grpc+proto")) + grpcListener := cmuxServer.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc")) httpListener := cmuxServer.Match(cmux.Any()) go func() { diff --git a/cmd/query/app/server_test.go b/cmd/query/app/server_test.go index 3cde1d21bbc..3b30fbc16a4 100644 --- a/cmd/query/app/server_test.go +++ b/cmd/query/app/server_test.go @@ -15,7 +15,10 @@ package app import ( + "context" "fmt" + "github.com/stretchr/testify/mock" + "net/http" "testing" "time" @@ -23,11 +26,15 @@ import ( "github.com/stretchr/testify/assert" "go.uber.org/zap" "go.uber.org/zap/zaptest/observer" + "google.golang.org/grpc" "github.com/jaegertracing/jaeger/cmd/flags" "github.com/jaegertracing/jaeger/cmd/query/app/querysvc" "github.com/jaegertracing/jaeger/pkg/healthcheck" "github.com/jaegertracing/jaeger/ports" + "github.com/jaegertracing/jaeger/proto-gen/api_v2" + depsmocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks" + spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" ) func TestServerError(t *testing.T) { @@ -43,16 +50,51 @@ func TestServer(t *testing.T) { flagsSvc := flags.NewService(ports.AgentAdminHTTP) flagsSvc.Logger = zap.NewNop() - querySvc := &querysvc.QueryService{} + spanReader := &spanstoremocks.Reader{} + dependencyReader := &depsmocks.Reader{} + querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{}) + tracer := opentracing.NoopTracer{} server := NewServer(flagsSvc, querySvc, &QueryOptions{Port: ports.QueryAdminHTTP, BearerTokenPropagation: true}, tracer) assert.NoError(t, server.Start()) - // TODO wait for servers to come up and test http and grpc endpoints + // wait for the server to come up time.Sleep(1 * time.Second) + expectedServices := []string{"demo"} + + // test gRPC endpoint + conn, err := grpc.Dial(fmt.Sprintf("localhost:%d", ports.QueryAdminHTTP), grpc.WithInsecure(), grpc.WithTimeout((1 * time.Second))) + if err != nil { + t.Errorf("cannot connect to gRPC query service: %v", err) + } else { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + + spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil).Once() + + client := api_v2.NewQueryServiceClient(conn) + resp, err := client.GetServices(ctx, &api_v2.GetServicesRequest{}) + assert.NoError(t, err) + assert.NotNil(t, resp) + assert.Equal(t, resp.Services, expectedServices) + + cancel() + + assert.NoError(t, conn.Close()) + } + + // test http endpoint + spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil).Once() + req, err := http.NewRequest("GET", fmt.Sprintf("http://localhost:%d/api/services", ports.QueryAdminHTTP), nil) + assert.NoError(t, err) + req.Header.Add("Accept", "application/json") + httpClient = &http.Client{ Timeout: 2 * time.Second } + resp, err := httpClient.Do(req) + assert.NoError(t, err) + assert.Equal(t, resp.StatusCode, http.StatusOK) + server.Close() for i := 0; i < 10; i++ { if server.svc.HC().Get() == healthcheck.Unavailable { @@ -78,7 +120,18 @@ func TestServerGracefulExit(t *testing.T) { // Wait for servers to come up before we can call .Close() time.Sleep(1 * time.Second) - server.Close() + + closed := make(chan struct{}) + go func() { + server.Close() + close(closed) + }() + select { + case <-closed: + // all is well + case <-time.After(1 * time.Second): + t.Errorf("timeout while stopping server") + } for _, logEntry := range logs.All() { assert.True(t, logEntry.Level != zap.ErrorLevel,