Skip to content

Commit

Permalink
Add pipeline module
Browse files Browse the repository at this point in the history
  • Loading branch information
TylerHelmuth committed Sep 18, 2024
1 parent 59c083f commit e7a8c7d
Show file tree
Hide file tree
Showing 15 changed files with 482 additions and 0 deletions.
25 changes: 25 additions & 0 deletions .chloggen/add-pipeline-module.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: pipeline

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adds new `pipeline` module to house the concept of pipeline ID and Signal.

# One or more tracking issues or pull requests related to the change
issues: [11209]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -303,11 +303,13 @@ check-contrib:
-replace go.opentelemetry.io/collector/extension/zpagesextension=$(CURDIR)/extension/zpagesextension \
-replace go.opentelemetry.io/collector/featuregate=$(CURDIR)/featuregate \
-replace go.opentelemetry.io/collector/internal/globalgates=$(CURDIR)/internal/globalgates \
-replace go.opentelemetry.io/collector/internal/globalsignal=$(CURDIR)/internal/globalsignal \
-replace go.opentelemetry.io/collector/otelcol=$(CURDIR)/otelcol \
-replace go.opentelemetry.io/collector/otelcol/otelcoltest=$(CURDIR)/otelcol/otelcoltest \
-replace go.opentelemetry.io/collector/pdata=$(CURDIR)/pdata \
-replace go.opentelemetry.io/collector/pdata/testdata=$(CURDIR)/pdata/testdata \
-replace go.opentelemetry.io/collector/pdata/pprofile=$(CURDIR)/pdata/pprofile \
-replace go.opentelemetry.io/collector/pipeline=$(CURDIR)/pipeline \
-replace go.opentelemetry.io/collector/processor=$(CURDIR)/processor \
-replace go.opentelemetry.io/collector/processor/batchprocessor=$(CURDIR)/processor/batchprocessor \
-replace go.opentelemetry.io/collector/processor/memorylimiterprocessor=$(CURDIR)/processor/memorylimiterprocessor \
Expand Down Expand Up @@ -369,11 +371,13 @@ restore-contrib:
-dropreplace go.opentelemetry.io/collector/extension/zpagesextension \
-dropreplace go.opentelemetry.io/collector/featuregate \
-dropreplace go.opentelemetry.io/collector/internal/globalgates \
-dropreplace go.opentelemetry.io/collector/internal/globalsignal \
-dropreplace go.opentelemetry.io/collector/otelcol \
-dropreplace go.opentelemetry.io/collector/otelcol/otelcoltest \
-dropreplace go.opentelemetry.io/collector/pdata \
-dropreplace go.opentelemetry.io/collector/pdata/testdata \
-dropreplace go.opentelemetry.io/collector/pdata/pprofile \
-dropreplace go.opentelemetry.io/collector/pipeline \
-dropreplace go.opentelemetry.io/collector/processor \
-dropreplace go.opentelemetry.io/collector/processor/batchprocessor \
-dropreplace go.opentelemetry.io/collector/processor/memorylimiterprocessor \
Expand Down
1 change: 1 addition & 0 deletions internal/globalsignal/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
11 changes: 11 additions & 0 deletions internal/globalsignal/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module go.opentelemetry.io/collector/internal/globalsignal

go 1.22.0

require github.com/stretchr/testify v1.9.0

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
10 changes: 10 additions & 0 deletions internal/globalsignal/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 50 additions & 0 deletions internal/globalsignal/signal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package globalsignal // import "go.opentelemetry.io/collector/internal/globalsignal"

import (
"fmt"
"regexp"
)

// Signal represents the signals supported by the collector.
type Signal struct {
name string
}

// String returns the string representation of the signal.
func (s Signal) String() string {
return s.name
}

// MarshalText marshals the Signal.
func (s Signal) MarshalText() (text []byte, err error) {
return []byte(s.name), nil
}

// signalRegex is used to validate the signal.
// A signal must consist of 1 to 62 lowercase ASCII alphabetic characters.
var signalRegex = regexp.MustCompile(`^[a-z]{1,62}$`)

// NewSignal creates a Signal. It returns an error if the Signal is invalid.
// A Signal must consist of 1 to 62 lowercase ASCII alphabetic characters.
func NewSignal(signal string) (Signal, error) {
if len(signal) == 0 {
return Signal{}, fmt.Errorf("signal must not be empty")
}
if !signalRegex.MatchString(signal) {
return Signal{}, fmt.Errorf("invalid character(s) in type %q", signal)
}
return Signal{name: signal}, nil
}

// MustNewSignal creates a Signal. It panics if the Signal is invalid.
// A signal must consist of 1 to 62 lowercase ASCII alphabetic characters.
func MustNewSignal(signal string) Signal {
s, err := NewSignal(signal)
if err != nil {
panic(err)

Check warning on line 47 in internal/globalsignal/signal.go

View check run for this annotation

Codecov / codecov/patch

internal/globalsignal/signal.go#L47

Added line #L47 was not covered by tests
}
return s
}
41 changes: 41 additions & 0 deletions internal/globalsignal/signal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package globalsignal

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func Test_NewSignal(t *testing.T) {
s, err := NewSignal("traces")
require.NoError(t, err)
assert.Equal(t, Signal{name: "traces"}, s)
}

func Test_NewSignal_Invalid(t *testing.T) {
_, err := NewSignal("")
require.Error(t, err)
_, err = NewSignal("TRACES")
require.Error(t, err)
}

func Test_MustNewSignal(t *testing.T) {
s := MustNewSignal("traces")
assert.Equal(t, Signal{name: "traces"}, s)
}

func Test_Signal_String(t *testing.T) {
s := MustNewSignal("traces")
assert.Equal(t, "traces", s.String())
}

func Test_Signal_MarshalText(t *testing.T) {
s := MustNewSignal("traces")
b, err := s.MarshalText()
require.NoError(t, err)
assert.Equal(t, []byte("traces"), b)
}
1 change: 1 addition & 0 deletions pipeline/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../Makefile.Common
16 changes: 16 additions & 0 deletions pipeline/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
module go.opentelemetry.io/collector/pipeline

go 1.22.0

require (
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/internal/globalsignal v0.109.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace go.opentelemetry.io/collector/internal/globalsignal => ../internal/globalsignal
10 changes: 10 additions & 0 deletions pipeline/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

131 changes: 131 additions & 0 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package pipeline // import "go.opentelemetry.io/collector/pipeline"
import (
"errors"
"fmt"
"regexp"
"strings"

"go.opentelemetry.io/collector/internal/globalsignal"
)

// typeAndNameSeparator is the separator that is used between type and name in type/name composite keys.
const typeAndNameSeparator = "/"

// ID represents the identity for a pipeline. It combines two values:
// * signal - the Signal of the pipeline.
// * name - the name of that pipeline.
type ID struct {
signal Signal `mapstructure:"-"`
name string `mapstructure:"-"`
}

// NewID returns a new ID with the given Signal and empty name.
func NewID(signal Signal) ID {
return ID{signal: signal}
}

// MustNewID builds a Signal and returns a new ID with the given Signal and empty name.
// It panics if the Signal is invalid.
// A signal must consist of 1 to 62 lowercase ASCII alphabetic characters.
func MustNewID(signal string) ID {
return ID{signal: globalsignal.MustNewSignal(signal)}
}

// NewIDWithName returns a new ID with the given Signal and name.
func NewIDWithName(signal Signal, name string) ID {
return ID{signal: signal, name: name}
}

// MustNewIDWithName builds a Signal and returns a new ID with the given Signal and name.
// It panics if the Signal is invalid or name is invalid.
// A signal must consist of 1 to 62 lowercase ASCII alphabetic characters.
// A name must consist of 1 to 1024 unicode characters excluding whitespace, control characters, and symbols.
func MustNewIDWithName(signal string, name string) ID {
id := ID{signal: globalsignal.MustNewSignal(signal)}
err := validateName(name)
if err != nil {
panic(err)

Check warning on line 50 in pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

pipeline/pipeline.go#L50

Added line #L50 was not covered by tests
}
id.name = name
return id
}

// Signal returns the Signal of the ID.
func (i ID) Signal() Signal {
return i.signal
}

// Name returns the name of the ID.
func (i ID) Name() string {
return i.name
}

// MarshalText implements the encoding.TextMarshaler interface.
// This marshals the Signal and name as one string in the config.
func (i ID) MarshalText() (text []byte, err error) {
return []byte(i.String()), nil
}

// UnmarshalText implements the encoding.TextUnmarshaler interface.
func (i *ID) UnmarshalText(text []byte) error {
idStr := string(text)
items := strings.SplitN(idStr, typeAndNameSeparator, 2)
var signalStr, nameStr string
if len(items) >= 1 {
signalStr = strings.TrimSpace(items[0])
}

if len(items) == 1 && signalStr == "" {
return errors.New("id must not be empty")
}

if signalStr == "" {
return fmt.Errorf("in %q id: the part before %s should not be empty", idStr, typeAndNameSeparator)
}

if len(items) > 1 {
// "name" part is present.
nameStr = strings.TrimSpace(items[1])
if nameStr == "" {
return fmt.Errorf("in %q id: the part after %s should not be empty", idStr, typeAndNameSeparator)
}
if err := validateName(nameStr); err != nil {
return fmt.Errorf("in %q id: %w", nameStr, err)
}
}

var err error
if i.signal, err = globalsignal.NewSignal(signalStr); err != nil {
return fmt.Errorf("in %q id: %w", idStr, err)
}
i.name = nameStr

return nil
}

// String returns the ID string representation as "signal[/name]" format.
func (i ID) String() string {
if i.name == "" {
return i.signal.String()
}

return i.signal.String() + typeAndNameSeparator + i.name
}

// nameRegexp is used to validate the name of an ID. A name can consist of
// 1 to 1024 unicode characters excluding whitespace, control characters, and
// symbols.
var nameRegexp = regexp.MustCompile(`^[^\pZ\pC\pS]+$`)

func validateName(nameStr string) error {
if len(nameStr) > 1024 {
return fmt.Errorf("name %q is longer than 1024 characters (%d characters)", nameStr, len(nameStr))
}
if !nameRegexp.MatchString(nameStr) {
return fmt.Errorf("invalid character(s) in name %q", nameStr)
}
return nil
}
Loading

0 comments on commit e7a8c7d

Please sign in to comment.