-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
use pipelineprofiles instead of componentprofiles
- Loading branch information
Showing
12 changed files
with
1,044 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
include ../../../Makefile.Common |
19 changes: 19 additions & 0 deletions
19
exporter/exporterhelper/exporterhelperprofiles/constants.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package exporterhelperprofiles // import "go.opentelemetry.io/collector/exporter/exporterhelper/exporterhelperprofiles" | ||
|
||
import ( | ||
"errors" | ||
) | ||
|
||
var ( | ||
// errNilConfig is returned when an empty name is given. | ||
errNilConfig = errors.New("nil config") | ||
// errNilLogger is returned when a logger is nil | ||
errNilLogger = errors.New("nil logger") | ||
// errNilPushProfileData is returned when a nil PushProfiles is given. | ||
errNilPushProfileData = errors.New("nil PushProfiles") | ||
// errNilProfilesConverter is returned when a nil RequestFromProfilesFunc is given. | ||
errNilProfilesConverter = errors.New("nil RequestFromProfilesFunc") | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
module go.opentelemetry.io/collector/exporter/exporterhelper/exporterhelperprofiles | ||
|
||
go 1.22.0 | ||
|
||
require ( | ||
github.com/stretchr/testify v1.9.0 | ||
go.opentelemetry.io/collector/component v0.111.0 | ||
go.opentelemetry.io/collector/config/configretry v1.17.0 | ||
go.opentelemetry.io/collector/consumer v0.111.0 | ||
go.opentelemetry.io/collector/consumer/consumererror/consumererrorprofiles v0.0.0-00010101000000-000000000000 | ||
go.opentelemetry.io/collector/consumer/consumerprofiles v0.111.0 | ||
go.opentelemetry.io/collector/consumer/consumertest v0.111.0 | ||
go.opentelemetry.io/collector/exporter v0.111.0 | ||
go.opentelemetry.io/collector/exporter/exporterprofiles v0.111.0 | ||
go.opentelemetry.io/collector/pdata v1.17.0 | ||
go.opentelemetry.io/collector/pdata/pprofile v0.111.0 | ||
go.opentelemetry.io/collector/pdata/testdata v0.111.0 | ||
go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.0.0-00010101000000-000000000000 | ||
go.opentelemetry.io/otel v1.31.0 | ||
go.opentelemetry.io/otel/sdk v1.31.0 | ||
go.opentelemetry.io/otel/trace v1.31.0 | ||
go.uber.org/zap v1.27.0 | ||
) | ||
|
||
require ( | ||
github.com/cenkalti/backoff/v4 v4.3.0 // indirect | ||
github.com/davecgh/go-spew v1.1.1 // indirect | ||
github.com/go-logr/logr v1.4.2 // indirect | ||
github.com/go-logr/stdr v1.2.2 // indirect | ||
github.com/gogo/protobuf v1.3.2 // indirect | ||
github.com/google/uuid v1.6.0 // indirect | ||
github.com/json-iterator/go v1.1.12 // indirect | ||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect | ||
github.com/modern-go/reflect2 v1.0.2 // indirect | ||
github.com/pmezard/go-difflib v1.0.0 // indirect | ||
go.opentelemetry.io/collector/config/configtelemetry v0.111.0 // indirect | ||
go.opentelemetry.io/collector/extension v0.111.0 // indirect | ||
go.opentelemetry.io/collector/extension/experimental/storage v0.111.0 // indirect | ||
go.opentelemetry.io/collector/internal/globalsignal v0.111.0 // indirect | ||
go.opentelemetry.io/collector/pipeline v0.111.0 // indirect | ||
go.opentelemetry.io/collector/receiver v0.111.0 // indirect | ||
go.opentelemetry.io/collector/receiver/receiverprofiles v0.111.0 // indirect | ||
go.opentelemetry.io/otel/metric v1.31.0 // indirect | ||
go.opentelemetry.io/otel/sdk/metric v1.31.0 // indirect | ||
go.uber.org/multierr v1.11.0 // indirect | ||
golang.org/x/net v0.28.0 // indirect | ||
golang.org/x/sys v0.26.0 // indirect | ||
golang.org/x/text v0.17.0 // indirect | ||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect | ||
google.golang.org/grpc v1.67.1 // indirect | ||
google.golang.org/protobuf v1.35.1 // indirect | ||
gopkg.in/yaml.v3 v3.0.1 // indirect | ||
) | ||
|
||
replace go.opentelemetry.io/collector/consumer/consumertest => ../../../consumer/consumertest | ||
|
||
replace go.opentelemetry.io/collector/pdata/pprofile => ../../../pdata/pprofile | ||
|
||
replace go.opentelemetry.io/collector/pdata/testdata => ../../../pdata/testdata | ||
|
||
replace go.opentelemetry.io/collector/exporter => ../../ | ||
|
||
replace go.opentelemetry.io/collector/consumer => ../../../consumer | ||
|
||
replace go.opentelemetry.io/collector/consumer/consumererror/consumererrorprofiles => ../../../consumer/consumererror/consumererrorprofiles | ||
|
||
replace go.opentelemetry.io/collector/receiver => ../../../receiver | ||
|
||
replace go.opentelemetry.io/collector/consumer/consumerprofiles => ../../../consumer/consumerprofiles | ||
|
||
replace go.opentelemetry.io/collector/component => ../../../component | ||
|
||
replace go.opentelemetry.io/collector/receiver/receiverprofiles => ../../../receiver/receiverprofiles | ||
|
||
replace go.opentelemetry.io/collector/extension => ../../../extension | ||
|
||
replace go.opentelemetry.io/collector/pdata => ../../../pdata | ||
|
||
replace go.opentelemetry.io/collector/exporter/exporterprofiles => ../../exporterprofiles | ||
|
||
replace go.opentelemetry.io/collector/config/configtelemetry => ../../../config/configtelemetry | ||
|
||
replace go.opentelemetry.io/collector/config/configretry => ../../../config/configretry | ||
|
||
replace go.opentelemetry.io/collector/pipeline/pipelineprofiles => ../../../pipeline/pipelineprofiles | ||
|
||
replace go.opentelemetry.io/collector/extension/experimental/storage => ../../../extension/experimental/storage | ||
|
||
replace go.opentelemetry.io/collector/pipeline => ../../../pipeline | ||
|
||
replace go.opentelemetry.io/collector/internal/globalsignal => ../../../internal/globalsignal |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
163 changes: 163 additions & 0 deletions
163
exporter/exporterhelper/exporterhelperprofiles/profiles.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,163 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package exporterhelperprofiles // import "go.opentelemetry.io/collector/exporter/exporterhelper/exporterhelperprofiles" | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
|
||
"go.uber.org/zap" | ||
|
||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/consumer/consumererror" | ||
"go.opentelemetry.io/collector/consumer/consumererror/consumererrorprofiles" | ||
"go.opentelemetry.io/collector/consumer/consumerprofiles" | ||
"go.opentelemetry.io/collector/exporter" | ||
"go.opentelemetry.io/collector/exporter/exporterhelper" | ||
"go.opentelemetry.io/collector/exporter/exporterhelper/internal" | ||
"go.opentelemetry.io/collector/exporter/exporterprofiles" | ||
"go.opentelemetry.io/collector/exporter/exporterqueue" | ||
"go.opentelemetry.io/collector/pdata/pprofile" | ||
"go.opentelemetry.io/collector/pipeline/pipelineprofiles" | ||
) | ||
|
||
var profilesMarshaler = &pprofile.ProtoMarshaler{} | ||
var profilesUnmarshaler = &pprofile.ProtoUnmarshaler{} | ||
|
||
type profilesRequest struct { | ||
pd pprofile.Profiles | ||
pusher consumerprofiles.ConsumeProfilesFunc | ||
} | ||
|
||
func newProfilesRequest(pd pprofile.Profiles, pusher consumerprofiles.ConsumeProfilesFunc) exporterhelper.Request { | ||
return &profilesRequest{ | ||
pd: pd, | ||
pusher: pusher, | ||
} | ||
} | ||
|
||
func newProfileRequestUnmarshalerFunc(pusher consumerprofiles.ConsumeProfilesFunc) exporterqueue.Unmarshaler[exporterhelper.Request] { | ||
return func(bytes []byte) (exporterhelper.Request, error) { | ||
profiles, err := profilesUnmarshaler.UnmarshalProfiles(bytes) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return newProfilesRequest(profiles, pusher), nil | ||
} | ||
} | ||
|
||
func profilesRequestMarshaler(req exporterhelper.Request) ([]byte, error) { | ||
return profilesMarshaler.MarshalProfiles(req.(*profilesRequest).pd) | ||
} | ||
|
||
func (req *profilesRequest) OnError(err error) exporterhelper.Request { | ||
var profileError consumererrorprofiles.Profiles | ||
if errors.As(err, &profileError) { | ||
return newProfilesRequest(profileError.Data(), req.pusher) | ||
} | ||
return req | ||
} | ||
|
||
func (req *profilesRequest) Export(ctx context.Context) error { | ||
return req.pusher(ctx, req.pd) | ||
} | ||
|
||
func (req *profilesRequest) ItemsCount() int { | ||
return req.pd.SampleCount() | ||
} | ||
|
||
type profileExporter struct { | ||
*internal.BaseExporter | ||
consumerprofiles.Profiles | ||
} | ||
|
||
// NewProfilesExporter creates an exporterprofiles.Profiless that records observability metrics and wraps every request with a Span. | ||
func NewProfilesExporter( | ||
ctx context.Context, | ||
set exporter.Settings, | ||
cfg component.Config, | ||
pusher consumerprofiles.ConsumeProfilesFunc, | ||
options ...exporterhelper.Option, | ||
) (exporterprofiles.Profiles, error) { | ||
if cfg == nil { | ||
return nil, errNilConfig | ||
} | ||
if pusher == nil { | ||
return nil, errNilPushProfileData | ||
} | ||
profilesOpts := []exporterhelper.Option{ | ||
internal.WithMarshaler(profilesRequestMarshaler), internal.WithUnmarshaler(newProfileRequestUnmarshalerFunc(pusher)), | ||
internal.WithBatchFuncs(mergeProfiles, mergeSplitProfiles), | ||
} | ||
return NewProfilesRequestExporter(ctx, set, requestFromProfiles(pusher), append(profilesOpts, options...)...) | ||
} | ||
|
||
// RequestFromProfilesFunc converts pprofile.Profiles into a user-defined Request. | ||
// Experimental: This API is at the early stage of development and may change without backward compatibility | ||
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. | ||
type RequestFromProfilesFunc func(context.Context, pprofile.Profiles) (exporterhelper.Request, error) | ||
|
||
// requestFromProfiles returns a RequestFromProfilesFunc that converts pprofile.Profiles into a Request. | ||
func requestFromProfiles(pusher consumerprofiles.ConsumeProfilesFunc) RequestFromProfilesFunc { | ||
return func(_ context.Context, profiles pprofile.Profiles) (exporterhelper.Request, error) { | ||
return newProfilesRequest(profiles, pusher), nil | ||
} | ||
} | ||
|
||
// NewProfilesRequestExporter creates a new profiles exporter based on a custom ProfilesConverter and RequestSender. | ||
// Experimental: This API is at the early stage of development and may change without backward compatibility | ||
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. | ||
func NewProfilesRequestExporter( | ||
_ context.Context, | ||
set exporter.Settings, | ||
converter RequestFromProfilesFunc, | ||
options ...exporterhelper.Option, | ||
) (exporterprofiles.Profiles, error) { | ||
if set.Logger == nil { | ||
return nil, errNilLogger | ||
} | ||
|
||
if converter == nil { | ||
return nil, errNilProfilesConverter | ||
} | ||
|
||
be, err := internal.NewBaseExporter(set, pipelineprofiles.SignalProfiles, newProfilesExporterWithObservability, options...) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
tc, err := consumerprofiles.NewProfiles(func(ctx context.Context, pd pprofile.Profiles) error { | ||
req, cErr := converter(ctx, pd) | ||
if cErr != nil { | ||
set.Logger.Error("Failed to convert profiles. Dropping data.", | ||
zap.Int("dropped_samples", pd.SampleCount()), | ||
zap.Error(err)) | ||
return consumererror.NewPermanent(cErr) | ||
} | ||
return be.Send(ctx, req) | ||
}, be.ConsumerOptions...) | ||
|
||
return &profileExporter{ | ||
BaseExporter: be, | ||
Profiles: tc, | ||
}, err | ||
} | ||
|
||
type profilesExporterWithObservability struct { | ||
internal.BaseRequestSender | ||
obsrep *internal.ObsReport | ||
} | ||
|
||
func newProfilesExporterWithObservability(obsrep *internal.ObsReport) internal.RequestSender { | ||
return &profilesExporterWithObservability{obsrep: obsrep} | ||
} | ||
|
||
func (tewo *profilesExporterWithObservability) Send(ctx context.Context, req exporterhelper.Request) error { | ||
c := tewo.obsrep.StartProfilesOp(ctx) | ||
numSamples := req.ItemsCount() | ||
// Forward the data to the next consumer (this pusher is the next). | ||
err := tewo.NextSender.Send(c, req) | ||
tewo.obsrep.EndProfilesOp(c, numSamples, err) | ||
return err | ||
} |
Oops, something went wrong.