Skip to content

Commit

Permalink
Initial Discovery properties provider and config incorporation (#2494)
Browse files Browse the repository at this point in the history
  • Loading branch information
rmfitzpatrick authored Jan 23, 2023
1 parent f8a513c commit f9bb21e
Show file tree
Hide file tree
Showing 20 changed files with 1,011 additions and 57 deletions.
3 changes: 3 additions & 0 deletions cmd/otelcol/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ func main() {
ResolverSettings: confmap.ResolverSettings{
URIs: collectorSettings.ResolverURIs(),
Providers: map[string]confmap.Provider{
discovery.PropertyScheme(): configprovider.NewConfigSourceConfigMapProvider(
discovery.PropertyProvider(), zap.NewNop(), info, hooks, configsources.Get()...,
),
discovery.ConfigDScheme(): configprovider.NewConfigSourceConfigMapProvider(
discovery.ConfigDProvider(),
zap.NewNop(), // The service logger is not available yet, setting it to Nop.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/signalfx/splunk-otel-collector
go 1.19

require (
github.com/alecthomas/participle/v2 v2.0.0-beta.5
github.com/antonmedv/expr v1.9.0
github.com/apache/pulsar-client-go v0.9.0
github.com/cenkalti/backoff/v4 v4.2.0
Expand Down Expand Up @@ -152,7 +153,6 @@ require (
github.com/Shopify/sarama v1.37.2 // indirect
github.com/Showmax/go-fqdn v1.0.0 // indirect
github.com/StackExchange/wmi v1.2.1 // indirect
github.com/alecthomas/participle/v2 v2.0.0-beta.5 // indirect
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 // indirect
github.com/apache/thrift v0.17.0 // indirect
Expand Down
94 changes: 82 additions & 12 deletions internal/confmapprovider/discovery/discoverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/base64"
"fmt"
"os"
"strings"
"sync"
"time"

Expand All @@ -42,11 +43,15 @@ import (

"github.com/signalfx/splunk-otel-collector/internal/common/discovery"
"github.com/signalfx/splunk-otel-collector/internal/components"
"github.com/signalfx/splunk-otel-collector/internal/confmapprovider/discovery/properties"
"github.com/signalfx/splunk-otel-collector/internal/receiver/discoveryreceiver"
"github.com/signalfx/splunk-otel-collector/internal/version"
)

const durationEnvVar = "SPLUNK_DISCOVERY_DURATION"
const (
durationEnvVar = "SPLUNK_DISCOVERY_DURATION"
logLevelEnvVar = "SPLUNK_DISCOVERY_LOG_LEVEL"
)

// discoverer provides the mechanism for a "preflight" collector service
// that will stand up the observers and discovery receivers based on the .discovery.yaml
Expand All @@ -64,9 +69,12 @@ type discoverer struct {
unexpandedReceiverEntries map[component.ID]map[component.ID]map[string]any
discoveredConfig map[component.ID]map[string]any
discoveredObservers map[component.ID]discovery.StatusType
info component.BuildInfo
duration time.Duration
mu sync.Mutex
// propertiesConf is a store of all properties from cmdline args and env vars
// that's merged with receiver/observer configs before creation
propertiesConf *confmap.Conf
info component.BuildInfo
duration time.Duration
mu sync.Mutex
}

func newDiscoverer(logger *zap.Logger) (*discoverer, error) {
Expand All @@ -87,7 +95,6 @@ func newDiscoverer(logger *zap.Logger) (*discoverer, error) {
if err != nil {
return (*discoverer)(nil), err
}

m := &discoverer{
logger: logger,
info: info,
Expand All @@ -102,9 +109,31 @@ func newDiscoverer(logger *zap.Logger) (*discoverer, error) {
discoveredConfig: map[component.ID]map[string]any{},
discoveredObservers: map[component.ID]discovery.StatusType{},
}
m.propertiesConf = m.propertiesConfFromEnv()
return m, nil
}

func (d *discoverer) propertiesConfFromEnv() *confmap.Conf {
propertiesConf := confmap.New()
for _, env := range os.Environ() {
equalsIdx := strings.Index(env, "=")
if equalsIdx != -1 && len(env) > equalsIdx+1 {
envVar := env[:equalsIdx]
if envVar == logLevelEnvVar || envVar == durationEnvVar {
continue
}
if p, ok, e := properties.NewPropertyFromEnvVar(envVar, env[equalsIdx+1:]); ok {
if e != nil {
d.logger.Info(fmt.Sprintf("invalid discovery property environment variable %q", env), zap.Error(e))
continue
}
propertiesConf.Merge(confmap.NewFromStringMap(p.ToStringMap()))
}
}
}
return propertiesConf
}

// discover will create all .discovery.yaml components, start them, wait the configured
// duration, and tear them down before returning the discovery config.
func (d *discoverer) discover(cfg *Config) (map[string]any, error) {
Expand Down Expand Up @@ -197,19 +226,38 @@ func (d *discoverer) createDiscoveryReceiversAndObservers(cfg *Config) (map[comp
discoveryReceiverRaw := map[string]any{}
receivers := map[string]any{}

receiversPropertiesConf := confmap.New()
if d.propertiesConf.IsSet("receivers") {
receiversPropertiesConf, err = d.propertiesConf.Sub("receivers")
if err != nil {
return nil, nil, fmt.Errorf("failed obtaining receivers properties config: %w", err)
}
}
for receiverID, receiver := range cfg.ReceiversToDiscover {
if ok, err = d.updateReceiverForObserver(receiverID, receiver, observerID); err != nil {
return nil, nil, err
} else if !ok {
continue
}
d.addUnexpandedReceiverConfig(receiverID, observerID, receiver.Entry.ToStringMap())
receivers[receiverID.String()] = receiver.Entry.ToStringMap()
receiverEntry := receiver.Entry.ToStringMap()
if receiversPropertiesConf.IsSet(receiverID.String()) {
receiverPropertiesConf, e := receiversPropertiesConf.Sub(receiverID.String())
if e != nil {
return nil, nil, fmt.Errorf("failed obtaining receiver properties config: %w", e)
}
entryConf := confmap.NewFromStringMap(receiverEntry)
if err = entryConf.Merge(receiverPropertiesConf); err != nil {
return nil, nil, fmt.Errorf("failed merging receiver properties config: %w", err)
}
receiverEntry = entryConf.ToStringMap()
}

d.addUnexpandedReceiverConfig(receiverID, observerID, receiverEntry)
receivers[receiverID.String()] = receiverEntry
}

discoveryReceiverRaw["receivers"] = receivers
discoveryReceiverConfMap := confmap.NewFromStringMap(discoveryReceiverRaw)

if err = d.expandConverter.Convert(context.Background(), discoveryReceiverConfMap); err != nil {
return nil, nil, fmt.Errorf("error converting environment variables in receiver config: %w", err)
}
Expand Down Expand Up @@ -241,6 +289,24 @@ func (d *discoverer) createObserver(observerID component.ID, cfg *Config) (otelc

observerConfig := observerFactory.CreateDefaultConfig()
observerCfgMap := confmap.NewFromStringMap(cfg.DiscoveryObservers[observerID].ToStringMap())

if d.propertiesConf.IsSet("extensions") {
propertiesConf, e := d.propertiesConf.Sub("extensions")
if e != nil {
return nil, fmt.Errorf("failed obtaining extensions properties config: %w", e)
}
if propertiesConf.IsSet(observerID.String()) {
propertiesConf, e = propertiesConf.Sub(observerID.String())
if e != nil {
return nil, fmt.Errorf("failed obtaining observer properties config: %w", e)
}
if err = observerCfgMap.Merge(propertiesConf); err != nil {
return nil, fmt.Errorf("failed merging observer properties config: %w", err)
}
cfg.DiscoveryObservers[observerID] = ExtensionEntry{observerCfgMap.ToStringMap()}
}
}

if err = d.expandConverter.Convert(context.Background(), observerCfgMap); err != nil {
return nil, fmt.Errorf("error converting environment variables in %q config: %w", observerID.String(), err)
}
Expand Down Expand Up @@ -323,15 +389,17 @@ func (d *discoverer) discoveryConfig(cfg *Config) (map[string]any, error) {
}
}
if receiverAdded {
dCfg.Merge(confmap.NewFromStringMap(map[string]any{
if err := dCfg.Merge(confmap.NewFromStringMap(map[string]any{
"service": map[string]any{
"pipelines": map[string]any{
"metrics": map[string]any{
"receivers": []string{"receiver_creator/discovery"},
},
},
},
}))
})); err != nil {
return nil, fmt.Errorf("failed adding receiver_creator/discovery to metrics pipeline: %w", err)
}
}

extensions := confmap.NewFromStringMap(map[string]any{"extensions": map[string]any{}})
Expand Down Expand Up @@ -573,10 +641,12 @@ func (d *discoverer) ConsumeLogs(_ context.Context, ld plog.Logs) error {

func determineCurrentStatus(current, observed discovery.StatusType) discovery.StatusType {
switch {
case current == discovery.Successful:
// once successful never revert
case observed == discovery.Successful:
current = discovery.Successful
case current == discovery.Failed && observed == discovery.Partial:
current = discovery.Partial
case current == discovery.Partial:
// only update if observed successful (above)
default:
current = observed
}
Expand Down
23 changes: 23 additions & 0 deletions internal/confmapprovider/discovery/discoverer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package discovery

import (
"fmt"
"os"
"strings"
"testing"
Expand All @@ -25,6 +26,8 @@ import (
"go.opentelemetry.io/collector/confmap"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"

"github.com/signalfx/splunk-otel-collector/internal/common/discovery"
)

func TestDiscovererDurationFromEnv(t *testing.T) {
Expand Down Expand Up @@ -140,3 +143,23 @@ func TestMergeEntries(t *testing.T) {
"five.key": "five.val",
}, first)
}

func TestDetermineCurrentStatus(t *testing.T) {
for _, test := range []struct {
current, observed, expected discovery.StatusType
}{
{"failed", "failed", "failed"},
{"failed", "partial", "partial"},
{"failed", "successful", "successful"},
{"partial", "failed", "partial"},
{"partial", "partial", "partial"},
{"partial", "successful", "successful"},
{"successful", "failed", "successful"},
{"successful", "partial", "successful"},
{"successful", "successful", "successful"},
} {
t.Run(fmt.Sprintf("%s:%s->%s", test.current, test.observed, test.expected), func(t *testing.T) {
require.Equal(t, test.expected, determineCurrentStatus(test.current, test.observed))
})
}
}
57 changes: 57 additions & 0 deletions internal/confmapprovider/discovery/properties/env_var.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright Splunk, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package properties

import (
"fmt"

"github.com/alecthomas/participle/v2"
"github.com/alecthomas/participle/v2/lexer"
)

// SPLUNK_DISCOVERY_RECEIVERS_receiver_x2d_type_x2f_receiver_x2d_name_CONFIG_field_x3a_x3a_subfield=val
// SPLUNK_DISCOVERY_EXTENSIONS_observer_x2d_type_x2f_observer_x2d_name_CONFIG_field_x3a_x3a_subfield=val

var envVarLex = lexer.MustSimple([]lexer.SimpleRule{
{Name: "Underscore", Pattern: `_`},
{Name: "String", Pattern: `[^_]+`},
})

var envVarParser = participle.MustBuild[EnvVarProperty](
participle.Lexer(envVarLex),
participle.UseLookahead(participle.MaxLookahead),
)

type EnvVarProperty struct {
ComponentType string `parser:"'SPLUNK' Underscore 'DISCOVERY' Underscore @('RECEIVERS' | 'EXTENSIONS') Underscore"`
Component EnvVarComponentID `parser:"@@"`
Key string `parser:"Underscore 'CONFIG' Underscore @(String|Underscore)+"`
Val string
}

type EnvVarComponentID struct {
Type string `parser:"@~(Underscore (?= 'CONFIG'))+"`
// _x2f_ -> '/'
Name string `parser:"(Underscore 'x2f' Underscore @(~(?= Underscore (?= 'CONFIG'))+|''))?"`
}

func NewEnvVarProperty(property, val string) (*EnvVarProperty, error) {
p, err := envVarParser.ParseString("SPLUNK_DISCOVERY", property)
if err != nil {
return nil, fmt.Errorf("invalid property env var (parsing error): %w", err)
}
p.Val = val
return p, nil
}
Loading

0 comments on commit f9bb21e

Please sign in to comment.