Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Receive from obsreport.Receiver funcs #3326

Merged
merged 1 commit into from
May 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions obsreport/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,26 @@
// Receivers should use the respective start and end according to the data type
// being received, ie.:
//
// * TraceData receive operations should use the pair:
// StartTraceDataReceiveOp/EndTraceDataReceiveOp
// * Traces receive operations should use the pair:
// StartTracesOp/EndTracesOp
//
// * Metrics receive operations should use the pair:
// StartMetricsReceiveOp/EndMetricsReceiveOp
// StartMetricsOp/EndMetricsOp
//
// * Logs receive operations should use the pair:
// StartLogsOp/EndLogsOp
//
// Similar for exporters:
//
// * TraceData export operations should use the pair:
// StartTraceDataExportOp/EndTraceDataExportOp
// * Traces export operations should use the pair:
// StartTracesExportOp/EndTracesExportOp
//
// * Metrics export operations should use the pair:
// StartMetricsExportOp/EndMetricsExportOp
//
// * Metrics export operations should use the pair:
// StartLogsExportOp/EndLogsExportOp
//
// The package is capable of generating legacy metrics by using the
// observability package allowing a controlled transition from legacy to the
// new metrics. The goal is to eventually remove the legacy metrics and use only
Expand Down
92 changes: 28 additions & 64 deletions obsreport/obsreport_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type StartReceiveOption func(*StartReceiveOptions)
// for {
// // Since the context outlives the individual receive operations call obsreport using
// // WithLongLivedCtx().
// ctx := obsreport.StartTraceDataReceiveOp(
// ctx := obsreport.StartTracesOp(
// longLivedCtx,
// r.config.Name(),
// r.transport,
Expand All @@ -62,7 +62,7 @@ type StartReceiveOption func(*StartReceiveOptions)
// if ok {
// err = r.nextConsumer.ConsumeTraces(ctx, td)
// }
// obsreport.EndTraceDataReceiveOp(
// obsreport.EndTracesOp(
// ctx,
// r.format,
// len(td.Spans),
Expand Down Expand Up @@ -99,94 +99,58 @@ func NewReceiver(cfg ReceiverSettings) *Receiver {
}
}

// StartTraceDataReceiveOp is called when a request is received from a client.
// StartTracesOp is called when a request is received from a client.
// The returned context should be used in other calls to the obsreport functions
// dealing with the same receive operation.
func (rec *Receiver) StartTraceDataReceiveOp(
operationCtx context.Context,
opt ...StartReceiveOption,
) context.Context {
return rec.traceReceiveOp(
operationCtx,
obsmetrics.ReceiveTraceDataOperationSuffix,
opt...)
func (rec *Receiver) StartTracesOp(operationCtx context.Context, opt ...StartReceiveOption) context.Context {
return rec.startOp(operationCtx, obsmetrics.ReceiveTraceDataOperationSuffix, opt...)
}

// EndTraceDataReceiveOp completes the receive operation that was started with
// StartTraceDataReceiveOp.
func (rec *Receiver) EndTraceDataReceiveOp(
// EndTracesOp completes the receive operation that was started with
// StartTracesOp.
func (rec *Receiver) EndTracesOp(
receiverCtx context.Context,
format string,
numReceivedSpans int,
err error,
) {
rec.endReceiveOp(
receiverCtx,
format,
numReceivedSpans,
err,
config.TracesDataType,
)
rec.endOp(receiverCtx, format, numReceivedSpans, err, config.TracesDataType)
}

// StartLogsReceiveOp is called when a request is received from a client.
// StartLogsOp is called when a request is received from a client.
// The returned context should be used in other calls to the obsreport functions
// dealing with the same receive operation.
func (rec *Receiver) StartLogsReceiveOp(
operationCtx context.Context,
opt ...StartReceiveOption,
) context.Context {
return rec.traceReceiveOp(
operationCtx,
obsmetrics.ReceiverLogsOperationSuffix,
opt...)
func (rec *Receiver) StartLogsOp(operationCtx context.Context, opt ...StartReceiveOption) context.Context {
return rec.startOp(operationCtx, obsmetrics.ReceiverLogsOperationSuffix, opt...)
}

// EndLogsReceiveOp completes the receive operation that was started with
// StartLogsReceiveOp.
func (rec *Receiver) EndLogsReceiveOp(
// EndLogsOp completes the receive operation that was started with
// StartLogsOp.
func (rec *Receiver) EndLogsOp(
receiverCtx context.Context,
format string,
numReceivedLogRecords int,
err error,
) {
rec.endReceiveOp(
receiverCtx,
format,
numReceivedLogRecords,
err,
config.LogsDataType,
)
rec.endOp(receiverCtx, format, numReceivedLogRecords, err, config.LogsDataType)
}

// StartMetricsReceiveOp is called when a request is received from a client.
// StartMetricsOp is called when a request is received from a client.
// The returned context should be used in other calls to the obsreport functions
// dealing with the same receive operation.
func (rec *Receiver) StartMetricsReceiveOp(
operationCtx context.Context,
opt ...StartReceiveOption,
) context.Context {
return rec.traceReceiveOp(
operationCtx,
obsmetrics.ReceiverMetricsOperationSuffix,
opt...)
func (rec *Receiver) StartMetricsOp(operationCtx context.Context, opt ...StartReceiveOption) context.Context {
return rec.startOp(operationCtx, obsmetrics.ReceiverMetricsOperationSuffix, opt...)
}

// EndMetricsReceiveOp completes the receive operation that was started with
// StartMetricsReceiveOp.
func (rec *Receiver) EndMetricsReceiveOp(
// EndMetricsOp completes the receive operation that was started with
// StartMetricsOp.
func (rec *Receiver) EndMetricsOp(
receiverCtx context.Context,
format string,
numReceivedPoints int,
err error,
) {
rec.endReceiveOp(
receiverCtx,
format,
numReceivedPoints,
err,
config.MetricsDataType,
)
rec.endOp(receiverCtx, format, numReceivedPoints, err, config.MetricsDataType)
}

// ReceiverContext adds the keys used when recording observability metrics to
Expand All @@ -205,9 +169,9 @@ func ReceiverContext(
return ctx
}

// traceReceiveOp creates the span used to trace the operation. Returning
// startOp creates the span used to trace the operation. Returning
// the updated context with the created span.
func (rec *Receiver) traceReceiveOp(
func (rec *Receiver) startOp(
receiverCtx context.Context,
operationSuffix string,
opt ...StartReceiveOption,
Expand All @@ -224,7 +188,7 @@ func (rec *Receiver) traceReceiveOp(
ctx, span = trace.StartSpan(receiverCtx, spanName)
} else {
// Since the receiverCtx is long lived do not use it to start the span.
// This way this trace ends when the EndTraceDataReceiveOp is called.
// This way this trace ends when the EndTracesOp is called.
// Here is safe to ignore the returned context since it is not used below.
_, span = trace.StartSpan(context.Background(), spanName)

Expand All @@ -240,8 +204,8 @@ func (rec *Receiver) traceReceiveOp(
return ctx
}

// endReceiveOp records the observability signals at the end of an operation.
func (rec *Receiver) endReceiveOp(
// endOp records the observability signals at the end of an operation.
func (rec *Receiver) endOp(
receiverCtx context.Context,
format string,
numReceivedItems int,
Expand Down
16 changes: 8 additions & 8 deletions obsreport/obsreport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ func TestReceiveTraceDataOp(t *testing.T) {
rcvdSpans := []int{13, 42}
for i, param := range params {
rec := NewReceiver(ReceiverSettings{ReceiverID: receiver, Transport: param.transport})
ctx := rec.StartTraceDataReceiveOp(receiverCtx)
ctx := rec.StartTracesOp(receiverCtx)
assert.NotNil(t, ctx)

rec.EndTraceDataReceiveOp(
rec.EndTracesOp(
ctx,
format,
rcvdSpans[i],
Expand Down Expand Up @@ -133,10 +133,10 @@ func TestReceiveLogsOp(t *testing.T) {
rcvdLogRecords := []int{13, 42}
for i, param := range params {
rec := NewReceiver(ReceiverSettings{ReceiverID: receiver, Transport: param.transport})
ctx := rec.StartLogsReceiveOp(receiverCtx)
ctx := rec.StartLogsOp(receiverCtx)
assert.NotNil(t, ctx)

rec.EndLogsReceiveOp(
rec.EndLogsOp(
ctx,
format,
rcvdLogRecords[i],
Expand Down Expand Up @@ -194,10 +194,10 @@ func TestReceiveMetricsOp(t *testing.T) {
rcvdMetricPts := []int{23, 29}
for i, param := range params {
rec := NewReceiver(ReceiverSettings{ReceiverID: receiver, Transport: param.transport})
ctx := rec.StartMetricsReceiveOp(receiverCtx)
ctx := rec.StartMetricsOp(receiverCtx)
assert.NotNil(t, ctx)

rec.EndMetricsReceiveOp(
rec.EndMetricsOp(
ctx,
format,
rcvdMetricPts[i],
Expand Down Expand Up @@ -465,12 +465,12 @@ func TestReceiveWithLongLivedCtx(t *testing.T) {
// Use a new context on each operation to simulate distinct operations
// under the same long lived context.
rec := NewReceiver(ReceiverSettings{ReceiverID: receiver, Transport: transport})
ctx := rec.StartTraceDataReceiveOp(
ctx := rec.StartTracesOp(
longLivedCtx,
WithLongLivedCtx())
assert.NotNil(t, ctx)

rec.EndTraceDataReceiveOp(
rec.EndTracesOp(
ctx,
format,
op.numSpans,
Expand Down
12 changes: 6 additions & 6 deletions obsreport/obsreporttest/obsreporttest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ func TestCheckReceiverTracesViews(t *testing.T) {

receiverCtx := obsreport.ReceiverContext(context.Background(), receiver, transport)
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: transport})
ctx := rec.StartTraceDataReceiveOp(receiverCtx)
ctx := rec.StartTracesOp(receiverCtx)
assert.NotNil(t, ctx)
rec.EndTraceDataReceiveOp(
rec.EndTracesOp(
ctx,
format,
7,
Expand All @@ -62,9 +62,9 @@ func TestCheckReceiverMetricsViews(t *testing.T) {

receiverCtx := obsreport.ReceiverContext(context.Background(), receiver, transport)
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: transport})
ctx := rec.StartMetricsReceiveOp(receiverCtx)
ctx := rec.StartMetricsOp(receiverCtx)
assert.NotNil(t, ctx)
rec.EndMetricsReceiveOp(ctx, format, 7, nil)
rec.EndMetricsOp(ctx, format, 7, nil)

obsreporttest.CheckReceiverMetrics(t, receiver, transport, 7, 0)
}
Expand All @@ -76,9 +76,9 @@ func TestCheckReceiverLogsViews(t *testing.T) {

receiverCtx := obsreport.ReceiverContext(context.Background(), receiver, transport)
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: transport})
ctx := rec.StartLogsReceiveOp(receiverCtx)
ctx := rec.StartLogsOp(receiverCtx)
assert.NotNil(t, ctx)
rec.EndLogsReceiveOp(ctx, format, 7, nil)
rec.EndLogsOp(ctx, format, 7, nil)

obsreporttest.CheckReceiverLogs(t, receiver, transport, 7, 0)
}
Expand Down
14 changes: 7 additions & 7 deletions receiver/jaegerreceiver/trace_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,10 @@ func (h *agentHandler) EmitZipkinBatch(context.Context, []*zipkincore.Span) (err
// Jaeger spans received by the Jaeger agent processor.
func (h *agentHandler) EmitBatch(ctx context.Context, batch *jaeger.Batch) error {
ctx = obsreport.ReceiverContext(ctx, h.id, h.transport)
ctx = h.obsrecv.StartTraceDataReceiveOp(ctx)
ctx = h.obsrecv.StartTracesOp(ctx)

numSpans, err := consumeTraces(ctx, batch, h.nextConsumer)
h.obsrecv.EndTraceDataReceiveOp(ctx, thriftFormat, numSpans, err)
h.obsrecv.EndTracesOp(ctx, thriftFormat, numSpans, err)
return err
}

Expand All @@ -273,12 +273,12 @@ func (jr *jReceiver) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest)
}

ctx = obsreport.ReceiverContext(ctx, jr.id, grpcTransport)
ctx = jr.grpcObsrecv.StartTraceDataReceiveOp(ctx)
ctx = jr.grpcObsrecv.StartTracesOp(ctx)

td := jaegertranslator.ProtoBatchToInternalTraces(r.GetBatch())

err := jr.nextConsumer.ConsumeTraces(ctx, td)
jr.grpcObsrecv.EndTraceDataReceiveOp(ctx, protobufFormat, len(r.GetBatch().Spans), err)
jr.grpcObsrecv.EndTracesOp(ctx, protobufFormat, len(r.GetBatch().Spans), err)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -423,12 +423,12 @@ func (jr *jReceiver) HandleThriftHTTPBatch(w http.ResponseWriter, r *http.Reques
}

ctx = obsreport.ReceiverContext(ctx, jr.id, collectorHTTPTransport)
ctx = jr.httpObsrecv.StartTraceDataReceiveOp(ctx)
ctx = jr.httpObsrecv.StartTracesOp(ctx)

batch, hErr := jr.decodeThriftHTTPBody(r)
if hErr != nil {
http.Error(w, html.EscapeString(hErr.msg), hErr.statusCode)
jr.httpObsrecv.EndTraceDataReceiveOp(ctx, thriftFormat, 0, hErr)
jr.httpObsrecv.EndTracesOp(ctx, thriftFormat, 0, hErr)
return
}

Expand All @@ -438,7 +438,7 @@ func (jr *jReceiver) HandleThriftHTTPBatch(w http.ResponseWriter, r *http.Reques
} else {
w.WriteHeader(http.StatusAccepted)
}
jr.httpObsrecv.EndTraceDataReceiveOp(ctx, thriftFormat, numSpans, err)
jr.httpObsrecv.EndTracesOp(ctx, thriftFormat, numSpans, err)
}

func (jr *jReceiver) startCollector(host component.Host) error {
Expand Down
8 changes: 4 additions & 4 deletions receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
session.MarkMessage(message, "")

ctx := obsreport.ReceiverContext(session.Context(), c.id, transport)
ctx = c.obsrecv.StartTraceDataReceiveOp(ctx)
ctx = c.obsrecv.StartTracesOp(ctx)
statsTags := []tag.Mutator{tag.Insert(tagInstanceName, c.id.String())}
_ = stats.RecordWithTags(ctx, statsTags,
statMessageCount.M(1),
Expand All @@ -275,7 +275,7 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe

spanCount := traces.SpanCount()
err = c.nextConsumer.ConsumeTraces(session.Context(), traces)
c.obsrecv.EndTraceDataReceiveOp(ctx, c.unmarshaler.Encoding(), spanCount, err)
c.obsrecv.EndTracesOp(ctx, c.unmarshaler.Encoding(), spanCount, err)
if err != nil {
return err
}
Expand Down Expand Up @@ -312,7 +312,7 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
session.MarkMessage(message, "")

ctx := obsreport.ReceiverContext(session.Context(), c.id, transport)
ctx = c.obsrecv.StartTraceDataReceiveOp(ctx)
ctx = c.obsrecv.StartTracesOp(ctx)
_ = stats.RecordWithTags(
ctx,
[]tag.Mutator{tag.Insert(tagInstanceName, c.id.String())},
Expand All @@ -328,7 +328,7 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess

err = c.nextConsumer.ConsumeLogs(session.Context(), logs)
// TODO
c.obsrecv.EndTraceDataReceiveOp(ctx, c.unmarshaler.Encoding(), logs.LogRecordCount(), err)
c.obsrecv.EndTracesOp(ctx, c.unmarshaler.Encoding(), logs.LogRecordCount(), err)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions receiver/opencensusreceiver/ocmetrics/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (ocr *Receiver) processReceivedMsg(
}

func (ocr *Receiver) sendToNextConsumer(longLivedRPCCtx context.Context, node *commonpb.Node, resource *resourcepb.Resource, metrics []*ocmetrics.Metric) error {
ctx := ocr.obsrecv.StartMetricsReceiveOp(
ctx := ocr.obsrecv.StartMetricsOp(
longLivedRPCCtx,
obsreport.WithLongLivedCtx())

Expand All @@ -141,7 +141,7 @@ func (ocr *Receiver) sendToNextConsumer(longLivedRPCCtx context.Context, node *c
consumerErr = ocr.nextConsumer.ConsumeMetrics(ctx, internaldata.OCToMetrics(node, resource, metrics))
}

ocr.obsrecv.EndMetricsReceiveOp(
ocr.obsrecv.EndMetricsOp(
ctx,
receiverDataFormat,
numPoints,
Expand Down
Loading