Skip to content

Commit

Permalink
Fix heroku component update not picking up all the changes (#3833)
Browse files Browse the repository at this point in the history
* Fix heroku component update not picking up all the changes

* add changelog
  • Loading branch information
thampiotr authored and clayton-cornell committed Aug 14, 2023
1 parent 8e31cef commit c5641e9
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 42 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,13 @@ v0.33.2 (2023-05-11)
- Fix an issue with the grafana/agent windows docker image entrypoint
not targeting the right location for the config. (@erikbaranowski)

- Fix issue where the the `node_exporter` integration and
- Fix issue where the `node_exporter` integration and
`prometheus.exporter.unix` `diskstat_device_include` component could not set
the allowlist field for the diskstat collector. (@tpaschalis)

- Fix an issue in `loki.source.heroku` where updating the `labels` or `use_incoming_timestamp`
would not take effect. (@thampiotr)

- Flow: Fix an issue within S3 Module where the S3 path was not parsed correctly when the
path consists of a parent directory. (@jastisriradheshyam)

Expand Down
20 changes: 10 additions & 10 deletions component/loki/source/heroku/heroku.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,15 @@ import (
"sync"

"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/relabel"

"github.com/grafana/agent/component"
"github.com/grafana/agent/component/common/loki"
fnet "github.com/grafana/agent/component/common/net"
flow_relabel "github.com/grafana/agent/component/common/relabel"
ht "github.com/grafana/agent/component/loki/source/heroku/internal/herokutarget"
"github.com/grafana/agent/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/relabel"
)

func init() {
Expand Down Expand Up @@ -118,7 +117,11 @@ func (c *Component) Update(args component.Arguments) error {
rcs = flow_relabel.ComponentToPromRelabelConfigs(newArgs.RelabelRules)
}

if listenerChanged(c.args.Server, newArgs.Server) || relabelRulesChanged(c.args.RelabelRules, newArgs.RelabelRules) {
restartRequired := changed(c.args.Server, newArgs.Server) ||
changed(c.args.RelabelRules, newArgs.RelabelRules) ||
changed(c.args.Labels, newArgs.Labels) ||
c.args.UseIncomingTimestamp != newArgs.UseIncomingTimestamp
if restartRequired {
if c.target != nil {
err := c.target.Stop()
if err != nil {
Expand Down Expand Up @@ -166,7 +169,7 @@ func (c *Component) DebugInfo() interface{} {
c.mut.RLock()
defer c.mut.RUnlock()

var res readerDebugInfo = readerDebugInfo{
var res = readerDebugInfo{
Ready: c.target.Ready(),
Address: c.target.HTTPListenAddress(),
}
Expand All @@ -179,9 +182,6 @@ type readerDebugInfo struct {
Address string `river:"address,attr"`
}

func listenerChanged(prev, next *fnet.ServerConfig) bool {
return !reflect.DeepEqual(prev, next)
}
func relabelRulesChanged(prev, next flow_relabel.Rules) bool {
func changed(prev, next any) bool {
return !reflect.DeepEqual(prev, next)
}
188 changes: 163 additions & 25 deletions component/loki/source/heroku/heroku_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,40 +15,27 @@ import (
"github.com/grafana/agent/component/loki/source/heroku/internal/herokutarget"
"github.com/grafana/agent/pkg/util"
"github.com/grafana/regexp"
"github.com/phayes/freeport"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
)

func TestPush(t *testing.T) {
opts := component.Options{
Logger: util.TestFlowLogger(t),
Registerer: prometheus.NewRegistry(),
OnStateChange: func(e component.Exports) {},
}
opts := defaultOptions(t)

ch1, ch2 := make(chan loki.Entry), make(chan loki.Entry)
args := Arguments{
Server: &fnet.ServerConfig{
HTTP: &fnet.HTTPConfig{
ListenAddress: address,
ListenPort: port,
},
// assign random grpc port
GRPC: &fnet.GRPCConfig{ListenPort: 0},
},
UseIncomingTimestamp: false,
Labels: map[string]string{"foo": "bar"},
ForwardTo: []loki.LogsReceiver{ch1, ch2},
RelabelRules: rulesExport,
}

args := testArgsWith(t, func(args *Arguments) {
args.ForwardTo = []loki.LogsReceiver{ch1, ch2}
args.RelabelRules = rulesExport
args.Labels = map[string]string{"foo": "bar"}
})
// Create and run the component.
c, err := New(opts, args)
require.NoError(t, err)

go c.Run(context.Background())
time.Sleep(200 * time.Millisecond)
go func() { require.NoError(t, c.Run(context.Background())) }()
waitForServerToBeReady(t, c)

// Create a Heroku Drain Request and send it to the launched server.
req, err := http.NewRequest(http.MethodPost, getEndpoint(c.target), strings.NewReader(testPayload))
Expand Down Expand Up @@ -78,8 +65,104 @@ func TestPush(t *testing.T) {
}
}

const address = "localhost"
const port = 42421
func TestUpdate_detectsWhenTargetRequiresARestart(t *testing.T) {
httpPort := getFreePort(t)
grpcPort := getFreePort(t)
tests := []struct {
name string
args Arguments
newArgs Arguments
restartRequired bool
}{
{
name: "identical args don't require server restart",
args: testArgsWithPorts(httpPort, grpcPort),
newArgs: testArgsWithPorts(httpPort, grpcPort),
restartRequired: false,
},
{
name: "change in address requires server restart",
args: testArgsWithPorts(httpPort, grpcPort),
newArgs: testArgsWith(t, func(args *Arguments) {
args.Server.HTTP.ListenAddress = "127.0.0.1"
args.Server.HTTP.ListenPort = httpPort
args.Server.GRPC.ListenPort = grpcPort
}),
restartRequired: true,
},
{
name: "change in port requires server restart",
args: testArgsWithPorts(httpPort, grpcPort),
newArgs: testArgsWithPorts(getFreePort(t), grpcPort),
restartRequired: true,
},
{
name: "change in forwardTo does not require server restart",
args: testArgsWithPorts(httpPort, grpcPort),
newArgs: testArgsWith(t, func(args *Arguments) {
args.ForwardTo = []loki.LogsReceiver{}
args.Server.HTTP.ListenPort = httpPort
args.Server.GRPC.ListenPort = grpcPort
}),
restartRequired: false,
},
{
name: "change in labels requires server restart",
args: testArgsWithPorts(httpPort, grpcPort),
newArgs: testArgsWith(t, func(args *Arguments) {
args.Labels = map[string]string{"some": "label"}
args.Server.HTTP.ListenPort = httpPort
args.Server.GRPC.ListenPort = grpcPort
}),
restartRequired: true,
},
{
name: "change in relabel rules requires server restart",
args: testArgsWithPorts(httpPort, grpcPort),
newArgs: testArgsWith(t, func(args *Arguments) {
args.RelabelRules = flow_relabel.Rules{}
args.Server.HTTP.ListenPort = httpPort
args.Server.GRPC.ListenPort = grpcPort
}),
restartRequired: true,
},
{
name: "change in use incoming timestamp requires server restart",
args: testArgsWithPorts(httpPort, grpcPort),
newArgs: testArgsWith(t, func(args *Arguments) {
args.UseIncomingTimestamp = !args.UseIncomingTimestamp
args.Server.HTTP.ListenPort = httpPort
args.Server.GRPC.ListenPort = grpcPort
}),
restartRequired: true,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
comp, err := New(
defaultOptions(t),
tc.args,
)
require.NoError(t, err)
defer func() {
// in order to cleanly shutdown, we want to make sure the server is running first.
waitForServerToBeReady(t, comp)
require.NoError(t, comp.target.Stop())
}()

// in order to cleanly update, we want to make sure the server is running first.
waitForServerToBeReady(t, comp)

targetBefore := comp.target
err = comp.Update(tc.newArgs)
require.NoError(t, err)

restarted := targetBefore != comp.target
require.Equal(t, restarted, tc.restartRequired)
})
}
}

const testPayload = `270 <158>1 2022-06-13T14:52:23.622778+00:00 host heroku router - at=info method=GET path="/" host=cryptic-cliffs-27764.herokuapp.com request_id=59da6323-2bc4-4143-8677-cc66ccfb115f fwd="181.167.87.140" dyno=web.1 connect=0ms service=3ms status=200 bytes=6979 protocol=https
`

Expand Down Expand Up @@ -114,6 +197,61 @@ var rulesExport = flow_relabel.Rules{
},
}

func defaultOptions(t *testing.T) component.Options {
return component.Options{
Logger: util.TestFlowLogger(t),
Registerer: prometheus.NewRegistry(),
OnStateChange: func(e component.Exports) {},
}
}

func testArgsWithPorts(httpPort int, grpcPort int) Arguments {
return Arguments{
Server: &fnet.ServerConfig{
HTTP: &fnet.HTTPConfig{
ListenAddress: "localhost",
ListenPort: httpPort,
},
GRPC: &fnet.GRPCConfig{
ListenAddress: "localhost",
ListenPort: grpcPort,
},
},
ForwardTo: []loki.LogsReceiver{make(chan loki.Entry), make(chan loki.Entry)},
Labels: map[string]string{"foo": "bar", "fizz": "buzz"},
RelabelRules: flow_relabel.Rules{
{
SourceLabels: []string{"tag"},
Regex: flow_relabel.Regexp{Regexp: regexp.MustCompile("ignore")},
Action: flow_relabel.Drop,
},
},
UseIncomingTimestamp: false,
}
}

func testArgsWith(t *testing.T, mutator func(arguments *Arguments)) Arguments {
a := testArgsWithPorts(getFreePort(t), getFreePort(t))
mutator(&a)
return a
}

func waitForServerToBeReady(t *testing.T, comp *Component) {
require.Eventuallyf(t, func() bool {
resp, err := http.Get(fmt.Sprintf(
"http://%v/wrong/url",
comp.target.HTTPListenAddress(),
))
return err == nil && resp.StatusCode == 404
}, 5*time.Second, 20*time.Millisecond, "server failed to start before timeout")
}

func getFreePort(t *testing.T) int {
port, err := freeport.GetFreePort()
require.NoError(t, err)
return port
}

func newRegexp() flow_relabel.Regexp {
re, err := regexp.Compile("^(?:(.*))$")
if err != nil {
Expand All @@ -123,5 +261,5 @@ func newRegexp() flow_relabel.Regexp {
}

func getEndpoint(target *herokutarget.HerokuTarget) string {
return fmt.Sprintf("http://%s:%d%s", address, port, target.DrainEndpoint())
return fmt.Sprintf("http://%s%s", target.HTTPListenAddress(), target.DrainEndpoint())
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,14 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gorilla/mux"
"github.com/grafana/agent/component/common/loki"
fnet "github.com/grafana/agent/component/common/net"
"github.com/grafana/loki/pkg/logproto"
herokuEncoding "github.com/heroku/x/logplex/encoding"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"

"github.com/grafana/agent/component/common/loki"
fnet "github.com/grafana/agent/component/common/net"

"github.com/grafana/loki/pkg/logproto"
)

const ReservedLabelTenantID = "__tenant_id__"
Expand All @@ -48,7 +46,7 @@ type HerokuTarget struct {
server *fnet.TargetServer
}

// NewTarget creates a brand new Heroku Drain target, capable of receiving logs from a Heroku application through an HTTP drain.
// NewHerokuTarget creates a brand new Heroku Drain target, capable of receiving logs from a Heroku application through an HTTP drain.
func NewHerokuTarget(metrics *Metrics, logger log.Logger, handler loki.EntryHandler, relabel []*relabel.Config, config *HerokuDrainTargetConfig, reg prometheus.Registerer) (*HerokuTarget, error) {
wrappedLogger := log.With(logger, "component", "heroku_drain")

Expand Down

0 comments on commit c5641e9

Please sign in to comment.