Skip to content

Commit

Permalink
fix the component and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
thampiotr committed May 5, 2023
1 parent e9015c6 commit d91b3b8
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 121 deletions.
74 changes: 14 additions & 60 deletions component/prometheus/source/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@ package api
import (
"context"
"fmt"
"github.com/gorilla/mux"
fnet "github.com/grafana/agent/component/common/net"
"net/http"
"reflect"
"sync"
"time"

"github.com/gorilla/mux"
fnet "github.com/grafana/agent/component/common/net"

"github.com/go-kit/log/level"
"github.com/grafana/agent/component"
Expand Down Expand Up @@ -44,7 +42,6 @@ type Component struct {
updateMut sync.RWMutex
args Arguments
server *fnet.TargetServer
healthErr error
}

func New(opts component.Options, args Arguments) (component.Component, error) {
Expand All @@ -54,23 +51,18 @@ func New(opts component.Options, args Arguments) (component.Component, error) {
opts: opts,
handler: remote.NewWriteHandler(opts.Logger, fanout),
fanout: fanout,
args: args,
uncheckedCollector: uncheckedCollector,
}
opts.Registerer.MustRegister(uncheckedCollector)

// we do not need to hold the lock here, since `c` is not yet exposed
err := c.createNewServer(args)
if err != nil {
if err := c.Update(args); err != nil {
return nil, err
}

return c, nil
}

// Run satisfies the Component interface.
func (c *Component) Run(ctx context.Context) error {
go c.runServer()
defer func() {
c.updateMut.Lock()
defer c.updateMut.Unlock()
Expand All @@ -97,62 +89,25 @@ func (c *Component) Update(args component.Arguments) error {
return nil
}
c.shutdownServer()
c.healthErr = nil

err := c.createNewServer(newArgs)
err, s := c.createNewServer(newArgs)
if err != nil {
err := fmt.Errorf("failed to create new server on update: %v", err)
c.healthErr = err
return err
}
go c.runServer()

c.args = newArgs
return nil
}

// CurrentHealth satisfies [component.HealthComponent] interface.
func (c *Component) CurrentHealth() component.Health {
c.updateMut.RLock()
defer c.updateMut.RUnlock()
if c.healthErr != nil {
return component.Health{
Health: component.HealthTypeUnhealthy,
Message: fmt.Sprintf("component error: %v", c.healthErr),
UpdateTime: time.Now(),
}
}
return component.Health{
Health: component.HealthTypeHealthy,
Message: "the component is healthy",
UpdateTime: time.Now(),
}
}

func (c *Component) runServer() {
c.updateMut.RLock()
s := c.server
c.updateMut.RUnlock()

if s == nil { // already shut down and attempt to run may panic
return
}
c.server = s

// TODO: this was blocking before, now it's not... need to fix concurrency
err := s.MountAndRun(func(router *mux.Router) {
err = c.server.MountAndRun(func(router *mux.Router) {
router.Path("/api/v1/metrics/write").Methods("POST").HandlerFunc(c.handler.ServeHTTP)
})
level.Warn(c.opts.Logger).Log("msg", "server Run exited", "error", err)
if err != nil {
c.updateMut.Lock()
defer c.updateMut.Unlock()
c.healthErr = err
return err
}

c.args = newArgs
return nil
}

// createNewServer will create a new server.Server and assign it to the server field.
// It is not goroutine-safe and an updateMut write lock should be held when it's called.
func (c *Component) createNewServer(args Arguments) error {
func (c *Component) createNewServer(args Arguments) (error, *fnet.TargetServer) {
// [server.Server] registers new metrics every time it is created. To
// avoid issues with re-registering metrics with the same name, we create a
// new registry for the server every time we create one, and pass it to an
Expand All @@ -167,15 +122,14 @@ func (c *Component) createNewServer(args Arguments) error {
args.Server,
)
if err != nil {
return fmt.Errorf("failed to create server: %v", err)
return fmt.Errorf("failed to create server: %v", err), nil
}

c.server = s
return nil
return nil, s
}

// shutdownServer will shut down the currently used server.
// It is not goroutine-safe and an updateMut write lock should be held when it's called.
// It is not goroutine-safe and an updateMut write lock must be held when it's called.
func (c *Component) shutdownServer() {
if c.server != nil {
c.server.StopAndShutdown()
Expand Down
75 changes: 14 additions & 61 deletions component/prometheus/source/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func TestForwardsMetrics(t *testing.T) {
ListenAddress: "localhost",
ListenPort: port,
},
GRPC: testGRPCConfig(t),
},
ForwardTo: testAppendable(actualSamples),
}
Expand All @@ -77,67 +78,6 @@ func TestForwardsMetrics(t *testing.T) {
verifyExpectations(t, input, expected, actualSamples, args, ctx)
}

func TestHealthy(t *testing.T) {
timestamp := time.Now().Add(time.Second).UnixMilli()
input := []prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "cluster", Value: "local"}, {Name: "foo", Value: "bar"}},
Samples: []prompb.Sample{{Timestamp: timestamp, Value: 12}},
}}
expected := []testSample{
{ts: timestamp, val: 12, l: labels.FromStrings("cluster", "local", "foo", "bar")},
}
actualSamples := make(chan testSample, 100)

// Start the component
port, err := freeport.GetFreePort()
require.NoError(t, err)
args := Arguments{
Server: &fnet.ServerConfig{
HTTP: &fnet.HTTPConfig{
ListenAddress: "localhost",
ListenPort: port,
},
},
ForwardTo: testAppendable(actualSamples),
}
comp, err := New(testOptions(t), args)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
go func() {
require.NoError(t, comp.Run(ctx))
}()

// verify healthy after starting up
hc := comp.(component.HealthComponent)
require.Equal(t, component.HealthTypeHealthy, hc.CurrentHealth().Health)
require.Contains(t, hc.CurrentHealth().Message, "the component is healthy")

// send some data and verify still healthy
verifyExpectations(t, input, expected, actualSamples, args, ctx)
require.Equal(t, component.HealthTypeHealthy, hc.CurrentHealth().Health)
}

func TestUnhealthy(t *testing.T) {
args := Arguments{
Server: &fnet.ServerConfig{
HTTP: &fnet.HTTPConfig{
ListenAddress: "localhost",
ListenPort: 0,
},
},
ForwardTo: nil,
}
comp, err := New(testOptions(t), args)
require.NoError(t, err)

c := comp.(*Component)
c.healthErr = fmt.Errorf("test error")

require.Equal(t, component.HealthTypeUnhealthy, c.CurrentHealth().Health)
require.Contains(t, c.CurrentHealth().Message, "test error")
}

func TestUpdate(t *testing.T) {
timestamp := time.Now().Add(time.Second).UnixMilli()
input01 := []prompb.TimeSeries{{
Expand Down Expand Up @@ -185,6 +125,7 @@ func TestUpdate(t *testing.T) {
ListenAddress: "localhost",
ListenPort: port,
},
GRPC: testGRPCConfig(t),
},
ForwardTo: testAppendable(actualSamples),
}
Expand All @@ -206,6 +147,7 @@ func TestUpdate(t *testing.T) {
ListenAddress: "localhost",
ListenPort: otherPort,
},
GRPC: testGRPCConfig(t),
},
ForwardTo: testAppendable(actualSamples),
}
Expand All @@ -215,6 +157,10 @@ func TestUpdate(t *testing.T) {
verifyExpectations(t, input02, expected02, actualSamples, args, ctx)
}

func testGRPCConfig(t *testing.T) *fnet.GRPCConfig {
return &fnet.GRPCConfig{ListenAddress: "127.0.0.1", ListenPort: getFreePort(t)}
}

func TestServerRestarts(t *testing.T) {
port, err := freeport.GetFreePort()
require.NoError(t, err)
Expand Down Expand Up @@ -348,6 +294,7 @@ func waitForServerToBeReady(t *testing.T, args Arguments) {
args.Server.HTTP.ListenAddress,
args.Server.HTTP.ListenPort,
))
t.Logf("err: %v, resp: %v", err, resp)
return err == nil && resp.StatusCode == 404
}, 5*time.Second, 20*time.Millisecond, "server failed to start before timeout")
}
Expand Down Expand Up @@ -438,3 +385,9 @@ func testOptions(t *testing.T) component.Options {
Registerer: prometheus.NewRegistry(),
}
}

func getFreePort(t *testing.T) int {
p, err := freeport.GetFreePort()
require.NoError(t, err)
return p
}

0 comments on commit d91b3b8

Please sign in to comment.