Skip to content

Commit 9635a33

Browse files
authored
Add gRPC communication between agent and collector (#1165)
* Add gRPC communication between agent and collector Signed-off-by: Pavol Loffay <ploffay@redhat.com> * uncomment make and fix comment Signed-off-by: Pavol Loffay <ploffay@redhat.com> * Fix grpc handler initialization Signed-off-by: Pavol Loffay <ploffay@redhat.com> * Copy process if missing Signed-off-by: Pavol Loffay <ploffay@redhat.com> * log only warn and error from grpc Signed-off-by: Pavol Loffay <ploffay@redhat.com> * Fix review comments Signed-off-by: Pavol Loffay <ploffay@redhat.com> * Add todo Signed-off-by: Pavol Loffay <ploffay@redhat.com> * Add Roundrobin grpc load balancer Signed-off-by: Pavol Loffay <ploffay@redhat.com> * Fix comment Signed-off-by: Pavol Loffay <ploffay@redhat.com> * Fix review comments Signed-off-by: Pavol Loffay <ploffay@redhat.com> * Change grpc target name Signed-off-by: Pavol Loffay <ploffay@redhat.com> * Defaul return error Signed-off-by: Pavol Loffay <ploffay@redhat.com> * Fix back tchannel collector addr Signed-off-by: Pavol Loffay <ploffay@redhat.com> * Remove collector from grpc reporter flag Signed-off-by: Pavol Loffay <ploffay@redhat.com>
1 parent f2eb7d1 commit 9635a33

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+3967
-1259
lines changed

Makefile

+27-17
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ TOP_PKGS := $(shell glide novendor | \
66
grep -v \
77
-e ./thrift-gen/... \
88
-e ./swagger-gen/... \
9+
-e ./proto-gen/... \
910
-e ./examples/... \
1011
-e ./scripts/...\
1112
)
@@ -17,6 +18,7 @@ ALL_SRC := $(shell find . -name "*.go" | \
1718
-e vendor \
1819
-e /thrift-gen/ \
1920
-e /swagger-gen/ \
21+
-e /proto-gen/ \
2022
-e /examples/ \
2123
-e doc.go \
2224
-e model.pb.go \
@@ -335,6 +337,20 @@ generate-mocks: install-mockery
335337
echo-version:
336338
@echo $(GIT_CLOSEST_TAG)
337339

340+
PROTO_INCLUDES := \
341+
-I model/proto \
342+
-I vendor/github.com/grpc-ecosystem/grpc-gateway \
343+
-I vendor/github.com/gogo/googleapis \
344+
-I vendor/github.com/gogo/protobuf
345+
# Remapping of std types to gogo types (must not contain spaces)
346+
PROTO_GOGO_MAPPINGS := $(shell echo \
347+
Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types, \
348+
Mgoogle/protobuf/duration.proto=github.com/gogo/protobuf/types, \
349+
Mgoogle/protobuf/empty.proto=github.com/gogo/protobuf/types, \
350+
Mgoogle/api/annotations.proto=github.com/gogo/googleapis/google/api, \
351+
Mmodel.proto=github.com/jaegertracing/jaeger/model \
352+
| sed 's/ //g')
353+
338354
.PHONY: proto
339355
proto:
340356
# Generate gogo, gRPC-Gateway, swagger, go-validators output.
@@ -362,31 +378,25 @@ proto:
362378
# (https://medium.com/@linchenon/generate-grpc-and-protobuf-libraries-with-containers-c15ba4e4f3ad)
363379
#
364380
protoc \
365-
-I model/proto \
366-
-I vendor/github.com/grpc-ecosystem/grpc-gateway/ \
367-
-I vendor/github.com/gogo/googleapis/ \
368-
-I vendor/ \
369-
--gogo_out=plugins=grpc,\
370-
Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types,\
371-
Mgoogle/protobuf/duration.proto=github.com/gogo/protobuf/types,\
372-
Mgoogle/protobuf/empty.proto=github.com/gogo/protobuf/types,\
373-
Mgoogle/api/annotations.proto=github.com/gogo/googleapis/google/api:\
374-
$$GOPATH/src/github.com/jaegertracing/jaeger/model/ \
375-
--grpc-gateway_out=\
376-
Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types,\
377-
Mgoogle/protobuf/empty.proto=github.com/gogo/protobuf/types,\
378-
Mgoogle/api/annotations.proto=github.com/gogo/googleapis/google/api:\
379-
$$GOPATH/src/github.com/jaegertracing/jaeger/model \
380-
--swagger_out=model/proto/openapi/ \
381+
$(PROTO_INCLUDES) \
382+
--gogo_out=plugins=grpc,$(PROTO_GOGO_MAPPINGS):$(PWD)/model/ \
381383
model/proto/model.proto
382384

385+
protoc \
386+
$(PROTO_INCLUDES) \
387+
--gogo_out=plugins=grpc,$(PROTO_GOGO_MAPPINGS):$(PWD)/proto-gen/api_v2/ \
388+
--grpc-gateway_out=$(PROTO_GOGO_MAPPINGS):$(PWD)/proto-gen/api_v2/ \
389+
--swagger_out=$(PWD)/proto-gen/openapi/ \
390+
model/proto/api_v2.proto
391+
383392
protoc \
384393
-I model/proto \
385-
--go_out=$$GOPATH/src/github.com/jaegertracing/jaeger/model/prototest/ \
394+
--go_out=$(PWD)/model/prototest/ \
386395
model/proto/model_test.proto
387396

388397
.PHONY: proto-install
389398
proto-install:
399+
go get -u github.com/golang/glog
390400
go install \
391401
./vendor/github.com/golang/protobuf/protoc-gen-go \
392402
./vendor/github.com/gogo/protobuf/protoc-gen-gogo \

cmd/agent/app/agent_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func TestAgentSamplingEndpoint(t *testing.T) {
6565
require.NoError(t, err)
6666
body, err := ioutil.ReadAll(resp.Body)
6767
assert.NoError(t, err)
68-
assert.Equal(t, "tcollector error: no peers available\n", string(body))
68+
assert.Equal(t, "collector error: no peers available\n", string(body))
6969
})
7070
}
7171

cmd/agent/app/flags.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func AddFlags(flags *flag.FlagSet) {
5151
flags.String(
5252
httpServerHostPort,
5353
defaultHTTPServerHostPort,
54-
"host:port of the http server (e.g. for /sampling point and /baggage endpoint)")
54+
"host:port of the http server (e.g. for /sampling point and /baggageRestrictions endpoint)")
5555
}
5656

5757
// InitFromViper initializes Builder with properties retrieved from Viper.

cmd/agent/app/httpserver/server.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func (h *httpHandler) serveSamplingHTTP(w http.ResponseWriter, r *http.Request,
108108
resp, err := h.manager.GetSamplingStrategy(service)
109109
if err != nil {
110110
h.metrics.TCollectorProxyFailures.Inc(1)
111-
http.Error(w, fmt.Sprintf("tcollector error: %+v", err), http.StatusInternalServerError)
111+
http.Error(w, fmt.Sprintf("collector error: %+v", err), http.StatusInternalServerError)
112112
return
113113
}
114114
jsonBytes, err := json.Marshal(resp)
@@ -138,7 +138,7 @@ func (h *httpHandler) serveBaggageHTTP(w http.ResponseWriter, r *http.Request) {
138138
resp, err := h.manager.GetBaggageRestrictions(service)
139139
if err != nil {
140140
h.metrics.TCollectorProxyFailures.Inc(1)
141-
http.Error(w, fmt.Sprintf("tcollector error: %+v", err), http.StatusInternalServerError)
141+
http.Error(w, fmt.Sprintf("collector error: %+v", err), http.StatusInternalServerError)
142142
return
143143
}
144144
// NB. it's literally impossible for this Marshal to fail

cmd/agent/app/httpserver/server_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -146,10 +146,10 @@ func TestHTTPHandlerErrors(t *testing.T) {
146146
},
147147
},
148148
{
149-
description: "sampler tcollector error",
149+
description: "sampler collector error",
150150
url: "?service=Y",
151151
statusCode: http.StatusInternalServerError,
152-
body: "tcollector error: no mock response provided\n",
152+
body: "collector error: no mock response provided\n",
153153
metrics: []mTestutils.ExpectedMetric{
154154
{Name: "http-server.errors", Tags: map[string]string{"source": "tcollector-proxy", "status": "5xx"}, Value: 1},
155155
},
@@ -158,7 +158,7 @@ func TestHTTPHandlerErrors(t *testing.T) {
158158
description: "baggage tcollector error",
159159
url: "/baggageRestrictions?service=Y",
160160
statusCode: http.StatusInternalServerError,
161-
body: "tcollector error: no mock response provided\n",
161+
body: "collector error: no mock response provided\n",
162162
metrics: []mTestutils.ExpectedMetric{
163163
{Name: "http-server.errors", Tags: map[string]string{"source": "tcollector-proxy", "status": "5xx"}, Value: 1},
164164
},

cmd/agent/app/reporter/flags.go

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// Copyright (c) 2018 The Jaeger Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package reporter
16+
17+
import (
18+
"flag"
19+
"fmt"
20+
21+
"github.com/spf13/viper"
22+
)
23+
24+
const (
25+
reporterType = "reporter.type"
26+
// TCHANNEL is name of tchannel reporter.
27+
TCHANNEL Type = "tchannel"
28+
// GRPC is name of gRPC reporter.
29+
GRPC Type = "grpc"
30+
)
31+
32+
// Type defines type of reporter.
33+
type Type string
34+
35+
// Options holds generic reporter configuration.
36+
type Options struct {
37+
ReporterType Type
38+
}
39+
40+
// AddFlags adds flags for Options.
41+
func AddFlags(flags *flag.FlagSet) {
42+
flags.String(reporterType, string(TCHANNEL), fmt.Sprintf("Reporter type to use e.g. %s, %s", string(TCHANNEL), string(GRPC)))
43+
}
44+
45+
// InitFromViper initializes Options with properties retrieved from Viper.
46+
func (b *Options) InitFromViper(v *viper.Viper) *Options {
47+
b.ReporterType = Type(v.GetString(reporterType))
48+
return b
49+
}

cmd/agent/app/reporter/flags_test.go

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Copyright (c) 2018 The Jaeger Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package reporter
16+
17+
import (
18+
"flag"
19+
"testing"
20+
21+
"github.com/spf13/cobra"
22+
"github.com/spf13/viper"
23+
"github.com/stretchr/testify/assert"
24+
"github.com/stretchr/testify/require"
25+
)
26+
27+
func TestBingFlags(t *testing.T) {
28+
v := viper.New()
29+
command := cobra.Command{}
30+
flags := &flag.FlagSet{}
31+
AddFlags(flags)
32+
command.PersistentFlags().AddGoFlagSet(flags)
33+
v.BindPFlags(command.PersistentFlags())
34+
35+
err := command.ParseFlags([]string{
36+
"--reporter.type=grpc",
37+
})
38+
require.NoError(t, err)
39+
40+
b := &Options{}
41+
b.InitFromViper(v)
42+
assert.Equal(t, Type("grpc"), b.ReporterType)
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Copyright (c) 2018 The Jaeger Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package grpc
16+
17+
import (
18+
"go.uber.org/zap"
19+
"google.golang.org/grpc"
20+
"google.golang.org/grpc/balancer/roundrobin"
21+
"google.golang.org/grpc/resolver"
22+
"google.golang.org/grpc/resolver/manual"
23+
24+
"github.com/jaegertracing/jaeger/cmd/agent/app/httpserver"
25+
aReporter "github.com/jaegertracing/jaeger/cmd/agent/app/reporter"
26+
)
27+
28+
// ProxyBuilder holds objects communicating with collector
29+
type ProxyBuilder struct {
30+
reporter aReporter.Reporter
31+
manager httpserver.ClientConfigManager
32+
}
33+
34+
// NewCollectorProxy creates ProxyBuilder
35+
func NewCollectorProxy(o *Options, logger *zap.Logger) *ProxyBuilder {
36+
// It does not return error if the collector is not running
37+
// a way to fail immediately is to call WithBlock and WithTimeout
38+
var conn *grpc.ClientConn
39+
if len(o.CollectorHostPort) > 1 {
40+
r, _ := manual.GenerateAndRegisterManualResolver()
41+
var resolvedAddrs []resolver.Address
42+
for _, addr := range o.CollectorHostPort {
43+
resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: addr})
44+
}
45+
r.InitialAddrs(resolvedAddrs)
46+
conn, _ = grpc.Dial(r.Scheme()+":///round_robin", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
47+
} else {
48+
conn, _ = grpc.Dial(o.CollectorHostPort[0], grpc.WithInsecure())
49+
}
50+
return &ProxyBuilder{
51+
reporter: NewReporter(conn, logger),
52+
manager: NewSamplingManager(conn)}
53+
}
54+
55+
// GetReporter returns Reporter
56+
func (b ProxyBuilder) GetReporter() aReporter.Reporter {
57+
return b.reporter
58+
}
59+
60+
// GetManager returns manager
61+
func (b ProxyBuilder) GetManager() httpserver.ClientConfigManager {
62+
return b.manager
63+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Copyright (c) 2018 The Jaeger Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package grpc
16+
17+
import (
18+
"testing"
19+
20+
"github.com/stretchr/testify/assert"
21+
"github.com/stretchr/testify/require"
22+
"go.uber.org/zap"
23+
"google.golang.org/grpc"
24+
25+
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
26+
"github.com/jaegertracing/jaeger/thrift-gen/jaeger"
27+
)
28+
29+
func TestProxyBuilder(t *testing.T) {
30+
proxy := NewCollectorProxy(&Options{CollectorHostPort: []string{"localhost:0000"}}, zap.NewNop())
31+
require.NotNil(t, proxy)
32+
assert.NotNil(t, proxy.GetReporter())
33+
assert.NotNil(t, proxy.GetManager())
34+
}
35+
36+
func TestMultipleCollectors(t *testing.T) {
37+
spanHandler1 := &mockSpanHandler{}
38+
s1, addr1 := initializeGRPCTestServer(t, func(s *grpc.Server) {
39+
api_v2.RegisterCollectorServiceServer(s, spanHandler1)
40+
})
41+
defer s1.Stop()
42+
spanHandler2 := &mockSpanHandler{}
43+
s2, addr2 := initializeGRPCTestServer(t, func(s *grpc.Server) {
44+
api_v2.RegisterCollectorServiceServer(s, spanHandler2)
45+
})
46+
defer s2.Stop()
47+
48+
proxy := NewCollectorProxy(&Options{CollectorHostPort: []string{addr1.String(), addr2.String()}}, zap.NewNop())
49+
require.NotNil(t, proxy)
50+
assert.NotNil(t, proxy.GetReporter())
51+
assert.NotNil(t, proxy.GetManager())
52+
53+
var bothServers = false
54+
// TODO do not iterate, just create two batches
55+
for i := 0; i < 10; i++ {
56+
r := proxy.GetReporter()
57+
err := r.EmitBatch(&jaeger.Batch{Spans: []*jaeger.Span{{OperationName: "op"}}, Process: &jaeger.Process{ServiceName: "service"}})
58+
require.NoError(t, err)
59+
if len(spanHandler1.getRequests()) > 0 && len(spanHandler2.getRequests()) > 0 {
60+
bothServers = true
61+
break
62+
}
63+
}
64+
assert.Equal(t, true, bothServers)
65+
}

0 commit comments

Comments
 (0)