Skip to content

Commit

Permalink
Output zero values at reset points (#190)
Browse files Browse the repository at this point in the history
* Output 0 values for cumualtive resets; eliminate overlap testing

* Do not count cumulative resets

* Changelog

* Use zero-width points consistently

* one log cleanup
  • Loading branch information
jmacd authored Apr 14, 2021
1 parent 9fecd5d commit 0b96283
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 194 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Changed

- OTLP data points re-use Resource and InstrumentationLibrary (thus are smaller). (#182)
- Counter reset events output zero values at the reset timestamp, instead of skipping points. (#190)
- `sidecar.metrics.invalid` broadened to include non-validation failures, renamed `sidecar.metrics.failing`. (#188)

### Removed

- Removed counters `sidecar.samples.produced` & `sidecar.samples.processed`. (#187)
- Removed counter `sidecar.cumulative.missing_resets`. (#190)
- Removed overlap detection, this cannot happen without the MonitoredResource transform removed in #2. (#190)

## [0.21.1](https://github.com/lightstep/opentelemetry-prometheus-sidecar/releases/tag/v0.21.1) - 2021-04-06

Expand Down
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -465,9 +465,8 @@ Metrics from the subordinate process can help identify issues once the first met
| sidecar.series.dropped | counter | number of series or metrics dropped | `key_reason`: various |
| sidecar.points.produced | counter | number of points read from the prometheus WAL | |
| sidecar.points.dropped | counter | number of points dropped due to errors | `key_reason`: various |
| sidecar.points.skipped | counter | number of points skipped by filters or cumulative resets | |
| sidecar.points.skipped | counter | number of points skipped due to filters | |
| sidecar.metadata.lookups | counter | number of calls to lookup metadata | `error`: true, false |
| sidecar.cumulative.missing_resets | counter | number of points skipped because cumulative reset time was not known | |
| sidecar.series.current | gauge | number of series refs in the series cache | `status`: live, filtered, invalid |
| sidecar.wal.size | gauge | size of the prometheus WAL | |
| sidecar.wal.offset | gauge | current offset in the prometheus WAL | |
Expand Down
4 changes: 2 additions & 2 deletions cmd/opentelemetry-prometheus-sidecar/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,9 @@ func TestE2E(t *testing.T) {
}

expect := map[string][]float64{
"some_counter": {1, 2, 3, 4, 5},
"some_counter": {0, 1, 2, 3, 4},
"some_gauge": {1, 2, 3, 4, 5},
"some_counter_relabel": {1, 2, 3, 4, 5},
"some_counter_relabel": {0, 1, 2, 3, 4},
"some_gauge_relabel": {1, 2, 3, 4, 5},
}

Expand Down
19 changes: 10 additions & 9 deletions cmd/opentelemetry-prometheus-sidecar/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestValidationErrorReporting(t *testing.T) {
}

// Create a WAL with 3 series, 5 points. Two of them are
// counters, so after resets we have 3 series, 3 points.
// counters, so after resets we have 3 series, 5 points.
dir, err := ioutil.TempDir("", "test_validation")
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -161,16 +161,16 @@ func TestValidationErrorReporting(t *testing.T) {
timer := time.NewTimer(time.Second * 10)
defer timer.Stop()

// Wait for 3 specific points, then 3 specific meta points.
var droppedPointsFound, droppedSeriesFound, skippedPointsFound int64
// Wait for 3 specific points, then 2 specific meta points.
var droppedPointsFound, droppedSeriesFound int64
var got = 0
outer:
for got < 3 || droppedPointsFound == 0 || droppedSeriesFound == 0 || skippedPointsFound == 0 {
for got < 5 || droppedPointsFound == 0 || droppedSeriesFound == 0 {
var data *metrics.ResourceMetrics
select {
case data = <-ms.metrics:
case <-timer.C:
t.Errorf("test timeout")
t.Error("test timeout: ", got, droppedPointsFound, droppedSeriesFound)
break outer
}

Expand All @@ -184,14 +184,16 @@ outer:
) error {
switch name {
case "counter", "gauge", "correct":
require.InEpsilon(t, 100, point.(*otlpmetrics.DoubleDataPoint).Value, 0.01)
if point.(*otlpmetrics.DoubleDataPoint).Value == 0 {
// OK!
} else {
require.InEpsilon(t, 100, point.(*otlpmetrics.DoubleDataPoint).Value, 0.01)
}
got++
case config.DroppedPointsMetric:
droppedPointsFound = point.(*otlpmetrics.IntDataPoint).Value
case config.DroppedSeriesMetric:
droppedSeriesFound = point.(*otlpmetrics.IntDataPoint).Value
case config.SkippedPointsMetric:
skippedPointsFound = point.(*otlpmetrics.IntDataPoint).Value
case config.FailingMetricsMetric:
labels := point.(*otlpmetrics.IntDataPoint).Labels

Expand Down Expand Up @@ -226,7 +228,6 @@ outer:
// Correct drop summary:
require.Equal(t, int64(2), droppedPointsFound) // from server response
require.Equal(t, int64(1), droppedSeriesFound) // from server response
require.Equal(t, int64(2), skippedPointsFound) // number of cumulative resets

for _, expect := range []string{
// We didn't start the trace service but received data.
Expand Down
16 changes: 9 additions & 7 deletions otlp/queue_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ import (
promconfig "github.com/prometheus/prometheus/config"
"go.opentelemetry.io/otel/metric"
metricsService "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
commonpb "go.opentelemetry.io/proto/otlp/common/v1"
metricspb "go.opentelemetry.io/proto/otlp/metrics/v1"
resourcepb "go.opentelemetry.io/proto/otlp/resource/v1"
commonpb "go.opentelemetry.io/proto/otlp/common/v1"

// gRPC Status protobuf types we may want to see. This type
// is not widely used, but is the most standard way to itemize
Expand Down Expand Up @@ -357,12 +357,14 @@ func (t *QueueManager) calculateDesiredShards() {
// to shardUpdateDuration.
select {
case t.reshardChan <- numShards:
level.Info(t.logger).Log(
"msg", "send queue resharding",
"from", t.numShards,
"to", numShards,
)
t.numShards = numShards
if numShards != t.numShards {
level.Info(t.logger).Log(
"msg", "send queue resharding",
"from", t.numShards,
"to", numShards,
)
t.numShards = numShards
}
default:
level.Warn(t.logger).Log(
"msg", "currently resharding, skipping",
Expand Down
69 changes: 7 additions & 62 deletions retrieval/series_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,7 @@ type seriesGetter interface {
get(ctx context.Context, ref uint64) (*seriesCacheEntry, error)

// Get the reset timestamp and adjusted value for the input sample.
// If false is returned, the sample should be skipped.
getResetAdjusted(ref uint64, t int64, v float64) (int64, float64, bool)

// Attempt to set the new most recent time range for the series with given hash.
// Returns false if it failed, in which case the sample must be discarded.
updateSampleInterval(hash uint64, start, end int64) bool
getResetAdjusted(entry *seriesCacheEntry, timestamp int64, value float64) (reset int64, adjusted float64)
}

// seriesCache holds a mapping from series reference to label set.
Expand All @@ -74,8 +69,6 @@ type seriesCache struct {
mtx sync.Mutex
// Map from series reference to various cached information about it.
entries map[uint64]*seriesCacheEntry
// Map from series hash to most recently written interval.
intervals map[uint64]sampleInterval
// Map for jobs where "instance" has been relabelled
jobInstanceMap map[string]string

Expand Down Expand Up @@ -133,10 +126,6 @@ var (
"sidecar.metadata.lookups",
"Number of Metric series lookups",
)
seriesCacheMissingResetCounter = telemetry.NewCounter(
"sidecar.cumulative.missing_resets",
"Number of Metric series resets that were missing start time, causing gaps a series",
)

errSeriesNotFound = fmt.Errorf("series ref not found")
errSeriesMissingMetadata = fmt.Errorf("series ref missing metadata")
Expand Down Expand Up @@ -169,7 +158,6 @@ func newSeriesCache(
filters: filters,
metaget: metaget,
entries: map[uint64]*seriesCacheEntry{},
intervals: map[uint64]sampleInterval{},
metricsPrefix: metricsPrefix,
renames: renames,
jobInstanceMap: jobInstanceMap,
Expand Down Expand Up @@ -308,69 +296,26 @@ func (c *seriesCache) get(ctx context.Context, ref uint64) (*seriesCacheEntry, e
return e, nil
}

// updateSampleInterval attempts to set the new most recent time range for the series with given hash.
// Returns false if it failed, in which case the sample must be discarded.
func (c *seriesCache) updateSampleInterval(hash uint64, start, end int64) bool {
iv, ok := c.intervals[hash]
if !ok || iv.accepts(start, end) {
c.intervals[hash] = sampleInterval{start, end}
return true
}
return false
}

type sampleInterval struct {
start, end int64
}

func (si *sampleInterval) accepts(start, end int64) bool {
return (start == si.start && end > si.end) || (start > si.start && start >= si.end)
}

// getResetAdjusted takes a sample for a referenced series and returns
// its reset timestamp and adjusted value.
// If the last return argument is false, the sample should be dropped.
func (c *seriesCache) getResetAdjusted(ref uint64, t int64, v float64) (int64, float64, bool) {
c.mtx.Lock()
e, ok := c.entries[ref]
c.mtx.Unlock()
if !ok {
// Note: This is an improbable branch. Every code path
// that reaches this point has already called get(), which
// checks the same error condition. If this could
// really happen, we'd be counting dropped data incorrectly.
doevery.TimePeriod(config.DefaultNoisyLogPeriod, func() {
level.Warn(c.logger).Log(
"msg", "timeseries missing ref",
"ref", ref,
)
})
return 0, 0, false
}
func (c *seriesCache) getResetAdjusted(e *seriesCacheEntry, t int64, v float64) (int64, float64) {
hasReset := e.hasReset
e.hasReset = true
if !hasReset {
e.resetTimestamp = t
e.resetValue = v
e.previousValue = v
// If we just initialized the reset timestamp, this sample should be skipped.
// We don't know the window over which the current cumulative value was built up over.
// The next sample for will be considered from this point onwards.

seriesCacheMissingResetCounter.Add(context.Background(), 1, nil)
return 0, 0, false
// If we just initialized the reset timestamp, record a zero (i.e., reset).
// The next sample will be considered relative to resetValue.
return t, 0
}
if v < e.previousValue {
// If the value has dropped, there's been a reset.
// If the series was reset, set the reset timestamp to be one millisecond
// before the timestamp of the current sample.
// We don't know the true reset time but this ensures the range is non-zero
// while unlikely to conflict with any previous sample.
e.resetValue = 0
e.resetTimestamp = t - 1
e.resetTimestamp = t
}
e.previousValue = v
return e.resetTimestamp, v - e.resetValue, true
return e.resetTimestamp, v - e.resetValue
}

// set the label set for the given reference.
Expand Down
30 changes: 12 additions & 18 deletions retrieval/series_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,40 +457,34 @@ func TestSeriesCache_ResetBehavior(t *testing.T) {
t.Fatalf("unexpected error: %s", err)
}

_, err := c.get(ctx, refID)
entry, err := c.get(ctx, refID)
require.NoError(t, err)

type kase struct {
ts int64
value float64
reset int64
cumulative float64
ok bool
}

const pad = 1
const pad = 0 // OTLP allows zero-width points

// Simulate two resets.
for i, k := range []kase{
{1, 10, 1, 0, false},
{2, 20, 1, 10, true},
{3, 30, 1, 20, true},
{4, 40, 1, 30, true},
{1, 10, 1, 0},
{2, 20, 1, 10},
{3, 30, 1, 20},
{4, 40, 1, 30},

{5, 5, 5 - pad, 5, true},
{6, 10, 5 - pad, 10, true},
{7, 15, 5 - pad, 15, true},
{5, 5, 5 - pad, 5},
{6, 10, 5 - pad, 10},
{7, 15, 5 - pad, 15},

{8, 0, 8 - pad, 0, true},
{9, 10, 8 - pad, 10, true},
{8, 0, 8 - pad, 0},
{9, 10, 8 - pad, 10},
} {
ts, val, ok := c.getResetAdjusted(refID, k.ts, k.value)

require.Equal(t, k.ok, ok, "%d", i)
ts, val := c.getResetAdjusted(entry, k.ts, k.value)

if !ok {
continue
}
require.Equal(t, k.reset, ts, "%d", i)
require.Equal(t, k.cumulative, val, "%d", i)
}
Expand Down
28 changes: 4 additions & 24 deletions retrieval/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,10 @@ func (b *sampleBuilder) next(ctx context.Context, samples []record.RefSample) (*
labels := protoStringLabels(entry.desc.Labels)

var resetTimestamp int64
var ok bool
switch entry.metadata.MetricType {
case textparse.MetricTypeCounter:
var value float64
resetTimestamp, value, ok = b.series.getResetAdjusted(sample.Ref, sample.T, sample.V)
if !ok {
return nil, 0, tailSamples, nil
}
resetTimestamp, value = b.series.getResetAdjusted(entry, sample.T, sample.V)

if entry.metadata.ValueType == config.INT64 {
point.Data = &metric_pb.Metric_IntSum{
Expand All @@ -120,19 +116,13 @@ func (b *sampleBuilder) next(ctx context.Context, samples []record.RefSample) (*
switch entry.suffix {
case metricSuffixSum:
var value float64
resetTimestamp, value, ok = b.series.getResetAdjusted(sample.Ref, sample.T, sample.V)
if !ok {
return nil, 0, tailSamples, nil
}
resetTimestamp, value = b.series.getResetAdjusted(entry, sample.T, sample.V)
point.Data = &metric_pb.Metric_DoubleSum{
DoubleSum: monotonicDoublePoint(labels, resetTimestamp, sample.T, value),
}
case metricSuffixCount:
var value float64
resetTimestamp, value, ok = b.series.getResetAdjusted(sample.Ref, sample.T, sample.V)
if !ok {
return nil, 0, tailSamples, nil
}
resetTimestamp, value = b.series.getResetAdjusted(entry, sample.T, sample.V)
point.Data = &metric_pb.Metric_IntSum{
IntSum: monotonicIntegerPoint(labels, resetTimestamp, sample.T, value),
}
Expand Down Expand Up @@ -173,16 +163,12 @@ func (b *sampleBuilder) next(ctx context.Context, samples []record.RefSample) (*
return nil, 0, tailSamples, errors.Errorf("unexpected metric type %s", entry.metadata.MetricType)
}

if !b.series.updateSampleInterval(entry.hash, resetTimestamp, sample.T) {
return nil, 0, tailSamples, nil
}
if b.maxPointAge > 0 {
when := time.Unix(sample.T/1000, int64(time.Duration(sample.T%1000)*time.Millisecond))
if time.Since(when) > b.maxPointAge {
return nil, 0, tailSamples, nil
}
}

return point, entry.hash, tailSamples, nil
}

Expand Down Expand Up @@ -339,7 +325,7 @@ Loop:
}
lastTimestamp = s.T

rt, v, ok := b.series.getResetAdjusted(s.Ref, s.T, s.V)
rt, v := b.series.getResetAdjusted(e, s.T, s.V)

switch name[len(baseName):] {
case metricSuffixSum:
Expand All @@ -361,12 +347,6 @@ Loop:
default:
break Loop
}
// If a series appeared for the first time, we won't get a valid reset timestamp yet.
// This may happen if the histogram is entirely new or if new series appeared through bucket changes.
// We skip the entire histogram sample in this case.
if !ok {
skip = true
}
consumed++
}
// Don't emit a sample if we explicitly skip it or no reset timestamp was set because the
Expand Down
Loading

0 comments on commit 0b96283

Please sign in to comment.