-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Adding new component: Schema Processor
This is the initial commit into the collector to add in the component to help enforce a standard semantic convention for all signals.
- Loading branch information
1 parent
814c3a9
commit 5f83e82
Showing
20 changed files
with
2,028 additions
and
0 deletions.
There are no files selected for viewing
Validating CODEOWNERS rules …
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
include ../../Makefile.Common |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
# Schema Transformer Processor | ||
|
||
🚧 _Currently under development, subject to change_ 🚧 | ||
|
||
Supported Pipelines: traces, metrics, logs | ||
|
||
The _Schema Processor_ is used to convert existing telemetry data or signals to a version of the semantic convention defined as part of the configuration. | ||
The processor works by using a set of target schema URLs that are used to match incoming signals. | ||
On a match, the processor will fetch the schema translation file (if not cached) set by the incoming signal and apply the transformations | ||
required to export as the target semantic convention version. | ||
|
||
Furthermore, it is also possible for organisations and vendors to publish their own semantic conventions and be used by this processor, | ||
be sure to follow [schema overview](https://opentelemetry.io/docs/reference/specification/schemas/overview/) for all the details. | ||
|
||
## Caching Schema Translation Files | ||
|
||
In order to improve efficiency of the processor, the `prefetch` option allows the processor to start downloading and preparing | ||
the translations needed for signals that match the schema URL. | ||
|
||
## Schema Formats | ||
|
||
A schema URl is made up in two parts, _Schema Family_ and _Schema Version_, the schema URL is broken down like so: | ||
|
||
```text | ||
| Schema URL | | ||
| https://example.com/telemetry/schemas/ | | 1.0.1 | | ||
| Schema Family | | Schema Version | | ||
``` | ||
|
||
The final path in the schema URL _MUST_ be the schema version and the preceding portion of the URL is the _Schema Family_. | ||
To read about schema formats, please read more [here](https://opentelemetry.io/docs/reference/specification/schemas/overview/#schema-url) | ||
|
||
## Targets Schemas | ||
|
||
Targets define a set of schema URLs with a schema identifier that will be used to translate any schema URL that matches the target URL to that version. | ||
In the event that the processor matches a signal to a target, the processor will translate the signal from the published one to the defined identifier; | ||
for example using the configuration below, a signal published with the `https://opentelemetry.io/schemas/1.8.0` schema will be translated | ||
by the collector to the `https//opentelemetry.io/schemas/1.6.1` schema. | ||
Within the schema targets, no duplicate schema families are allowed and will report an error if detected. | ||
|
||
|
||
# Example | ||
|
||
```yaml | ||
processors: | ||
schema: | ||
prefetch: | ||
- https://opentelemetry.io/schemas/1.9.0 | ||
targets: | ||
- https://opentelemetry.io/schemas/1.6.1 | ||
- http://example.com/telemetry/schemas/1.0.1 | ||
``` | ||
For more complete examples, please refer to [config.yml](./testdata/config.yml). |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package schemaprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor" | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
|
||
"go.opentelemetry.io/collector/config" | ||
|
||
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/schema" | ||
) | ||
|
||
var ( | ||
errRequiresTargets = errors.New("requires schema targets") | ||
errDuplicateTargets = errors.New("duplicate targets detected") | ||
) | ||
|
||
// Config defines the user provided values for the Schema Processor | ||
type Config struct { | ||
config.ProcessorSettings `mapstructure:",squash"` | ||
|
||
// PreCache is a list of schema URLs that are downloaded | ||
// and cached at the start of the collector runtime | ||
// in order to avoid fetching data that later on could | ||
// block processing of signals. (Optional field) | ||
Prefetch []string `mapstructure:"prefetch"` | ||
|
||
// Targets define what schema families should be | ||
// translated to, allowing older and newer formats | ||
// to conform to the target schema identifier. | ||
Targets []string `mapstructure:"targets"` | ||
} | ||
|
||
func (c *Config) Validate() error { | ||
for _, schemaURL := range c.Prefetch { | ||
_, _, err := schema.GetFamilyAndIdentifier(schemaURL) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
// Not strictly needed since it would just pass on | ||
// any data that doesn't match targets, however defining | ||
// this processor with no targets is wasteful. | ||
if len(c.Targets) == 0 { | ||
return fmt.Errorf("no schema targets defined: %w", errRequiresTargets) | ||
} | ||
|
||
families := make(map[string]struct{}) | ||
for _, target := range c.Targets { | ||
family, _, err := schema.GetFamilyAndIdentifier(target) | ||
if err != nil { | ||
return err | ||
} | ||
if _, exist := families[family]; exist { | ||
return errDuplicateTargets | ||
} | ||
families[family] = struct{}{} | ||
} | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package schemaprocessor | ||
|
||
import ( | ||
"path" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"go.opentelemetry.io/collector/component/componenttest" | ||
"go.opentelemetry.io/collector/config" | ||
"go.opentelemetry.io/collector/service/servicetest" | ||
|
||
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/schema" | ||
) | ||
|
||
func TestLoadConfig(t *testing.T) { | ||
t.Parallel() | ||
|
||
factories, err := componenttest.NopFactories() | ||
require.NoError(t, err, "Must not error on creating Nop factories") | ||
|
||
factory := NewFactory() | ||
factories.Processors[typeStr] = factory | ||
|
||
cfg, err := servicetest.LoadConfigAndValidate(path.Join("testdata", "config.yml"), factories) | ||
require.NoError(t, err, "Must not error when loading configuration") | ||
|
||
pcfg := cfg.Processors[config.NewComponentIDWithName(typeStr, "with-all-options")] | ||
assert.Equal(t, pcfg, &Config{ | ||
ProcessorSettings: config.NewProcessorSettings(config.NewComponentIDWithName(typeStr, "with-all-options")), | ||
Prefetch: []string{ | ||
"https://opentelemetry.io/schemas/1.9.0", | ||
}, | ||
Targets: []string{ | ||
"https://opentelemetry.io/schemas/1.4.2", | ||
"https://example.com/otel/schemas/1.2.0", | ||
}, | ||
}) | ||
} | ||
|
||
func TestConfigurationValidation(t *testing.T) { | ||
t.Parallel() | ||
|
||
tests := []struct { | ||
scenario string | ||
target []string | ||
expectError error | ||
}{ | ||
{scenario: "No targets", target: nil, expectError: errRequiresTargets}, | ||
{ | ||
scenario: "One target of incomplete schema family", | ||
target: []string{"opentelemetry.io/schemas/1.0.0"}, | ||
expectError: schema.ErrInvalidFamily, | ||
}, | ||
{ | ||
scenario: "One target of incomplete schema identifier", | ||
target: []string{"https://opentelemetry.io/schemas/1"}, | ||
expectError: schema.ErrInvalidIdentifier, | ||
}, | ||
{ | ||
scenario: "Valid target(s)", | ||
target: []string{ | ||
"https://opentelemetry.io/schemas/1.9.0", | ||
}, | ||
expectError: nil, | ||
}, | ||
{ | ||
scenario: "Duplicate targets", | ||
target: []string{ | ||
"https://opentelemetry.io/schemas/1.9.0", | ||
"https://opentelemetry.io/schemas/1.0.0", | ||
}, | ||
expectError: errDuplicateTargets, | ||
}, | ||
} | ||
|
||
for _, tc := range tests { | ||
cfg := &Config{ | ||
Targets: tc.target, | ||
} | ||
|
||
assert.ErrorIs(t, cfg.Validate(), tc.expectError, tc.scenario) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package schemaprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor" | ||
|
||
import ( | ||
"context" | ||
|
||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/config" | ||
"go.opentelemetry.io/collector/consumer" | ||
"go.opentelemetry.io/collector/processor/processorhelper" | ||
) | ||
|
||
const typeStr = "schema" | ||
|
||
var processorCapabilities = consumer.Capabilities{MutatesData: true} | ||
|
||
// factory will store any of the precompiled schemas in future | ||
type factory struct{} | ||
|
||
// newDefaultConfiguration returns the configuration for schema transformer processor | ||
// with the default values being used throughout it | ||
func newDefaultConfiguration() config.Processor { | ||
return &Config{ | ||
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)), | ||
} | ||
} | ||
|
||
func NewFactory() component.ProcessorFactory { | ||
f := &factory{} | ||
return component.NewProcessorFactory( | ||
typeStr, | ||
newDefaultConfiguration, | ||
component.WithLogsProcessor(f.createLogsProcessor), | ||
component.WithMetricsProcessor(f.createMetricsProcessor), | ||
component.WithTracesProcessor(f.createTracesProcessor), | ||
) | ||
} | ||
|
||
func (f factory) createLogsProcessor( | ||
ctx context.Context, | ||
set component.ProcessorCreateSettings, | ||
cfg config.Processor, | ||
next consumer.Logs, | ||
) (component.LogsProcessor, error) { | ||
transformer, err := newTransformer(ctx, cfg, set) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return processorhelper.NewLogsProcessor( | ||
cfg, | ||
next, | ||
transformer.processLogs, | ||
processorhelper.WithCapabilities(processorCapabilities), | ||
processorhelper.WithStart(transformer.start), | ||
) | ||
} | ||
|
||
func (f factory) createMetricsProcessor( | ||
ctx context.Context, | ||
set component.ProcessorCreateSettings, | ||
cfg config.Processor, | ||
next consumer.Metrics, | ||
) (component.MetricsProcessor, error) { | ||
transformer, err := newTransformer(ctx, cfg, set) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return processorhelper.NewMetricsProcessor( | ||
cfg, | ||
next, | ||
transformer.processMetrics, | ||
processorhelper.WithCapabilities(processorCapabilities), | ||
processorhelper.WithStart(transformer.start), | ||
) | ||
} | ||
|
||
func (f factory) createTracesProcessor( | ||
ctx context.Context, | ||
set component.ProcessorCreateSettings, | ||
cfg config.Processor, | ||
next consumer.Traces, | ||
) (component.TracesProcessor, error) { | ||
transformer, err := newTransformer(ctx, cfg, set) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return processorhelper.NewTracesProcessor( | ||
cfg, | ||
next, | ||
transformer.processTraces, | ||
processorhelper.WithCapabilities(processorCapabilities), | ||
processorhelper.WithStart(transformer.start), | ||
) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package schemaprocessor_test |
Oops, something went wrong.