Skip to content

Commit

Permalink
[processor/geoip] Add context config option (open-telemetry#34214)
Browse files Browse the repository at this point in the history
**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->

- Adds a `source.from` configuration option to let the user define where
the IP address will be looked for.
- Adds the `attribute` source.from option. IP address can be located in
the inner signal's attributes, thus adding the geographical attributes
accordingly.
- Decouples the signals processing in `geoip_processor_metrics.go`,
`geoip_processor_traces.go` and `geoip_processor_logs.go`. Polymorphism
could not be applied due to their different types and iteration
mechanism, with Go `1.23` we could undo this changes in favor of a
`pcommon.Map` iterator.


**Link to tracking Issue:** <Issue number if applicable>
open-telemetry#34036

**Testing:** <Describe what testing was performed and which tests were
added.>
- Removes the manual generation of testing signals in favor of
`testdata` + golden checks. The same test cases are applied for unit and
integration tests.

**Documentation:** <Describe the documentation added.> README.md updated

---------

Co-authored-by: Evan Bradley <11745660+evan-bradley@users.noreply.github.com>
Co-authored-by: Andrzej Stencel <andrzej.stencel@elastic.co>
  • Loading branch information
3 people authored and f7o committed Sep 12, 2024
1 parent ee23b6a commit 37e1b01
Show file tree
Hide file tree
Showing 49 changed files with 1,916 additions and 303 deletions.
27 changes: 27 additions & 0 deletions .chloggen/add_source_config_option.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: geoipprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add a context configuration option to specify the IP address attribute telemetry location.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34036]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
6 changes: 5 additions & 1 deletion processor/geoipprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

## Description

The geoIP processor `geoipprocessor` enhances resource attributes by appending information about the geographical location of an IP address. To add geographical information, the IP address must be included in the resource attributes using the [`source.address` semantic conventions key attribute](https://github.com/open-telemetry/semantic-conventions/blob/v1.26.0/docs/general/attributes.md#source).
The geoIP processor `geoipprocessor` enhances the attributes of a span, log, or metric by appending information about the geographical location of an IP address. To add geographical information, the IP address must be included in the attributes using the [`source.address` semantic conventions key attribute](https://github.com/open-telemetry/semantic-conventions/blob/v1.26.0/docs/general/attributes.md#source). By default, only the resource attributes will be modified. Please refer to [config.go](./config.go) for the config spec.

### Geographical location metadata

Expand All @@ -39,13 +39,17 @@ The following settings must be configured:

- `providers`: A map containing geographical location information providers. These providers are used to search for the geographical location attributes associated with an IP. Supported providers:
- [maxmind](./internal/provider/maxmindprovider/README.md)
- `context`: Allows specifying the underlying telemetry context the processor will work with. Available values:
- `resource`(default): Resource attributes.
- `record`: Attributes within a data point, log record or a span.

## Examples

```yaml
processors:
# processor name: geoip
geoip:
context: resource
providers:
maxmind:
database_path: /tmp/mygeodb
Expand Down
23 changes: 23 additions & 0 deletions processor/geoipprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package geoipprocessor // import "github.com/open-telemetry/opentelemetry-collec
import (
"errors"
"fmt"
"strings"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
Expand All @@ -17,10 +18,31 @@ const (
providersKey = "providers"
)

type ContextID string

const (
resource ContextID = "resource"
record ContextID = "record"
)

func (c *ContextID) UnmarshalText(text []byte) error {
str := ContextID(strings.ToLower(string(text)))
switch str {
case resource, record:
*c = str
return nil
default:
return fmt.Errorf("unknown context %s, available values: %s, %s", str, resource, record)
}
}

// Config holds the configuration for the GeoIP processor.
type Config struct {
// Providers specifies the sources to extract geographical information about a given IP.
Providers map[string]provider.Config `mapstructure:"-"`

// Context section allows specifying the source type to look for the IP. Available options: resource or record.
Context ContextID `mapstructure:"context"`
}

var (
Expand All @@ -39,6 +61,7 @@ func (cfg *Config) Validate() error {
return fmt.Errorf("error validating provider %s: %w", providerID, err)
}
}

return nil
}

Expand Down
16 changes: 15 additions & 1 deletion processor/geoipprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ func TestLoadConfig(t *testing.T) {
{
id: component.NewIDWithName(metadata.Type, "maxmind"),
expected: &Config{
Context: resource,
Providers: map[string]provider.Config{
"maxmind": &maxmind.Config{DatabasePath: "/tmp/db"},
},
},
},
{
id: component.NewIDWithName(metadata.Type, "maxmind_record_context"),
expected: &Config{
Context: record,
Providers: map[string]provider.Config{
"maxmind": &maxmind.Config{DatabasePath: "/tmp/db"},
},
Expand All @@ -44,6 +54,10 @@ func TestLoadConfig(t *testing.T) {
id: component.NewIDWithName(metadata.Type, "invalid_providers_config"),
unmarshalErrorMessage: "unexpected sub-config value kind for key:providers value:this should be a map kind:string",
},
{
id: component.NewIDWithName(metadata.Type, "invalid_source"),
unmarshalErrorMessage: "unknown context not.an.otlp.context, available values: resource, record",
},
}

for _, tt := range tests {
Expand All @@ -58,7 +72,7 @@ func TestLoadConfig(t *testing.T) {
require.NoError(t, err)

if tt.unmarshalErrorMessage != "" {
assert.EqualError(t, sub.Unmarshal(cfg), tt.unmarshalErrorMessage)
assert.ErrorContains(t, sub.Unmarshal(cfg), tt.unmarshalErrorMessage)
return
}
require.NoError(t, sub.Unmarshal(cfg))
Expand Down
10 changes: 6 additions & 4 deletions processor/geoipprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ func getProviderFactory(key string) (provider.GeoIPProviderFactory, bool) {

// createDefaultConfig returns a default configuration for the processor.
func createDefaultConfig() component.Config {
return &Config{}
return &Config{
Context: resource,
}
}

// createGeoIPProviders creates a list of GeoIPProvider instances based on the provided configuration and providers factories.
Expand Down Expand Up @@ -87,7 +89,7 @@ func createMetricsProcessor(ctx context.Context, set processor.Settings, cfg com
if err != nil {
return nil, err
}
return processorhelper.NewMetricsProcessor(ctx, set, cfg, nextConsumer, newGeoIPProcessor(defaultResourceAttributes, providers).processMetrics, processorhelper.WithCapabilities(processorCapabilities))
return processorhelper.NewMetricsProcessor(ctx, set, cfg, nextConsumer, newGeoIPProcessor(geoCfg, defaultResourceAttributes, providers).processMetrics, processorhelper.WithCapabilities(processorCapabilities))
}

func createTracesProcessor(ctx context.Context, set processor.Settings, cfg component.Config, nextConsumer consumer.Traces) (processor.Traces, error) {
Expand All @@ -96,7 +98,7 @@ func createTracesProcessor(ctx context.Context, set processor.Settings, cfg comp
if err != nil {
return nil, err
}
return processorhelper.NewTracesProcessor(ctx, set, cfg, nextConsumer, newGeoIPProcessor(defaultResourceAttributes, providers).processTraces, processorhelper.WithCapabilities(processorCapabilities))
return processorhelper.NewTracesProcessor(ctx, set, cfg, nextConsumer, newGeoIPProcessor(geoCfg, defaultResourceAttributes, providers).processTraces, processorhelper.WithCapabilities(processorCapabilities))
}

func createLogsProcessor(ctx context.Context, set processor.Settings, cfg component.Config, nextConsumer consumer.Logs) (processor.Logs, error) {
Expand All @@ -105,5 +107,5 @@ func createLogsProcessor(ctx context.Context, set processor.Settings, cfg compon
if err != nil {
return nil, err
}
return processorhelper.NewLogsProcessor(ctx, set, cfg, nextConsumer, newGeoIPProcessor(defaultResourceAttributes, providers).processLogs, processorhelper.WithCapabilities(processorCapabilities))
return processorhelper.NewLogsProcessor(ctx, set, cfg, nextConsumer, newGeoIPProcessor(geoCfg, defaultResourceAttributes, providers).processLogs, processorhelper.WithCapabilities(processorCapabilities))
}
67 changes: 20 additions & 47 deletions processor/geoipprocessor/geoip_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,31 @@ import (
"net"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/otel/attribute"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/geoipprocessor/internal/provider"
)

var (
errIPNotFound = errors.New("no IP address found in the resource attributes")
errParseIP = errors.New("could not parse IP address")
errUnspecifiedIP = errors.New("unspecified address")
errIPNotFound = errors.New("no IP address found in the resource attributes")
errParseIP = errors.New("could not parse IP address")
errUnspecifiedIP = errors.New("unspecified address")
errUnspecifiedSource = errors.New("no source attributes defined")
)

// newGeoIPProcessor creates a new instance of geoIPProcessor with the specified fields.
type geoIPProcessor struct {
providers []provider.GeoIPProvider
resourceAttributes []attribute.Key

cfg *Config
}

func newGeoIPProcessor(resourceAttributes []attribute.Key, providers []provider.GeoIPProvider) *geoIPProcessor {
func newGeoIPProcessor(processorConfig *Config, resourceAttributes []attribute.Key, providers []provider.GeoIPProvider) *geoIPProcessor {
return &geoIPProcessor{
resourceAttributes: resourceAttributes,
providers: providers,
cfg: processorConfig,
}
}

Expand All @@ -48,11 +49,11 @@ func parseIP(strIP string) (net.IP, error) {
return ip, nil
}

// ipFromResourceAttributes extracts an IP address from the given resource's attributes based on the specified fields.
// ipFromAttributes extracts an IP address from the given attributes based on the specified fields.
// It returns the first IP address if found, or an error if no valid IP address is found.
func ipFromResourceAttributes(attributes []attribute.Key, resource pcommon.Resource) (net.IP, error) {
func ipFromAttributes(attributes []attribute.Key, resource pcommon.Map) (net.IP, error) {
for _, attr := range attributes {
if ipField, found := resource.Attributes().Get(string(attr)); found {
if ipField, found := resource.Get(string(attr)); found {
// The attribute might contain a domain name. Skip any net.ParseIP error until we have a fine-grained error propagation strategy.
// TODO: propagate an error once error_mode configuration option is available (e.g. transformprocessor)
ipAttribute, err := parseIP(ipField.AsString())
Expand Down Expand Up @@ -80,9 +81,9 @@ func (g *geoIPProcessor) geoLocation(ctx context.Context, ip net.IP) (attribute.
return *allAttributes, nil
}

// processResource processes a single resource by adding geolocation attributes based on the found IP address.
func (g *geoIPProcessor) processResource(ctx context.Context, resource pcommon.Resource) error {
ipAddr, err := ipFromResourceAttributes(g.resourceAttributes, resource)
// processAttributes processes a pcommon.Map by adding geolocation attributes based on the found IP address.
func (g *geoIPProcessor) processAttributes(ctx context.Context, metadata pcommon.Map) error {
ipAddr, err := ipFromAttributes(g.resourceAttributes, metadata)
if err != nil {
// TODO: log IP error not found
if errors.Is(err, errIPNotFound) {
Expand All @@ -97,41 +98,13 @@ func (g *geoIPProcessor) processResource(ctx context.Context, resource pcommon.R
}

for _, geoAttr := range attributes.ToSlice() {
resource.Attributes().PutStr(string(geoAttr.Key), geoAttr.Value.AsString())
}

return nil
}

func (g *geoIPProcessor) processMetrics(ctx context.Context, ms pmetric.Metrics) (pmetric.Metrics, error) {
rm := ms.ResourceMetrics()
for i := 0; i < rm.Len(); i++ {
err := g.processResource(ctx, rm.At(i).Resource())
if err != nil {
return ms, err
switch geoAttr.Value.Type() {
case attribute.FLOAT64:
metadata.PutDouble(string(geoAttr.Key), geoAttr.Value.AsFloat64())
case attribute.STRING:
metadata.PutStr(string(geoAttr.Key), geoAttr.Value.AsString())
}
}
return ms, nil
}

func (g *geoIPProcessor) processTraces(ctx context.Context, ts ptrace.Traces) (ptrace.Traces, error) {
rt := ts.ResourceSpans()
for i := 0; i < rt.Len(); i++ {
err := g.processResource(ctx, rt.At(i).Resource())
if err != nil {
return ts, err
}
}
return ts, nil
}

func (g *geoIPProcessor) processLogs(ctx context.Context, ls plog.Logs) (plog.Logs, error) {
rl := ls.ResourceLogs()
for i := 0; i < rl.Len(); i++ {
err := g.processResource(ctx, rl.At(i).Resource())
if err != nil {
return ls, err
}
}
return ls, nil
return nil
}
35 changes: 35 additions & 0 deletions processor/geoipprocessor/geoip_processor_logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package geoipprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/geoipprocessor"

import (
"context"

"go.opentelemetry.io/collector/pdata/plog"
)

func (g *geoIPProcessor) processLogs(ctx context.Context, ls plog.Logs) (plog.Logs, error) {
rl := ls.ResourceLogs()
for i := 0; i < rl.Len(); i++ {
switch g.cfg.Context {
case resource:
err := g.processAttributes(ctx, rl.At(i).Resource().Attributes())
if err != nil {
return ls, err
}
case record:
for j := 0; j < rl.At(i).ScopeLogs().Len(); j++ {
for k := 0; k < rl.At(i).ScopeLogs().At(j).LogRecords().Len(); k++ {
err := g.processAttributes(ctx, rl.At(i).ScopeLogs().At(j).LogRecords().At(k).Attributes())
if err != nil {
return ls, err
}
}
}
default:
return ls, errUnspecifiedSource
}
}
return ls, nil
}
Loading

0 comments on commit 37e1b01

Please sign in to comment.