Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/deltatocumulative]: observe accumulation metrics #31363

Merged
merged 23 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/deltatocumulative-metrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: "enhancement"

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: deltatocumulativeprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: self-instrumentation to observe key metrics of the stream accumulation

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30705]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
17 changes: 17 additions & 0 deletions processor/deltatocumulativeprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,21 @@ processors:
# upper limit of streams to track. new streams exceeding this limit
# will be dropped
[ max_streams: <int> | default = 0 (off) ]

```

There is no further configuration required. All delta samples are converted to cumulative.

## Troubleshooting

The following metrics are recorded when [telemetry is
enabled](https://opentelemetry.io/docs/collector/configuration/#telemetry):

| Name | Description | Unit |
|------------------------------------------|---------------------------------------------------------------------------------------|---------------|
| `deltatocumulative.streams.tracked` | Number of streams currently tracked by the aggregation state | `{stream}` |
andrzej-stencel marked this conversation as resolved.
Show resolved Hide resolved
| `deltatocumulative.streams.limit` | Upper limit of tracked streams | `{stream}` |
| `deltatocumulative.streams.evicted` | Number of streams removed from tracking to ingest newer streams | `{stream}` |
| `deltatocumulative.datapoints.processed` | Total number of datapoints processed, whether successful or not | `{datapoint}` |
| `deltatocumulative.datapoints.dropped` | Faulty datapoints that were dropped due to the reason given in the `reason` attribute | `{datapoint}` |
| `deltatocumulative.gaps.length` | Total length of all gaps in the streams, which occur e.g. due to lost in transit | `second` |
3 changes: 2 additions & 1 deletion processor/deltatocumulativeprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ func createMetricsProcessor(_ context.Context, set processor.CreateSettings, cfg
return nil, fmt.Errorf("configuration parsing error")
}

return newProcessor(pcfg, set.Logger, next), nil
meter := metadata.Meter(set.TelemetrySettings)
return newProcessor(pcfg, set.Logger, meter, next), nil
}
3 changes: 2 additions & 1 deletion processor/deltatocumulativeprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
go.opentelemetry.io/collector/consumer v0.97.1-0.20240409140257-792fac1b62d4
go.opentelemetry.io/collector/pdata v1.4.1-0.20240409140257-792fac1b62d4
go.opentelemetry.io/collector/processor v0.97.1-0.20240409140257-792fac1b62d4
go.opentelemetry.io/otel v1.24.0
go.opentelemetry.io/otel/metric v1.24.0
go.opentelemetry.io/otel/trace v1.24.0
go.uber.org/zap v1.27.0
Expand Down Expand Up @@ -39,9 +40,9 @@ require (
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
go.opentelemetry.io/collector v0.97.1-0.20240409140257-792fac1b62d4 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.97.1-0.20240409140257-792fac1b62d4 // indirect
go.opentelemetry.io/collector/pdata/testdata v0.0.0-20240408153657-fc289290613a // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.46.0 // indirect
go.opentelemetry.io/otel/sdk v1.24.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.24.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions processor/deltatocumulativeprocessor/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 18 additions & 1 deletion processor/deltatocumulativeprocessor/internal/delta/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,17 @@ func (a Accumulator[D]) Store(id streams.Ident, dp D) error {
return ErrOutOfOrder{Last: aggr.Timestamp(), Sample: dp.Timestamp()}
}

// detect gaps
var gap error
if dp.StartTimestamp() > aggr.Timestamp() {
sh0rez marked this conversation as resolved.
Show resolved Hide resolved
gap = ErrGap{From: aggr.Timestamp(), To: dp.StartTimestamp()}
}

res := aggr.Add(dp)
return a.Map.Store(id, res)
if err := a.Map.Store(id, res); err != nil {
return err
}
return gap
}

type ErrOlderStart struct {
Expand All @@ -65,3 +74,11 @@ type ErrOutOfOrder struct {
func (e ErrOutOfOrder) Error() string {
return fmt.Sprintf("out of order: dropped sample from time=%s, because series is already at time=%s", e.Sample, e.Last)
}

type ErrGap struct {
From, To pcommon.Timestamp
}

func (e ErrGap) Error() string {
return fmt.Sprintf("gap in stream from %s to %s. samples were likely lost in transit", e.From, e.To)
}
52 changes: 52 additions & 0 deletions processor/deltatocumulativeprocessor/internal/maybe/ptr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// maybe provides utilities for representing data may or may not exist at
// runtime in a safe way.
//
// A typical approach to this are pointers, but they suffer from two issues:
// - Unsafety: permitting nil pointers must require careful checking on each use,
// which is easily forgotten
// - Blindness: nil itself does cannot differentiate between "set to nil" and
// "not set all", leading to unexepcted edge cases
//
// The [Ptr] type of this package provides a safe alternative with a clear
// distinction between "not set" and "set to nil".
package maybe // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/maybe"

// Ptr references some value of type T that is not guaranteed to exist.
// Callers must use [Ptr.Try] to access the underlying value, checking the
// ok return value too.
// This provides a clear distinction between "not set" and "set to nil".
//
// Use [Some] and [None] to create Ptrs.
type Ptr[T any] struct {
to *T
ok bool
}

// None returns a Ptr that represents "not-set".
// This is equal to a zero-value Ptr.
func None[T any]() Ptr[T] {
return Ptr[T]{to: nil, ok: false}
}

// Some returns a pointer to the passed T.
//
// The ptr argument may be nil, in which case this represents "explicitly set to
// nil".
func Some[T any](ptr *T) Ptr[T] {
return Ptr[T]{to: ptr, ok: true}
}

// Try attempts to de-reference the Ptr, giving one of three results:
//
// - nil, false: not-set
// - nil, true: explicitly set to nil
// - non-nil, true: set to some value
//
// This provides extra safety over bare pointers, because callers are forced by
// the compiler to either check or explicitly ignore the ok value.
func (ptr Ptr[T]) Try() (_ *T, ok bool) {
return ptr.to, ptr.ok
}
64 changes: 64 additions & 0 deletions processor/deltatocumulativeprocessor/internal/maybe/ptr_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package maybe_test

import (
"fmt"
"testing"

"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/maybe"
)

func TestMaybe(t *testing.T) {
t.Run("zero-not-ok", func(t *testing.T) {
var ptr maybe.Ptr[int]
_, ok := ptr.Try()
require.False(t, ok)
})
t.Run("none-not-ok", func(t *testing.T) {
ptr := maybe.None[int]()
_, ok := ptr.Try()
require.False(t, ok)
})
t.Run("explicit-nil", func(t *testing.T) {
ptr := maybe.Some[int](nil)
v, ok := ptr.Try()
require.Nil(t, v)
require.True(t, ok)
})
t.Run("value", func(t *testing.T) {
num := 42
ptr := maybe.Some(&num)
v, ok := ptr.Try()
require.True(t, ok)
require.Equal(t, num, *v)
})
}

func ExamplePtr() {
var unset maybe.Ptr[int] // = maybe.None()
if v, ok := unset.Try(); ok {
fmt.Println("unset:", v)
} else {
fmt.Println("unset: !ok")
}

var xnil maybe.Ptr[int] = maybe.Some[int](nil)
if v, ok := xnil.Try(); ok {
fmt.Println("explicit nil:", v)
}

num := 42
var set maybe.Ptr[int] = maybe.Some(&num)
if v, ok := set.Try(); ok {
fmt.Println("set:", *v)
}

// Output:
// unset: !ok
// explicit nil: <nil>
// set: 42
}
14 changes: 6 additions & 8 deletions processor/deltatocumulativeprocessor/internal/streams/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ type LimitMap[T any] struct {
}

func (m LimitMap[T]) Store(id identity.Stream, v T) error {
if m.Map.Len() < m.Max {
_, ok := m.Map.Load(id)
avail := m.Map.Len() < m.Max
if ok || avail {
return m.Map.Store(id, v)
}

Expand All @@ -33,7 +35,7 @@ func (m LimitMap[T]) Store(id identity.Stream, v T) error {
if err := m.Map.Store(id, v); err != nil {
return err
}
return ErrEvicted{ErrLimit: errl, id: gone}
return ErrEvicted{ErrLimit: errl, Ident: gone}
}
return errl
}
Expand All @@ -51,13 +53,9 @@ func AtLimit(err error) bool {

type ErrEvicted struct {
ErrLimit
id Ident
Ident Ident
}

func (e ErrEvicted) Error() string {
return fmt.Sprintf("%s. evicted stream %s", e.ErrLimit, e.id)
}

func (e ErrEvicted) Unwrap() error {
return e.ErrLimit
return fmt.Sprintf("%s. evicted stream %s", e.ErrLimit, e.Ident)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ func TestLimit(t *testing.T) {
lim := streams.Limit(items, 10)

ids := make([]identity.Stream, 10)
dps := make([]data.Number, 10)

// write until limit must work
for i := 0; i < 10; i++ {
id, dp := sum.Stream()
ids[i] = id
dps[i] = dp
err := lim.Store(id, dp)
require.NoError(t, err)
}
Expand All @@ -40,6 +42,12 @@ func TestLimit(t *testing.T) {
require.True(t, streams.AtLimit(err))
}

// write to existing must work
{
err := lim.Store(ids[3], dps[3])
require.NoError(t, err)
}

// after removing one, must be accepted again
{
lim.Delete(ids[0])
Expand Down
Loading