diff --git a/.chloggen/deprecate-pipelieneprofiles.yaml b/.chloggen/deprecate-pipelieneprofiles.yaml new file mode 100644 index 00000000000..680f8dbaf07 --- /dev/null +++ b/.chloggen/deprecate-pipelieneprofiles.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: deprecation + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: pipeline + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Deprecate pipelineprofiles module in favor of xpipeline to allow adding more experimental data types. + +# One or more tracking issues or pull requests related to the change +issues: [11778] + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/cmd/builder/internal/builder/main_test.go b/cmd/builder/internal/builder/main_test.go index 16e035bdfe7..6887e5f2707 100644 --- a/cmd/builder/internal/builder/main_test.go +++ b/cmd/builder/internal/builder/main_test.go @@ -88,7 +88,7 @@ var replaceModules = []string{ "/internal/sharedcomponent", "/otelcol", "/pipeline", - "/pipeline/pipelineprofiles", + "/pipeline/xpipeline", "/processor", "/processor/processortest", "/processor/batchprocessor", diff --git a/cmd/otelcorecol/builder-config.yaml b/cmd/otelcorecol/builder-config.yaml index 3f86dacde3a..058876d4148 100644 --- a/cmd/otelcorecol/builder-config.yaml +++ b/cmd/otelcorecol/builder-config.yaml @@ -92,7 +92,7 @@ replaces: - go.opentelemetry.io/collector/pdata/testdata => ../../pdata/testdata - go.opentelemetry.io/collector/pdata/pprofile => ../../pdata/pprofile - go.opentelemetry.io/collector/pipeline => ../../pipeline - - go.opentelemetry.io/collector/pipeline/pipelineprofiles => ../../pipeline/pipelineprofiles + - go.opentelemetry.io/collector/pipeline/xpipeline => ../../pipeline/xpipeline - go.opentelemetry.io/collector/processor => ../../processor - go.opentelemetry.io/collector/processor/processortest => ../../processor/processortest - go.opentelemetry.io/collector/processor/batchprocessor => ../../processor/batchprocessor diff --git a/cmd/otelcorecol/go.mod b/cmd/otelcorecol/go.mod index 678a37c604d..cb4c2192086 100644 --- a/cmd/otelcorecol/go.mod +++ b/cmd/otelcorecol/go.mod @@ -115,7 +115,7 @@ require ( go.opentelemetry.io/collector/pdata/pprofile v0.115.0 // indirect go.opentelemetry.io/collector/pdata/testdata v0.115.0 // indirect go.opentelemetry.io/collector/pipeline v0.115.0 // indirect - go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.115.0 // indirect + go.opentelemetry.io/collector/pipeline/xpipeline v0.115.0 // indirect go.opentelemetry.io/collector/processor/processortest v0.115.0 // indirect go.opentelemetry.io/collector/processor/xprocessor v0.115.0 // indirect go.opentelemetry.io/collector/receiver/receivertest v0.115.0 // indirect @@ -269,7 +269,7 @@ replace go.opentelemetry.io/collector/pdata/pprofile => ../../pdata/pprofile replace go.opentelemetry.io/collector/pipeline => ../../pipeline -replace go.opentelemetry.io/collector/pipeline/pipelineprofiles => ../../pipeline/pipelineprofiles +replace go.opentelemetry.io/collector/pipeline/xpipeline => ../../pipeline/xpipeline replace go.opentelemetry.io/collector/processor => ../../processor diff --git a/connector/connectorprofiles/connector.go b/connector/connectorprofiles/connector.go index bd66013824d..064c0509c32 100644 --- a/connector/connectorprofiles/connector.go +++ b/connector/connectorprofiles/connector.go @@ -12,7 +12,7 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/pipeline" - "go.opentelemetry.io/collector/pipeline/pipelineprofiles" + "go.opentelemetry.io/collector/pipeline/xpipeline" ) type Factory interface { @@ -60,7 +60,7 @@ type CreateTracesToProfilesFunc func(context.Context, connector.Settings, compon // CreateTracesToProfiles implements Factory.CreateTracesToProfiles(). func (f CreateTracesToProfilesFunc) CreateTracesToProfiles(ctx context.Context, set connector.Settings, cfg component.Config, next xconsumer.Profiles) (connector.Traces, error) { if f == nil { - return nil, internal.ErrDataTypes(set.ID, pipeline.SignalTraces, pipelineprofiles.SignalProfiles) + return nil, internal.ErrDataTypes(set.ID, pipeline.SignalTraces, xpipeline.SignalProfiles) } return f(ctx, set, cfg, next) } @@ -71,7 +71,7 @@ type CreateMetricsToProfilesFunc func(context.Context, connector.Settings, compo // CreateMetricsToProfiles implements Factory.CreateMetricsToProfiles(). func (f CreateMetricsToProfilesFunc) CreateMetricsToProfiles(ctx context.Context, set connector.Settings, cfg component.Config, next xconsumer.Profiles) (connector.Metrics, error) { if f == nil { - return nil, internal.ErrDataTypes(set.ID, pipeline.SignalMetrics, pipelineprofiles.SignalProfiles) + return nil, internal.ErrDataTypes(set.ID, pipeline.SignalMetrics, xpipeline.SignalProfiles) } return f(ctx, set, cfg, next) } @@ -82,7 +82,7 @@ type CreateLogsToProfilesFunc func(context.Context, connector.Settings, componen // CreateLogsToProfiles implements Factory.CreateLogsToProfiles(). func (f CreateLogsToProfilesFunc) CreateLogsToProfiles(ctx context.Context, set connector.Settings, cfg component.Config, next xconsumer.Profiles) (connector.Logs, error) { if f == nil { - return nil, internal.ErrDataTypes(set.ID, pipeline.SignalLogs, pipelineprofiles.SignalProfiles) + return nil, internal.ErrDataTypes(set.ID, pipeline.SignalLogs, xpipeline.SignalProfiles) } return f(ctx, set, cfg, next) } @@ -93,7 +93,7 @@ type CreateProfilesToProfilesFunc func(context.Context, connector.Settings, comp // CreateProfilesToProfiles implements Factory.CreateProfilesToProfiles(). func (f CreateProfilesToProfilesFunc) CreateProfilesToProfiles(ctx context.Context, set connector.Settings, cfg component.Config, next xconsumer.Profiles) (Profiles, error) { if f == nil { - return nil, internal.ErrDataTypes(set.ID, pipelineprofiles.SignalProfiles, pipelineprofiles.SignalProfiles) + return nil, internal.ErrDataTypes(set.ID, xpipeline.SignalProfiles, xpipeline.SignalProfiles) } return f(ctx, set, cfg, next) } @@ -104,7 +104,7 @@ type CreateProfilesToTracesFunc func(context.Context, connector.Settings, compon // CreateProfilesToTraces implements Factory.CreateProfilesToTraces(). func (f CreateProfilesToTracesFunc) CreateProfilesToTraces(ctx context.Context, set connector.Settings, cfg component.Config, next consumer.Traces) (Profiles, error) { if f == nil { - return nil, internal.ErrDataTypes(set.ID, pipelineprofiles.SignalProfiles, pipeline.SignalTraces) + return nil, internal.ErrDataTypes(set.ID, xpipeline.SignalProfiles, pipeline.SignalTraces) } return f(ctx, set, cfg, next) } @@ -115,7 +115,7 @@ type CreateProfilesToMetricsFunc func(context.Context, connector.Settings, compo // CreateProfilesToMetrics implements Factory.CreateProfilesToMetrics(). func (f CreateProfilesToMetricsFunc) CreateProfilesToMetrics(ctx context.Context, set connector.Settings, cfg component.Config, next consumer.Metrics) (Profiles, error) { if f == nil { - return nil, internal.ErrDataTypes(set.ID, pipelineprofiles.SignalProfiles, pipeline.SignalMetrics) + return nil, internal.ErrDataTypes(set.ID, xpipeline.SignalProfiles, pipeline.SignalMetrics) } return f(ctx, set, cfg, next) } @@ -126,7 +126,7 @@ type CreateProfilesToLogsFunc func(context.Context, connector.Settings, componen // CreateProfilesToLogs implements Factory.CreateProfilesToLogs(). func (f CreateProfilesToLogsFunc) CreateProfilesToLogs(ctx context.Context, set connector.Settings, cfg component.Config, next consumer.Logs) (Profiles, error) { if f == nil { - return nil, internal.ErrDataTypes(set.ID, pipelineprofiles.SignalProfiles, pipeline.SignalLogs) + return nil, internal.ErrDataTypes(set.ID, xpipeline.SignalProfiles, pipeline.SignalLogs) } return f(ctx, set, cfg, next) } diff --git a/connector/connectorprofiles/connector_test.go b/connector/connectorprofiles/connector_test.go index e3cbd0615c4..6c27e593b27 100644 --- a/connector/connectorprofiles/connector_test.go +++ b/connector/connectorprofiles/connector_test.go @@ -17,7 +17,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/pipeline" - "go.opentelemetry.io/collector/pipeline/pipelineprofiles" + "go.opentelemetry.io/collector/pipeline/xpipeline" ) var ( @@ -32,18 +32,18 @@ func TestNewFactoryNoOptions(t *testing.T) { assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig()) _, err := factory.CreateTracesToProfiles(context.Background(), connector.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) - assert.Equal(t, err, internal.ErrDataTypes(testID, pipeline.SignalTraces, pipelineprofiles.SignalProfiles)) + assert.Equal(t, err, internal.ErrDataTypes(testID, pipeline.SignalTraces, xpipeline.SignalProfiles)) _, err = factory.CreateMetricsToProfiles(context.Background(), connector.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) - assert.Equal(t, err, internal.ErrDataTypes(testID, pipeline.SignalMetrics, pipelineprofiles.SignalProfiles)) + assert.Equal(t, err, internal.ErrDataTypes(testID, pipeline.SignalMetrics, xpipeline.SignalProfiles)) _, err = factory.CreateLogsToProfiles(context.Background(), connector.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) - assert.Equal(t, err, internal.ErrDataTypes(testID, pipeline.SignalLogs, pipelineprofiles.SignalProfiles)) + assert.Equal(t, err, internal.ErrDataTypes(testID, pipeline.SignalLogs, xpipeline.SignalProfiles)) _, err = factory.CreateProfilesToTraces(context.Background(), connector.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) - assert.Equal(t, err, internal.ErrDataTypes(testID, pipelineprofiles.SignalProfiles, pipeline.SignalTraces)) + assert.Equal(t, err, internal.ErrDataTypes(testID, xpipeline.SignalProfiles, pipeline.SignalTraces)) _, err = factory.CreateProfilesToMetrics(context.Background(), connector.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) - assert.Equal(t, err, internal.ErrDataTypes(testID, pipelineprofiles.SignalProfiles, pipeline.SignalMetrics)) + assert.Equal(t, err, internal.ErrDataTypes(testID, xpipeline.SignalProfiles, pipeline.SignalMetrics)) _, err = factory.CreateProfilesToLogs(context.Background(), connector.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) - assert.Equal(t, err, internal.ErrDataTypes(testID, pipelineprofiles.SignalProfiles, pipeline.SignalLogs)) + assert.Equal(t, err, internal.ErrDataTypes(testID, xpipeline.SignalProfiles, pipeline.SignalLogs)) } func TestNewFactoryWithSameTypes(t *testing.T) { @@ -59,11 +59,11 @@ func TestNewFactoryWithSameTypes(t *testing.T) { require.NoError(t, err) _, err = factory.CreateProfilesToTraces(context.Background(), connector.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) - assert.Equal(t, err, internal.ErrDataTypes(testID, pipelineprofiles.SignalProfiles, pipeline.SignalTraces)) + assert.Equal(t, err, internal.ErrDataTypes(testID, xpipeline.SignalProfiles, pipeline.SignalTraces)) _, err = factory.CreateProfilesToMetrics(context.Background(), connector.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) - assert.Equal(t, err, internal.ErrDataTypes(testID, pipelineprofiles.SignalProfiles, pipeline.SignalMetrics)) + assert.Equal(t, err, internal.ErrDataTypes(testID, xpipeline.SignalProfiles, pipeline.SignalMetrics)) _, err = factory.CreateProfilesToLogs(context.Background(), connector.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) - assert.Equal(t, err, internal.ErrDataTypes(testID, pipelineprofiles.SignalProfiles, pipeline.SignalLogs)) + assert.Equal(t, err, internal.ErrDataTypes(testID, xpipeline.SignalProfiles, pipeline.SignalLogs)) } func TestNewFactoryWithTranslateTypes(t *testing.T) { @@ -81,7 +81,7 @@ func TestNewFactoryWithTranslateTypes(t *testing.T) { assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig()) _, err := factory.CreateProfilesToProfiles(context.Background(), connector.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) - assert.Equal(t, err, internal.ErrDataTypes(testID, pipelineprofiles.SignalProfiles, pipelineprofiles.SignalProfiles)) + assert.Equal(t, err, internal.ErrDataTypes(testID, xpipeline.SignalProfiles, xpipeline.SignalProfiles)) assert.Equal(t, component.StabilityLevelBeta, factory.TracesToProfilesStability()) _, err = factory.CreateTracesToProfiles(context.Background(), connector.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) diff --git a/connector/connectorprofiles/go.mod b/connector/connectorprofiles/go.mod index 17c4d009256..52e276d537d 100644 --- a/connector/connectorprofiles/go.mod +++ b/connector/connectorprofiles/go.mod @@ -13,7 +13,7 @@ require ( go.opentelemetry.io/collector/pdata/pprofile v0.115.0 go.opentelemetry.io/collector/pdata/testdata v0.115.0 go.opentelemetry.io/collector/pipeline v0.115.0 - go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.115.0 + go.opentelemetry.io/collector/pipeline/xpipeline v0.115.0 ) require ( @@ -59,6 +59,6 @@ replace go.opentelemetry.io/collector/pdata/testdata => ../../pdata/testdata replace go.opentelemetry.io/collector/pipeline => ../../pipeline -replace go.opentelemetry.io/collector/pipeline/pipelineprofiles => ../../pipeline/pipelineprofiles +replace go.opentelemetry.io/collector/pipeline/xpipeline => ../../pipeline/xpipeline replace go.opentelemetry.io/collector/internal/fanoutconsumer => ../../internal/fanoutconsumer diff --git a/connector/connectorprofiles/profiles_router_test.go b/connector/connectorprofiles/profiles_router_test.go index 5ff72b68aef..dc5b2bfe658 100644 --- a/connector/connectorprofiles/profiles_router_test.go +++ b/connector/connectorprofiles/profiles_router_test.go @@ -18,7 +18,7 @@ import ( "go.opentelemetry.io/collector/pdata/pprofile" "go.opentelemetry.io/collector/pdata/testdata" "go.opentelemetry.io/collector/pipeline" - "go.opentelemetry.io/collector/pipeline/pipelineprofiles" + "go.opentelemetry.io/collector/pipeline/xpipeline" ) type mutatingProfilesSink struct { @@ -51,7 +51,7 @@ func fuzzProfiles(numIDs, numCons, numProfiles int) func(*testing.T) { // If any consumer is mutating, the router must report mutating for i := 0; i < numCons; i++ { - allIDs = append(allIDs, pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "sink_"+strconv.Itoa(numCons))) + allIDs = append(allIDs, pipeline.NewIDWithName(xpipeline.SignalProfiles, "sink_"+strconv.Itoa(numCons))) // Random chance for each consumer to be mutating if (numCons+numProfiles+i)%4 == 0 { allCons = append(allCons, &mutatingProfilesSink{ProfilesSink: new(consumertest.ProfilesSink)}) @@ -111,8 +111,8 @@ func TestProfilessRouterConsumer(t *testing.T) { ctx := context.Background() td := testdata.GenerateProfiles(1) - fooID := pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "foo") - barID := pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "bar") + fooID := pipeline.NewIDWithName(xpipeline.SignalProfiles, "foo") + barID := pipeline.NewIDWithName(xpipeline.SignalProfiles, "bar") foo := new(consumertest.ProfilesSink) bar := new(consumertest.ProfilesSink) @@ -153,7 +153,7 @@ func TestProfilessRouterConsumer(t *testing.T) { assert.Nil(t, none) require.Error(t, err) - fake, err := r.Consumer(pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "fake")) + fake, err := r.Consumer(pipeline.NewIDWithName(xpipeline.SignalProfiles, "fake")) assert.Nil(t, fake) assert.Error(t, err) } diff --git a/connector/connectortest/go.mod b/connector/connectortest/go.mod index a35561e9804..73f0bfa22c9 100644 --- a/connector/connectortest/go.mod +++ b/connector/connectortest/go.mod @@ -29,7 +29,7 @@ require ( go.opentelemetry.io/collector/config/configtelemetry v0.115.0 // indirect go.opentelemetry.io/collector/internal/fanoutconsumer v0.115.0 // indirect go.opentelemetry.io/collector/pipeline v0.115.0 // indirect - go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.115.0 // indirect + go.opentelemetry.io/collector/pipeline/xpipeline v0.115.0 // indirect go.opentelemetry.io/otel v1.32.0 // indirect go.opentelemetry.io/otel/metric v1.32.0 // indirect go.opentelemetry.io/otel/sdk v1.32.0 // indirect @@ -70,6 +70,6 @@ replace go.opentelemetry.io/collector/pdata/testdata => ../../pdata/testdata replace go.opentelemetry.io/collector/pipeline => ../../pipeline -replace go.opentelemetry.io/collector/pipeline/pipelineprofiles => ../../pipeline/pipelineprofiles +replace go.opentelemetry.io/collector/pipeline/xpipeline => ../../pipeline/xpipeline replace go.opentelemetry.io/collector/internal/fanoutconsumer => ../../internal/fanoutconsumer diff --git a/connector/forwardconnector/go.mod b/connector/forwardconnector/go.mod index 0dc2d34743f..e3b2229ffd6 100644 --- a/connector/forwardconnector/go.mod +++ b/connector/forwardconnector/go.mod @@ -37,7 +37,7 @@ require ( go.opentelemetry.io/collector/consumer/xconsumer v0.115.1-0.20241213185000-4593ba7de234 // indirect go.opentelemetry.io/collector/internal/fanoutconsumer v0.115.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.115.0 // indirect - go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.115.0 // indirect + go.opentelemetry.io/collector/pipeline/xpipeline v0.115.0 // indirect go.opentelemetry.io/otel v1.32.0 // indirect go.opentelemetry.io/otel/metric v1.32.0 // indirect go.opentelemetry.io/otel/sdk v1.32.0 // indirect @@ -87,6 +87,6 @@ replace go.opentelemetry.io/collector/connector/connectorprofiles => ../connecto replace go.opentelemetry.io/collector/pipeline => ../../pipeline -replace go.opentelemetry.io/collector/pipeline/pipelineprofiles => ../../pipeline/pipelineprofiles +replace go.opentelemetry.io/collector/pipeline/xpipeline => ../../pipeline/xpipeline replace go.opentelemetry.io/collector/internal/fanoutconsumer => ../../internal/fanoutconsumer diff --git a/exporter/debugexporter/go.mod b/exporter/debugexporter/go.mod index 822db02e825..715191610ad 100644 --- a/exporter/debugexporter/go.mod +++ b/exporter/debugexporter/go.mod @@ -48,7 +48,7 @@ require ( go.opentelemetry.io/collector/extension/experimental/storage v0.115.0 // indirect go.opentelemetry.io/collector/featuregate v1.21.0 // indirect go.opentelemetry.io/collector/pipeline v0.115.0 // indirect - go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.115.0 // indirect + go.opentelemetry.io/collector/pipeline/xpipeline v0.115.0 // indirect go.opentelemetry.io/collector/receiver v0.115.0 // indirect go.opentelemetry.io/collector/receiver/receivertest v0.115.0 // indirect go.opentelemetry.io/collector/receiver/xreceiver v0.115.0 // indirect @@ -108,7 +108,7 @@ replace go.opentelemetry.io/collector/exporter/exporterhelper/exporterhelperprof replace go.opentelemetry.io/collector/pipeline => ../../pipeline -replace go.opentelemetry.io/collector/pipeline/pipelineprofiles => ../../pipeline/pipelineprofiles +replace go.opentelemetry.io/collector/pipeline/xpipeline => ../../pipeline/xpipeline replace go.opentelemetry.io/collector/exporter/exportertest => ../exportertest diff --git a/exporter/exporterhelper/exporterhelperprofiles/go.mod b/exporter/exporterhelper/exporterhelperprofiles/go.mod index 50b809fea0c..b80e4863d83 100644 --- a/exporter/exporterhelper/exporterhelperprofiles/go.mod +++ b/exporter/exporterhelper/exporterhelperprofiles/go.mod @@ -17,7 +17,7 @@ require ( go.opentelemetry.io/collector/exporter/xexporter v0.115.0 go.opentelemetry.io/collector/pdata/pprofile v0.115.0 go.opentelemetry.io/collector/pdata/testdata v0.115.0 - go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.115.0 + go.opentelemetry.io/collector/pipeline/xpipeline v0.115.0 go.opentelemetry.io/otel v1.32.0 go.opentelemetry.io/otel/sdk v1.32.0 go.opentelemetry.io/otel/trace v1.32.0 @@ -91,7 +91,7 @@ replace go.opentelemetry.io/collector/config/configtelemetry => ../../../config/ replace go.opentelemetry.io/collector/config/configretry => ../../../config/configretry -replace go.opentelemetry.io/collector/pipeline/pipelineprofiles => ../../../pipeline/pipelineprofiles +replace go.opentelemetry.io/collector/pipeline/xpipeline => ../../../pipeline/xpipeline replace go.opentelemetry.io/collector/extension/experimental/storage => ../../../extension/experimental/storage diff --git a/exporter/exporterhelper/exporterhelperprofiles/profiles.go b/exporter/exporterhelper/exporterhelperprofiles/profiles.go index 8aa3e88d355..267bc05b3ca 100644 --- a/exporter/exporterhelper/exporterhelperprofiles/profiles.go +++ b/exporter/exporterhelper/exporterhelperprofiles/profiles.go @@ -19,7 +19,7 @@ import ( "go.opentelemetry.io/collector/exporter/exporterqueue" "go.opentelemetry.io/collector/exporter/xexporter" "go.opentelemetry.io/collector/pdata/pprofile" - "go.opentelemetry.io/collector/pipeline/pipelineprofiles" + "go.opentelemetry.io/collector/pipeline/xpipeline" ) var ( @@ -123,7 +123,7 @@ func NewProfilesRequestExporter( return nil, errNilProfilesConverter } - be, err := internal.NewBaseExporter(set, pipelineprofiles.SignalProfiles, newProfilesExporterWithObservability, options...) + be, err := internal.NewBaseExporter(set, xpipeline.SignalProfiles, newProfilesExporterWithObservability, options...) if err != nil { return nil, err } diff --git a/exporter/otlpexporter/go.mod b/exporter/otlpexporter/go.mod index 12d844f20ef..5edcfe91aa4 100644 --- a/exporter/otlpexporter/go.mod +++ b/exporter/otlpexporter/go.mod @@ -64,7 +64,7 @@ require ( go.opentelemetry.io/collector/extension/experimental/storage v0.115.0 // indirect go.opentelemetry.io/collector/featuregate v1.21.0 // indirect go.opentelemetry.io/collector/pipeline v0.115.0 // indirect - go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.115.0 // indirect + go.opentelemetry.io/collector/pipeline/xpipeline v0.115.0 // indirect go.opentelemetry.io/collector/receiver v0.115.0 // indirect go.opentelemetry.io/collector/receiver/receivertest v0.115.0 // indirect go.opentelemetry.io/collector/receiver/xreceiver v0.115.0 // indirect @@ -143,7 +143,7 @@ replace go.opentelemetry.io/collector/exporter/exporterhelper/exporterhelperprof replace go.opentelemetry.io/collector/consumer/consumererror/consumererrorprofiles => ../../consumer/consumererror/consumererrorprofiles -replace go.opentelemetry.io/collector/pipeline/pipelineprofiles => ../../pipeline/pipelineprofiles +replace go.opentelemetry.io/collector/pipeline/xpipeline => ../../pipeline/xpipeline replace go.opentelemetry.io/collector/consumer/consumererror => ../../consumer/consumererror diff --git a/exporter/otlphttpexporter/go.mod b/exporter/otlphttpexporter/go.mod index 710ae1ea5e8..90884a5f036 100644 --- a/exporter/otlphttpexporter/go.mod +++ b/exporter/otlphttpexporter/go.mod @@ -64,7 +64,7 @@ require ( go.opentelemetry.io/collector/extension/experimental/storage v0.115.0 // indirect go.opentelemetry.io/collector/featuregate v1.21.0 // indirect go.opentelemetry.io/collector/pipeline v0.115.0 // indirect - go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.115.0 // indirect + go.opentelemetry.io/collector/pipeline/xpipeline v0.115.0 // indirect go.opentelemetry.io/collector/receiver v0.115.0 // indirect go.opentelemetry.io/collector/receiver/receivertest v0.115.0 // indirect go.opentelemetry.io/collector/receiver/xreceiver v0.115.0 // indirect @@ -133,7 +133,7 @@ replace go.opentelemetry.io/collector/receiver/xreceiver => ../../receiver/xrece replace go.opentelemetry.io/collector/receiver/receivertest => ../../receiver/receivertest -replace go.opentelemetry.io/collector/pipeline/pipelineprofiles => ../../pipeline/pipelineprofiles +replace go.opentelemetry.io/collector/pipeline/xpipeline => ../../pipeline/xpipeline replace go.opentelemetry.io/collector/consumer/consumererror/consumererrorprofiles => ../../consumer/consumererror/consumererrorprofiles diff --git a/internal/e2e/go.mod b/internal/e2e/go.mod index 885ae14a738..bfb1b16124b 100644 --- a/internal/e2e/go.mod +++ b/internal/e2e/go.mod @@ -95,7 +95,7 @@ require ( go.opentelemetry.io/collector/featuregate v1.21.0 // indirect go.opentelemetry.io/collector/internal/fanoutconsumer v0.115.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.115.0 // indirect - go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.115.0 // indirect + go.opentelemetry.io/collector/pipeline/xpipeline v0.115.0 // indirect go.opentelemetry.io/collector/processor v0.115.0 // indirect go.opentelemetry.io/collector/processor/processortest v0.115.0 // indirect go.opentelemetry.io/collector/processor/xprocessor v0.115.0 // indirect @@ -224,7 +224,7 @@ replace go.opentelemetry.io/collector/exporter/xexporter => ../../exporter/xexpo replace go.opentelemetry.io/collector/pipeline => ../../pipeline -replace go.opentelemetry.io/collector/pipeline/pipelineprofiles => ../../pipeline/pipelineprofiles +replace go.opentelemetry.io/collector/pipeline/xpipeline => ../../pipeline/xpipeline replace go.opentelemetry.io/collector/exporter/exportertest => ../../exporter/exportertest diff --git a/otelcol/go.mod b/otelcol/go.mod index 482f41d6845..6adff91d533 100644 --- a/otelcol/go.mod +++ b/otelcol/go.mod @@ -81,7 +81,7 @@ require ( go.opentelemetry.io/collector/pdata v1.21.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.115.0 // indirect go.opentelemetry.io/collector/pdata/testdata v0.115.0 // indirect - go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.115.0 // indirect + go.opentelemetry.io/collector/pipeline/xpipeline v0.115.0 // indirect go.opentelemetry.io/collector/processor/xprocessor v0.115.0 // indirect go.opentelemetry.io/collector/receiver/xreceiver v0.115.0 // indirect go.opentelemetry.io/collector/semconv v0.115.0 // indirect @@ -191,7 +191,7 @@ replace go.opentelemetry.io/collector/exporter/xexporter => ../exporter/xexporte replace go.opentelemetry.io/collector/pipeline => ../pipeline -replace go.opentelemetry.io/collector/pipeline/pipelineprofiles => ../pipeline/pipelineprofiles +replace go.opentelemetry.io/collector/pipeline/xpipeline => ../pipeline/xpipeline replace go.opentelemetry.io/collector/exporter/exportertest => ../exporter/exportertest diff --git a/otelcol/otelcoltest/go.mod b/otelcol/otelcoltest/go.mod index 36acfe7603f..b5603a5c3f9 100644 --- a/otelcol/otelcoltest/go.mod +++ b/otelcol/otelcoltest/go.mod @@ -79,7 +79,7 @@ require ( go.opentelemetry.io/collector/pdata v1.21.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.115.0 // indirect go.opentelemetry.io/collector/pdata/testdata v0.115.0 // indirect - go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.115.0 // indirect + go.opentelemetry.io/collector/pipeline/xpipeline v0.115.0 // indirect go.opentelemetry.io/collector/processor/xprocessor v0.115.0 // indirect go.opentelemetry.io/collector/receiver/xreceiver v0.115.0 // indirect go.opentelemetry.io/collector/semconv v0.115.0 // indirect @@ -186,7 +186,7 @@ replace go.opentelemetry.io/collector/exporter/xexporter => ../../exporter/xexpo replace go.opentelemetry.io/collector/pipeline => ../../pipeline -replace go.opentelemetry.io/collector/pipeline/pipelineprofiles => ../../pipeline/pipelineprofiles +replace go.opentelemetry.io/collector/pipeline/xpipeline => ../../pipeline/xpipeline replace go.opentelemetry.io/collector/exporter/exportertest => ../../exporter/exportertest diff --git a/pipeline/pipelineprofiles/config.go b/pipeline/pipelineprofiles/config.go index 6802cf15c95..a81887086b7 100644 --- a/pipeline/pipelineprofiles/config.go +++ b/pipeline/pipelineprofiles/config.go @@ -1,10 +1,12 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 +// Deprecated: [0.116.0] Use go.opentelemetry.io/collector/pipeline/xpipeline instead. package pipelineprofiles // import "go.opentelemetry.io/collector/pipeline/pipelineprofiles" import ( "go.opentelemetry.io/collector/pipeline/internal/globalsignal" ) +// Deprecated: [0.116.0] Use xpipeline.SignalProfiles instead. var SignalProfiles = globalsignal.MustNewSignal("profiles") diff --git a/pipeline/xpipeline/Makefile b/pipeline/xpipeline/Makefile new file mode 100644 index 00000000000..ded7a36092d --- /dev/null +++ b/pipeline/xpipeline/Makefile @@ -0,0 +1 @@ +include ../../Makefile.Common diff --git a/pipeline/xpipeline/config.go b/pipeline/xpipeline/config.go new file mode 100644 index 00000000000..f37a7ea479d --- /dev/null +++ b/pipeline/xpipeline/config.go @@ -0,0 +1,8 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package xpipeline // import "go.opentelemetry.io/collector/pipeline/xpipeline" + +import "go.opentelemetry.io/collector/pipeline/internal/globalsignal" + +var SignalProfiles = globalsignal.MustNewSignal("profiles") diff --git a/pipeline/xpipeline/go.mod b/pipeline/xpipeline/go.mod new file mode 100644 index 00000000000..b4181ba8b68 --- /dev/null +++ b/pipeline/xpipeline/go.mod @@ -0,0 +1,7 @@ +module go.opentelemetry.io/collector/pipeline/xpipeline + +go 1.22.0 + +require go.opentelemetry.io/collector/pipeline v0.115.0 + +replace go.opentelemetry.io/collector/pipeline => ../ diff --git a/pipeline/xpipeline/go.sum b/pipeline/xpipeline/go.sum new file mode 100644 index 00000000000..67458fab477 --- /dev/null +++ b/pipeline/xpipeline/go.sum @@ -0,0 +1,8 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/service/go.mod b/service/go.mod index 264a8df6860..5f6dc414f5e 100644 --- a/service/go.mod +++ b/service/go.mod @@ -35,7 +35,7 @@ require ( go.opentelemetry.io/collector/pdata/pprofile v0.115.0 go.opentelemetry.io/collector/pdata/testdata v0.115.0 go.opentelemetry.io/collector/pipeline v0.115.0 - go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.115.0 + go.opentelemetry.io/collector/pipeline/xpipeline v0.115.0 go.opentelemetry.io/collector/processor v0.115.0 go.opentelemetry.io/collector/processor/processortest v0.115.0 go.opentelemetry.io/collector/processor/xprocessor v0.115.0 @@ -201,7 +201,7 @@ replace go.opentelemetry.io/collector/processor/xprocessor => ../processor/xproc replace go.opentelemetry.io/collector/exporter/xexporter => ../exporter/xexporter -replace go.opentelemetry.io/collector/pipeline/pipelineprofiles => ../pipeline/pipelineprofiles +replace go.opentelemetry.io/collector/pipeline/xpipeline => ../pipeline/xpipeline replace go.opentelemetry.io/collector/exporter/exportertest => ../exporter/exportertest diff --git a/service/internal/builders/connector.go b/service/internal/builders/connector.go index 9edf0172fd6..c813bf4a620 100644 --- a/service/internal/builders/connector.go +++ b/service/internal/builders/connector.go @@ -14,7 +14,7 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/pipeline" - "go.opentelemetry.io/collector/pipeline/pipelineprofiles" + "go.opentelemetry.io/collector/pipeline/xpipeline" ) func errDataTypes(id component.ID, from, to pipeline.Signal) error { @@ -106,7 +106,7 @@ func (b *ConnectorBuilder) CreateTracesToProfiles(ctx context.Context, set conne f, ok := connFact.(connectorprofiles.Factory) if !ok { - return nil, errDataTypes(set.ID, pipeline.SignalTraces, pipelineprofiles.SignalProfiles) + return nil, errDataTypes(set.ID, pipeline.SignalTraces, xpipeline.SignalProfiles) } logStabilityLevel(set.Logger, f.TracesToProfilesStability()) @@ -187,7 +187,7 @@ func (b *ConnectorBuilder) CreateMetricsToProfiles(ctx context.Context, set conn f, ok := connFact.(connectorprofiles.Factory) if !ok { - return nil, errDataTypes(set.ID, pipeline.SignalMetrics, pipelineprofiles.SignalProfiles) + return nil, errDataTypes(set.ID, pipeline.SignalMetrics, xpipeline.SignalProfiles) } logStabilityLevel(set.Logger, f.MetricsToProfilesStability()) @@ -268,7 +268,7 @@ func (b *ConnectorBuilder) CreateLogsToProfiles(ctx context.Context, set connect f, ok := connFact.(connectorprofiles.Factory) if !ok { - return nil, errDataTypes(set.ID, pipeline.SignalLogs, pipelineprofiles.SignalProfiles) + return nil, errDataTypes(set.ID, pipeline.SignalLogs, xpipeline.SignalProfiles) } logStabilityLevel(set.Logger, f.LogsToProfilesStability()) @@ -292,7 +292,7 @@ func (b *ConnectorBuilder) CreateProfilesToTraces(ctx context.Context, set conne f, ok := connFact.(connectorprofiles.Factory) if !ok { - return nil, errDataTypes(set.ID, pipelineprofiles.SignalProfiles, pipeline.SignalTraces) + return nil, errDataTypes(set.ID, xpipeline.SignalProfiles, pipeline.SignalTraces) } logStabilityLevel(set.Logger, f.ProfilesToTracesStability()) @@ -316,7 +316,7 @@ func (b *ConnectorBuilder) CreateProfilesToMetrics(ctx context.Context, set conn f, ok := connFact.(connectorprofiles.Factory) if !ok { - return nil, errDataTypes(set.ID, pipelineprofiles.SignalProfiles, pipeline.SignalMetrics) + return nil, errDataTypes(set.ID, xpipeline.SignalProfiles, pipeline.SignalMetrics) } logStabilityLevel(set.Logger, f.ProfilesToMetricsStability()) @@ -340,7 +340,7 @@ func (b *ConnectorBuilder) CreateProfilesToLogs(ctx context.Context, set connect f, ok := connFact.(connectorprofiles.Factory) if !ok { - return nil, errDataTypes(set.ID, pipelineprofiles.SignalProfiles, pipeline.SignalLogs) + return nil, errDataTypes(set.ID, xpipeline.SignalProfiles, pipeline.SignalLogs) } logStabilityLevel(set.Logger, f.ProfilesToLogsStability()) @@ -364,7 +364,7 @@ func (b *ConnectorBuilder) CreateProfilesToProfiles(ctx context.Context, set con f, ok := connFact.(connectorprofiles.Factory) if !ok { - return nil, errDataTypes(set.ID, pipelineprofiles.SignalProfiles, pipelineprofiles.SignalProfiles) + return nil, errDataTypes(set.ID, xpipeline.SignalProfiles, xpipeline.SignalProfiles) } logStabilityLevel(set.Logger, f.ProfilesToProfilesStability()) diff --git a/service/internal/builders/connector_test.go b/service/internal/builders/connector_test.go index 7de975e0ca0..cd7b1fad893 100644 --- a/service/internal/builders/connector_test.go +++ b/service/internal/builders/connector_test.go @@ -20,7 +20,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/pipeline" - "go.opentelemetry.io/collector/pipeline/pipelineprofiles" + "go.opentelemetry.io/collector/pipeline/xpipeline" ) func TestConnectorBuilder(t *testing.T) { @@ -146,7 +146,7 @@ func TestConnectorBuilder(t *testing.T) { assert.Equal(t, nopConnectorInstance, t2l) } t2p, err := b.CreateTracesToProfiles(context.Background(), createConnectorSettings(tt.id), tt.nextProfiles) - if expectedErr := tt.err(pipeline.SignalTraces, pipelineprofiles.SignalProfiles); expectedErr != "" { + if expectedErr := tt.err(pipeline.SignalTraces, xpipeline.SignalProfiles); expectedErr != "" { assert.EqualError(t, err, expectedErr) assert.Nil(t, t2p) } else { @@ -181,7 +181,7 @@ func TestConnectorBuilder(t *testing.T) { assert.Equal(t, nopConnectorInstance, m2l) } m2p, err := b.CreateMetricsToProfiles(context.Background(), createConnectorSettings(tt.id), tt.nextProfiles) - if expectedErr := tt.err(pipeline.SignalMetrics, pipelineprofiles.SignalProfiles); expectedErr != "" { + if expectedErr := tt.err(pipeline.SignalMetrics, xpipeline.SignalProfiles); expectedErr != "" { assert.EqualError(t, err, expectedErr) assert.Nil(t, m2p) } else { @@ -216,7 +216,7 @@ func TestConnectorBuilder(t *testing.T) { assert.Equal(t, nopConnectorInstance, l2l) } l2p, err := b.CreateLogsToProfiles(context.Background(), createConnectorSettings(tt.id), tt.nextProfiles) - if expectedErr := tt.err(pipeline.SignalLogs, pipelineprofiles.SignalProfiles); expectedErr != "" { + if expectedErr := tt.err(pipeline.SignalLogs, xpipeline.SignalProfiles); expectedErr != "" { assert.EqualError(t, err, expectedErr) assert.Nil(t, l2p) } else { @@ -225,7 +225,7 @@ func TestConnectorBuilder(t *testing.T) { } p2t, err := b.CreateProfilesToTraces(context.Background(), createConnectorSettings(tt.id), tt.nextTraces) - if expectedErr := tt.err(pipelineprofiles.SignalProfiles, pipeline.SignalTraces); expectedErr != "" { + if expectedErr := tt.err(xpipeline.SignalProfiles, pipeline.SignalTraces); expectedErr != "" { assert.EqualError(t, err, expectedErr) assert.Nil(t, p2t) } else { @@ -233,7 +233,7 @@ func TestConnectorBuilder(t *testing.T) { assert.Equal(t, nopConnectorInstance, p2t) } p2m, err := b.CreateProfilesToMetrics(context.Background(), createConnectorSettings(tt.id), tt.nextMetrics) - if expectedErr := tt.err(pipelineprofiles.SignalProfiles, pipeline.SignalMetrics); expectedErr != "" { + if expectedErr := tt.err(xpipeline.SignalProfiles, pipeline.SignalMetrics); expectedErr != "" { assert.EqualError(t, err, expectedErr) assert.Nil(t, p2m) } else { @@ -241,7 +241,7 @@ func TestConnectorBuilder(t *testing.T) { assert.Equal(t, nopConnectorInstance, p2m) } p2l, err := b.CreateProfilesToLogs(context.Background(), createConnectorSettings(tt.id), tt.nextLogs) - if expectedErr := tt.err(pipelineprofiles.SignalProfiles, pipeline.SignalLogs); expectedErr != "" { + if expectedErr := tt.err(xpipeline.SignalProfiles, pipeline.SignalLogs); expectedErr != "" { assert.EqualError(t, err, expectedErr) assert.Nil(t, p2l) } else { @@ -249,7 +249,7 @@ func TestConnectorBuilder(t *testing.T) { assert.Equal(t, nopConnectorInstance, p2l) } p2p, err := b.CreateProfilesToProfiles(context.Background(), createConnectorSettings(tt.id), tt.nextProfiles) - if expectedErr := tt.err(pipelineprofiles.SignalProfiles, pipelineprofiles.SignalProfiles); expectedErr != "" { + if expectedErr := tt.err(xpipeline.SignalProfiles, xpipeline.SignalProfiles); expectedErr != "" { assert.EqualError(t, err, expectedErr) assert.Nil(t, p2p) } else { diff --git a/service/internal/graph/connector.go b/service/internal/graph/connector.go index ce965d7417d..b2313860d87 100644 --- a/service/internal/graph/connector.go +++ b/service/internal/graph/connector.go @@ -12,7 +12,7 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/pipeline" - "go.opentelemetry.io/collector/pipeline/pipelineprofiles" + "go.opentelemetry.io/collector/pipeline/xpipeline" "go.opentelemetry.io/collector/service/internal/builders" "go.opentelemetry.io/collector/service/internal/capabilityconsumer" "go.opentelemetry.io/collector/service/internal/components" @@ -61,7 +61,7 @@ func (n *connectorNode) buildComponent( return n.buildMetrics(ctx, set, builder, nexts) case pipeline.SignalLogs: return n.buildLogs(ctx, set, builder, nexts) - case pipelineprofiles.SignalProfiles: + case xpipeline.SignalProfiles: return n.buildProfiles(ctx, set, builder, nexts) } return nil @@ -96,7 +96,7 @@ func (n *connectorNode) buildTraces( n.Component, err = builder.CreateMetricsToTraces(ctx, set, next) case pipeline.SignalLogs: n.Component, err = builder.CreateLogsToTraces(ctx, set, next) - case pipelineprofiles.SignalProfiles: + case xpipeline.SignalProfiles: n.Component, err = builder.CreateProfilesToTraces(ctx, set, next) } return err @@ -131,7 +131,7 @@ func (n *connectorNode) buildMetrics( n.Component, err = builder.CreateTracesToMetrics(ctx, set, next) case pipeline.SignalLogs: n.Component, err = builder.CreateLogsToMetrics(ctx, set, next) - case pipelineprofiles.SignalProfiles: + case xpipeline.SignalProfiles: n.Component, err = builder.CreateProfilesToMetrics(ctx, set, next) } return err @@ -166,7 +166,7 @@ func (n *connectorNode) buildLogs( n.Component, err = builder.CreateTracesToLogs(ctx, set, next) case pipeline.SignalMetrics: n.Component, err = builder.CreateMetricsToLogs(ctx, set, next) - case pipelineprofiles.SignalProfiles: + case xpipeline.SignalProfiles: n.Component, err = builder.CreateProfilesToLogs(ctx, set, next) } return err @@ -186,7 +186,7 @@ func (n *connectorNode) buildProfiles( var err error switch n.exprPipelineType { - case pipelineprofiles.SignalProfiles: + case xpipeline.SignalProfiles: var conn connectorprofiles.Profiles conn, err = builder.CreateProfilesToProfiles(ctx, set, next) if err != nil { diff --git a/service/internal/graph/exporter.go b/service/internal/graph/exporter.go index 948da2f759a..ab7d0f6392b 100644 --- a/service/internal/graph/exporter.go +++ b/service/internal/graph/exporter.go @@ -10,7 +10,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/pipeline" - "go.opentelemetry.io/collector/pipeline/pipelineprofiles" + "go.opentelemetry.io/collector/pipeline/xpipeline" "go.opentelemetry.io/collector/service/internal/builders" "go.opentelemetry.io/collector/service/internal/components" ) @@ -56,7 +56,7 @@ func (n *exporterNode) buildComponent( n.Component, err = builder.CreateMetrics(ctx, set) case pipeline.SignalLogs: n.Component, err = builder.CreateLogs(ctx, set) - case pipelineprofiles.SignalProfiles: + case xpipeline.SignalProfiles: n.Component, err = builder.CreateProfiles(ctx, set) default: return fmt.Errorf("error creating exporter %q for data type %q is not supported", set.ID, n.pipelineType) diff --git a/service/internal/graph/graph.go b/service/internal/graph/graph.go index 284e7800915..33fccd30b42 100644 --- a/service/internal/graph/graph.go +++ b/service/internal/graph/graph.go @@ -32,7 +32,7 @@ import ( "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/internal/fanoutconsumer" "go.opentelemetry.io/collector/pipeline" - "go.opentelemetry.io/collector/pipeline/pipelineprofiles" + "go.opentelemetry.io/collector/pipeline/xpipeline" "go.opentelemetry.io/collector/service/internal/builders" "go.opentelemetry.io/collector/service/internal/capabilityconsumer" "go.opentelemetry.io/collector/service/internal/status" @@ -318,7 +318,7 @@ func (g *Graph) buildComponents(ctx context.Context, set Settings) error { cc := capabilityconsumer.NewLogs(next.(consumer.Logs), capability) n.baseConsumer = cc n.ConsumeLogsFunc = cc.ConsumeLogs - case pipelineprofiles.SignalProfiles: + case xpipeline.SignalProfiles: cc := capabilityconsumer.NewProfiles(next.(xconsumer.Profiles), capability) n.baseConsumer = cc n.ConsumeProfilesFunc = cc.ConsumeProfiles @@ -344,7 +344,7 @@ func (g *Graph) buildComponents(ctx context.Context, set Settings) error { consumers = append(consumers, next.(consumer.Logs)) } n.baseConsumer = fanoutconsumer.NewLogs(consumers) - case pipelineprofiles.SignalProfiles: + case xpipeline.SignalProfiles: consumers := make([]xconsumer.Profiles, 0, len(nexts)) for _, next := range nexts { consumers = append(consumers, next.(xconsumer.Profiles)) @@ -484,7 +484,7 @@ func (g *Graph) GetExporters() map[pipeline.Signal]map[component.ID]component.Co exportersMap[pipeline.SignalTraces] = make(map[component.ID]component.Component) exportersMap[pipeline.SignalMetrics] = make(map[component.ID]component.Component) exportersMap[pipeline.SignalLogs] = make(map[component.ID]component.Component) - exportersMap[pipelineprofiles.SignalProfiles] = make(map[component.ID]component.Component) + exportersMap[xpipeline.SignalProfiles] = make(map[component.ID]component.Component) for _, pg := range g.pipelines { for _, expNode := range pg.exporters { @@ -547,7 +547,7 @@ func connectorStability(f connector.Factory, expType, recType pipeline.Signal) c return f.TracesToMetricsStability() case pipeline.SignalLogs: return f.TracesToLogsStability() - case pipelineprofiles.SignalProfiles: + case xpipeline.SignalProfiles: fprof, ok := f.(connectorprofiles.Factory) if !ok { return component.StabilityLevelUndefined @@ -562,7 +562,7 @@ func connectorStability(f connector.Factory, expType, recType pipeline.Signal) c return f.MetricsToMetricsStability() case pipeline.SignalLogs: return f.MetricsToLogsStability() - case pipelineprofiles.SignalProfiles: + case xpipeline.SignalProfiles: fprof, ok := f.(connectorprofiles.Factory) if !ok { return component.StabilityLevelUndefined @@ -577,14 +577,14 @@ func connectorStability(f connector.Factory, expType, recType pipeline.Signal) c return f.LogsToMetricsStability() case pipeline.SignalLogs: return f.LogsToLogsStability() - case pipelineprofiles.SignalProfiles: + case xpipeline.SignalProfiles: fprof, ok := f.(connectorprofiles.Factory) if !ok { return component.StabilityLevelUndefined } return fprof.LogsToProfilesStability() } - case pipelineprofiles.SignalProfiles: + case xpipeline.SignalProfiles: fprof, ok := f.(connectorprofiles.Factory) if !ok { return component.StabilityLevelUndefined @@ -596,7 +596,7 @@ func connectorStability(f connector.Factory, expType, recType pipeline.Signal) c return fprof.ProfilesToMetricsStability() case pipeline.SignalLogs: return fprof.ProfilesToLogsStability() - case pipelineprofiles.SignalProfiles: + case xpipeline.SignalProfiles: return fprof.ProfilesToProfilesStability() } } diff --git a/service/internal/graph/graph_test.go b/service/internal/graph/graph_test.go index d5f4216f561..4c4cbedbaa3 100644 --- a/service/internal/graph/graph_test.go +++ b/service/internal/graph/graph_test.go @@ -21,7 +21,7 @@ import ( "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/pdata/testdata" "go.opentelemetry.io/collector/pipeline" - "go.opentelemetry.io/collector/pipeline/pipelineprofiles" + "go.opentelemetry.io/collector/pipeline/xpipeline" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/processortest" "go.opentelemetry.io/collector/receiver" @@ -57,7 +57,7 @@ func TestConnectorPipelinesGraph(t *testing.T) { Processors: []component.ID{component.MustNewID("exampleprocessor")}, Exporters: []component.ID{component.MustNewID("exampleexporter")}, }, - pipeline.NewID(pipelineprofiles.SignalProfiles): { + pipeline.NewID(xpipeline.SignalProfiles): { Receivers: []component.ID{component.MustNewID("examplereceiver")}, Processors: []component.ID{component.MustNewID("exampleprocessor")}, Exporters: []component.ID{component.MustNewID("exampleexporter")}, @@ -83,7 +83,7 @@ func TestConnectorPipelinesGraph(t *testing.T) { Processors: []component.ID{component.MustNewIDWithName("exampleprocessor", "mutate")}, Exporters: []component.ID{component.MustNewID("exampleexporter")}, }, - pipeline.NewID(pipelineprofiles.SignalProfiles): { + pipeline.NewID(xpipeline.SignalProfiles): { Receivers: []component.ID{component.MustNewID("examplereceiver")}, Processors: []component.ID{component.MustNewIDWithName("exampleprocessor", "mutate")}, Exporters: []component.ID{component.MustNewID("exampleexporter")}, @@ -109,7 +109,7 @@ func TestConnectorPipelinesGraph(t *testing.T) { Processors: []component.ID{component.MustNewID("exampleprocessor"), component.MustNewIDWithName("exampleprocessor", "mutate")}, Exporters: []component.ID{component.MustNewID("exampleexporter")}, }, - pipeline.NewID(pipelineprofiles.SignalProfiles): { + pipeline.NewID(xpipeline.SignalProfiles): { Receivers: []component.ID{component.MustNewID("examplereceiver")}, Processors: []component.ID{component.MustNewID("exampleprocessor"), component.MustNewIDWithName("exampleprocessor", "mutate")}, Exporters: []component.ID{component.MustNewID("exampleexporter")}, @@ -132,7 +132,7 @@ func TestConnectorPipelinesGraph(t *testing.T) { Receivers: []component.ID{component.MustNewID("examplereceiver")}, Exporters: []component.ID{component.MustNewID("exampleexporter")}, }, - pipeline.NewID(pipelineprofiles.SignalProfiles): { + pipeline.NewID(xpipeline.SignalProfiles): { Receivers: []component.ID{component.MustNewID("examplereceiver")}, Exporters: []component.ID{component.MustNewID("exampleexporter")}, }, @@ -157,7 +157,7 @@ func TestConnectorPipelinesGraph(t *testing.T) { Processors: []component.ID{component.MustNewIDWithName("exampleprocessor", "mutate"), component.MustNewID("exampleprocessor")}, Exporters: []component.ID{component.MustNewID("exampleexporter"), component.MustNewIDWithName("exampleexporter", "1")}, }, - pipeline.NewID(pipelineprofiles.SignalProfiles): { + pipeline.NewID(xpipeline.SignalProfiles): { Receivers: []component.ID{component.MustNewID("examplereceiver"), component.MustNewIDWithName("examplereceiver", "1")}, Processors: []component.ID{component.MustNewIDWithName("exampleprocessor", "mutate"), component.MustNewID("exampleprocessor")}, Exporters: []component.ID{component.MustNewID("exampleexporter"), component.MustNewIDWithName("exampleexporter", "1")}, @@ -180,7 +180,7 @@ func TestConnectorPipelinesGraph(t *testing.T) { Receivers: []component.ID{component.MustNewID("examplereceiver"), component.MustNewIDWithName("examplereceiver", "1")}, Exporters: []component.ID{component.MustNewID("exampleexporter"), component.MustNewIDWithName("exampleexporter", "1")}, }, - pipeline.NewID(pipelineprofiles.SignalProfiles): { + pipeline.NewID(xpipeline.SignalProfiles): { Receivers: []component.ID{component.MustNewID("examplereceiver"), component.MustNewIDWithName("examplereceiver", "1")}, Exporters: []component.ID{component.MustNewID("exampleexporter"), component.MustNewIDWithName("exampleexporter", "1")}, }, @@ -217,12 +217,12 @@ func TestConnectorPipelinesGraph(t *testing.T) { Receivers: []component.ID{component.MustNewID("examplereceiver")}, Exporters: []component.ID{component.MustNewID("exampleexporter")}, }, - pipeline.NewID(pipelineprofiles.SignalProfiles): { + pipeline.NewID(xpipeline.SignalProfiles): { Receivers: []component.ID{component.MustNewID("examplereceiver")}, Processors: []component.ID{component.MustNewIDWithName("exampleprocessor", "mutate")}, Exporters: []component.ID{component.MustNewID("exampleexporter")}, }, - pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "1"): { + pipeline.NewIDWithName(xpipeline.SignalProfiles, "1"): { Receivers: []component.ID{component.MustNewID("examplereceiver")}, Exporters: []component.ID{component.MustNewID("exampleexporter")}, }, @@ -280,12 +280,12 @@ func TestConnectorPipelinesGraph(t *testing.T) { { name: "pipelines_conn_simple_profiles.yaml", pipelineConfigs: pipelines.Config{ - pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "in"): { + pipeline.NewIDWithName(xpipeline.SignalProfiles, "in"): { Receivers: []component.ID{component.MustNewID("examplereceiver")}, Processors: []component.ID{component.MustNewID("exampleprocessor")}, Exporters: []component.ID{component.MustNewIDWithName("exampleconnector", "inherit_mutate")}, }, - pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "out"): { + pipeline.NewIDWithName(xpipeline.SignalProfiles, "out"): { Receivers: []component.ID{component.MustNewIDWithName("exampleconnector", "inherit_mutate")}, Processors: []component.ID{component.MustNewIDWithName("exampleprocessor", "mutate")}, // mutate propagates upstream to connector Exporters: []component.ID{component.MustNewID("exampleexporter")}, @@ -374,22 +374,22 @@ func TestConnectorPipelinesGraph(t *testing.T) { { name: "pipelines_conn_fork_merge_profiles.yaml", pipelineConfigs: pipelines.Config{ - pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "in"): { + pipeline.NewIDWithName(xpipeline.SignalProfiles, "in"): { Receivers: []component.ID{component.MustNewID("examplereceiver")}, Processors: []component.ID{component.MustNewID("exampleprocessor")}, Exporters: []component.ID{component.MustNewIDWithName("exampleconnector", "inherit_mutate")}, }, - pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "type0"): { + pipeline.NewIDWithName(xpipeline.SignalProfiles, "type0"): { Receivers: []component.ID{component.MustNewIDWithName("exampleconnector", "inherit_mutate")}, Processors: []component.ID{component.MustNewID("exampleprocessor")}, Exporters: []component.ID{component.MustNewIDWithName("exampleconnector", "merge")}, }, - pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "type1"): { + pipeline.NewIDWithName(xpipeline.SignalProfiles, "type1"): { Receivers: []component.ID{component.MustNewIDWithName("exampleconnector", "inherit_mutate")}, Processors: []component.ID{component.MustNewIDWithName("exampleprocessor", "mutate")}, // mutate propagates upstream to connector Exporters: []component.ID{component.MustNewIDWithName("exampleconnector", "merge")}, }, - pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "out"): { + pipeline.NewIDWithName(xpipeline.SignalProfiles, "out"): { Receivers: []component.ID{component.MustNewIDWithName("exampleconnector", "merge")}, Processors: []component.ID{component.MustNewID("exampleprocessor")}, Exporters: []component.ID{component.MustNewID("exampleexporter")}, @@ -415,7 +415,7 @@ func TestConnectorPipelinesGraph(t *testing.T) { Processors: []component.ID{component.MustNewID("exampleprocessor")}, Exporters: []component.ID{component.MustNewID("exampleexporter")}, }, - pipeline.NewID(pipelineprofiles.SignalProfiles): { + pipeline.NewID(xpipeline.SignalProfiles): { Receivers: []component.ID{component.MustNewID("exampleconnector")}, Processors: []component.ID{component.MustNewID("exampleprocessor")}, Exporters: []component.ID{component.MustNewID("exampleexporter")}, @@ -441,7 +441,7 @@ func TestConnectorPipelinesGraph(t *testing.T) { Processors: []component.ID{component.MustNewID("exampleprocessor")}, Exporters: []component.ID{component.MustNewID("exampleexporter")}, }, - pipeline.NewID(pipelineprofiles.SignalProfiles): { + pipeline.NewID(xpipeline.SignalProfiles): { Receivers: []component.ID{component.MustNewID("exampleconnector")}, Processors: []component.ID{component.MustNewID("exampleprocessor")}, Exporters: []component.ID{component.MustNewID("exampleexporter")}, @@ -467,7 +467,7 @@ func TestConnectorPipelinesGraph(t *testing.T) { Processors: []component.ID{component.MustNewID("exampleprocessor")}, Exporters: []component.ID{component.MustNewID("exampleexporter")}, }, - pipeline.NewID(pipelineprofiles.SignalProfiles): { + pipeline.NewID(xpipeline.SignalProfiles): { Receivers: []component.ID{component.MustNewID("exampleconnector")}, Processors: []component.ID{component.MustNewID("exampleprocessor")}, Exporters: []component.ID{component.MustNewID("exampleexporter")}, @@ -493,7 +493,7 @@ func TestConnectorPipelinesGraph(t *testing.T) { Processors: []component.ID{component.MustNewID("exampleprocessor")}, Exporters: []component.ID{component.MustNewID("exampleexporter")}, }, - pipeline.NewID(pipelineprofiles.SignalProfiles): { + pipeline.NewID(xpipeline.SignalProfiles): { Receivers: []component.ID{component.MustNewID("exampleconnector")}, Processors: []component.ID{component.MustNewID("exampleprocessor")}, Exporters: []component.ID{component.MustNewID("exampleexporter")}, @@ -519,7 +519,7 @@ func TestConnectorPipelinesGraph(t *testing.T) { Processors: []component.ID{component.MustNewID("exampleprocessor")}, Exporters: []component.ID{component.MustNewID("exampleconnector")}, }, - pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "in"): { + pipeline.NewIDWithName(xpipeline.SignalProfiles, "in"): { Receivers: []component.ID{component.MustNewID("examplereceiver")}, Processors: []component.ID{component.MustNewID("exampleprocessor")}, Exporters: []component.ID{component.MustNewID("exampleconnector")}, @@ -539,7 +539,7 @@ func TestConnectorPipelinesGraph(t *testing.T) { Processors: []component.ID{component.MustNewID("exampleprocessor")}, Exporters: []component.ID{component.MustNewID("exampleexporter")}, }, - pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "out"): { + pipeline.NewIDWithName(xpipeline.SignalProfiles, "out"): { Receivers: []component.ID{component.MustNewID("exampleconnector")}, Processors: []component.ID{component.MustNewID("exampleprocessor")}, Exporters: []component.ID{component.MustNewID("exampleexporter")}, @@ -565,7 +565,7 @@ func TestConnectorPipelinesGraph(t *testing.T) { Processors: []component.ID{component.MustNewIDWithName("exampleprocessor", "mutate")}, Exporters: []component.ID{component.MustNewIDWithName("exampleconnector", "inherit_mutate")}, }, - pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "in"): { + pipeline.NewIDWithName(xpipeline.SignalProfiles, "in"): { Receivers: []component.ID{component.MustNewID("examplereceiver")}, Processors: []component.ID{component.MustNewIDWithName("exampleprocessor", "mutate")}, Exporters: []component.ID{component.MustNewIDWithName("exampleconnector", "inherit_mutate")}, @@ -585,7 +585,7 @@ func TestConnectorPipelinesGraph(t *testing.T) { Processors: []component.ID{component.MustNewIDWithName("exampleprocessor", "mutate")}, // mutate propagates upstream to connector Exporters: []component.ID{component.MustNewID("exampleexporter")}, }, - pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "out"): { + pipeline.NewIDWithName(xpipeline.SignalProfiles, "out"): { Receivers: []component.ID{component.MustNewIDWithName("exampleconnector", "inherit_mutate")}, Processors: []component.ID{component.MustNewIDWithName("exampleprocessor", "mutate")}, // mutate propagates upstream to connector Exporters: []component.ID{component.MustNewID("exampleexporter")}, @@ -620,11 +620,11 @@ func TestConnectorPipelinesGraph(t *testing.T) { Receivers: []component.ID{component.MustNewID("mockforward")}, Exporters: []component.ID{component.MustNewID("exampleexporter")}, }, - pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "in"): { + pipeline.NewIDWithName(xpipeline.SignalProfiles, "in"): { Receivers: []component.ID{component.MustNewID("examplereceiver")}, Exporters: []component.ID{component.MustNewID("mockforward")}, }, - pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "out"): { + pipeline.NewIDWithName(xpipeline.SignalProfiles, "out"): { Receivers: []component.ID{component.MustNewID("mockforward")}, Exporters: []component.ID{component.MustNewID("exampleexporter")}, }, @@ -712,22 +712,22 @@ func TestConnectorPipelinesGraph(t *testing.T) { { name: "pipelines_conn_mutate_profiles.yaml", pipelineConfigs: pipelines.Config{ - pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "in"): { + pipeline.NewIDWithName(xpipeline.SignalProfiles, "in"): { Receivers: []component.ID{component.MustNewID("examplereceiver")}, Processors: []component.ID{component.MustNewID("exampleprocessor")}, Exporters: []component.ID{component.MustNewIDWithName("exampleconnector", "inherit_mutate")}, }, - pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "out0"): { + pipeline.NewIDWithName(xpipeline.SignalProfiles, "out0"): { Receivers: []component.ID{component.MustNewIDWithName("exampleconnector", "inherit_mutate")}, Processors: []component.ID{component.MustNewID("exampleprocessor")}, Exporters: []component.ID{component.MustNewID("exampleexporter")}, }, - pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "middle"): { + pipeline.NewIDWithName(xpipeline.SignalProfiles, "middle"): { Receivers: []component.ID{component.MustNewIDWithName("exampleconnector", "inherit_mutate")}, Processors: []component.ID{component.MustNewID("exampleprocessor")}, Exporters: []component.ID{component.MustNewIDWithName("exampleconnector", "mutate")}, }, - pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "out1"): { + pipeline.NewIDWithName(xpipeline.SignalProfiles, "out1"): { Receivers: []component.ID{component.MustNewIDWithName("exampleconnector", "mutate")}, Processors: []component.ID{component.MustNewID("exampleprocessor")}, Exporters: []component.ID{component.MustNewID("exampleexporter")}, @@ -902,7 +902,7 @@ func TestConnectorPipelinesGraph(t *testing.T) { logsReceiver := c.(*testcomponents.ExampleReceiver) require.NoError(t, logsReceiver.ConsumeLogs(context.Background(), testdata.GenerateLogs(1))) } - for _, c := range allReceivers[pipelineprofiles.SignalProfiles] { + for _, c := range allReceivers[xpipeline.SignalProfiles] { profilesReceiver := c.(*testcomponents.ExampleReceiver) require.NoError(t, profilesReceiver.ConsumeProfiles(context.Background(), testdata.GenerateProfiles(1))) } @@ -992,7 +992,7 @@ func TestConnectorPipelinesGraph(t *testing.T) { } } } - for _, e := range allExporters[pipelineprofiles.SignalProfiles] { + for _, e := range allExporters[xpipeline.SignalProfiles] { profilesExporter := e.(*testcomponents.ExampleExporter) assert.Len(t, profilesExporter.Profiles, tt.expectedPerExporter) expectedMutable := testdata.GenerateProfiles(1) @@ -1031,9 +1031,9 @@ func TestConnectorRouter(t *testing.T) { logsRightID := pipeline.NewIDWithName(pipeline.SignalLogs, "right") logsLeftID := pipeline.NewIDWithName(pipeline.SignalLogs, "left") - profilesInID := pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "in") - profilesRightID := pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "right") - profilesLeftID := pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "left") + profilesInID := pipeline.NewIDWithName(xpipeline.SignalProfiles, "in") + profilesRightID := pipeline.NewIDWithName(xpipeline.SignalProfiles, "right") + profilesLeftID := pipeline.NewIDWithName(xpipeline.SignalProfiles, "left") ctx := context.Background() set := Settings{ @@ -1214,9 +1214,9 @@ func TestConnectorRouter(t *testing.T) { assert.Len(t, logsLeft.Logs, 2) // Get a handle for the profiles receiver and both Exporters - profilesReceiver := allReceivers[pipelineprofiles.SignalProfiles][rcvrID].(*testcomponents.ExampleReceiver) - profilesRight := allExporters[pipelineprofiles.SignalProfiles][expRightID].(*testcomponents.ExampleExporter) - profilesLeft := allExporters[pipelineprofiles.SignalProfiles][expLeftID].(*testcomponents.ExampleExporter) + profilesReceiver := allReceivers[xpipeline.SignalProfiles][rcvrID].(*testcomponents.ExampleReceiver) + profilesRight := allExporters[xpipeline.SignalProfiles][expRightID].(*testcomponents.ExampleExporter) + profilesLeft := allExporters[xpipeline.SignalProfiles][expLeftID].(*testcomponents.ExampleExporter) // Consume 1, validate it went right require.NoError(t, profilesReceiver.ConsumeProfiles(ctx, testdata.GenerateProfiles(1))) @@ -1313,7 +1313,7 @@ func TestGraphBuildErrors(t *testing.T) { component.MustNewID("bf"): badExporterFactory.CreateDefaultConfig(), }, pipelineCfgs: pipelines.Config{ - pipeline.NewID(pipelineprofiles.SignalProfiles): { + pipeline.NewID(xpipeline.SignalProfiles): { Receivers: []component.ID{component.MustNewID("nop")}, Exporters: []component.ID{component.MustNewID("bf")}, }, @@ -1392,7 +1392,7 @@ func TestGraphBuildErrors(t *testing.T) { component.MustNewID("nop"): nopReceiverFactory.CreateDefaultConfig(), }, pipelineCfgs: pipelines.Config{ - pipeline.NewID(pipelineprofiles.SignalProfiles): { + pipeline.NewID(xpipeline.SignalProfiles): { Receivers: []component.ID{component.MustNewID("nop")}, Processors: []component.ID{component.MustNewID("bf")}, Exporters: []component.ID{component.MustNewID("nop")}, @@ -1457,7 +1457,7 @@ func TestGraphBuildErrors(t *testing.T) { component.MustNewID("nop"): nopReceiverFactory.CreateDefaultConfig(), }, pipelineCfgs: pipelines.Config{ - pipeline.NewID(pipelineprofiles.SignalProfiles): { + pipeline.NewID(xpipeline.SignalProfiles): { Receivers: []component.ID{component.MustNewID("bf")}, Exporters: []component.ID{component.MustNewID("nop")}, }, @@ -1549,7 +1549,7 @@ func TestGraphBuildErrors(t *testing.T) { Receivers: []component.ID{component.MustNewID("nop")}, Exporters: []component.ID{component.MustNewID("bf")}, }, - pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "out"): { + pipeline.NewIDWithName(xpipeline.SignalProfiles, "out"): { Receivers: []component.ID{component.MustNewID("bf")}, Exporters: []component.ID{component.MustNewID("nop")}, }, @@ -1641,7 +1641,7 @@ func TestGraphBuildErrors(t *testing.T) { Receivers: []component.ID{component.MustNewID("nop")}, Exporters: []component.ID{component.MustNewID("bf")}, }, - pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "out"): { + pipeline.NewIDWithName(xpipeline.SignalProfiles, "out"): { Receivers: []component.ID{component.MustNewID("bf")}, Exporters: []component.ID{component.MustNewID("nop")}, }, @@ -1733,7 +1733,7 @@ func TestGraphBuildErrors(t *testing.T) { Receivers: []component.ID{component.MustNewID("nop")}, Exporters: []component.ID{component.MustNewID("bf")}, }, - pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "out"): { + pipeline.NewIDWithName(xpipeline.SignalProfiles, "out"): { Receivers: []component.ID{component.MustNewID("bf")}, Exporters: []component.ID{component.MustNewID("nop")}, }, @@ -1752,7 +1752,7 @@ func TestGraphBuildErrors(t *testing.T) { component.MustNewID("bf"): nopConnectorFactory.CreateDefaultConfig(), }, pipelineCfgs: pipelines.Config{ - pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "in"): { + pipeline.NewIDWithName(xpipeline.SignalProfiles, "in"): { Receivers: []component.ID{component.MustNewID("nop")}, Exporters: []component.ID{component.MustNewID("bf")}, }, @@ -1775,7 +1775,7 @@ func TestGraphBuildErrors(t *testing.T) { component.MustNewID("bf"): nopConnectorFactory.CreateDefaultConfig(), }, pipelineCfgs: pipelines.Config{ - pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "in"): { + pipeline.NewIDWithName(xpipeline.SignalProfiles, "in"): { Receivers: []component.ID{component.MustNewID("nop")}, Exporters: []component.ID{component.MustNewID("bf")}, }, @@ -1798,7 +1798,7 @@ func TestGraphBuildErrors(t *testing.T) { component.MustNewID("bf"): nopConnectorFactory.CreateDefaultConfig(), }, pipelineCfgs: pipelines.Config{ - pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "in"): { + pipeline.NewIDWithName(xpipeline.SignalProfiles, "in"): { Receivers: []component.ID{component.MustNewID("nop")}, Exporters: []component.ID{component.MustNewID("bf")}, }, @@ -1821,11 +1821,11 @@ func TestGraphBuildErrors(t *testing.T) { component.MustNewID("bf"): nopConnectorFactory.CreateDefaultConfig(), }, pipelineCfgs: pipelines.Config{ - pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "in"): { + pipeline.NewIDWithName(xpipeline.SignalProfiles, "in"): { Receivers: []component.ID{component.MustNewID("nop")}, Exporters: []component.ID{component.MustNewID("bf")}, }, - pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "out"): { + pipeline.NewIDWithName(xpipeline.SignalProfiles, "out"): { Receivers: []component.ID{component.MustNewID("bf")}, Exporters: []component.ID{component.MustNewID("nop")}, }, @@ -2017,7 +2017,7 @@ func TestGraphBuildErrors(t *testing.T) { component.MustNewIDWithName("nop", "conn"): nopConnectorFactory.CreateDefaultConfig(), }, pipelineCfgs: pipelines.Config{ - pipeline.NewID(pipelineprofiles.SignalProfiles): { + pipeline.NewID(xpipeline.SignalProfiles): { Receivers: []component.ID{component.MustNewIDWithName("nop", "conn")}, Processors: []component.ID{component.MustNewID("nop")}, Exporters: []component.ID{component.MustNewIDWithName("nop", "conn")}, @@ -2180,22 +2180,22 @@ func TestGraphBuildErrors(t *testing.T) { component.MustNewIDWithName("nop", "conn2"): nopConnectorFactory.CreateDefaultConfig(), }, pipelineCfgs: pipelines.Config{ - pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "in"): { + pipeline.NewIDWithName(xpipeline.SignalProfiles, "in"): { Receivers: []component.ID{component.MustNewID("nop")}, Processors: []component.ID{component.MustNewID("nop")}, Exporters: []component.ID{component.MustNewIDWithName("nop", "conn")}, }, - pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "1"): { + pipeline.NewIDWithName(xpipeline.SignalProfiles, "1"): { Receivers: []component.ID{component.MustNewIDWithName("nop", "conn")}, Processors: []component.ID{component.MustNewID("nop")}, Exporters: []component.ID{component.MustNewIDWithName("nop", "conn1")}, }, - pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "2"): { + pipeline.NewIDWithName(xpipeline.SignalProfiles, "2"): { Receivers: []component.ID{component.MustNewIDWithName("nop", "conn1")}, Processors: []component.ID{component.MustNewID("nop")}, Exporters: []component.ID{component.MustNewIDWithName("nop", "conn2"), component.MustNewIDWithName("nop", "conn")}, }, - pipeline.NewIDWithName(pipelineprofiles.SignalProfiles, "out"): { + pipeline.NewIDWithName(xpipeline.SignalProfiles, "out"): { Receivers: []component.ID{component.MustNewIDWithName("nop", "conn2")}, Processors: []component.ID{component.MustNewID("nop")}, Exporters: []component.ID{component.MustNewID("nop")}, diff --git a/service/internal/graph/processor.go b/service/internal/graph/processor.go index f52316cf958..3288a505d80 100644 --- a/service/internal/graph/processor.go +++ b/service/internal/graph/processor.go @@ -11,7 +11,7 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/pipeline" - "go.opentelemetry.io/collector/pipeline/pipelineprofiles" + "go.opentelemetry.io/collector/pipeline/xpipeline" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/service/internal/builders" "go.opentelemetry.io/collector/service/internal/components" @@ -58,7 +58,7 @@ func (n *processorNode) buildComponent(ctx context.Context, n.Component, err = builder.CreateMetrics(ctx, set, next.(consumer.Metrics)) case pipeline.SignalLogs: n.Component, err = builder.CreateLogs(ctx, set, next.(consumer.Logs)) - case pipelineprofiles.SignalProfiles: + case xpipeline.SignalProfiles: n.Component, err = builder.CreateProfiles(ctx, set, next.(xconsumer.Profiles)) default: return fmt.Errorf("error creating processor %q in pipeline %q, data type %q is not supported", set.ID, n.pipelineID.String(), n.pipelineID.Signal()) diff --git a/service/internal/graph/receiver.go b/service/internal/graph/receiver.go index 3d88d4bfe56..48f7a36d148 100644 --- a/service/internal/graph/receiver.go +++ b/service/internal/graph/receiver.go @@ -12,7 +12,7 @@ import ( "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/internal/fanoutconsumer" "go.opentelemetry.io/collector/pipeline" - "go.opentelemetry.io/collector/pipeline/pipelineprofiles" + "go.opentelemetry.io/collector/pipeline/xpipeline" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/service/internal/builders" "go.opentelemetry.io/collector/service/internal/components" @@ -65,7 +65,7 @@ func (n *receiverNode) buildComponent(ctx context.Context, consumers = append(consumers, next.(consumer.Logs)) } n.Component, err = builder.CreateLogs(ctx, set, fanoutconsumer.NewLogs(consumers)) - case pipelineprofiles.SignalProfiles: + case xpipeline.SignalProfiles: var consumers []xconsumer.Profiles for _, next := range nexts { consumers = append(consumers, next.(xconsumer.Profiles)) diff --git a/service/internal/graph/util_test.go b/service/internal/graph/util_test.go index 9f0f39f3dcd..389a351d781 100644 --- a/service/internal/graph/util_test.go +++ b/service/internal/graph/util_test.go @@ -17,7 +17,7 @@ import ( "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/xexporter" "go.opentelemetry.io/collector/pipeline" - "go.opentelemetry.io/collector/pipeline/pipelineprofiles" + "go.opentelemetry.io/collector/pipeline/xpipeline" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/xprocessor" "go.opentelemetry.io/collector/receiver" @@ -79,7 +79,7 @@ func (g *Graph) getReceivers() map[pipeline.Signal]map[component.ID]component.Co receiversMap[pipeline.SignalTraces] = make(map[component.ID]component.Component) receiversMap[pipeline.SignalMetrics] = make(map[component.ID]component.Component) receiversMap[pipeline.SignalLogs] = make(map[component.ID]component.Component) - receiversMap[pipelineprofiles.SignalProfiles] = make(map[component.ID]component.Component) + receiversMap[xpipeline.SignalProfiles] = make(map[component.ID]component.Component) for _, pg := range g.pipelines { for _, rcvrNode := range pg.receivers { diff --git a/service/pipelines/config.go b/service/pipelines/config.go index 76a444127a6..025d3c8395a 100644 --- a/service/pipelines/config.go +++ b/service/pipelines/config.go @@ -10,7 +10,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/pipeline" - "go.opentelemetry.io/collector/pipeline/pipelineprofiles" + "go.opentelemetry.io/collector/pipeline/xpipeline" ) var ( @@ -42,7 +42,7 @@ func (cfg Config) Validate() error { switch pipelineID.Signal() { case pipeline.SignalTraces, pipeline.SignalMetrics, pipeline.SignalLogs: // Continue - case pipelineprofiles.SignalProfiles: + case xpipeline.SignalProfiles: if !serviceProfileSupportGate.IsEnabled() { return fmt.Errorf( "pipeline %q: profiling signal support is at alpha level, gated under the %q feature gate", diff --git a/service/pipelines/config_test.go b/service/pipelines/config_test.go index c42af8da603..eba5fa0cb6b 100644 --- a/service/pipelines/config_test.go +++ b/service/pipelines/config_test.go @@ -14,7 +14,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/pipeline" - "go.opentelemetry.io/collector/pipeline/pipelineprofiles" + "go.opentelemetry.io/collector/pipeline/xpipeline" ) func TestConfigValidate(t *testing.T) { @@ -80,7 +80,7 @@ func TestConfigValidate(t *testing.T) { name: "disabled-featuregate-profiles", cfgFn: func(*testing.T) Config { cfg := generateConfig(t) - cfg[pipeline.NewID(pipelineprofiles.SignalProfiles)] = &PipelineConfig{ + cfg[pipeline.NewID(xpipeline.SignalProfiles)] = &PipelineConfig{ Receivers: []component.ID{component.MustNewID("nop")}, Processors: []component.ID{component.MustNewID("nop")}, Exporters: []component.ID{component.MustNewID("nop")}, @@ -95,7 +95,7 @@ func TestConfigValidate(t *testing.T) { require.NoError(t, featuregate.GlobalRegistry().Set(serviceProfileSupportGateID, true)) cfg := generateConfig(t) - cfg[pipeline.NewID(pipelineprofiles.SignalProfiles)] = &PipelineConfig{ + cfg[pipeline.NewID(xpipeline.SignalProfiles)] = &PipelineConfig{ Receivers: []component.ID{component.MustNewID("nop")}, Processors: []component.ID{component.MustNewID("nop")}, Exporters: []component.ID{component.MustNewID("nop")}, diff --git a/service/service_test.go b/service/service_test.go index fb095f8a4a9..e36c75ab20f 100644 --- a/service/service_test.go +++ b/service/service_test.go @@ -31,7 +31,7 @@ import ( "go.opentelemetry.io/collector/internal/testutil" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pipeline" - "go.opentelemetry.io/collector/pipeline/pipelineprofiles" + "go.opentelemetry.io/collector/pipeline/xpipeline" "go.opentelemetry.io/collector/service/extensions" "go.opentelemetry.io/collector/service/internal/builders" "go.opentelemetry.io/collector/service/internal/promtest" @@ -248,8 +248,8 @@ func TestServiceGetExporters(t *testing.T) { assert.Contains(t, expMap[pipeline.SignalMetrics], component.NewID(nopType)) assert.Len(t, expMap[pipeline.SignalLogs], 1) assert.Contains(t, expMap[pipeline.SignalLogs], component.NewID(nopType)) - assert.Len(t, expMap[pipelineprofiles.SignalProfiles], 1) - assert.Contains(t, expMap[pipelineprofiles.SignalProfiles], component.NewID(nopType)) + assert.Len(t, expMap[xpipeline.SignalProfiles], 1) + assert.Contains(t, expMap[xpipeline.SignalProfiles], component.NewID(nopType)) } // TestServiceTelemetryCleanupOnError tests that if newService errors due to an invalid config telemetry is cleaned up @@ -658,7 +658,7 @@ func newNopConfig() Config { Processors: []component.ID{component.NewID(nopType)}, Exporters: []component.ID{component.NewID(nopType)}, }, - pipeline.NewID(pipelineprofiles.SignalProfiles): { + pipeline.NewID(xpipeline.SignalProfiles): { Receivers: []component.ID{component.NewID(nopType)}, Processors: []component.ID{component.NewID(nopType)}, Exporters: []component.ID{component.NewID(nopType)}, diff --git a/versions.yaml b/versions.yaml index 02972a652c6..655f3db8887 100644 --- a/versions.yaml +++ b/versions.yaml @@ -70,6 +70,7 @@ module-sets: - go.opentelemetry.io/collector/pdata/testdata - go.opentelemetry.io/collector/pipeline - go.opentelemetry.io/collector/pipeline/pipelineprofiles + - go.opentelemetry.io/collector/pipeline/xpipeline - go.opentelemetry.io/collector/processor - go.opentelemetry.io/collector/processor/processortest - go.opentelemetry.io/collector/processor/batchprocessor