Skip to content

Commit

Permalink
Add Telemetry collector structure with some collected resources (#1497)
Browse files Browse the repository at this point in the history
Problem: We want to create a starting design to collect telemetry data.

Solution: Implemented design to collect multiple types of telemetry data: Cluster Node Count, Count of NGF Resources (Graph), Project and Version.
  • Loading branch information
bjee19 authored Feb 6, 2024
1 parent 8c4fb19 commit 95c8613
Show file tree
Hide file tree
Showing 23 changed files with 1,258 additions and 96 deletions.
1 change: 1 addition & 0 deletions cmd/gateway/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func createStaticModeCommand() *cobra.Command {
},
Plus: plus,
TelemetryReportPeriod: period,
Version: version,
}

if err := static.StartManager(conf); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions deploy/helm-chart/templates/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ rules:
- namespaces
- services
- secrets
# FIXME(bjee19): make nodes permission dependent on telemetry being enabled.
# https://github.com/nginxinc/nginx-gateway-fabric/issues/1317.
- nodes
verbs:
- list
- watch
Expand Down
3 changes: 3 additions & 0 deletions deploy/manifests/nginx-gateway.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ rules:
- namespaces
- services
- secrets
# FIXME(bjee19): make nodes permission dependent on telemetry being enabled.
# https://github.com/nginxinc/nginx-gateway-fabric/issues/1317.
- nodes
verbs:
- list
- watch
Expand Down
9 changes: 9 additions & 0 deletions internal/framework/runnables/cronjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
type CronJobConfig struct {
// Worker is the function that will be run for every cronjob iteration.
Worker func(context.Context)
// ReadyCh delays the start of the job until the channel is closed.
ReadyCh <-chan struct{}
// Logger is the logger.
Logger logr.Logger
// Period defines the period of the cronjob. The cronjob will run every Period.
Expand All @@ -37,6 +39,13 @@ func NewCronJob(cfg CronJobConfig) *CronJob {
// Start starts the cronjob.
// Implements controller-runtime manager.Runnable
func (j *CronJob) Start(ctx context.Context) error {
select {
case <-j.cfg.ReadyCh:
case <-ctx.Done():
j.cfg.Logger.Info("Context canceled, failed to start cronjob")
return ctx.Err()
}

j.cfg.Logger.Info("Starting cronjob")

sliding := true // This means the period with jitter will be calculated after each worker call.
Expand Down
36 changes: 33 additions & 3 deletions internal/framework/runnables/cronjob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
func TestCronJob(t *testing.T) {
g := NewWithT(t)

readyChannel := make(chan struct{})

timeout := 10 * time.Second
var callCount int

Expand All @@ -22,9 +24,10 @@ func TestCronJob(t *testing.T) {
}

cfg := CronJobConfig{
Worker: worker,
Logger: zap.New(),
Period: 1 * time.Millisecond, // 1ms is much smaller than timeout so the CronJob should run a few times
Worker: worker,
Logger: zap.New(),
Period: 1 * time.Millisecond, // 1ms is much smaller than timeout so the CronJob should run a few times
ReadyCh: readyChannel,
}
job := NewCronJob(cfg)

Expand All @@ -35,6 +38,7 @@ func TestCronJob(t *testing.T) {
errCh <- job.Start(ctx)
close(errCh)
}()
close(readyChannel)

minReports := 2 // ensure that the CronJob reports more than once: it doesn't exit after the first run

Expand All @@ -44,3 +48,29 @@ func TestCronJob(t *testing.T) {
g.Eventually(errCh).Should(Receive(BeNil()))
g.Eventually(errCh).Should(BeClosed())
}

func TestCronJob_ContextCanceled(t *testing.T) {
g := NewWithT(t)

readyChannel := make(chan struct{})

cfg := CronJobConfig{
Worker: func(ctx context.Context) {},
Logger: zap.New(),
Period: 1 * time.Millisecond, // 1ms is much smaller than timeout so the CronJob should run a few times
ReadyCh: readyChannel,
}
job := NewCronJob(cfg)

ctx, cancel := context.WithCancel(context.Background())

errCh := make(chan error)
go func() {
errCh <- job.Start(ctx)
close(errCh)
}()

cancel()
g.Eventually(errCh).Should(Receive(MatchError(context.Canceled)))
g.Eventually(errCh).Should(BeClosed())
}
2 changes: 2 additions & 0 deletions internal/mode/static/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
)

type Config struct {
// Version is the running NGF version.
Version string
// AtomicLevel is an atomically changeable, dynamic logging level.
AtomicLevel zap.AtomicLevel
// GatewayNsName is the namespaced name of a Gateway resource that the Gateway will use.
Expand Down
50 changes: 39 additions & 11 deletions internal/mode/static/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package static
import (
"context"
"fmt"
"sync"
"time"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -58,8 +59,8 @@ type eventHandlerConfig struct {
logLevelSetter logLevelSetter
// metricsCollector collects metrics for this controller.
metricsCollector handlerMetricsCollector
// healthChecker sets the health of the Pod to Ready once we've written out our initial config
healthChecker *healthChecker
// nginxConfiguredOnStartChecker sets the health of the Pod to Ready once we've written out our initial config.
nginxConfiguredOnStartChecker *nginxConfiguredOnStartChecker
// controlConfigNSName is the NamespacedName of the NginxGateway config for this controller.
controlConfigNSName types.NamespacedName
// version is the current version number of the nginx config.
Expand All @@ -72,7 +73,10 @@ type eventHandlerConfig struct {
// (2) Keeping the statuses of the Gateway API resources updated.
// (3) Updating control plane configuration.
type eventHandlerImpl struct {
cfg eventHandlerConfig
// latestConfiguration is the latest Configuration generation.
latestConfiguration *dataplane.Configuration
cfg eventHandlerConfig
lock sync.Mutex
}

// newEventHandlerImpl creates a new eventHandlerImpl.
Expand Down Expand Up @@ -105,36 +109,44 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Log
switch changeType {
case state.NoChange:
logger.Info("Handling events didn't result into NGINX configuration changes")
if !h.cfg.healthChecker.ready && h.cfg.healthChecker.firstBatchError == nil {
h.cfg.healthChecker.setAsReady()
if !h.cfg.nginxConfiguredOnStartChecker.ready && h.cfg.nginxConfiguredOnStartChecker.firstBatchError == nil {
h.cfg.nginxConfiguredOnStartChecker.setAsReady()
}
return
case state.EndpointsOnlyChange:
h.cfg.version++
cfg := dataplane.BuildConfiguration(ctx, graph, h.cfg.serviceResolver, h.cfg.version)

h.setLatestConfiguration(&cfg)

err = h.updateUpstreamServers(
ctx,
logger,
dataplane.BuildConfiguration(ctx, graph, h.cfg.serviceResolver, h.cfg.version),
cfg,
)
case state.ClusterStateChange:
h.cfg.version++
cfg := dataplane.BuildConfiguration(ctx, graph, h.cfg.serviceResolver, h.cfg.version)

h.setLatestConfiguration(&cfg)

err = h.updateNginxConf(
ctx,
dataplane.BuildConfiguration(ctx, graph, h.cfg.serviceResolver, h.cfg.version),
cfg,
)
}

var nginxReloadRes nginxReloadResult
if err != nil {
logger.Error(err, "Failed to update NGINX configuration")
nginxReloadRes.error = err
if !h.cfg.healthChecker.ready {
h.cfg.healthChecker.firstBatchError = err
if !h.cfg.nginxConfiguredOnStartChecker.ready {
h.cfg.nginxConfiguredOnStartChecker.firstBatchError = err
}
} else {
logger.Info("NGINX configuration was successfully updated")
if !h.cfg.healthChecker.ready {
h.cfg.healthChecker.setAsReady()
if !h.cfg.nginxConfiguredOnStartChecker.ready {
h.cfg.nginxConfiguredOnStartChecker.setAsReady()
}
}

Expand Down Expand Up @@ -384,3 +396,19 @@ func getGatewayAddresses(

return gwAddresses, nil
}

// GetLatestConfiguration gets the latest configuration.
func (h *eventHandlerImpl) GetLatestConfiguration() *dataplane.Configuration {
h.lock.Lock()
defer h.lock.Unlock()

return h.latestConfiguration
}

// setLatestConfiguration sets the latest configuration.
func (h *eventHandlerImpl) setLatestConfiguration(cfg *dataplane.Configuration) {
h.lock.Lock()
defer h.lock.Unlock()

h.latestConfiguration = cfg
}
Loading

0 comments on commit 95c8613

Please sign in to comment.