Skip to content

Commit

Permalink
Add delay in the agent between each metrics push
Browse files Browse the repository at this point in the history
Signed-off-by: Kévin Lambert <kevin.lambert.ca@gmail.com>
  • Loading branch information
knlambert committed Apr 11, 2022
1 parent a9b94f4 commit 12d4fca
Show file tree
Hide file tree
Showing 9 changed files with 555 additions and 6 deletions.
45 changes: 40 additions & 5 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ type Agent struct {

// Field selector for the k8s resources that the agent watches
agentWatchFieldSelector string

// A mutex related to the metrics endpoint action, to avoid concurrent (and useless) pushes.
metricsPushMutex sync.Mutex
// Timestamp to keep in memory to Prevent from making too many requests to the Ambassador
// Cloud API.
metricsRelayDeadline time.Time
}

func getEnvWithDefault(envVarKey string, defaultValue string) string {
Expand Down Expand Up @@ -149,6 +155,7 @@ func NewAgent(directiveHandler DirectiveHandler, rolloutsGetterFactory rolloutsG
directiveHandler: directiveHandler,
reportRunning: atomicBool{value: false},
agentWatchFieldSelector: getEnvWithDefault("AGENT_WATCH_FIELD_SELECTOR", "metadata.namespace!=kube-system"),
metricsRelayDeadline: time.Now(),
}
}

Expand Down Expand Up @@ -609,15 +616,33 @@ func (a *Agent) ProcessSnapshot(ctx context.Context, snapshot *snapshotTypes.Sna

var allowedMetricsSuffixes = []string{"upstream_rq_total", "upstream_rq_time", "upstream_rq_5xx"}

func (a *Agent) MetricsRelayHandler(logCtx context.Context, in *envoyMetrics.StreamMetricsMessage) {
// MetricsRelayHandler is invoked as a callback when the agent receive metrics from Envoy (sink).
func (a *Agent) MetricsRelayHandler(
logCtx context.Context,
in *envoyMetrics.StreamMetricsMessage,
) {
a.metricsPushMutex.Lock()
defer a.metricsPushMutex.Unlock()

metrics := in.GetEnvoyMetrics()
dlog.Debugf(logCtx, "received %d metrics", len(metrics))
metricCount := len(metrics)

if !time.Now().After(a.metricsRelayDeadline) {
dlog.Debugf(logCtx, "Drop %d metric(s); next push scheduled for %s",
metricCount, a.metricsRelayDeadline.String())
return
}

if a.comm != nil && !a.reportingStopped {

dlog.Infof(logCtx, "Received %d metric(s)", metricCount)

a.ambassadorAPIKeyMutex.Lock()
apikey := a.ambassadorAPIKey
a.ambassadorAPIKeyMutex.Unlock()

outMetrics := make([]*io_prometheus_client.MetricFamily, 0, len(metrics))

for _, metricFamily := range metrics {
for _, suffix := range allowedMetricsSuffixes {
if strings.HasSuffix(metricFamily.GetName(), suffix) {
Expand All @@ -631,9 +656,19 @@ func (a *Agent) MetricsRelayHandler(logCtx context.Context, in *envoyMetrics.Str
Identity: a.agentID,
EnvoyMetrics: outMetrics,
}
dlog.Debugf(logCtx, "relaying %d metrics", len(outMessage.GetEnvoyMetrics()))
if err := a.comm.StreamMetrics(logCtx, outMessage, apikey); err != nil {
dlog.Errorf(logCtx, "Error streaming metrics: %+v", err)

if relayedMetricCount := len(outMessage.GetEnvoyMetrics()); relayedMetricCount > 0 {

dlog.Infof(logCtx, "Relaying %d metric(s)", relayedMetricCount)

if err := a.comm.StreamMetrics(logCtx, outMessage, apikey); err != nil {
dlog.Errorf(logCtx, "error streaming metric(s): %+v", err)
}

a.metricsRelayDeadline = time.Now().Add(defaultMinReportPeriod)

dlog.Infof(logCtx, "Next metrics relay scheduled for %s",
a.metricsRelayDeadline.UTC().String())
}
}
}
Expand Down
98 changes: 98 additions & 0 deletions pkg/agent/agent_metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package agent

import (
"context"
"github.com/datawire/ambassador/v2/pkg/api/agent"
envoyMetrics "github.com/datawire/ambassador/v2/pkg/api/envoy/service/metrics/v3"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"testing"
"time"
)

var (
counterType = io_prometheus_client.MetricType_COUNTER
acceptedMetric = &io_prometheus_client.MetricFamily{
Name: StrToPointer("cluster.apple_prod_443.upstream_rq_total"),
Type: &counterType,
Metric: []*io_prometheus_client.Metric{
{
Counter: &io_prometheus_client.Counter{
Value: Float64ToPointer(42),
},
TimestampMs: Int64ToPointer(time.Now().Unix() * 1000),
},
},
}
ignoredMetric = &io_prometheus_client.MetricFamily{
Name: StrToPointer("cluster.apple_prod_443.metric_to_ignore"),
Type: &counterType,
Metric: []*io_prometheus_client.Metric{
{
Counter: &io_prometheus_client.Counter{
Value: Float64ToPointer(42),
},
TimestampMs: Int64ToPointer(time.Now().Unix() * 1000),
},
},
}
)

type AgentMetricsSuite struct {
suite.Suite

clientMock *MockClient

stubbedAgent *Agent
}

func (s *AgentMetricsSuite) SetupTest() {
s.clientMock = &MockClient{}

s.stubbedAgent = &Agent{
metricsRelayDeadline: time.Time{},
comm: &RPCComm{
client: s.clientMock,
},
}
}

func (s *AgentMetricsSuite) AfterTest(suiteName, testName string) {
return
}

func (s *AgentMetricsSuite) TestMetricsHandlerWithRelay() {
//given
ctx := context.TODO()

//when
s.stubbedAgent.MetricsRelayHandler(ctx, &envoyMetrics.StreamMetricsMessage{
Identifier: nil,
EnvoyMetrics: []*io_prometheus_client.MetricFamily{ignoredMetric, acceptedMetric},
})

//then
assert.Equal(s.T(), []*agent.StreamMetricsMessage{{
EnvoyMetrics: []*io_prometheus_client.MetricFamily{acceptedMetric},
}}, s.clientMock.SentMetrics)
}

func (s *AgentMetricsSuite) TestMetricsHandlerWithRelayPass() {
//given
ctx := context.TODO()
s.stubbedAgent.metricsRelayDeadline = time.Now().Add(defaultMinReportPeriod)

//when
s.stubbedAgent.MetricsRelayHandler(ctx, &envoyMetrics.StreamMetricsMessage{
Identifier: nil,
EnvoyMetrics: []*io_prometheus_client.MetricFamily{acceptedMetric},
})

//then
assert.Equal(s.T(), 0, len(s.clientMock.SentMetrics))
}

func TestSuiteAgentMetrics(t *testing.T) {
suite.Run(t, new(AgentMetricsSuite))
}
28 changes: 27 additions & 1 deletion pkg/agent/comm_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
type MockClient struct {
Counter int64
grpc.ClientStream
SentMetrics []*agent.StreamMetricsMessage
SentSnapshots []*agent.Snapshot
snapMux sync.Mutex
reportFunc func(context.Context, *agent.Snapshot) (*agent.SnapshotResponse, error)
Expand Down Expand Up @@ -64,9 +65,34 @@ func (m *MockClient) Report(ctx context.Context, in *agent.Snapshot, opts ...grp
}

func (m *MockClient) StreamMetrics(ctx context.Context, opts ...grpc.CallOption) (agent.Director_StreamMetricsClient, error) {
panic("implement me")
return &mockStreamMetricsClient{
ctx: ctx,
opts: opts,
parent: m,
}, nil
}

type mockStreamMetricsClient struct {
ctx context.Context
opts []grpc.CallOption
parent *MockClient
}

func (s *mockStreamMetricsClient) Send(msg *agent.StreamMetricsMessage) error {
s.parent.SentMetrics = append(s.parent.SentMetrics, msg)
return nil
}
func (s *mockStreamMetricsClient) CloseAndRecv() (*agent.StreamMetricsResponse, error) {
return nil, nil
}

func (s *mockStreamMetricsClient) Header() (metadata.MD, error) { return nil, nil }
func (s *mockStreamMetricsClient) Trailer() metadata.MD { return nil }
func (s *mockStreamMetricsClient) CloseSend() error { return nil }
func (s *mockStreamMetricsClient) Context() context.Context { return s.ctx }
func (s *mockStreamMetricsClient) SendMsg(m interface{}) error { return nil }
func (s *mockStreamMetricsClient) RecvMsg(m interface{}) error { return nil }

type mockReportStreamClient struct {
ctx context.Context
opts []grpc.CallOption
Expand Down
16 changes: 16 additions & 0 deletions pkg/agent/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package agent

// StrToPointer will return the pointer to the given string.
func StrToPointer(str string) *string {
return &str
}

// Float64ToPointer will return the pointer to the given float.
func Float64ToPointer(f float64) *float64 {
return &f
}

// Int64ToPointer will return the pointer to the given int64.
func Int64ToPointer(i int64) *int64 {
return &i
}
65 changes: 65 additions & 0 deletions vendor/github.com/stretchr/testify/suite/doc.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 53 additions & 0 deletions vendor/github.com/stretchr/testify/suite/interfaces.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 12d4fca

Please sign in to comment.