Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(agent): add metrics to agent workers #3674

Merged
merged 5 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions .github/workflows/scheduled-jobs.yml
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This update is "taking" a ride with this PR. I'm updating the synthetic monitoring timeouts in a way that we can still send alarms on what is happening if they fail or become idle.

Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,20 @@ jobs:
pokeshop-trace-based-tests:
name: Run trace based tests for Pokeshop
runs-on: ubuntu-latest
timeout-minutes: 10
timeout-minutes: 20

steps:
- name: Checkout
uses: actions/checkout@v3

- name: Configure Tracetest CLI
timeout-minutes: 5
uses: kubeshop/tracetest-github-action@v1
with:
token: ${{secrets.TRACETEST_POKESHOP_TOKEN}}

- name: Run synthetic monitoring tests
timeout-minutes: 10
run: |
tracetest run testsuite --file ./testing/synthetic-monitoring/pokeshop/_testsuite.yaml --vars ./testing/synthetic-monitoring/pokeshop/_variableset.yaml

Expand Down Expand Up @@ -65,18 +67,20 @@ jobs:
pokeshop-serverless-trace-based-tests:
name: Run trace based tests for Pokeshop Serverless
runs-on: ubuntu-latest
timeout-minutes: 10
timeout-minutes: 20

steps:
- name: Checkout
uses: actions/checkout@v3

- name: Configure Tracetest CLI
timeout-minutes: 5
uses: kubeshop/tracetest-github-action@v1
with:
token: ${{secrets.TRACETEST_SERVERLESS_POKESHOP_TOKEN}}

- name: Run synthetic monitoring tests
timeout-minutes: 10
run: |
tracetest run testsuite --file ./testing/synthetic-monitoring/pokeshop-serverless/_testsuite.yaml --vars ./testing/synthetic-monitoring/pokeshop-serverless/_variableset.yaml

Expand Down Expand Up @@ -119,18 +123,20 @@ jobs:
otel-demo-trace-based-tests:
name: Run trace based tests for Open Telemetry demo
runs-on: ubuntu-latest
timeout-minutes: 10
timeout-minutes: 20

steps:
- name: Checkout
uses: actions/checkout@v3

- name: Configure Tracetest CLI
timeout-minutes: 5
uses: kubeshop/tracetest-github-action@v1
with:
token: ${{secrets.TRACETEST_OTELDEMO_TOKEN}}

- name: Run synthetic monitoring tests
timeout-minutes: 10
run: |
tracetest run testsuite --file ./testing/synthetic-monitoring/otel-demo/_testsuite.yaml --vars ./testing/synthetic-monitoring/otel-demo/_variableset.yaml

Expand Down
16 changes: 14 additions & 2 deletions agent/runner/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/kubeshop/tracetest/agent/workers"
"github.com/kubeshop/tracetest/agent/workers/poller"

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -43,8 +44,14 @@ func StartSession(ctx context.Context, cfg config.Config, observer event.Observe
return nil, err
}

meter, err := telemetry.GetMeter(ctx, cfg.CollectorEndpoint, cfg.Name)
if err != nil {
observer.Error(err)
return nil, err
}

traceCache := collector.NewTraceCache()
controlPlaneClient, err := newControlPlaneClient(ctx, cfg, traceCache, observer, logger, tracer)
controlPlaneClient, err := newControlPlaneClient(ctx, cfg, traceCache, observer, logger, tracer, meter)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -107,7 +114,7 @@ func StartCollector(ctx context.Context, config config.Config, traceCache collec
return collector, nil
}

func newControlPlaneClient(ctx context.Context, config config.Config, traceCache collector.TraceCache, observer event.Observer, logger *zap.Logger, tracer trace.Tracer) (*client.Client, error) {
func newControlPlaneClient(ctx context.Context, config config.Config, traceCache collector.TraceCache, observer event.Observer, logger *zap.Logger, tracer trace.Tracer, meter metric.Meter) (*client.Client, error) {
controlPlaneClient, err := client.Connect(ctx, config.ServerURL,
client.WithAPIKey(config.APIKey),
client.WithAgentName(config.Name),
Expand All @@ -123,7 +130,9 @@ func newControlPlaneClient(ctx context.Context, config config.Config, traceCache
stopWorker := workers.NewStopperWorker(
workers.WithStopperObserver(observer),
workers.WithStopperCancelFuncList(processStopper.CancelMap()),
workers.WithStopperLogger(logger),
workers.WithStopperTracer(tracer),
workers.WithStopperMeter(meter),
)

triggerWorker := workers.NewTriggerWorker(
Expand All @@ -133,6 +142,7 @@ func newControlPlaneClient(ctx context.Context, config config.Config, traceCache
workers.WithTriggerStoppableProcessRunner(processStopper.RunStoppableProcess),
workers.WithTriggerLogger(logger),
workers.WithTriggerTracer(tracer),
workers.WithTriggerMeter(meter),
)

pollingWorker := workers.NewPollerWorker(
Expand All @@ -142,13 +152,15 @@ func newControlPlaneClient(ctx context.Context, config config.Config, traceCache
workers.WithPollerStoppableProcessRunner(processStopper.RunStoppableProcess),
workers.WithPollerLogger(logger),
workers.WithPollerTracer(tracer),
workers.WithPollerMeter(meter),
)

dataStoreTestConnectionWorker := workers.NewTestConnectionWorker(
controlPlaneClient,
workers.WithTestConnectionLogger(logger),
workers.WithTestConnectionObserver(observer),
workers.WithTestConnectionTracer(tracer),
workers.WithTestConnectionMeter(meter),
)

controlPlaneClient.OnDataStoreTestConnectionRequest(dataStoreTestConnectionWorker.Test)
Expand Down
37 changes: 37 additions & 0 deletions agent/telemetry/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package telemetry

import (
"fmt"
"os"

"github.com/kubeshop/tracetest/server/version"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
)

func getAgentServiceName(serviceName string) string {
return fmt.Sprintf("tracetest.agent-%s", serviceName)
}

func getResource(serviceName string) (*resource.Resource, error) {
hostname, err := os.Hostname()
if err != nil {
return nil, fmt.Errorf("could not get OS hostname: %w", err)
}

resource, err := resource.Merge(
resource.Default(),
resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(serviceName),
semconv.HostName(hostname),
semconv.ServiceVersion(version.Version), // TODO: should we consider a version file for the agent?
),
)

if err != nil {
return nil, fmt.Errorf("could not merge resources: %w", err)
}

return resource, nil
}
77 changes: 77 additions & 0 deletions agent/telemetry/meter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package telemetry

import (
"context"
"fmt"
"time"

"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
metricsdk "go.opentelemetry.io/otel/sdk/metric"
)

const (
metricsReaderInterval = 30 * time.Second
metricExporterTimeout = 5 * time.Second
)

func GetNoopMeter() metric.Meter {
return noop.NewMeterProvider().Meter("noop")
}

func GetMeter(ctx context.Context, otelExporterEndpoint, serviceName string) (metric.Meter, error) {
if otelExporterEndpoint == "" {
// fallback, return noop
return GetNoopMeter(), nil
}

realServiceName := getAgentServiceName(serviceName)

provider, err := newMeterProvider(ctx, otelExporterEndpoint, realServiceName)
if err != nil {
return nil, fmt.Errorf("could not create meter provider: %w", err)
}

return provider.Meter("tracetest.agent"), nil
}

func newMeterProvider(ctx context.Context, otelExporterEndpoint, serviceName string) (metric.MeterProvider, error) {
resource, err := getResource(serviceName)
if err != nil {
return nil, fmt.Errorf("could not get resource: %w", err)
}

exporter, err := getMetricExporter(ctx, otelExporterEndpoint)
if err != nil {
return nil, fmt.Errorf("could not create metric exporter: %w", err)
}

periodicReader := metricsdk.NewPeriodicReader(
exporter,
metricsdk.WithInterval(metricsReaderInterval),
)

provider := metricsdk.NewMeterProvider(
metricsdk.WithResource(resource),
metricsdk.WithReader(periodicReader),
)

return provider, nil
}

func getMetricExporter(ctx context.Context, otelExporterEndpoint string) (*otlpmetricgrpc.Exporter, error) {
ctx, cancel := context.WithTimeout(ctx, metricExporterTimeout)
defer cancel()

exporter, err := otlpmetricgrpc.New(ctx,
otlpmetricgrpc.WithEndpoint(otelExporterEndpoint),
otlpmetricgrpc.WithCompressor("gzip"),
otlpmetricgrpc.WithInsecure(),
)
if err != nil {
return nil, fmt.Errorf("could not create metric exporter: %w", err)
}

return exporter, nil
}
22 changes: 8 additions & 14 deletions agent/telemetry/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,25 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdkTrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

const spanExporterTimeout = 1 * time.Minute

func GetNoopTracer() trace.Tracer {
return trace.NewNoopTracerProvider().Tracer("noop")
}

func GetTracer(ctx context.Context, otelExporterEndpoint, serviceName string) (trace.Tracer, error) {
if otelExporterEndpoint == "" {
// fallback, return noop
return trace.NewNoopTracerProvider().Tracer("noop"), nil
return GetNoopTracer(), nil
}

realServiceName := fmt.Sprintf("tracetestAgent_%s", serviceName)
realServiceName := getAgentServiceName(serviceName)

spanExporter, err := newSpanExporter(ctx, otelExporterEndpoint)
if err != nil {
Expand Down Expand Up @@ -57,22 +59,14 @@ func newSpanExporter(ctx context.Context, otelExporterEndpoint string) (sdkTrace
}

func newTraceProvider(ctx context.Context, spanExporter sdkTrace.SpanExporter, serviceName string) (*sdkTrace.TracerProvider, error) {
defaultResource := resource.Default()

mergedResource, err := resource.Merge(
defaultResource,
resource.NewWithAttributes(
defaultResource.SchemaURL(),
semconv.ServiceNameKey.String(serviceName),
),
)
resource, err := getResource(serviceName)
if err != nil {
return nil, fmt.Errorf("failed to create otel resource: %w", err)
}

tp := sdkTrace.NewTracerProvider(
sdkTrace.WithBatcher(spanExporter),
sdkTrace.WithResource(mergedResource),
sdkTrace.WithResource(resource),
)

otel.SetTracerProvider(tp)
Expand Down
21 changes: 20 additions & 1 deletion agent/workers/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import (
"github.com/kubeshop/tracetest/agent/client"
"github.com/kubeshop/tracetest/agent/event"
"github.com/kubeshop/tracetest/agent/proto"
"github.com/kubeshop/tracetest/agent/telemetry"
"github.com/kubeshop/tracetest/server/datastore"
"github.com/kubeshop/tracetest/server/executor"
"github.com/kubeshop/tracetest/server/tracedb"
"github.com/kubeshop/tracetest/server/tracedb/connection"
"github.com/kubeshop/tracetest/server/traces"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
Expand All @@ -29,6 +31,7 @@ type PollerWorker struct {
observer event.Observer
stoppableProcessRunner StoppableProcessRunner
tracer trace.Tracer
meter metric.Meter
}

type PollerOption func(*PollerWorker)
Expand Down Expand Up @@ -63,13 +66,20 @@ func WithPollerTracer(tracer trace.Tracer) PollerOption {
}
}

func WithPollerMeter(meter metric.Meter) PollerOption {
return func(pw *PollerWorker) {
pw.meter = meter
}
}

func NewPollerWorker(client *client.Client, opts ...PollerOption) *PollerWorker {
pollerWorker := &PollerWorker{
client: client,
sentSpanIDs: gocache.New[string, bool](),
logger: zap.NewNop(),
observer: event.NewNopObserver(),
tracer: trace.NewNoopTracerProvider().Tracer("noop"),
tracer: telemetry.GetNoopTracer(),
meter: telemetry.GetNoopMeter(),
}

for _, opt := range opts {
Expand All @@ -83,6 +93,11 @@ func (w *PollerWorker) Poll(ctx context.Context, request *proto.PollingRequest)
ctx, span := w.tracer.Start(ctx, "PollingRequest Worker operation")
defer span.End()

runCounter, _ := w.meter.Int64Counter("tracetest.agent.pollerworker.runs")
runCounter.Add(ctx, 1)

errorCounter, _ := w.meter.Int64Counter("tracetest.agent.pollerworker.errors")

w.logger.Debug("Received polling request", zap.Any("request", request))
w.observer.StartTracePoll(request)

Expand Down Expand Up @@ -120,9 +135,13 @@ func (w *PollerWorker) Poll(ctx context.Context, request *proto.PollingRequest)

formattedErr := fmt.Errorf("could not report polling error back to the server: %w. Original error: %s", sendErr, err.Error())
span.RecordError(formattedErr)
errorCounter.Add(ctx, 1)

return formattedErr
}

span.RecordError(err)
errorCounter.Add(ctx, 1)
}

w.observer.EndTracePoll(request, nil)
Expand Down
Loading
Loading