Skip to content

Commit

Permalink
telemetry: TestFaults
Browse files Browse the repository at this point in the history
  • Loading branch information
sh0rez committed Apr 8, 2024
1 parent ecc88a8 commit fb77807
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 7 deletions.
6 changes: 3 additions & 3 deletions processor/deltatocumulativeprocessor/internal/maybe/ptr.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func None[T any]() Ptr[T] {

// Some returns a pointer to the passed T.
//
// The ptr argument may be nil, in which case this represents "explictely set to
// The ptr argument may be nil, in which case this represents "explicitly set to
// nil".
func Some[T any](ptr *T) Ptr[T] {
return Ptr[T]{to: ptr, ok: true}
Expand All @@ -42,11 +42,11 @@ func Some[T any](ptr *T) Ptr[T] {
// Try attempts to de-reference the Ptr, giving one of three results:
//
// - nil, false: not-set
// - nil, true: explicitely set to nil
// - nil, true: explicitly set to nil
// - non-nil, true: set to some value
//
// This provides extra safety over bare pointers, because callers are forced by
// the compiler to either check or explicitely ignore the ok value.
// the compiler to either check or explicitly ignore the ok value.
func (ptr Ptr[T]) Try() (_ *T, ok bool) {
return ptr.to, ptr.ok
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package maybe_test

import (
"fmt"
"testing"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/maybe"
"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/maybe"
)

func TestMaybe(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (m LimitMap[T]) Store(id identity.Stream, v T) error {
if err := m.Map.Store(id, v); err != nil {
return err
}
return ErrEvicted{ErrLimit: errl, id: gone}
return ErrEvicted{ErrLimit: errl, Ident: gone}
}
return errl
}
Expand All @@ -53,9 +53,9 @@ func AtLimit(err error) bool {

type ErrEvicted struct {
ErrLimit
id Ident
Ident Ident
}

func (e ErrEvicted) Error() string {
return fmt.Sprintf("%s. evicted stream %s", e.ErrLimit, e.id)
return fmt.Sprintf("%s. evicted stream %s", e.ErrLimit, e.Ident)
}
150 changes: 150 additions & 0 deletions processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package telemetry_test

import (
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/otel/metric/noop"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testdata/random"
)

// TestFaults verifies certain non-fatal errors are actually caused and
// subsequently dropped. It does so by writing bad samples to the actual
// implementation instead of fabricating errors manually.
func TestFaults(t *testing.T) {
type Map = streams.Map[data.Number]
type Case struct {
Name string
Map Map
Pre func(Map, identity.Stream, data.Number) error
Bad func(Map, identity.Stream, data.Number) error
Err error
}

sum := random.Sum()
evid, evdp := sum.Stream()

cases := []Case{
{
Name: "older-start",
Pre: func(dps Map, id identity.Stream, dp data.Number) error {
dp.SetStartTimestamp(ts(20))
dp.SetTimestamp(ts(30))
return dps.Store(id, dp)
},
Bad: func(dps Map, id identity.Stream, dp data.Number) error {
dp.SetStartTimestamp(ts(10))
dp.SetTimestamp(ts(40))
return dps.Store(id, dp)
},
Err: delta.ErrOlderStart{Start: ts(20), Sample: ts(10)},
},
{
Name: "out-of-order",
Pre: func(dps Map, id identity.Stream, dp data.Number) error {
dp.SetTimestamp(ts(20))
return dps.Store(id, dp)
},
Bad: func(dps Map, id identity.Stream, dp data.Number) error {
dp.SetTimestamp(ts(10))
return dps.Store(id, dp)
},
Err: delta.ErrOutOfOrder{Last: ts(20), Sample: ts(10)},
},
{
Name: "gap",
Pre: func(dps Map, id identity.Stream, dp data.Number) error {
dp.SetStartTimestamp(ts(10))
dp.SetTimestamp(ts(20))
return dps.Store(id, dp)
},
Bad: func(dps Map, id identity.Stream, dp data.Number) error {
dp.SetStartTimestamp(ts(30))
dp.SetTimestamp(ts(40))
return dps.Store(id, dp)
},
Err: delta.ErrGap{From: ts(20), To: ts(30)},
},
{
Name: "limit",
Map: streams.Limit(delta.New[data.Number](), 1),
Pre: func(dps Map, id identity.Stream, dp data.Number) error {
dp.SetTimestamp(ts(10))
return dps.Store(id, dp)
},
Bad: func(dps Map, _ identity.Stream, _ data.Number) error {
id, dp := sum.Stream()
dp.SetTimestamp(ts(20))
return dps.Store(id, dp)
},
Err: streams.ErrLimit(1),
},
{
Name: "evict",
Map: func() Map {
ev := HeadEvictor[data.Number]{Map: delta.New[data.Number]()}
lim := streams.Limit(ev, 1)
lim.Evictor = ev
return lim
}(),
Pre: func(dps Map, _ identity.Stream, _ data.Number) error {
evdp.SetTimestamp(ts(10))
return dps.Store(evid, evdp)
},
Bad: func(dps Map, _ identity.Stream, _ data.Number) error {
id, dp := sum.Stream()
dp.SetTimestamp(ts(20))
return dps.Store(id, dp)
},
Err: streams.ErrEvicted{Ident: evid, ErrLimit: streams.ErrLimit(1)},
},
}

for _, c := range cases {
t.Run(c.Name, func(t *testing.T) {
id, dp := sum.Stream()
tel := telemetry.New(noop.Meter{})

dps := c.Map
if dps == nil {
dps = delta.New[data.Number]()
}
onf := telemetry.ObserveNonFatal(dps, &tel.Metrics)

if c.Pre != nil {
err := c.Pre(onf, id, dp.Clone())
require.NoError(t, err)
}

err := c.Bad(dps, id, dp.Clone())
require.Equal(t, c.Err, err)

err = c.Bad(onf, id, dp.Clone())
require.NoError(t, err)
})
}
}

type ts = pcommon.Timestamp

// HeadEvictor drops the first stream on Evict()
type HeadEvictor[T any] struct{ streams.Map[T] }

func (e HeadEvictor[T]) Evict() (evicted identity.Stream) {
e.Items()(func(id identity.Stream, _ T) bool {
e.Delete(id)
evicted = id
return false
})
return evicted
}

1 comment on commit fb77807

@RichieSams
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

Please sign in to comment.