Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrument ADX Mon components with more metrics #101

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion alerter/alert/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"io"
"net/http"
"time"

"github.com/Azure/adx-mon/metrics"
)

var (
Expand All @@ -31,7 +33,8 @@ func NewClient(timeout time.Duration) (*Client, error) {
t.IdleConnTimeout = time.Minute

httpClient := &http.Client{
Timeout: timeout,
Timeout: timeout,
Transport: metrics.NewRoundTripper(t),
steeling marked this conversation as resolved.
Show resolved Hide resolved
}

return &Client{
Expand Down
1 change: 1 addition & 0 deletions alerter/engine/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func (e *Executor) HandlerFn(ctx context.Context, endpoint string, qc *QueryCont

logger.Errorf("Failed to create Notification: %s\n", err)
metrics.NotificationUnhealthy.WithLabelValues(qc.Rule.Namespace, qc.Rule.Name).Set(1)
metrics.AlerterErrors.WithLabelValues(qc.Rule.Name, metrics.CreateAlertNotificationError).Inc()
return nil
}
metrics.NotificationUnhealthy.WithLabelValues(qc.Rule.Namespace, qc.Rule.Name).Set(0)
Expand Down
10 changes: 7 additions & 3 deletions alerter/engine/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func (e *worker) ExecuteQuery(ctx context.Context) {
queryContext, err := NewQueryContext(e.rule, start, e.Region)
if err != nil {
logger.Errorf("Failed to wrap query=%s/%s on %s/%s: %s", e.rule.Namespace, e.rule.Name, e.kustoClient.Endpoint(e.rule.Database), e.rule.Database, err)
metrics.AlerterErrors.WithLabelValues(e.rule.Name, metrics.CreateQueryContextError).Inc()
return
}

Expand Down Expand Up @@ -115,14 +116,16 @@ func (e *worker) ExecuteQuery(ctx context.Context) {

if !isUserError(err) {
metrics.QueryHealth.WithLabelValues(e.rule.Namespace, e.rule.Name).Set(0)
metrics.AlerterErrors.WithLabelValues(e.rule.Name, metrics.AlerterQueryInternalError).Inc()
return
}

metrics.AlerterErrors.WithLabelValues(e.rule.Name, metrics.AlerterQueryUserError).Inc()

summary, err := KustoQueryLinks(fmt.Sprintf("This query is failing to execute:<br/><br/><pre>%s</pre><br/><br/>", err.Error()), queryContext.Query, e.kustoClient.Endpoint(e.rule.Database), e.rule.Database)
if err != nil {
logger.Errorf("Failed to send failure alert for %s/%s: %s", e.rule.Namespace, e.rule.Name, err)
metrics.NotificationUnhealthy.WithLabelValues(e.rule.Namespace, e.rule.Name).Set(1)
return
logger.Error("Failed to generate kusto deep links for %s/%s: %s", e.rule.Namespace, e.rule.Name, err)
steeling marked this conversation as resolved.
Show resolved Hide resolved
summary = fmt.Sprintf("Rule %s is failing to execute against endpoint: %s, database: %s with error: %s", e.rule.Name, e.kustoClient.Endpoint(e.rule.Database), e.rule.Database, err.Error())
}

if err := e.AlertCli.Create(ctx, e.AlertAddr, alert.Alert{
Expand All @@ -135,6 +138,7 @@ func (e *worker) ExecuteQuery(ctx context.Context) {
}); err != nil {
logger.Errorf("Failed to send failure alert for %s/%s: %s", e.rule.Namespace, e.rule.Name, err)
// Only set the notification as failed if we are not able to send a failure alert directly.
metrics.AlerterErrors.WithLabelValues(e.rule.Name, metrics.CreateAlertNotificationError).Add(1)
metrics.NotificationUnhealthy.WithLabelValues(e.rule.Namespace, e.rule.Name).Set(1)
return
}
Expand Down
2 changes: 2 additions & 0 deletions alerter/rules/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

alertrulev1 "github.com/Azure/adx-mon/api/v1"
"github.com/Azure/adx-mon/metrics"
"github.com/Azure/adx-mon/pkg/logger"

"github.com/Azure/azure-kusto-go/kusto"
Expand Down Expand Up @@ -130,6 +131,7 @@ func (s *Store) reloadPeriodically() {
rules, err := s.reloadRules()
if err != nil {
logger.Errorf("failed to reload rules: %s", err)
metrics.AlerterErrors.WithLabelValues("", metrics.AlerterReloadRulesError).Add(1)
continue
}

Expand Down
10 changes: 7 additions & 3 deletions cmd/ingestor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/Azure/adx-mon/ingestor"
"github.com/Azure/adx-mon/ingestor/adx"
"github.com/Azure/adx-mon/ingestor/storage"
"github.com/Azure/adx-mon/metrics"
"github.com/Azure/adx-mon/pkg/logger"
"github.com/Azure/adx-mon/pkg/tls"
"github.com/Azure/azure-kusto-go/kusto"
Expand Down Expand Up @@ -338,8 +339,8 @@ func realMain(ctx *cli.Context) error {

logger.Infof("Listening at %s", ":9090")
mux := http.NewServeMux()
mux.HandleFunc("/transfer", svc.HandleTransfer)
mux.HandleFunc("/receive", svc.HandleReceive)
mux.HandleFunc("/transfer", metrics.HandlerFuncRecorder(metrics.IngestorSubsystem, svc.HandleTransfer))
mux.HandleFunc("/receive", metrics.HandlerFuncRecorder(metrics.IngestorSubsystem, svc.HandleReceive))
mux.HandleFunc(logsv1connect.LogsServiceExportProcedure, svc.HandleLogs)

logger.Infof("Metrics Listening at %s", ":9091")
Expand Down Expand Up @@ -449,7 +450,10 @@ func newKustoClient(endpoint string) (ingest.QueryClient, error) {
kcsb := kusto.NewConnectionStringBuilder(endpoint)
kcsb.WithDefaultAzureCredential()

return kusto.New(kcsb)
httpClient := &http.Client{
Transport: metrics.NewRoundTripper(http.DefaultTransport),
}
return kusto.New(kcsb, kusto.WithHttpClient(httpClient))
}

func newUploader(kustoCli ingest.QueryClient, database, storageDir string, concurrentUploads int, defaultMapping storage.SchemaMapping) (adx.Uploader, error) {
Expand Down
11 changes: 7 additions & 4 deletions collector/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,24 @@ package collector
import (
"compress/gzip"
"fmt"
prom_model "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"net"
"net/http"
"time"

"github.com/Azure/adx-mon/metrics"
prom_model "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
)

var dialer = &net.Dialer{
Timeout: 5 * time.Second,
}

var transport = &http.Transport{
var transport = metrics.NewRoundTripper(&http.Transport{
DialContext: dialer.DialContext,
TLSHandshakeTimeout: 5 * time.Second,
}
})

var httpClient = &http.Client{
Timeout: time.Second * 10,
Transport: transport,
Expand Down
17 changes: 3 additions & 14 deletions collector/otlp/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"net/http"
"net/url"
"slices"
"strconv"
"sync"

"buf.build/gen/go/opentelemetry/opentelemetry/bufbuild/connect-go/opentelemetry/proto/collector/logs/v1/logsv1connect"
Expand All @@ -19,7 +18,6 @@ import (
"github.com/Azure/adx-mon/pkg/logger"
"github.com/Azure/adx-mon/pkg/pool"
connect_go "github.com/bufbuild/connect-go"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/http2"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -79,14 +77,14 @@ func LogsProxyHandler(ctx context.Context, endpoints []string, insecureSkipVerif
},
}
}
httpClient.Transport = metrics.NewRoundTripper(httpClient.Transport)

// Create our gRPC client used to upgrade from HTTP1 to HTTP2 via gRPC and proxy to the OTLP endpoint
rpcClients[endpoint] = logsv1connect.NewLogsServiceClient(httpClient, grpcEndpoint, connect_go.WithGRPC())
}
bufs := pool.NewBytes(1024 * 1024)

return func(w http.ResponseWriter, r *http.Request) {
m := metrics.RequestsReceived.MustCurryWith(prometheus.Labels{"path": "/logs"})
return metrics.HandlerFuncRecorder(metrics.CollectorSubsystem, func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()

switch r.Header.Get("Content-Type") {
Expand All @@ -103,21 +101,18 @@ func LogsProxyHandler(ctx context.Context, endpoints []string, insecureSkipVerif
if err != nil {
logger.Errorf("Failed to read request body: %v", err)
w.WriteHeader(http.StatusInternalServerError)
m.WithLabelValues(strconv.Itoa(http.StatusInternalServerError)).Inc()
steeling marked this conversation as resolved.
Show resolved Hide resolved
return
}
if n < int(r.ContentLength) {
logger.Warnf("Short read %d < %d", n, r.ContentLength)
w.WriteHeader(http.StatusInternalServerError)
m.WithLabelValues(strconv.Itoa(http.StatusInternalServerError)).Inc()
return
}

msg := &v1.ExportLogsServiceRequest{}
if err := proto.Unmarshal(b, msg); err != nil {
logger.Errorf("Failed to unmarshal request body: %v", err)
w.WriteHeader(http.StatusBadRequest)
m.WithLabelValues(strconv.Itoa(http.StatusBadRequest)).Inc()
return
}

Expand Down Expand Up @@ -174,7 +169,6 @@ func LogsProxyHandler(ctx context.Context, endpoints []string, insecureSkipVerif
if err != nil {
logger.Errorf("Failed to marshal response: %v", err)
w.WriteHeader(http.StatusInternalServerError)
m.WithLabelValues(strconv.Itoa(http.StatusInternalServerError)).Inc()
return
}
} else {
Expand All @@ -184,7 +178,6 @@ func LogsProxyHandler(ctx context.Context, endpoints []string, insecureSkipVerif
if err != nil {
logger.Errorf("Failed to marshal response: %v", err)
w.WriteHeader(http.StatusInternalServerError)
m.WithLabelValues(strconv.Itoa(http.StatusInternalServerError)).Inc()
return
}
}
Expand All @@ -193,21 +186,17 @@ func LogsProxyHandler(ctx context.Context, endpoints []string, insecureSkipVerif
w.WriteHeader(statusCode)
w.Write(respBodyBytes)

m.WithLabelValues(strconv.Itoa(statusCode)).Inc()

case "application/json":
// We're receiving JSON, so we need to unmarshal the JSON
// into an OTLP protobuf, then use gRPC to send the OTLP
// protobuf to the OTLP endpoint
w.WriteHeader(http.StatusNotImplemented)
m.WithLabelValues(strconv.Itoa(http.StatusNotImplemented)).Inc()

default:
logger.Errorf("Unsupported Content-Type: %s", r.Header.Get("Content-Type"))
w.WriteHeader(http.StatusBadRequest)
m.WithLabelValues(strconv.Itoa(http.StatusBadRequest)).Inc()
}
}
})
}

func modifyAttributes(msg *v1.ExportLogsServiceRequest, add []*commonv1.KeyValue, lift map[string]struct{}) (*v1.ExportLogsServiceRequest, int) {
Expand Down
1 change: 1 addition & 0 deletions collector/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ func (s *Service) scrapeTargets() {
fams, err := FetchMetrics(target.Addr)
if err != nil {
logger.Errorf("Failed to scrape metrics for %s: %s", target, err.Error())
metrics.CollectorErrors.WithLabelValues(metrics.CollectorScrapeError).Add(1)
continue
}

Expand Down
5 changes: 5 additions & 0 deletions ingestor/adx/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/Azure/adx-mon/ingestor/cluster"
"github.com/Azure/adx-mon/ingestor/storage"
"github.com/Azure/adx-mon/metrics"
"github.com/Azure/adx-mon/pkg/logger"
"github.com/Azure/adx-mon/pkg/service"
"github.com/Azure/adx-mon/pkg/wal"
Expand Down Expand Up @@ -202,6 +203,7 @@ func (n *uploader) upload(ctx context.Context) error {
database, table, _, err = wal.ParseFilename(path)
if err != nil {
logger.Errorf("Failed to parse file: %s", err.Error())
metrics.IngestorWalErrors.WithLabelValues(metrics.ParseFilenameError).Inc()
continue
}

Expand All @@ -211,6 +213,7 @@ func (n *uploader) upload(ctx context.Context) error {
continue
} else if err != nil {
logger.Errorf("Failed to open file: %s", err.Error())
metrics.IngestorWalErrors.WithLabelValues(metrics.OpenFileError).Inc()
continue
}

Expand Down Expand Up @@ -240,12 +243,14 @@ func (n *uploader) upload(ctx context.Context) error {
now := time.Now()
if err := n.uploadReader(mr, database, table); err != nil {
logger.Errorf("Failed to upload file: %s", err.Error())
metrics.IngestorUploadErrors.Inc()
return
}
logger.Infof("Uploaded %v duration=%s", paths, time.Since(now).String())
for _, f := range paths {
if err := os.RemoveAll(f); err != nil {
logger.Errorf("Failed to remove file: %s", err.Error())
metrics.IngestorWalErrors.WithLabelValues(metrics.DeleteFileError).Inc()
}
}
}()
Expand Down
Loading