Skip to content

Commit

Permalink
Merge pull request #2780 from lpegoraro/eng-1056-orb-policies-opentel…
Browse files Browse the repository at this point in the history
…emetry-configuration

feat(backend): add opentelemetry backend in orb-agent, policies.
  • Loading branch information
lpegoraro authored Nov 17, 2023
2 parents ac169e5 + 02078db commit 39a654b
Show file tree
Hide file tree
Showing 43 changed files with 1,737 additions and 282 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,5 @@ docker/otel-collector-config.yaml

kind/*
!kind/README.md

otelcol-contrib

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 16 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ DOCKERS = $(addprefix docker_,$(SERVICES))
DOCKERS_DEV = $(addprefix docker_dev_,$(SERVICES))
CGO_ENABLED ?= 0
GOARCH ?= $(shell dpkg-architecture -q DEB_BUILD_ARCH)
GOOS ?= $(shell dpkg-architecture -q DEB_TARGET_ARCH_OS)
DIODE_TAG ?= develop
ORB_VERSION = $(shell cat VERSION)
COMMIT_HASH = $(shell git rev-parse --short HEAD)
OTEL_COLLECTOR_CONTRIB_VERSION ?= 0.87.0
OTEL_CONTRIB_URL ?= "https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v$(OTEL_COLLECTOR_CONTRIB_VERSION)/otelcol-contrib_$(OTEL_COLLECTOR_CONTRIB_VERSION)_$(GOOS)_$(GOARCH).tar.gz"

define compile_service
echo "ORB_VERSION: $(ORB_VERSION)"
Expand Down Expand Up @@ -228,6 +232,7 @@ agent_bin:

agent:
docker build --no-cache \
--build-arg GOARCH=$(GOARCH) \
--build-arg PKTVISOR_TAG=$(PKTVISOR_TAG) \
--tag=$(ORB_DOCKERHUB_REPO)/$(DOCKER_IMAGE_NAME_PREFIX)-agent:$(REF_TAG) \
--tag=$(ORB_DOCKERHUB_REPO)/$(DOCKER_IMAGE_NAME_PREFIX)-agent:$(ORB_VERSION) \
Expand All @@ -236,9 +241,11 @@ agent:

agent_full:
docker build --no-cache \
--build-arg GOARCH=$(GOARCH) \
--build-arg PKTVISOR_TAG=$(PKTVISOR_TAG) \
--build-arg DIODE_TAG=$(DIODE_TAG) \
--build-arg ORB_TAG=${ORB_TAG} \
--build-arg ORB_TAG=${REF_TAG} \
--build-arg OTEL_TAG=${OTEL_COLLECTOR_CONTRIB_VERSION} \
--tag=$(ORB_DOCKERHUB_REPO)/$(DOCKER_IMAGE_NAME_PREFIX)-agent-full:$(REF_TAG) \
--tag=$(ORB_DOCKERHUB_REPO)/$(DOCKER_IMAGE_NAME_PREFIX)-agent-full:$(ORB_VERSION) \
--tag=$(ORB_DOCKERHUB_REPO)/$(DOCKER_IMAGE_NAME_PREFIX)-agent-full:$(ORB_VERSION)-$(COMMIT_HASH) \
Expand Down Expand Up @@ -284,3 +291,11 @@ ui:
-f docker/Dockerfile .

platform: dockers_dev agent ui

pull-latest-otel-collector-contrib:
wget -O ./agent/backend/otel/otelcol_contrib.tar.gz $(OTEL_CONTRIB_URL)
tar -xvf ./agent/backend/otel/otelcol_contrib.tar.gz -C ./agent/backend/otel/
cp ./agent/backend/otel/otelcol-contrib .
rm ./agent/backend/otel/otelcol_contrib.tar.gz
rm ./agent/backend/otel/LICENSE
rm ./agent/backend/otel/README.md
53 changes: 33 additions & 20 deletions agent/agent_prof.go → agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ type orbAgent struct {
logger *zap.Logger
config config.Config
client mqtt.Client
agent_id string
db *sqlx.DB
backends map[string]backend.Backend
backendState map[string]*backend.State
cancelFunction context.CancelFunc
rpcFromCancelFunc context.CancelFunc
// TODO: look for a better way to do this, context shouldn't be inside structs

asyncContext context.Context

hbTicker *time.Ticker
Expand Down Expand Up @@ -91,6 +92,11 @@ func New(logger *zap.Logger, c config.Config) (Agent, error) {

pm, err := manager.New(logger, c, db)
if err != nil {
logger.Error("error during create policy manager, exiting", zap.Error(err))
return nil, err
}
if pm.GetRepo() == nil {
logger.Error("policy manager failed to get repository", zap.Error(err))
return nil, err
}
return &orbAgent{logger: logger, config: c, policyManager: pm, db: db, groupsInfos: make(map[string]GroupInfo)}, nil
Expand All @@ -112,6 +118,7 @@ func (a *orbAgent) startBackends(agentCtx context.Context) error {
configuration := structs.Map(a.config.OrbAgent.Otel)
configuration["agent_tags"] = a.config.OrbAgent.Tags
if err := be.Configure(a.logger, a.policyManager.GetRepo(), configurationEntry, configuration); err != nil {
a.logger.Info("failed to configure backend", zap.String("backend", name), zap.Error(err))
return err
}
backendCtx := context.WithValue(agentCtx, "routine", name)
Expand All @@ -120,14 +127,20 @@ func (a *orbAgent) startBackends(agentCtx context.Context) error {
} else {
backendCtx = context.WithValue(backendCtx, "agent_id", "auto-provisioning-without-id")
}
if err := be.Start(context.WithCancel(backendCtx)); err != nil {
return err
}
a.backends[name] = be
a.backendState[name] = &backend.State{
Status: backend.Unknown,
Status: be.GetInitialState(),
LastRestartTS: time.Now(),
}
if err := be.Start(context.WithCancel(backendCtx)); err != nil {
a.logger.Info("failed to start backend", zap.String("backend", name), zap.Error(err))
a.backendState[name] = &backend.State{
Status: be.GetInitialState(),
LastError: err.Error(),
LastRestartTS: time.Now(),
}
return err
}
}
return nil
}
Expand All @@ -151,10 +164,6 @@ func (a *orbAgent) Start(ctx context.Context, cancelFunc context.CancelFunc) err
mqtt.DEBUG = &agentLoggerDebug{a: a}
}

if err := a.startBackends(ctx); err != nil {
return err
}

ccm, err := cloud_config.New(a.logger, a.config, a.db)
if err != nil {
return err
Expand All @@ -170,6 +179,10 @@ func (a *orbAgent) Start(ctx context.Context, cancelFunc context.CancelFunc) err
return err
}

if err := a.startBackends(ctx); err != nil {
return err
}

a.logonWithHearbeat()

return nil
Expand All @@ -184,7 +197,9 @@ func (a *orbAgent) logonWithHearbeat() {

func (a *orbAgent) logoffWithHeartbeat(ctx context.Context) {
a.logger.Debug("stopping heartbeat, going offline status", zap.Any("routine", ctx.Value("routine")))
a.hbTicker.Stop()
if a.hbTicker != nil {
a.hbTicker.Stop()
}
if a.rpcFromCancelFunc != nil {
a.rpcFromCancelFunc()
}
Expand Down Expand Up @@ -253,9 +268,9 @@ func (a *orbAgent) RestartBackend(ctx context.Context, name string, reason strin
a.backendState[name].LastError = fmt.Sprintf("failed to reset backend: %v", err)
a.logger.Error("failed to reset backend", zap.String("backend", name), zap.Error(err))
}
be.SetCommsClient(a.config.OrbAgent.Cloud.MQTT.Id, &a.client, fmt.Sprintf("%s/?/%s", a.baseTopic, name))
err := a.sendAgentPoliciesReq()
if err != nil {
be.SetCommsClient(a.agent_id, &a.client, fmt.Sprintf("%s/?/%s", a.baseTopic, name))

if err := a.sendAgentPoliciesReq(); err != nil {
a.logger.Error("failed to send agent policies request", zap.Error(err))
}
return nil
Expand Down Expand Up @@ -288,20 +303,18 @@ func (a *orbAgent) RestartAll(ctx context.Context, reason string) error {
} else {
ctx = context.WithValue(ctx, "agent_id", "auto-provisioning-without-id")
}
a.logger.Info("restarting all backends", zap.String("reason", reason))
a.logoffWithHeartbeat(ctx)
a.logger.Info("restarting comms", zap.String("reason", reason))
if err := a.restartComms(ctx); err != nil {
a.logger.Error("failed to restart comms", zap.Error(err))
}
for name := range a.backends {
a.logger.Info("restarting backend", zap.String("backend", name), zap.String("reason", reason))
err := a.RestartBackend(ctx, name, reason)
if err != nil {
a.logger.Error("failed to restart backend", zap.Error(err))
}
}
a.logger.Info("restarting comms", zap.String("reason", reason))
a.logoffWithHeartbeat(ctx)
err := a.restartComms(ctx)
if err != nil {
a.logger.Error("failed to restart comms", zap.Error(err))
}
a.logonWithHearbeat()
a.logger.Info("all backends and comms were restarted")

Expand Down
4 changes: 4 additions & 0 deletions agent/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
BackendError
AgentError
Offline
Waiting
)

type RunningStatus int
Expand All @@ -29,6 +30,7 @@ var runningStatusMap = [...]string{
"backend_error",
"agent_error",
"offline",
"waiting",
}

var runningStatusRevMap = map[string]RunningStatus{
Expand All @@ -37,6 +39,7 @@ var runningStatusRevMap = map[string]RunningStatus{
"backend_error": BackendError,
"agent_error": AgentError,
"offline": Offline,
"waiting": Waiting,
}

type State struct {
Expand All @@ -62,6 +65,7 @@ type Backend interface {
GetStartTime() time.Time
GetCapabilities() (map[string]interface{}, error)
GetRunningStatus() (RunningStatus, string, error)
GetInitialState() RunningStatus

ApplyPolicy(data policies.PolicyData, updatePolicy bool) error
RemovePolicy(data policies.PolicyData) error
Expand Down
4 changes: 4 additions & 0 deletions agent/backend/diode/diode.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ func (d *diodeBackend) GetStartTime() time.Time {
return d.startTime
}

func (d *diodeBackend) GetInitialState() backend.RunningStatus {
return backend.Unknown
}

func (d *diodeBackend) GetCapabilities() (map[string]interface{}, error) {
return make(map[string]interface{}), nil
}
Expand Down
3 changes: 3 additions & 0 deletions agent/backend/diode/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ func (d *diodeBackend) receiveOtlp() {
Logger: d.logger,
TracerProvider: trace.NewNoopTracerProvider(),
MeterProvider: metric.NewMeterProvider(),
ReportComponentStatus: func(*component.StatusEvent) error {
return nil
},
},
BuildInfo: component.NewDefaultBuildInfo(),
}
Expand Down
15 changes: 15 additions & 0 deletions agent/backend/otel/comms.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package otel

import (
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"strings"
)

func (o *openTelemetryBackend) SetCommsClient(agentID string, client *mqtt.Client, baseTopic string) {
o.mqttClient = client
otelBaseTopic := strings.Replace(baseTopic, "?", "otlp", 1)
o.otlpMetricsTopic = fmt.Sprintf("%s/m/%c", otelBaseTopic, agentID[0])
o.otlpTracesTopic = fmt.Sprintf("%s/t/%c", otelBaseTopic, agentID[0])
o.otlpLogsTopic = fmt.Sprintf("%s/l/%c", otelBaseTopic, agentID[0])
}
Loading

0 comments on commit 39a654b

Please sign in to comment.