-
Notifications
You must be signed in to change notification settings - Fork 43
ElasticAPM receiver: handle transactions and spans #294
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
receiver/elasticapmreceiver/internal/mappers/IntakeV2ToDerivedFields.go
Outdated
Show resolved
Hide resolved
receiver/elasticapmreceiver/internal/mappers/IntakeV2ToDerivedFields.go
Outdated
Show resolved
Hide resolved
receiver/elasticapmreceiver/internal/mappers/IntakeV2ToDerivedFields.go
Outdated
Show resolved
Hide resolved
receiver/elasticapmreceiver/internal/mappers/IntakeV2ToSemConv.go
Outdated
Show resolved
Hide resolved
receiver/elasticapmreceiver/internal/mappers/IntakeV2ToSemConv.go
Outdated
Show resolved
Hide resolved
baeb9ac to
9e5c118
Compare
| s := ss.Spans().AppendEmpty() | ||
|
|
||
| traceId, err := traceIDFromHex(event.Trace.Id) | ||
| if err == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would keep the receiver code very small, and extract all the logic into internal methods, so they can be more easily tested. But maybe that's my preference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe something like the upstream translator package: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/translator/jaeger/jaegerproto_to_traces.go#L27
An internal function that given an Elastic span/transaction event returns its corresponding ptrace.ResourceSpan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agree. I moved out all these lines into separate functions. One nit here: https://github.com/elastic/opentelemetry-collector-components/pull/294/files#r1963186038 - but I feel it's ok.
| attributes.PutStr(semconv.AttributeTelemetrySDKLanguage, event.Service.Language.Name) | ||
| } | ||
| attributes.PutStr(semconv.AttributeTelemetrySDKName, "ElasticAPM") | ||
| attributes.PutStr(semconv.AttributeDeploymentEnvironmentName, event.Service.Environment) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the event.Service.Environment always populated? Should we set the optional semconv.AttributeDeploymentEnvironmentName if empty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, thanks! I added a check here.
| func TranslateIntakeV2TransactionToOTelAttributes(event *modelpb.APMEvent, attributes pcommon.Map) { | ||
| if event.Http != nil && event.Http.Request != nil { | ||
| attributes.PutStr(semconv.AttributeHTTPRequestMethod, event.Http.Request.Method) | ||
| attributes.PutInt(semconv.AttributeHTTPResponseStatusCode, int64(event.Http.Response.StatusCode)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function below TranslateIntakeV2SpanToOTelAttributes checks for event.Http.Response being non-nil, wdyt about unifying the HTTP attributes setter in a separate function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, moved it.
| s := ss.Spans().AppendEmpty() | ||
|
|
||
| traceId, err := traceIDFromHex(event.Trace.Id) | ||
| if err == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe something like the upstream translator package: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/translator/jaeger/jaegerproto_to_traces.go#L27
An internal function that given an Elastic span/transaction event returns its corresponding ptrace.ResourceSpan
| nextTrace := new(consumertest.TracesSink) | ||
| rec, _ := factory.CreateTraces(context.Background(), set, cfg, nextTrace) | ||
|
|
||
| if err := rec.Start(context.Background(), componenttest.NewNopHost()); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wdyt of deferring the receiver shutdown just after starting it?
defer func() {
if err := rec.Shutdown(context.Background()); err != nil {
t.Errorf("Shutdown failed: %v", err)
}
}()There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Love it! Updated.
| receiver := extractElasticAPMReceiver(rec) | ||
| if receiver == nil { | ||
| t.Fatal("Failed to extract elasticAPMReceiver") | ||
| } | ||
|
|
||
| protocol := "https" | ||
| if receiver.cfg.TLSSetting == nil { | ||
| protocol = "http" | ||
| } | ||
| resp, err := http.Post(protocol+"://"+receiver.cfg.Endpoint+intakePath, "application/x-ndjson", bytes.NewBuffer(data)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about explicitly setting the receiver server endpoint so we don't require this type assertion?
Something like:
testEndpoint := testutil.GetAvailableLocalAddress(t)
rcvr, err := newElasticAPMReceiver(nil, &Config{
ServerConfig: confighttp.ServerConfig{
Endpoint: testEndpoint,
},
}, receivertest.NewNopSettings())then (always http)
resp, err := http.Post("http://"+testEndpoint+intakePath, "application/x-ndjson", bytes.NewBuffer(data))by using the GetAvailableLocalAddress helper, we ensure there are no port collisions during the tests execution
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, this is a good idea. I think this PR predates testutil.GetAvailableLocalAddress(t), so that's why my less nice solution here originally.
Updated the PR and now I use testutil.GetAvailableLocalAddress(t) - some things I adapted, since now http.Post(...) is in a different method (due to another point above), and I did not want to pass the endpoint string around, so I still rely on receiver.cfg.Endpoint - but the endpoint itself is now defined by testutil.GetAvailableLocalAddress(t).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not want to pass the endpoint string around, so I still rely on receiver.cfg.Endpoint - but the endpoint itself is now defined by testutil.GetAvailableLocalAddress(t).
But then you need to do a type assertion on the receiver interface, is there any reason why not to use the string endpoint?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My only thinking was to not pass too many parameters. On the other hand, passing the string is very lightweight compared to the type assertion.
Pushed an update to get rid of the type assertion and now we pass the string.
2500b94 to
b0da9e1
Compare
rogercoll
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, added a couple of nits
|
|
||
| actualTraces := nextTrace.AllTraces()[0] | ||
| expectedFile := filepath.Join(testData, expectedYamlFileName) | ||
| expectedTraces, _ := golden.ReadTraces(expectedFile) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would handle this error just in case we produce an invalid expectedFile at some point
| set := receivertest.NewNopSettings() | ||
| nextTrace := new(consumertest.TracesSink) | ||
| rec, _ := factory.CreateTraces(context.Background(), set, cfg, nextTrace) | ||
| receiver := rec.(*sharedcomponent.Component[*elasticAPMReceiver]).Unwrap() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this type assertion is needed, you could just use the testEndpoint variable instead of rec *elasticAPMReceiver in the runComparison function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, let's go with that #294 (comment)
| r.settings.Logger.Info("Starting HTTP server", zap.String("endpoint", r.cfg.ServerConfig.Endpoint)) | ||
|
|
||
| // TODO: Remove this once we have a proper way to configure TLS | ||
| r.cfg.TLSSetting = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could we set this in the factory.go createDefaultConfig default method instead? (to avoid mutating the config after being parsed)
| setHttpAttributes(event, attributes) | ||
|
|
||
| if event.Url != nil { | ||
| attributes.PutStr(semconv.AttributeURLFull, event.Url.Full) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if any of this inner fields can be nil, if so we might need to add a guard function for all the setters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually a good point - we need to guard these. For now I just added another check. This was already setting empty strings.
In the spec the required fields are listed: https://github.com/elastic/apm-server/tree/main/docs/spec/v2
If any of those are missing, apm server rejects those documents. But url.full isn't required.
This whole question makes me think... we should probably discuss how to handle invalid documents - like when a required field is missing.
In any case - this specific one is handled.
Updated: opened https://github.com/elastic/opentelemetry-dev/issues/553?issue=elastic%7Copentelemetry-dev%7C752
This PR adds mapping for transactions and spans from the Elastic APM data model to OTel.
It also adds tests based on golden. The idea for testing: we reuse ndjson files from the apm-server repo and send those to the receiver which we compare by using golden.
TODO (will bring these up in meetings) - done:
opentelemetry-lib- we do the reverse here what's the enricher does that - but we use the exact same Elastic specific attribute names. We could share all those constants across these 2 libs. PR: Create common package for reusing elastic specific attributes opentelemetry-lib#141 - The PR is merged, but opentelemetry-lib will need the next release. I have a todo for this in the PR, I'd merge as is and update later.Many things are still outside this PR, like errors, logs, metrics. I propose to handle those in follow-ups, as they are fairly independent.