diff --git a/CHANGELOG.md b/CHANGELOG.md index 7784c200fa0e..696c5098f71a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,11 +16,11 @@ Main (unreleased) ### Features - New Grafana Agent Flow components: + - `loki.source.api` - receive Loki log entries over HTTP (e.g. from other agents). (@thampiotr) - `prometheus.operator.servicemonitors` discovers ServiceMonitor resources in your Kubernetes cluster and scrape the targets they reference. (@captncraig, @marctc, @jcreixell) + - `prometheus.source.api` - receive Prometheus metrics over HTTP (e.g. from other agents). (@thampiotr) -- Added new Grafana Agent Flow components: - - `loki.source.api` - receive Loki log entries over HTTP (e.g. from other agents). (@thampiotr) - Added coalesce function to river stdlib. (@jkroepke) ### Enhancements diff --git a/component/all/all.go b/component/all/all.go index d39db794cd12..9e6b6737e32c 100644 --- a/component/all/all.go +++ b/component/all/all.go @@ -78,6 +78,7 @@ import ( _ "github.com/grafana/agent/component/prometheus/relabel" // Import prometheus.relabel _ "github.com/grafana/agent/component/prometheus/remotewrite" // Import prometheus.remote_write _ "github.com/grafana/agent/component/prometheus/scrape" // Import prometheus.scrape + _ "github.com/grafana/agent/component/prometheus/source/api" // Import prometheus.source.api _ "github.com/grafana/agent/component/remote/http" // Import remote.http _ "github.com/grafana/agent/component/remote/s3" // Import remote.s3 ) diff --git a/component/prometheus/source/api/api.go b/component/prometheus/source/api/api.go new file mode 100644 index 000000000000..32d5e3554514 --- /dev/null +++ b/component/prometheus/source/api/api.go @@ -0,0 +1,144 @@ +package api + +import ( + "context" + "fmt" + "github.com/gorilla/mux" + fnet "github.com/grafana/agent/component/common/net" + "net/http" + "reflect" + "sync" + + "github.com/go-kit/log/level" + "github.com/grafana/agent/component" + agentprom "github.com/grafana/agent/component/prometheus" + "github.com/grafana/agent/pkg/util" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/remote" +) + +func init() { + component.Register(component.Registration{ + Name: "prometheus.source.api", + Args: Arguments{}, + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + return New(opts, args.(Arguments)) + }, + }) +} + +type Arguments struct { + Server *fnet.ServerConfig `river:",squash"` + ForwardTo []storage.Appendable `river:"forward_to,attr"` +} + +type Component struct { + opts component.Options + handler http.Handler + fanout *agentprom.Fanout + uncheckedCollector *util.UncheckedCollector + + updateMut sync.RWMutex + args Arguments + server *fnet.TargetServer +} + +func New(opts component.Options, args Arguments) (component.Component, error) { + fanout := agentprom.NewFanout(args.ForwardTo, opts.ID, opts.Registerer) + uncheckedCollector := util.NewUncheckedCollector(nil) + c := &Component{ + opts: opts, + handler: remote.NewWriteHandler(opts.Logger, fanout), + fanout: fanout, + uncheckedCollector: uncheckedCollector, + } + opts.Registerer.MustRegister(uncheckedCollector) + + if err := c.Update(args); err != nil { + return nil, err + } + return c, nil +} + +// Run satisfies the Component interface. +func (c *Component) Run(ctx context.Context) error { + defer func() { + c.updateMut.Lock() + defer c.updateMut.Unlock() + c.shutdownServer() + }() + + for range ctx.Done() { + level.Info(c.opts.Logger).Log("msg", "terminating due to context done") + return nil + } + return nil +} + +// Update satisfies the Component interface. +func (c *Component) Update(args component.Arguments) error { + newArgs := args.(Arguments) + c.fanout.UpdateChildren(newArgs.ForwardTo) + + c.updateMut.Lock() + defer c.updateMut.Unlock() + + if !c.serverNeedsUpdate(newArgs) { + c.args = newArgs + return nil + } + c.shutdownServer() + + err, s := c.createNewServer(newArgs) + if err != nil { + return err + } + c.server = s + + err = c.server.MountAndRun(func(router *mux.Router) { + router.Path("/api/v1/metrics/write").Methods("POST").HandlerFunc(c.handler.ServeHTTP) + }) + if err != nil { + return err + } + + c.args = newArgs + return nil +} + +func (c *Component) createNewServer(args Arguments) (error, *fnet.TargetServer) { + // [server.Server] registers new metrics every time it is created. To + // avoid issues with re-registering metrics with the same name, we create a + // new registry for the server every time we create one, and pass it to an + // unchecked collector to bypass uniqueness checking. + serverRegistry := prometheus.NewRegistry() + c.uncheckedCollector.SetCollector(serverRegistry) + + s, err := fnet.NewTargetServer( + c.opts.Logger, + "prometheus_source_api", + serverRegistry, + args.Server, + ) + if err != nil { + return fmt.Errorf("failed to create server: %v", err), nil + } + + return nil, s +} + +// shutdownServer will shut down the currently used server. +// It is not goroutine-safe and an updateMut write lock must be held when it's called. +func (c *Component) shutdownServer() { + if c.server != nil { + c.server.StopAndShutdown() + c.server = nil + } +} + +func (c *Component) serverNeedsUpdate(args Arguments) bool { + oldConfig := c.args.Server + newConfig := args.Server + return !reflect.DeepEqual(newConfig, oldConfig) +} diff --git a/component/prometheus/source/api/api_test.go b/component/prometheus/source/api/api_test.go new file mode 100644 index 000000000000..311474a56c73 --- /dev/null +++ b/component/prometheus/source/api/api_test.go @@ -0,0 +1,393 @@ +package api + +import ( + "context" + "fmt" + "net/http" + "net/url" + "testing" + "time" + + fnet "github.com/grafana/agent/component/common/net" + + "github.com/golang/snappy" + "github.com/grafana/agent/component" + agentprom "github.com/grafana/agent/component/prometheus" + "github.com/grafana/agent/pkg/util" + "github.com/phayes/freeport" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/config" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/remote" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/protoadapt" +) + +func TestForwardsMetrics(t *testing.T) { + timestamp := time.Now().Add(time.Second).UnixMilli() + input := []prompb.TimeSeries{{ + Labels: []prompb.Label{{Name: "cluster", Value: "local"}, {Name: "foo", Value: "bar"}}, + Samples: []prompb.Sample{ + {Timestamp: timestamp, Value: 12}, + {Timestamp: timestamp + 1, Value: 24}, + {Timestamp: timestamp + 2, Value: 48}, + }, + }, { + Labels: []prompb.Label{{Name: "cluster", Value: "local"}, {Name: "fizz", Value: "buzz"}}, + Samples: []prompb.Sample{ + {Timestamp: timestamp, Value: 191}, + {Timestamp: timestamp + 1, Value: 1337}, + }, + }} + + expected := []testSample{ + {ts: timestamp, val: 12, l: labels.FromStrings("cluster", "local", "foo", "bar")}, + {ts: timestamp + 1, val: 24, l: labels.FromStrings("cluster", "local", "foo", "bar")}, + {ts: timestamp + 2, val: 48, l: labels.FromStrings("cluster", "local", "foo", "bar")}, + {ts: timestamp, val: 191, l: labels.FromStrings("cluster", "local", "fizz", "buzz")}, + {ts: timestamp + 1, val: 1337, l: labels.FromStrings("cluster", "local", "fizz", "buzz")}, + } + + actualSamples := make(chan testSample, 100) + + // Start the component + port, err := freeport.GetFreePort() + require.NoError(t, err) + args := Arguments{ + Server: &fnet.ServerConfig{ + HTTP: &fnet.HTTPConfig{ + ListenAddress: "localhost", + ListenPort: port, + }, + GRPC: testGRPCConfig(t), + }, + ForwardTo: testAppendable(actualSamples), + } + comp, err := New(testOptions(t), args) + require.NoError(t, err) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + go func() { + require.NoError(t, comp.Run(ctx)) + }() + + verifyExpectations(t, input, expected, actualSamples, args, ctx) +} + +func TestUpdate(t *testing.T) { + timestamp := time.Now().Add(time.Second).UnixMilli() + input01 := []prompb.TimeSeries{{ + Labels: []prompb.Label{{Name: "cluster", Value: "local"}, {Name: "foo", Value: "bar"}}, + Samples: []prompb.Sample{ + {Timestamp: timestamp, Value: 12}, + }, + }, { + Labels: []prompb.Label{{Name: "cluster", Value: "local"}, {Name: "fizz", Value: "buzz"}}, + Samples: []prompb.Sample{ + {Timestamp: timestamp, Value: 191}, + }, + }} + expected01 := []testSample{ + {ts: timestamp, val: 12, l: labels.FromStrings("cluster", "local", "foo", "bar")}, + {ts: timestamp, val: 191, l: labels.FromStrings("cluster", "local", "fizz", "buzz")}, + } + + input02 := []prompb.TimeSeries{{ + Labels: []prompb.Label{{Name: "cluster", Value: "local"}, {Name: "foo", Value: "bar"}}, + Samples: []prompb.Sample{ + {Timestamp: timestamp + 1, Value: 24}, + {Timestamp: timestamp + 2, Value: 48}, + }, + }, { + Labels: []prompb.Label{{Name: "cluster", Value: "local"}, {Name: "fizz", Value: "buzz"}}, + Samples: []prompb.Sample{ + {Timestamp: timestamp + 1, Value: 1337}, + }, + }} + expected02 := []testSample{ + {ts: timestamp + 1, val: 24, l: labels.FromStrings("cluster", "local", "foo", "bar")}, + {ts: timestamp + 2, val: 48, l: labels.FromStrings("cluster", "local", "foo", "bar")}, + {ts: timestamp + 1, val: 1337, l: labels.FromStrings("cluster", "local", "fizz", "buzz")}, + } + + actualSamples := make(chan testSample, 100) + + // Start the component + port, err := freeport.GetFreePort() + require.NoError(t, err) + args := Arguments{ + Server: &fnet.ServerConfig{ + HTTP: &fnet.HTTPConfig{ + ListenAddress: "localhost", + ListenPort: port, + }, + GRPC: testGRPCConfig(t), + }, + ForwardTo: testAppendable(actualSamples), + } + comp, err := New(testOptions(t), args) + require.NoError(t, err) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + go func() { + require.NoError(t, comp.Run(ctx)) + }() + + verifyExpectations(t, input01, expected01, actualSamples, args, ctx) + + otherPort, err := freeport.GetFreePort() + require.NoError(t, err) + args = Arguments{ + Server: &fnet.ServerConfig{ + HTTP: &fnet.HTTPConfig{ + ListenAddress: "localhost", + ListenPort: otherPort, + }, + GRPC: testGRPCConfig(t), + }, + ForwardTo: testAppendable(actualSamples), + } + err = comp.Update(args) + require.NoError(t, err) + + verifyExpectations(t, input02, expected02, actualSamples, args, ctx) +} + +func testGRPCConfig(t *testing.T) *fnet.GRPCConfig { + return &fnet.GRPCConfig{ListenAddress: "127.0.0.1", ListenPort: getFreePort(t)} +} + +func TestServerRestarts(t *testing.T) { + port, err := freeport.GetFreePort() + require.NoError(t, err) + + otherPort, err := freeport.GetFreePort() + require.NoError(t, err) + + testCases := []struct { + name string + initialArgs Arguments + newArgs Arguments + shouldRestart bool + }{ + { + name: "identical args require no restart", + initialArgs: Arguments{ + Server: &fnet.ServerConfig{ + HTTP: &fnet.HTTPConfig{ListenAddress: "localhost", ListenPort: port}, + }, + ForwardTo: []storage.Appendable{}, + }, + newArgs: Arguments{ + Server: &fnet.ServerConfig{ + HTTP: &fnet.HTTPConfig{ListenAddress: "localhost", ListenPort: port}, + }, + ForwardTo: []storage.Appendable{}, + }, + shouldRestart: false, + }, + { + name: "forward_to update does not require restart", + initialArgs: Arguments{ + Server: &fnet.ServerConfig{ + HTTP: &fnet.HTTPConfig{ListenAddress: "localhost", ListenPort: port}, + }, + ForwardTo: []storage.Appendable{}, + }, + newArgs: Arguments{ + Server: &fnet.ServerConfig{ + HTTP: &fnet.HTTPConfig{ListenAddress: "localhost", ListenPort: port}, + }, + ForwardTo: testAppendable(nil), + }, + shouldRestart: false, + }, + { + name: "hostname change requires restart", + initialArgs: Arguments{ + Server: &fnet.ServerConfig{ + HTTP: &fnet.HTTPConfig{ListenAddress: "localhost", ListenPort: port}, + }, + ForwardTo: []storage.Appendable{}, + }, + newArgs: Arguments{ + Server: &fnet.ServerConfig{ + HTTP: &fnet.HTTPConfig{ListenAddress: "127.0.0.1", ListenPort: port}, + }, + ForwardTo: testAppendable(nil), + }, + shouldRestart: true, + }, + { + name: "port change requires restart", + initialArgs: Arguments{ + Server: &fnet.ServerConfig{ + HTTP: &fnet.HTTPConfig{ListenAddress: "localhost", ListenPort: port}, + }, + ForwardTo: []storage.Appendable{}, + }, + newArgs: Arguments{ + Server: &fnet.ServerConfig{ + HTTP: &fnet.HTTPConfig{ListenAddress: "localhost", ListenPort: otherPort}, + }, + ForwardTo: testAppendable(nil), + }, + shouldRestart: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + c, err := New(testOptions(t), tc.initialArgs) + require.NoError(t, err) + + serverExit := make(chan error) + go func() { + serverExit <- c.Run(ctx) + }() + + comp := c.(*Component) + waitForServerToBeReady(t, comp.args) + + initialServer := comp.server + require.NotNil(t, initialServer) + + err = c.Update(tc.newArgs) + require.NoError(t, err) + + waitForServerToBeReady(t, comp.args) + + require.NotNil(t, comp.server) + restarted := initialServer != comp.server + + require.Equal(t, tc.shouldRestart, restarted) + + // shut down cleanly to release ports for other tests + cancel() + select { + case err := <-serverExit: + require.NoError(t, err, "unexpected error on server exit") + case <-time.After(5 * time.Second): + t.Fatalf("timed out waiting for server to shut down") + } + }) + } +} + +type testSample struct { + ts int64 + val float64 + l labels.Labels +} + +func waitForServerToBeReady(t *testing.T, args Arguments) { + require.Eventuallyf(t, func() bool { + resp, err := http.Get(fmt.Sprintf( + "http://%v:%d/wrong/path", + args.Server.HTTP.ListenAddress, + args.Server.HTTP.ListenPort, + )) + t.Logf("err: %v, resp: %v", err, resp) + return err == nil && resp.StatusCode == 404 + }, 5*time.Second, 20*time.Millisecond, "server failed to start before timeout") +} + +func verifyExpectations( + t *testing.T, + input []prompb.TimeSeries, + expected []testSample, + actualSamples chan testSample, + args Arguments, + ctx context.Context, +) { + // In case server didn't start yet + waitForServerToBeReady(t, args) + + // Send the input time series to the component + endpoint := fmt.Sprintf( + "http://%s:%d/api/v1/metrics/write", + args.Server.HTTP.ListenAddress, + args.Server.HTTP.ListenPort, + ) + err := request(ctx, endpoint, &prompb.WriteRequest{Timeseries: input}) + require.NoError(t, err) + + // Verify we receive expected metrics + for _, exp := range expected { + select { + case actual := <-actualSamples: + require.Equal(t, exp, actual) + case <-ctx.Done(): + t.Fatalf("test timed out") + } + } + + select { + case unexpected := <-actualSamples: + t.Fatalf("unexpected extra sample received: %v", unexpected) + default: + } +} + +func testAppendable(actualSamples chan testSample) []storage.Appendable { + hookFn := func( + ref storage.SeriesRef, + l labels.Labels, + ts int64, + val float64, + next storage.Appender, + ) (storage.SeriesRef, error) { + + actualSamples <- testSample{ts: ts, val: val, l: l} + return ref, nil + } + + return []storage.Appendable{agentprom.NewInterceptor( + nil, + agentprom.WithAppendHook( + hookFn))} +} + +func request(ctx context.Context, rawRemoteWriteURL string, req *prompb.WriteRequest) error { + remoteWriteURL, err := url.Parse(rawRemoteWriteURL) + if err != nil { + return err + } + + client, err := remote.NewWriteClient("remote-write-client", &remote.ClientConfig{ + URL: &config.URL{URL: remoteWriteURL}, + Timeout: model.Duration(30 * time.Second), + }) + if err != nil { + return err + } + + buf, err := proto.Marshal(protoadapt.MessageV2Of(req)) + if err != nil { + return err + } + + compressed := snappy.Encode(buf, buf) + return client.Store(ctx, compressed) +} + +func testOptions(t *testing.T) component.Options { + return component.Options{ + ID: "loki.source.api.test", + Logger: util.TestFlowLogger(t), + Registerer: prometheus.NewRegistry(), + } +} + +func getFreePort(t *testing.T) int { + p, err := freeport.GetFreePort() + require.NoError(t, err) + return p +} diff --git a/docs/sources/flow/reference/components/loki.source.api.md b/docs/sources/flow/reference/components/loki.source.api.md index e10a24191def..eb71f948d2f8 100644 --- a/docs/sources/flow/reference/components/loki.source.api.md +++ b/docs/sources/flow/reference/components/loki.source.api.md @@ -6,7 +6,7 @@ title: loki.source.api `loki.source.api` receives log entries over HTTP and forwards them to other `loki.*` components. -The HTTP API exposed is compatible with [Loki push API][loki-push-api] and the `logproto` format. This means that other [`loki.write`][loki.write] components can be used as a client and send requests to `loki.source.api` and enables using the Agent as a proxy for logs. +The HTTP API exposed is compatible with [Loki push API][loki-push-api] and the `logproto` format. This means that other [`loki.write`][loki.write] components can be used as a client and send requests to `loki.source.api` which enables using the Agent as a proxy for logs. [loki.write]: {{< relref "./loki.write.md" >}} [loki-push-api]: https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki diff --git a/docs/sources/flow/reference/components/prometheus.source.api.md b/docs/sources/flow/reference/components/prometheus.source.api.md new file mode 100644 index 000000000000..40da3201b78e --- /dev/null +++ b/docs/sources/flow/reference/components/prometheus.source.api.md @@ -0,0 +1,111 @@ +--- +title: prometheus.source.api +--- + +# prometheus.source.api + +`prometheus.source.api` listens for HTTP requests containing Prometheus metric samples and forwards them to other components capable of receiving metrics. + +The HTTP API exposed is compatible with [Prometheus `remote_write` API][prometheus-remote-write-docs]. This means that other [`prometheus.remote_write`][prometheus.remote_write] components can be used as a client and send requests to `prometheus.source.api` which enables using the Agent as a proxy for prometheus metrics. + +[prometheus.remote_write]: {{< relref "./prometheus.remote_write.md" >}} +[prometheus-remote-write-docs]: https://prometheus.io/docs/prometheus/latest/querying/api/#remote-write-receiver + +## Usage + +```river +prometheus.source.api "LABEL" { + http { + listen_address = "LISTEN_ADDRESS" + listen_port = PORT + } + forward_to = RECEIVER_LIST +} +``` + +The component will start HTTP server supporting the following requests: + +- `POST /api/v1/metrics/write` - send metrics to the component, which in turn will be forwarded to the receivers as configured in `forward_to` argument. The request format must match that of [Prometheus `remote_write` API][prometheus-remote-write-docs]. One way to send valid requests to this component is to use another Grafana Agent with a [`prometheus.remote_write`][prometheus.remote_write] component. + +## Arguments + +`prometheus.source.api` supports the following arguments: + + Name | Type | Description | Default | Required +--------------|------------------|---------------------------------------|---------|---------- + `forward_to` | `list(receiver)` | List of receivers to send metrics to. | | yes + +## Blocks + +The following blocks are supported inside the definition of `prometheus.source.api`: + + Hierarchy | Name | Description | Required +-----------|----------|----------------------------------------------------|---------- + `http` | [http][] | Configures the HTTP server that receives requests. | no + +[http]: #http + +### http + +{{< docs/shared lookup="flow/reference/components/loki-server-http.md" source="agent" >}} + +## Exported fields + +`prometheus.source.api` does not export any fields. + +## Component health + +`prometheus.source.api` is reported as unhealthy if it is given an invalid configuration. + +## Debug metrics + +The following are some of the metrics that are exposed when this component is used. Note that the metrics include labels such as `status_code` where relevant, which can be used to measure request success rates. + +* `prometheus_source_api_request_duration_seconds` (histogram): Time (in seconds) spent serving HTTP requests. +* `prometheus_source_api_request_message_bytes` (histogram): Size (in bytes) of messages received in the request. +* `prometheus_source_api_response_message_bytes` (histogram): Size (in bytes) of messages sent in response. +* `prometheus_source_api_tcp_connections` (gauge): Current number of accepted TCP connections. +* `agent_prometheus_fanout_latency` (histogram): Write latency for sending metrics to other components. +* `agent_prometheus_forwarded_samples_total` (counter): Total number of samples sent to downstream components. + +## Example + +This example creates a `prometheus.source.api` component which starts an HTTP server on `0.0.0.0` address and port `9999`. The server receives metrics and forwards them to a `prometheus.remote_write` component which writes these metrics to a specified HTTP endpoint. + +```river +// Receives metrics over HTTP +prometheus.source.api "api" { + http { + listen_address = "0.0.0.0" + listen_port = 9999 + } + forward_to = [prometheus.remote_write.local.receiver] +} + +// Writes metrics to a specified address, e.g. cloud-hosted Prometheus instance +prometheus.remote_write "local" { + endpoint { + url = "http://my-cloud-prometheus-instance.com/api/prom/push" + } +} +``` + +In order to send metrics to the `prometheus.source.api` component defined above, another Grafana Agent can run with the following configuration: + +```river +// Collects metrics of localhost:12345 +prometheus.scrape "agent_self" { + targets = [ + {"__address__" = "localhost:12345", "job" = "agent"}, + ] + forward_to = [prometheus.remote_write.local.receiver] +} + +// Writes metrics to localhost:9999/api/v1/metrics/write - e.g. served by +// the prometheus.source.api component from the example above. +prometheus.remote_write "local" { + endpoint { + url = "http://localhost:9999/api/v1/metrics/write" + } +} +``` \ No newline at end of file