Skip to content

Commit c1482c7

Browse files
hairyhendersonsiavashs
authored andcommitted
feat: add distributed tracing support
Add tracing support using otel to the the following components: - api: extract trace and span IDs from request context - provider: mem put - dispatch: split logic and use better naming - inhibit: source and target traces, mutes, etc. drop metrics - silence: query, expire, mutes - notify: add distributed tracing support to stages and all http requests Note: inhibitor metrics are dropped since we have tracing now and they are not needed. We have not released any version with these metrics so we can drop them safely, this is not a breaking change. This change borrows part of the implementation from #3673 Fixes #3670 Signed-off-by: Dave Henderson <dhenderson@gmail.com> Signed-off-by: Siavash Safi <siavash@cloudflare.com>
1 parent 64ac47b commit c1482c7

File tree

33 files changed

+1370
-962
lines changed

33 files changed

+1370
-962
lines changed

api/api.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/prometheus/common/model"
2929
"github.com/prometheus/common/promslog"
3030
"github.com/prometheus/common/route"
31+
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
3132

3233
apiv2 "github.com/prometheus/alertmanager/api/v2"
3334
"github.com/prometheus/alertmanager/cluster"
@@ -201,7 +202,7 @@ func (api *API) Register(r *route.Router, routePrefix string) *http.ServeMux {
201202

202203
// Update config and resolve timeout of each API. APIv2 also needs
203204
// setAlertStatus to be updated.
204-
func (api *API) Update(cfg *config.Config, setAlertStatus func(model.LabelSet)) {
205+
func (api *API) Update(cfg *config.Config, setAlertStatus func(ctx context.Context, labels model.LabelSet)) {
205206
api.v2.Update(cfg, setAlertStatus)
206207
}
207208

@@ -242,7 +243,7 @@ func (api *API) instrumentHandler(prefix string, h http.Handler) http.Handler {
242243
}
243244
promhttp.InstrumentHandlerDuration(
244245
api.requestDuration.MustCurryWith(prometheus.Labels{"handler": path}),
245-
h,
246+
otelhttp.WithRouteTag(path, h),
246247
).ServeHTTP(w, r)
247248
})
248249
}

api/v2/api.go

Lines changed: 61 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ import (
3333
prometheus_model "github.com/prometheus/common/model"
3434
"github.com/prometheus/common/version"
3535
"github.com/rs/cors"
36+
"go.opentelemetry.io/otel"
37+
"go.opentelemetry.io/otel/codes"
3638

3739
"github.com/prometheus/alertmanager/api/metrics"
3840
open_api_models "github.com/prometheus/alertmanager/api/v2/models"
@@ -54,6 +56,8 @@ import (
5456
"github.com/prometheus/alertmanager/types"
5557
)
5658

59+
var tracer = otel.Tracer("github.com/prometheus/alertmanager/api/v2")
60+
5761
// API represents an Alertmanager API v2.
5862
type API struct {
5963
peer cluster.ClusterPeer
@@ -82,7 +86,7 @@ type (
8286
groupsFn func(context.Context, func(*dispatch.Route) bool, func(*types.Alert, time.Time) bool) (dispatch.AlertGroups, map[prometheus_model.Fingerprint][]string, error)
8387
groupMutedFunc func(routeID, groupKey string) ([]string, bool)
8488
getAlertStatusFn func(prometheus_model.Fingerprint) types.AlertStatus
85-
setAlertStatusFn func(prometheus_model.LabelSet)
89+
setAlertStatusFn func(ctx context.Context, labels prometheus_model.LabelSet)
8690
)
8791

8892
// NewAPI returns a new Alertmanager API v2.
@@ -173,6 +177,9 @@ func (api *API) getStatusHandler(params general_ops.GetStatusParams) middleware.
173177
api.mtx.RLock()
174178
defer api.mtx.RUnlock()
175179

180+
_, span := tracer.Start(params.HTTPRequest.Context(), "api.getStatusHandler")
181+
defer span.End()
182+
176183
original := api.alertmanagerConfig.String()
177184
uptime := strfmt.DateTime(api.uptime)
178185

@@ -229,6 +236,9 @@ func (api *API) getReceiversHandler(params receiver_ops.GetReceiversParams) midd
229236
api.mtx.RLock()
230237
defer api.mtx.RUnlock()
231238

239+
_, span := tracer.Start(params.HTTPRequest.Context(), "api.getReceiversHandler")
240+
defer span.End()
241+
232242
receivers := make([]*open_api_models.Receiver, 0, len(api.alertmanagerConfig.Receivers))
233243
for i := range api.alertmanagerConfig.Receivers {
234244
receivers = append(receivers, &open_api_models.Receiver{Name: &api.alertmanagerConfig.Receivers[i].Name})
@@ -243,11 +253,13 @@ func (api *API) getAlertsHandler(params alert_ops.GetAlertsParams) middleware.Re
243253
// Initialize result slice to prevent api returning `null` when there
244254
// are no alerts present
245255
res = open_api_models.GettableAlerts{}
246-
ctx = params.HTTPRequest.Context()
247256

248257
logger = api.requestLogger(params.HTTPRequest)
249258
)
250259

260+
ctx, span := tracer.Start(params.HTTPRequest.Context(), "api.getAlertsHandler")
261+
defer span.End()
262+
251263
matchers, err := parseFilter(params.Filter)
252264
if err != nil {
253265
logger.Debug("Failed to parse matchers", "err", err)
@@ -274,14 +286,15 @@ func (api *API) getAlertsHandler(params alert_ops.GetAlertsParams) middleware.Re
274286

275287
api.mtx.RLock()
276288
for a := range alerts.Next() {
289+
alert := a.Data
277290
if err = alerts.Err(); err != nil {
278291
break
279292
}
280293
if err = ctx.Err(); err != nil {
281294
break
282295
}
283296

284-
routes := api.route.Match(a.Labels)
297+
routes := api.route.Match(alert.Labels)
285298
receivers := make([]string, 0, len(routes))
286299
for _, r := range routes {
287300
receivers = append(receivers, r.RouteOpts.Receiver)
@@ -291,13 +304,13 @@ func (api *API) getAlertsHandler(params alert_ops.GetAlertsParams) middleware.Re
291304
continue
292305
}
293306

294-
if !alertFilter(a, now) {
307+
if !alertFilter(alert, now) {
295308
continue
296309
}
297310

298-
alert := AlertToOpenAPIAlert(a, api.getAlertStatus(a.Fingerprint()), receivers, nil)
311+
openAlert := AlertToOpenAPIAlert(alert, api.getAlertStatus(alert.Fingerprint()), receivers, nil)
299312

300-
res = append(res, alert)
313+
res = append(res, openAlert)
301314
}
302315
api.mtx.RUnlock()
303316

@@ -315,7 +328,11 @@ func (api *API) getAlertsHandler(params alert_ops.GetAlertsParams) middleware.Re
315328
func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware.Responder {
316329
logger := api.requestLogger(params.HTTPRequest)
317330

318-
alerts := OpenAPIAlertsToAlerts(params.Alerts)
331+
ctx, span := tracer.Start(params.HTTPRequest.Context(), "api.postAlertsHandler")
332+
defer span.End()
333+
334+
alerts := OpenAPIAlertsToAlerts(ctx, params.Alerts)
335+
319336
now := time.Now()
320337

321338
api.mtx.RLock()
@@ -361,13 +378,19 @@ func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware.
361378
}
362379
validAlerts = append(validAlerts, a)
363380
}
364-
if err := api.alerts.Put(validAlerts...); err != nil {
365-
logger.Error("Failed to create alerts", "err", err)
381+
if err := api.alerts.Put(ctx, validAlerts...); err != nil {
382+
message := "Failed to create alerts"
383+
logger.Error(message, "err", err)
384+
span.SetStatus(codes.Error, message)
385+
span.RecordError(err)
366386
return alert_ops.NewPostAlertsInternalServerError().WithPayload(err.Error())
367387
}
368388

369389
if validationErrs.Len() > 0 {
370-
logger.Error("Failed to validate alerts", "err", validationErrs.Error())
390+
message := "Failed to validate alerts"
391+
logger.Error(message, "err", validationErrs.Error())
392+
span.SetStatus(codes.Error, message)
393+
span.RecordError(validationErrs)
371394
return alert_ops.NewPostAlertsBadRequest().WithPayload(validationErrs.Error())
372395
}
373396

@@ -377,6 +400,9 @@ func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware.
377400
func (api *API) getAlertGroupsHandler(params alertgroup_ops.GetAlertGroupsParams) middleware.Responder {
378401
logger := api.requestLogger(params.HTTPRequest)
379402

403+
ctx, span := tracer.Start(params.HTTPRequest.Context(), "api.getAlertGroupsHandler")
404+
defer span.End()
405+
380406
matchers, err := parseFilter(params.Filter)
381407
if err != nil {
382408
logger.Debug("Failed to parse matchers", "err", err)
@@ -407,8 +433,12 @@ func (api *API) getAlertGroupsHandler(params alertgroup_ops.GetAlertGroupsParams
407433
}(receiverFilter)
408434

409435
af := api.alertFilter(matchers, *params.Silenced, *params.Inhibited, *params.Active)
410-
alertGroups, allReceivers, err := api.alertGroups(params.HTTPRequest.Context(), rf, af)
436+
alertGroups, allReceivers, err := api.alertGroups(ctx, rf, af)
411437
if err != nil {
438+
message := "Failed to get alert groups"
439+
logger.Error(message, "err", err)
440+
span.SetStatus(codes.Error, message)
441+
span.RecordError(err)
412442
return alertgroup_ops.NewGetAlertGroupsInternalServerError()
413443
}
414444

@@ -441,12 +471,15 @@ func (api *API) getAlertGroupsHandler(params alertgroup_ops.GetAlertGroupsParams
441471

442472
func (api *API) alertFilter(matchers []*labels.Matcher, silenced, inhibited, active bool) func(a *types.Alert, now time.Time) bool {
443473
return func(a *types.Alert, now time.Time) bool {
474+
ctx, span := tracer.Start(context.Background(), "alertFilter")
475+
defer span.End()
476+
444477
if !a.EndsAt.IsZero() && a.EndsAt.Before(now) {
445478
return false
446479
}
447480

448481
// Set alert's current status based on its label set.
449-
api.setAlertStatus(a.Labels)
482+
api.setAlertStatus(ctx, a.Labels)
450483

451484
// Get alert's current status after seeing if it is suppressed.
452485
status := api.getAlertStatus(a.Fingerprint())
@@ -510,13 +543,16 @@ func matchFilterLabels(matchers []*labels.Matcher, sms map[string]string) bool {
510543
func (api *API) getSilencesHandler(params silence_ops.GetSilencesParams) middleware.Responder {
511544
logger := api.requestLogger(params.HTTPRequest)
512545

546+
ctx, span := tracer.Start(params.HTTPRequest.Context(), "api.getSilencesHandler")
547+
defer span.End()
548+
513549
matchers, err := parseFilter(params.Filter)
514550
if err != nil {
515551
logger.Debug("Failed to parse matchers", "err", err)
516552
return silence_ops.NewGetSilencesBadRequest().WithPayload(err.Error())
517553
}
518554

519-
psils, _, err := api.silences.Query()
555+
psils, _, err := api.silences.Query(ctx)
520556
if err != nil {
521557
logger.Error("Failed to get silences", "err", err)
522558
return silence_ops.NewGetSilencesInternalServerError().WithPayload(err.Error())
@@ -606,7 +642,10 @@ func CheckSilenceMatchesFilterLabels(s *silencepb.Silence, matchers []*labels.Ma
606642
func (api *API) getSilenceHandler(params silence_ops.GetSilenceParams) middleware.Responder {
607643
logger := api.requestLogger(params.HTTPRequest)
608644

609-
sils, _, err := api.silences.Query(silence.QIDs(params.SilenceID.String()))
645+
ctx, span := tracer.Start(params.HTTPRequest.Context(), "api.getSilenceHandler")
646+
defer span.End()
647+
648+
sils, _, err := api.silences.Query(ctx, silence.QIDs(params.SilenceID.String()))
610649
if err != nil {
611650
logger.Error("Failed to get silence by id", "err", err, "id", params.SilenceID.String())
612651
return silence_ops.NewGetSilenceInternalServerError().WithPayload(err.Error())
@@ -629,8 +668,11 @@ func (api *API) getSilenceHandler(params silence_ops.GetSilenceParams) middlewar
629668
func (api *API) deleteSilenceHandler(params silence_ops.DeleteSilenceParams) middleware.Responder {
630669
logger := api.requestLogger(params.HTTPRequest)
631670

671+
ctx, span := tracer.Start(params.HTTPRequest.Context(), "api.deleteSilenceHandler")
672+
defer span.End()
673+
632674
sid := params.SilenceID.String()
633-
if err := api.silences.Expire(sid); err != nil {
675+
if err := api.silences.Expire(ctx, sid); err != nil {
634676
logger.Error("Failed to expire silence", "err", err)
635677
if errors.Is(err, silence.ErrNotFound) {
636678
return silence_ops.NewDeleteSilenceNotFound()
@@ -643,6 +685,9 @@ func (api *API) deleteSilenceHandler(params silence_ops.DeleteSilenceParams) mid
643685
func (api *API) postSilencesHandler(params silence_ops.PostSilencesParams) middleware.Responder {
644686
logger := api.requestLogger(params.HTTPRequest)
645687

688+
ctx, span := tracer.Start(params.HTTPRequest.Context(), "api.postSilencesHandler")
689+
defer span.End()
690+
646691
sil, err := PostableSilenceToProto(params.Silence)
647692
if err != nil {
648693
logger.Error("Failed to marshal silence to proto", "err", err)
@@ -663,7 +708,7 @@ func (api *API) postSilencesHandler(params silence_ops.PostSilencesParams) middl
663708
return silence_ops.NewPostSilencesBadRequest().WithPayload(msg)
664709
}
665710

666-
if err = api.silences.Set(sil); err != nil {
711+
if err = api.silences.Set(ctx, sil); err != nil {
667712
logger.Error("Failed to create silence", "err", err)
668713
if errors.Is(err, silence.ErrNotFound) {
669714
return silence_ops.NewPostSilencesNotFound().WithPayload(err.Error())

api/v2/api_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -160,16 +160,16 @@ func TestDeleteSilenceHandler(t *testing.T) {
160160
EndsAt: now.Add(time.Hour),
161161
UpdatedAt: now,
162162
}
163-
require.NoError(t, silences.Set(unexpiredSil))
163+
require.NoError(t, silences.Set(t.Context(), unexpiredSil))
164164

165165
expiredSil := &silencepb.Silence{
166166
Matchers: []*silencepb.Matcher{m},
167167
StartsAt: now.Add(-time.Hour),
168168
EndsAt: now.Add(time.Hour),
169169
UpdatedAt: now,
170170
}
171-
require.NoError(t, silences.Set(expiredSil))
172-
require.NoError(t, silences.Expire(expiredSil.Id))
171+
require.NoError(t, silences.Set(t.Context(), expiredSil))
172+
require.NoError(t, silences.Expire(t.Context(), expiredSil.Id))
173173

174174
for i, tc := range []struct {
175175
sid string
@@ -222,16 +222,16 @@ func TestPostSilencesHandler(t *testing.T) {
222222
EndsAt: now.Add(time.Hour),
223223
UpdatedAt: now,
224224
}
225-
require.NoError(t, silences.Set(unexpiredSil))
225+
require.NoError(t, silences.Set(t.Context(), unexpiredSil))
226226

227227
expiredSil := &silencepb.Silence{
228228
Matchers: []*silencepb.Matcher{m},
229229
StartsAt: now.Add(-time.Hour),
230230
EndsAt: now.Add(time.Hour),
231231
UpdatedAt: now,
232232
}
233-
require.NoError(t, silences.Set(expiredSil))
234-
require.NoError(t, silences.Expire(expiredSil.Id))
233+
require.NoError(t, silences.Set(t.Context(), expiredSil))
234+
require.NoError(t, silences.Expire(t.Context(), expiredSil.Id))
235235

236236
t.Run("Silences CRUD", func(t *testing.T) {
237237
for i, tc := range []struct {

api/v2/compat.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package v2
1515

1616
import (
17+
"context"
1718
"fmt"
1819
"time"
1920

@@ -170,7 +171,10 @@ func AlertToOpenAPIAlert(alert *types.Alert, status types.AlertStatus, receivers
170171
}
171172

172173
// OpenAPIAlertsToAlerts converts open_api_models.PostableAlerts to []*types.Alert.
173-
func OpenAPIAlertsToAlerts(apiAlerts open_api_models.PostableAlerts) []*types.Alert {
174+
func OpenAPIAlertsToAlerts(ctx context.Context, apiAlerts open_api_models.PostableAlerts) []*types.Alert {
175+
_, span := tracer.Start(ctx, "OpenAPIAlertsToAlerts")
176+
defer span.End()
177+
174178
alerts := []*types.Alert{}
175179
for _, apiAlert := range apiAlerts {
176180
alert := types.Alert{

0 commit comments

Comments
 (0)