Skip to content

Commit

Permalink
Create prometheus.source.api component
Browse files Browse the repository at this point in the history
  • Loading branch information
thampiotr committed May 5, 2023
1 parent af52a4e commit f01609a
Show file tree
Hide file tree
Showing 6 changed files with 652 additions and 3 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ Main (unreleased)

### Features
- New Grafana Agent Flow components:
- `loki.source.api` - receive Loki log entries over HTTP (e.g. from other agents). (@thampiotr)
- `prometheus.operator.servicemonitors` discovers ServiceMonitor resources in your Kubernetes cluster and scrape
the targets they reference. (@captncraig, @marctc, @jcreixell)
- `prometheus.source.api` - receive Prometheus metrics over HTTP (e.g. from other agents). (@thampiotr)

- Added new Grafana Agent Flow components:
- `loki.source.api` - receive Loki log entries over HTTP (e.g. from other agents). (@thampiotr)
- Added coalesce function to river stdlib. (@jkroepke)

### Enhancements
Expand Down
1 change: 1 addition & 0 deletions component/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ import (
_ "github.com/grafana/agent/component/prometheus/relabel" // Import prometheus.relabel
_ "github.com/grafana/agent/component/prometheus/remotewrite" // Import prometheus.remote_write
_ "github.com/grafana/agent/component/prometheus/scrape" // Import prometheus.scrape
_ "github.com/grafana/agent/component/prometheus/source/api" // Import prometheus.source.api
_ "github.com/grafana/agent/component/remote/http" // Import remote.http
_ "github.com/grafana/agent/component/remote/s3" // Import remote.s3
)
144 changes: 144 additions & 0 deletions component/prometheus/source/api/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package api

import (
"context"
"fmt"
"github.com/gorilla/mux"
fnet "github.com/grafana/agent/component/common/net"
"net/http"
"reflect"
"sync"

"github.com/go-kit/log/level"
"github.com/grafana/agent/component"
agentprom "github.com/grafana/agent/component/prometheus"
"github.com/grafana/agent/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
)

func init() {
component.Register(component.Registration{
Name: "prometheus.source.api",
Args: Arguments{},
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
return New(opts, args.(Arguments))
},
})
}

type Arguments struct {
Server *fnet.ServerConfig `river:",squash"`
ForwardTo []storage.Appendable `river:"forward_to,attr"`
}

type Component struct {
opts component.Options
handler http.Handler
fanout *agentprom.Fanout
uncheckedCollector *util.UncheckedCollector

updateMut sync.RWMutex
args Arguments
server *fnet.TargetServer
}

func New(opts component.Options, args Arguments) (component.Component, error) {
fanout := agentprom.NewFanout(args.ForwardTo, opts.ID, opts.Registerer)
uncheckedCollector := util.NewUncheckedCollector(nil)
c := &Component{
opts: opts,
handler: remote.NewWriteHandler(opts.Logger, fanout),
fanout: fanout,
uncheckedCollector: uncheckedCollector,
}
opts.Registerer.MustRegister(uncheckedCollector)

if err := c.Update(args); err != nil {
return nil, err
}
return c, nil
}

// Run satisfies the Component interface.
func (c *Component) Run(ctx context.Context) error {
defer func() {
c.updateMut.Lock()
defer c.updateMut.Unlock()
c.shutdownServer()
}()

for range ctx.Done() {
level.Info(c.opts.Logger).Log("msg", "terminating due to context done")
return nil
}
return nil
}

// Update satisfies the Component interface.
func (c *Component) Update(args component.Arguments) error {
newArgs := args.(Arguments)
c.fanout.UpdateChildren(newArgs.ForwardTo)

c.updateMut.Lock()
defer c.updateMut.Unlock()

if !c.serverNeedsUpdate(newArgs) {
c.args = newArgs
return nil
}
c.shutdownServer()

err, s := c.createNewServer(newArgs)
if err != nil {
return err
}
c.server = s

err = c.server.MountAndRun(func(router *mux.Router) {
router.Path("/api/v1/metrics/write").Methods("POST").HandlerFunc(c.handler.ServeHTTP)
})
if err != nil {
return err
}

c.args = newArgs
return nil
}

func (c *Component) createNewServer(args Arguments) (error, *fnet.TargetServer) {
// [server.Server] registers new metrics every time it is created. To
// avoid issues with re-registering metrics with the same name, we create a
// new registry for the server every time we create one, and pass it to an
// unchecked collector to bypass uniqueness checking.
serverRegistry := prometheus.NewRegistry()
c.uncheckedCollector.SetCollector(serverRegistry)

s, err := fnet.NewTargetServer(
c.opts.Logger,
"prometheus_source_api",
serverRegistry,
args.Server,
)
if err != nil {
return fmt.Errorf("failed to create server: %v", err), nil
}

return nil, s
}

// shutdownServer will shut down the currently used server.
// It is not goroutine-safe and an updateMut write lock must be held when it's called.
func (c *Component) shutdownServer() {
if c.server != nil {
c.server.StopAndShutdown()
c.server = nil
}
}

func (c *Component) serverNeedsUpdate(args Arguments) bool {
oldConfig := c.args.Server
newConfig := args.Server
return !reflect.DeepEqual(newConfig, oldConfig)
}
Loading

0 comments on commit f01609a

Please sign in to comment.