-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use staleness markers generated by prometheus, rather than making our own #5062
Use staleness markers generated by prometheus, rather than making our own #5062
Conversation
…disappearing metrics (open-telemetry#3423)" This reverts commit 8b79380.
…elemetry#3414)" This reverts commit cdc1634.
@dashpole thank you for working on this fix! What do you mean by Prometheus already emits staleness markers? AFAIK Prometheus wasn't issuing staleness markers and only the server handles that. A revert of this might bring us back to square 0, but to alleviate currently problems sure it'll work. |
Verified that this passes the prometheus compliance suite:
By "emit", I mean that it calls Append with |
Awesome news, thank you @dashpole, LGTM! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you and LGTM @dashpole!
@dashpole I've crafted for you another end-to-end test that checks that multiple targets won't have staleness markers spuriously generatedas was reported in #4748 which we should also tag as fixed if it works package repro
import (
"context"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/pkg/value"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/processor/batchprocessor"
"go.opentelemetry.io/collector/service"
"go.opentelemetry.io/collector/service/parserprovider"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver"
)
// Test that staleness markers are NOT emitted for replicated services whose scrapes might alternate
// between intervals.
// Ideally after this test, which runs the entire collector and end-to-end scrapes then checks with the
// Prometheus remotewrite exporter should count NO staleness markers emitted.
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/4748
func TestStalenessMarkersReplicatedServicesDoNotProduceStalenessMarkers(t *testing.T) {
if testing.Short() {
t.Skip("This test can take a long time")
}
ctx, cancel := context.WithCancel(context.Background())
createStalenessHandler := func() http.HandlerFunc {
// 1. Setup the server.
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
select {
case <-ctx.Done():
return
default:
}
fmt.Fprintf(rw, `
# HELP jvm_memory_bytes_used Used bytes of a given JVM memory area.
# TYPE jvm_memory_bytes_used gauge
jvm_memory_bytes_used{area="heap"} %.1f`, float64(i))
})
}
scrapeServer1 := httptest.NewServer(createStalenessHandler())
defer scrapeServer1.Close()
scrapeServer2 := httptest.NewServer(createStalenessHandler())
defer scrapeServer2.Close()
serverURL1, err := url.Parse(scrapeServer1.URL)
require.Nil(t, err)
serverURL2, err := url.Parse(scrapeServer2.URL)
require.Nil(t, err)
// 2. Set up the Prometheus RemoteWrite endpoint.
prweUploads := make(chan *prompb.WriteRequest)
prweServer := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
// Snappy decode the uploads.
payload, rerr := ioutil.ReadAll(req.Body)
if err != nil {
panic(rerr)
}
recv := make([]byte, len(payload))
decoded, derr := snappy.Decode(recv, payload)
if err != nil {
panic(derr)
}
writeReq := new(prompb.WriteRequest)
if uerr := proto.Unmarshal(decoded, writeReq); uerr != nil {
panic(uerr)
}
select {
case <-ctx.Done():
return
case prweUploads <- writeReq:
}
}))
defer prweServer.Close()
// 3. Set the OpenTelemetry Prometheus receiver.
config := fmt.Sprintf(`
receivers:
prometheus:
config:
scrape_configs:
- job_name: 'test'
scrape_interval: 2ms
static_configs:
- targets: [%q,%q]
processors:
batch:
exporters:
prometheusremotewrite:
endpoint: %q
insecure: true
service:
pipelines:
metrics:
receivers: [prometheus]
processors: [batch]
exporters: [prometheusremotewrite]`, serverURL1.Host, serverURL2.Host, prweServer.URL)
// 4. Run the OpenTelemetry Collector.
receivers, err := component.MakeReceiverFactoryMap(prometheusreceiver.NewFactory())
require.Nil(t, err)
exporters, err := component.MakeExporterFactoryMap(prometheusremotewriteexporter.NewFactory())
require.Nil(t, err)
processors, err := component.MakeProcessorFactoryMap(batchprocessor.NewFactory())
require.Nil(t, err)
factories := component.Factories{
Receivers: receivers,
Exporters: exporters,
Processors: processors,
}
appSettings := service.CollectorSettings{
Factories: factories,
ParserProvider: parserprovider.NewInMemory(strings.NewReader(config)),
BuildInfo: component.BuildInfo{
Command: "otelcol",
Description: "OpenTelemetry Collector",
Version: "tests",
},
LoggingOptions: []zap.Option{
// Turn off the verbose logging from the collector.
zap.WrapCore(func(zapcore.Core) zapcore.Core {
return zapcore.NewNopCore()
}),
},
}
app, err := service.New(appSettings)
require.Nil(t, err)
go func() {
if err := app.Run(); err != nil {
t.Error(err)
}
}()
// Wait until the collector has actually started.
stateChannel := app.GetStateChannel()
for notYetStarted := true; notYetStarted; {
switch state := <-stateChannel; state {
case service.Running, service.Closed, service.Closing:
notYetStarted = false
}
}
// The OpenTelemetry collector has a data race because it closes
// a channel while
if false {
defer app.Shutdown()
}
// 5. Let's wait on 10 fetches.
var wReqL []*prompb.WriteRequest
for i := 0; i < 10; i++ {
wReqL = append(wReqL, <-prweUploads)
}
defer cancel()
// 6. Assert that we encounter the stale markers aka special NaNs for the various time series.
staleMarkerCount := 0
totalSamples := 0
for i, wReq := range wReqL {
name := fmt.Sprintf("WriteRequest#%d", i)
require.True(t, len(wReq.Timeseries) > 0, "Expecting at least 1 timeSeries for:: "+name)
for j, ts := range wReq.Timeseries {
fullName := fmt.Sprintf("%s/TimeSeries#%d", name, j)
assert.True(t, len(ts.Samples) > 0, "Expected at least 1 Sample in:: "+fullName)
// We are strictly counting series directly included in the scrapes, and no
// internal timeseries like "up" nor "scrape_seconds" etc.
metricName := ""
for _, label := range ts.Labels {
if label.Name == "__name__" {
metricName = label.Value
}
}
if !strings.HasPrefix(metricName, "jvm") {
continue
}
for _, sample := range ts.Samples {
totalSamples++
if value.IsStaleNaN(sample.Value) {
staleMarkerCount++
}
}
}
}
require.True(t, totalSamples > 0, "Expected at least 1 sample")
require.False(t, staleMarkerCount > 0, fmt.Sprintf("Expected no stale markers, got: %d", staleMarkerCount))
} |
@Aneurysm9 @rakyll please review |
Actually already done :) |
@odeke-em feel free to open a PR with the additional test. I'll gladly provide a review. @bogdandrutu would you accept cherry-picks of this to previous release branches (I realize previous i'd have to make the changes by hand in core)? My preference would be for 0.30.0 and on (excl. 0.32, which was retracted). |
The plan for cherrypicks (discussed in the 9/8 sig meeting) is:
|
… own (#5062) (#5170) * Revert "receiver/prometheus: glue and complete staleness marking for disappearing metrics (#3423)" This reverts commit 8b79380. * Revert "receiver/prometheus: add store to track stale metrics (#3414)" This reverts commit cdc1634. * stop dropping staleness markers from prometheus, and fix tests * add staleness end to end test from #3423
…pen-telemetry#5062) * Change `UnmarshalJSON[Traces|Metrics|Logs][Reques|Response]` to be a func `UnmarshalJSON` on the struct. * Rename `[Traces|Metrics|Logs][Reques|Response].Marshal` to `MarshalProto` consistent with JSON/Text standard interfaces. * Change `UnmarshalJSON[Traces|Metrics|Logs][Reques|Response]` to be a func `UnmarshalProto` on the struct. Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
Description:
Commits 1 and 2 reverts most of open-telemetry/opentelemetry-collector#3423, and open-telemetry/opentelemetry-collector#3414. I didn't revert the start timestamp changes because we had already used them for other purposes.
Commit 3 removes the check in our implementation of
Append
, where we drop metrics if they include NaN values. This effectively "enables" collection of staleness markers that had previously been disabled. I re-introduced some of the test changes required for handling staleness markers in our e2e tests from open-telemetry/opentelemetry-collector#3423.Commit 4 reintroduces the same staleness end-to-end test used in the reverted PRs to demonstrate that this PR does, indeed, add staleness markers.
Overall, this replaces our custom implementation of staleness markers with prometheus's implementation. This results in less duplication of work (since prometheus is already tracking staleness), is more likely to stay compliant, and is less likely to have correctness problems.
Link to tracking Issue:
This fixes an issue i've been debugging with GKE's use of the opentelemetry-collector. I believe it is the same underlying issue as #4907, but I haven't confirmed that.
Testing:
This passes the prometheus remote write compliance test suite.
I kept the same staleness e2e test that was previously used. I've also tested this with GKE's use of the collector. In my testing with GKE's use-case, this fixed a problem that I wasn't able to fully root cause. I was able to show that GKE's issue was caused by open-telemetry/opentelemetry-collector#3423, and then realized that we didn't even need to implement staleness markers ourselves.
Documentation:
Changes apply only to internal implementation details.
cc @jmacd @Aneurysm9 @odeke-em