Skip to content

Commit

Permalink
[elastic-agent] proxy requests to subprocesses to their metrics endpo…
Browse files Browse the repository at this point in the history
…ints (#28165)
  • Loading branch information
stuartnelson3 authored Oct 6, 2021
1 parent d339f89 commit 8ce047e
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 17 deletions.
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- Remove the `--kibana-url` from `install` and `enroll` command. {pull}25529[25529]
- Default to port 80 and 443 for Kibana and Fleet Server connections. {pull}25723[25723]
- Remove deprecated/undocumented IncludeCreatorMetadata setting from kubernetes metadata config options {pull}28006[28006]
- The `/processes/<subprocess>` endpoint proxies to the subprocess's monitoring endpoint, instead of querying its `/stats` endpoint {pull}28165[28165]

==== Bugfixes
- Fix rename *ConfigChange to *PolicyChange to align on changes in the UI. {pull}20779[20779]
Expand Down
52 changes: 35 additions & 17 deletions x-pack/elastic-agent/pkg/core/monitoring/server/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,16 @@ func processHandler(statsHandler func(http.ResponseWriter, *http.Request) error)
return statsHandler(w, r)
}

metricsBytes, statusCode, metricsErr := processMetrics(r.Context(), id)
beatsPath := vars["beatsPath"]
if _, ok := beatsPathAllowlist[beatsPath]; !ok {
return errorfWithStatus(http.StatusNotFound, "endpoint not found")
}

endpoint, err := generateEndpoint(id)
if err != nil {
return err
}
metricsBytes, statusCode, metricsErr := processMetrics(r.Context(), endpoint, beatsPath)
if metricsErr != nil {
return metricsErr
}
Expand All @@ -82,23 +91,14 @@ func processHandler(statsHandler func(http.ResponseWriter, *http.Request) error)
}
}

func processMetrics(ctx context.Context, id string) ([]byte, int, error) {
detail, err := parseID(id)
if err != nil {
return nil, 0, err
}

endpoint := beats.MonitoringEndpoint(detail.spec, artifact.DefaultConfig().OS(), detail.output)
if !strings.HasPrefix(endpoint, httpPlusPrefix) && !strings.HasPrefix(endpoint, "http") {
// add prefix for npipe and unix
endpoint = httpPlusPrefix + endpoint
}

if detail.isMonitoring {
endpoint += "_monitor"
}
var beatsPathAllowlist = map[string]struct{}{
"": struct{}{},
"stats": struct{}{},
"state": struct{}{},
}

hostData, err := parse.ParseURL(endpoint, "http", "", "", "stats", "")
func processMetrics(ctx context.Context, endpoint, path string) ([]byte, int, error) {
hostData, err := parse.ParseURL(endpoint, "http", "", "", path, "")
if err != nil {
return nil, 0, errorWithStatus(http.StatusInternalServerError, err)
}
Expand Down Expand Up @@ -145,6 +145,24 @@ func processMetrics(ctx context.Context, id string) ([]byte, int, error) {
return rb, resp.StatusCode, nil
}

func generateEndpoint(id string) (string, error) {
detail, err := parseID(id)
if err != nil {
return "", err
}

endpoint := beats.MonitoringEndpoint(detail.spec, artifact.DefaultConfig().OS(), detail.output)
if !strings.HasPrefix(endpoint, httpPlusPrefix) && !strings.HasPrefix(endpoint, "http") {
// add prefix for npipe and unix
endpoint = httpPlusPrefix + endpoint
}

if detail.isMonitoring {
endpoint += "_monitor"
}
return endpoint, nil
}

func writeResponse(w http.ResponseWriter, c interface{}) {
bytes, err := json.Marshal(c)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build linux
// +build linux

package server

import (
"context"
"net"
"net/http"
"net/http/httptest"
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestProcessProxyRequest(t *testing.T) {
sock := "/tmp/elastic-agent-test.sock"
defer os.Remove(sock)

endpoint := "http+unix://" + sock
server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Write the path to the client so they can verify the request
// was correct
w.Write([]byte(r.URL.Path))
}))

// Mimic subprocesses and listen on a unix socket
l, err := net.Listen("unix", sock)
require.NoError(t, err)
server.Listener = l
server.Start()
defer server.Close()

for _, path := range []string{"stats", "", "state"} {
respBytes, _, err := processMetrics(context.Background(), endpoint, path)
require.NoError(t, err)
// Verify that the server saw the path we tried to request
assert.Equal(t, "/"+path, string(respBytes))
}
}
2 changes: 2 additions & 0 deletions x-pack/elastic-agent/pkg/core/monitoring/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ func exposeMetricsEndpoint(log *logger.Logger, config *common.Config, ns func(st
if enableProcessStats {
r.HandleFunc("/processes", processesHandler(routesFetchFn))
r.Handle("/processes/{processID}", createHandler(processHandler(statsHandler)))
r.Handle("/processes/{processID}/", createHandler(processHandler(statsHandler)))
r.Handle("/processes/{processID}/{beatsPath}", createHandler(processHandler(statsHandler)))
}

mux := http.NewServeMux()
Expand Down

0 comments on commit 8ce047e

Please sign in to comment.