Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dedupe process in grpc reporter #1181

Merged
merged 6 commits into from
Nov 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/agent/app/reporter/grpc/collector_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ func TestMultipleCollectors(t *testing.T) {
assert.NotNil(t, proxy.GetManager())

var bothServers = false
r := proxy.GetReporter()
// TODO do not iterate, just create two batches
for i := 0; i < 10; i++ {
r := proxy.GetReporter()
for i := 0; i < 100; i++ {
err := r.EmitBatch(&jaeger.Batch{Spans: []*jaeger.Span{{OperationName: "op"}}, Process: &jaeger.Process{ServiceName: "service"}})
require.NoError(t, err)
if len(spanHandler1.getRequests()) > 0 && len(spanHandler2.getRequests()) > 0 {
Expand Down
20 changes: 10 additions & 10 deletions cmd/agent/app/reporter/grpc/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"

zipkin2 "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin"
"github.com/jaegertracing/jaeger/model"
jConverter "github.com/jaegertracing/jaeger/model/converter/thrift/jaeger"
"github.com/jaegertracing/jaeger/model/converter/thrift/zipkin"
Expand All @@ -32,42 +33,41 @@ import (
type Reporter struct {
collector api_v2.CollectorServiceClient
logger *zap.Logger
sanitizer zipkin2.Sanitizer
}

// NewReporter creates gRPC reporter.
func NewReporter(conn *grpc.ClientConn, logger *zap.Logger) *Reporter {
return &Reporter{
collector: api_v2.NewCollectorServiceClient(conn),
logger: logger,
sanitizer: zipkin2.NewChainedSanitizer(zipkin2.StandardSanitizers...),
}
}

// EmitBatch implements EmitBatch() of Reporter
func (r *Reporter) EmitBatch(b *thrift.Batch) error {
// TODO pass process to r.send() - do not convert it for every span
spans := jConverter.ToDomain(b.Spans, b.Process)
return r.send(spans)
return r.send(jConverter.ToDomain(b.Spans, nil), jConverter.ToDomainProcess(b.Process))
}

// EmitZipkinBatch implements EmitZipkinBatch() of Reporter
func (r *Reporter) EmitZipkinBatch(zSpans []*zipkincore.Span) error {
for _, zSpan := range zSpans {
zSpan = r.sanitizer.Sanitize(zSpan)
}
trace, err := zipkin.ToDomain(zSpans)
if err != nil {
return err
}
return r.send(trace.Spans)
return r.send(trace.Spans, nil)
}

func (r *Reporter) send(spans []*model.Span) error {
var process model.Process
if len(spans) > 0 {
process = *spans[0].Process
}
func (r *Reporter) send(spans []*model.Span, process *model.Process) error {
batch := model.Batch{Spans: spans, Process: process}
req := &api_v2.PostSpansRequest{Batch: batch}
_, err := r.collector.PostSpans(context.Background(), req)
if err != nil {
r.logger.Error("Could not send spans over gRPC", zap.Error(err), zap.String("service", batch.Process.ServiceName))
r.logger.Error("Could not send spans over gRPC", zap.Error(err))
}
return err
}
8 changes: 4 additions & 4 deletions cmd/agent/app/reporter/grpc/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ func TestReporter_EmitZipkinBatch(t *testing.T) {
}{
{in: &zipkincore.Span{}, err: "Cannot find service name in Zipkin span [traceID=0, spanID=0]"},
{in: &zipkincore.Span{Name: "jonatan", TraceID: 1, ID: 2, Timestamp: &a, Annotations: []*zipkincore.Annotation{{Value: zipkincore.CLIENT_SEND, Host: &zipkincore.Endpoint{ServiceName: "spring"}}}},
expected: model.Batch{Process: model.Process{ServiceName: "spring"},
Spans: []*model.Span{{TraceID: model.NewTraceID(0, 1), SpanID: model.NewSpanID(2), OperationName: "jonatan",
expected: model.Batch{
Spans: []*model.Span{{TraceID: model.NewTraceID(0, 1), SpanID: model.NewSpanID(2), OperationName: "jonatan", Duration: time.Microsecond * 1,
Tags: model.KeyValues{{Key: "span.kind", VStr: "client", VType: model.StringType}}, Process: &model.Process{ServiceName: "spring"}, StartTime: tm.UTC()}}}},
}
for _, test := range tests {
Expand Down Expand Up @@ -102,7 +102,7 @@ func TestReporter_EmitBatch(t *testing.T) {
err string
}{
{in: &jThrift.Batch{Process: &jThrift.Process{ServiceName: "node"}, Spans: []*jThrift.Span{{OperationName: "foo", StartTime: int64(model.TimeAsEpochMicroseconds(tm))}}},
expected: model.Batch{Process: model.Process{ServiceName: "node"}, Spans: []*model.Span{{OperationName: "foo", StartTime: tm.UTC(), Process: &model.Process{ServiceName: "node"}}}}},
expected: model.Batch{Process: &model.Process{ServiceName: "node"}, Spans: []*model.Span{{OperationName: "foo", StartTime: tm.UTC()}}}},
}
for _, test := range tests {
err = rep.EmitBatch(test.in)
Expand All @@ -119,6 +119,6 @@ func TestReporter_SendFailure(t *testing.T) {
conn, err := grpc.Dial("", grpc.WithInsecure())
require.NoError(t, err)
rep := NewReporter(conn, zap.NewNop())
err = rep.send(nil)
err = rep.send(nil, nil)
assert.EqualError(t, err, "rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: connection error: desc = \"transport: Error while dialing dial tcp: missing address\"")
}
9 changes: 1 addition & 8 deletions cmd/collector/app/builder/span_handler_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,6 @@ func (spanHb *SpanHandlerBuilder) BuildHandlers() (
hostname, _ := os.Hostname()
hostMetrics := spanHb.metricsFactory.Namespace("", map[string]string{"host": hostname})

zSanitizer := zs.NewChainedSanitizer(
zs.NewSpanDurationSanitizer(),
zs.NewSpanStartTimeSanitizer(),
zs.NewParentIDSanitizer(),
zs.NewErrorTagSanitizer(),
)

spanProcessor := app.NewSpanProcessor(
spanHb.spanWriter,
app.Options.ServiceMetrics(spanHb.metricsFactory),
Expand All @@ -82,7 +75,7 @@ func (spanHb *SpanHandlerBuilder) BuildHandlers() (
app.Options.QueueSize(spanHb.collectorOpts.QueueSize),
)

return app.NewZipkinSpanHandler(spanHb.logger, spanProcessor, zSanitizer),
return app.NewZipkinSpanHandler(spanHb.logger, spanProcessor, zs.NewChainedSanitizer(zs.StandardSanitizers...)),
app.NewJaegerSpanHandler(spanHb.logger, spanProcessor),
app.NewGRPCHandler(spanHb.logger, spanProcessor)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func NewGRPCHandler(logger *zap.Logger, spanProcessor SpanProcessor) *GRPCHandle
func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) (*api_v2.PostSpansResponse, error) {
for _, span := range r.GetBatch().Spans {
if span.GetProcess() == nil {
span.Process = &r.Batch.Process
span.Process = r.Batch.Process
}
}
_, err := g.spanProcessor.ProcessSpans(r.GetBatch().Spans, JaegerFormatType)
Expand Down
4 changes: 2 additions & 2 deletions cmd/collector/app/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ func TestPostSpans(t *testing.T) {
batch model.Batch
expected []*model.Span
}{
{batch: model.Batch{Process: model.Process{ServiceName: "batch-process"}, Spans: []*model.Span{{OperationName: "test-op", Process: &model.Process{ServiceName: "bar"}}}},
{batch: model.Batch{Process: &model.Process{ServiceName: "batch-process"}, Spans: []*model.Span{{OperationName: "test-op", Process: &model.Process{ServiceName: "bar"}}}},
expected: []*model.Span{{OperationName: "test-op", Process: &model.Process{ServiceName: "bar"}}}},
{batch: model.Batch{Process: model.Process{ServiceName: "batch-process"}, Spans: []*model.Span{{OperationName: "test-op"}}},
{batch: model.Batch{Process: &model.Process{ServiceName: "batch-process"}, Spans: []*model.Span{{OperationName: "test-op"}}},
expected: []*model.Span{{OperationName: "test-op", Process: &model.Process{ServiceName: "batch-process"}}}},
}
for _, test := range tests {
Expand Down
2 changes: 2 additions & 0 deletions cmd/collector/app/sanitizer/zipkin/span_sanitizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const (

var (
defaultDuration = int64(1)
// StandardSanitizers is a list of standard zipkin sanitizers.
StandardSanitizers = []Sanitizer{NewSpanStartTimeSanitizer(), NewSpanDurationSanitizer(), NewParentIDSanitizer(), NewErrorTagSanitizer()}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: one per line

)

// Sanitizer interface for sanitizing spans. Any business logic that needs to be applied to normalize the contents of a
Expand Down
8 changes: 8 additions & 0 deletions model/converter/thrift/jaeger/to_domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ func ToDomainSpan(jSpan *jaeger.Span, jProcess *jaeger.Process) *model.Span {
return toDomain{}.ToDomainSpan(jSpan, jProcess)
}

// ToDomainProcess transforms a process in jaeger.thrift format to model.Span.
func ToDomainProcess(jProcess *jaeger.Process) *model.Process {
return toDomain{}.getProcess(jProcess)
}

type toDomain struct{}

func (td toDomain) ToDomain(jSpans []*jaeger.Span, jProcess *jaeger.Process) []*model.Span {
Expand Down Expand Up @@ -96,6 +101,9 @@ func (td toDomain) getReferences(jRefs []*jaeger.SpanRef) []model.SpanRef {
// getProcess takes a jaeger.thrift process and produces a model.Process.
// Any errors are presented as tags
func (td toDomain) getProcess(jProcess *jaeger.Process) *model.Process {
if jProcess == nil {
return nil
}
tags := td.getTags(jProcess.Tags)
return &model.Process{
Tags: tags,
Expand Down
12 changes: 12 additions & 0 deletions model/converter/thrift/jaeger/to_domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"os"
"testing"
"time"

"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -102,3 +103,14 @@ func TestUnknownJaegerType(t *testing.T) {
expected := model.String("sneh", "Unknown VType: Tag({Key:sneh VType:<UNSET> VStr:<nil> VDouble:<nil> VBool:<nil> VLong:<nil> VBinary:[]})")
assert.Equal(t, mkv, expected)
}

func TestToDomain_ToDomainProcess(t *testing.T) {
p := ToDomainProcess(&jaeger.Process{ServiceName: "foo", Tags: []*jaeger.Tag{{Key: "foo", VType: jaeger.TagType_BOOL}}})
assert.Equal(t, &model.Process{ServiceName: "foo", Tags: []model.KeyValue{{Key: "foo", VType: model.BoolType}}}, p)
}

func TestToDomain_ToDomainSpanProcessNull(t *testing.T) {
tm := time.Unix(158, 0)
s := ToDomainSpan(&jaeger.Span{OperationName: "foo", StartTime: int64(model.TimeAsEpochMicroseconds(tm))}, nil)
assert.Equal(t, &model.Span{OperationName: "foo", StartTime: tm.UTC()}, s)
}
Loading