diff --git a/component/prometheus/source/api/api.go b/component/prometheus/source/api/api.go index 9b30eb5d4138..090aa16e900d 100644 --- a/component/prometheus/source/api/api.go +++ b/component/prometheus/source/api/api.go @@ -3,13 +3,11 @@ package api import ( "context" "fmt" + "github.com/gorilla/mux" + fnet "github.com/grafana/agent/component/common/net" "net/http" "reflect" "sync" - "time" - - "github.com/gorilla/mux" - fnet "github.com/grafana/agent/component/common/net" "github.com/go-kit/log/level" "github.com/grafana/agent/component" @@ -44,7 +42,6 @@ type Component struct { updateMut sync.RWMutex args Arguments server *fnet.TargetServer - healthErr error } func New(opts component.Options, args Arguments) (component.Component, error) { @@ -54,23 +51,18 @@ func New(opts component.Options, args Arguments) (component.Component, error) { opts: opts, handler: remote.NewWriteHandler(opts.Logger, fanout), fanout: fanout, - args: args, uncheckedCollector: uncheckedCollector, } opts.Registerer.MustRegister(uncheckedCollector) - // we do not need to hold the lock here, since `c` is not yet exposed - err := c.createNewServer(args) - if err != nil { + if err := c.Update(args); err != nil { return nil, err } - return c, nil } // Run satisfies the Component interface. func (c *Component) Run(ctx context.Context) error { - go c.runServer() defer func() { c.updateMut.Lock() defer c.updateMut.Unlock() @@ -97,62 +89,25 @@ func (c *Component) Update(args component.Arguments) error { return nil } c.shutdownServer() - c.healthErr = nil - err := c.createNewServer(newArgs) + err, s := c.createNewServer(newArgs) if err != nil { - err := fmt.Errorf("failed to create new server on update: %v", err) - c.healthErr = err return err } - go c.runServer() - - c.args = newArgs - return nil -} - -// CurrentHealth satisfies [component.HealthComponent] interface. -func (c *Component) CurrentHealth() component.Health { - c.updateMut.RLock() - defer c.updateMut.RUnlock() - if c.healthErr != nil { - return component.Health{ - Health: component.HealthTypeUnhealthy, - Message: fmt.Sprintf("component error: %v", c.healthErr), - UpdateTime: time.Now(), - } - } - return component.Health{ - Health: component.HealthTypeHealthy, - Message: "the component is healthy", - UpdateTime: time.Now(), - } -} - -func (c *Component) runServer() { - c.updateMut.RLock() - s := c.server - c.updateMut.RUnlock() - - if s == nil { // already shut down and attempt to run may panic - return - } + c.server = s - // TODO: this was blocking before, now it's not... need to fix concurrency - err := s.MountAndRun(func(router *mux.Router) { + err = c.server.MountAndRun(func(router *mux.Router) { router.Path("/api/v1/metrics/write").Methods("POST").HandlerFunc(c.handler.ServeHTTP) }) - level.Warn(c.opts.Logger).Log("msg", "server Run exited", "error", err) if err != nil { - c.updateMut.Lock() - defer c.updateMut.Unlock() - c.healthErr = err + return err } + + c.args = newArgs + return nil } -// createNewServer will create a new server.Server and assign it to the server field. -// It is not goroutine-safe and an updateMut write lock should be held when it's called. -func (c *Component) createNewServer(args Arguments) error { +func (c *Component) createNewServer(args Arguments) (error, *fnet.TargetServer) { // [server.Server] registers new metrics every time it is created. To // avoid issues with re-registering metrics with the same name, we create a // new registry for the server every time we create one, and pass it to an @@ -167,15 +122,14 @@ func (c *Component) createNewServer(args Arguments) error { args.Server, ) if err != nil { - return fmt.Errorf("failed to create server: %v", err) + return fmt.Errorf("failed to create server: %v", err), nil } - c.server = s - return nil + return nil, s } // shutdownServer will shut down the currently used server. -// It is not goroutine-safe and an updateMut write lock should be held when it's called. +// It is not goroutine-safe and an updateMut write lock must be held when it's called. func (c *Component) shutdownServer() { if c.server != nil { c.server.StopAndShutdown() diff --git a/component/prometheus/source/api/api_test.go b/component/prometheus/source/api/api_test.go index 27721b7908b7..311474a56c73 100644 --- a/component/prometheus/source/api/api_test.go +++ b/component/prometheus/source/api/api_test.go @@ -63,6 +63,7 @@ func TestForwardsMetrics(t *testing.T) { ListenAddress: "localhost", ListenPort: port, }, + GRPC: testGRPCConfig(t), }, ForwardTo: testAppendable(actualSamples), } @@ -77,67 +78,6 @@ func TestForwardsMetrics(t *testing.T) { verifyExpectations(t, input, expected, actualSamples, args, ctx) } -func TestHealthy(t *testing.T) { - timestamp := time.Now().Add(time.Second).UnixMilli() - input := []prompb.TimeSeries{{ - Labels: []prompb.Label{{Name: "cluster", Value: "local"}, {Name: "foo", Value: "bar"}}, - Samples: []prompb.Sample{{Timestamp: timestamp, Value: 12}}, - }} - expected := []testSample{ - {ts: timestamp, val: 12, l: labels.FromStrings("cluster", "local", "foo", "bar")}, - } - actualSamples := make(chan testSample, 100) - - // Start the component - port, err := freeport.GetFreePort() - require.NoError(t, err) - args := Arguments{ - Server: &fnet.ServerConfig{ - HTTP: &fnet.HTTPConfig{ - ListenAddress: "localhost", - ListenPort: port, - }, - }, - ForwardTo: testAppendable(actualSamples), - } - comp, err := New(testOptions(t), args) - require.NoError(t, err) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - go func() { - require.NoError(t, comp.Run(ctx)) - }() - - // verify healthy after starting up - hc := comp.(component.HealthComponent) - require.Equal(t, component.HealthTypeHealthy, hc.CurrentHealth().Health) - require.Contains(t, hc.CurrentHealth().Message, "the component is healthy") - - // send some data and verify still healthy - verifyExpectations(t, input, expected, actualSamples, args, ctx) - require.Equal(t, component.HealthTypeHealthy, hc.CurrentHealth().Health) -} - -func TestUnhealthy(t *testing.T) { - args := Arguments{ - Server: &fnet.ServerConfig{ - HTTP: &fnet.HTTPConfig{ - ListenAddress: "localhost", - ListenPort: 0, - }, - }, - ForwardTo: nil, - } - comp, err := New(testOptions(t), args) - require.NoError(t, err) - - c := comp.(*Component) - c.healthErr = fmt.Errorf("test error") - - require.Equal(t, component.HealthTypeUnhealthy, c.CurrentHealth().Health) - require.Contains(t, c.CurrentHealth().Message, "test error") -} - func TestUpdate(t *testing.T) { timestamp := time.Now().Add(time.Second).UnixMilli() input01 := []prompb.TimeSeries{{ @@ -185,6 +125,7 @@ func TestUpdate(t *testing.T) { ListenAddress: "localhost", ListenPort: port, }, + GRPC: testGRPCConfig(t), }, ForwardTo: testAppendable(actualSamples), } @@ -206,6 +147,7 @@ func TestUpdate(t *testing.T) { ListenAddress: "localhost", ListenPort: otherPort, }, + GRPC: testGRPCConfig(t), }, ForwardTo: testAppendable(actualSamples), } @@ -215,6 +157,10 @@ func TestUpdate(t *testing.T) { verifyExpectations(t, input02, expected02, actualSamples, args, ctx) } +func testGRPCConfig(t *testing.T) *fnet.GRPCConfig { + return &fnet.GRPCConfig{ListenAddress: "127.0.0.1", ListenPort: getFreePort(t)} +} + func TestServerRestarts(t *testing.T) { port, err := freeport.GetFreePort() require.NoError(t, err) @@ -348,6 +294,7 @@ func waitForServerToBeReady(t *testing.T, args Arguments) { args.Server.HTTP.ListenAddress, args.Server.HTTP.ListenPort, )) + t.Logf("err: %v, resp: %v", err, resp) return err == nil && resp.StatusCode == 404 }, 5*time.Second, 20*time.Millisecond, "server failed to start before timeout") } @@ -438,3 +385,9 @@ func testOptions(t *testing.T) component.Options { Registerer: prometheus.NewRegistry(), } } + +func getFreePort(t *testing.T) int { + p, err := freeport.GetFreePort() + require.NoError(t, err) + return p +}