Skip to content

Commit

Permalink
Fix gRPC query service cmux
Browse files Browse the repository at this point in the history
breaking change: grpc/grpc-go#2406
workaround described in:
- soheilhy/cmux#64
- https://github.com/soheilhy/cmux#limitations

Signed-off-by: Christian Weichel <christian.weichel@typefox.io>
  • Loading branch information
Christian Weichel committed Aug 25, 2019
1 parent 11d7631 commit 37b2e54
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 6 deletions.
4 changes: 1 addition & 3 deletions cmd/query/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,7 @@ func (s *Server) Start() error {
// cmux server acts as a reverse-proxy between HTTP and GRPC backends.
cmuxServer := cmux.New(s.conn)

grpcListener := cmuxServer.Match(
cmux.HTTP2HeaderField("content-type", "application/grpc"),
cmux.HTTP2HeaderField("content-type", "application/grpc+proto"))
grpcListener := cmuxServer.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
httpListener := cmuxServer.Match(cmux.Any())

go func() {
Expand Down
59 changes: 56 additions & 3 deletions cmd/query/app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,26 @@
package app

import (
"context"
"fmt"
"github.com/stretchr/testify/mock"
"net/http"
"testing"
"time"

opentracing "github.com/opentracing/opentracing-go"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"
"google.golang.org/grpc"

"github.com/jaegertracing/jaeger/cmd/flags"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
depsmocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
)

func TestServerError(t *testing.T) {
Expand All @@ -43,16 +50,51 @@ func TestServer(t *testing.T) {
flagsSvc := flags.NewService(ports.AgentAdminHTTP)
flagsSvc.Logger = zap.NewNop()

querySvc := &querysvc.QueryService{}
spanReader := &spanstoremocks.Reader{}
dependencyReader := &depsmocks.Reader{}
querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{})

tracer := opentracing.NoopTracer{}

server := NewServer(flagsSvc, querySvc, &QueryOptions{Port: ports.QueryAdminHTTP,
BearerTokenPropagation: true}, tracer)
assert.NoError(t, server.Start())

// TODO wait for servers to come up and test http and grpc endpoints
// wait for the server to come up
time.Sleep(1 * time.Second)

expectedServices := []string{"demo"}

// test gRPC endpoint
conn, err := grpc.Dial(fmt.Sprintf("localhost:%d", ports.QueryAdminHTTP), grpc.WithInsecure(), grpc.WithTimeout((1 * time.Second)))
if err != nil {
t.Errorf("cannot connect to gRPC query service: %v", err)
} else {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)

spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil).Once()

client := api_v2.NewQueryServiceClient(conn)
resp, err := client.GetServices(ctx, &api_v2.GetServicesRequest{})
assert.NoError(t, err)
assert.NotNil(t, resp)
assert.Equal(t, resp.Services, expectedServices)

cancel()

assert.NoError(t, conn.Close())
}

// test http endpoint
spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil).Once()
req, err := http.NewRequest("GET", fmt.Sprintf("http://localhost:%d/api/services", ports.QueryAdminHTTP), nil)
assert.NoError(t, err)
req.Header.Add("Accept", "application/json")
httpClient = &http.Client{ Timeout: 2 * time.Second }
resp, err := httpClient.Do(req)
assert.NoError(t, err)
assert.Equal(t, resp.StatusCode, http.StatusOK)

server.Close()
for i := 0; i < 10; i++ {
if server.svc.HC().Get() == healthcheck.Unavailable {
Expand All @@ -78,7 +120,18 @@ func TestServerGracefulExit(t *testing.T) {

// Wait for servers to come up before we can call .Close()
time.Sleep(1 * time.Second)
server.Close()

closed := make(chan struct{})
go func() {
server.Close()
close(closed)
}()
select {
case <-closed:
// all is well
case <-time.After(1 * time.Second):
t.Errorf("timeout while stopping server")
}

for _, logEntry := range logs.All() {
assert.True(t, logEntry.Level != zap.ErrorLevel,
Expand Down

0 comments on commit 37b2e54

Please sign in to comment.