Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial Discovery properties provider and config incorporation #2494

Merged
merged 1 commit into from
Jan 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/otelcol/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ func main() {
ResolverSettings: confmap.ResolverSettings{
URIs: collectorSettings.ResolverURIs(),
Providers: map[string]confmap.Provider{
discovery.PropertyScheme(): configprovider.NewConfigSourceConfigMapProvider(
discovery.PropertyProvider(), zap.NewNop(), info, hooks, configsources.Get()...,
),
discovery.ConfigDScheme(): configprovider.NewConfigSourceConfigMapProvider(
discovery.ConfigDProvider(),
zap.NewNop(), // The service logger is not available yet, setting it to Nop.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/signalfx/splunk-otel-collector
go 1.19

require (
github.com/alecthomas/participle/v2 v2.0.0-beta.5
github.com/antonmedv/expr v1.9.0
github.com/apache/pulsar-client-go v0.9.0
github.com/cenkalti/backoff/v4 v4.2.0
Expand Down Expand Up @@ -152,7 +153,6 @@ require (
github.com/Shopify/sarama v1.37.2 // indirect
github.com/Showmax/go-fqdn v1.0.0 // indirect
github.com/StackExchange/wmi v1.2.1 // indirect
github.com/alecthomas/participle/v2 v2.0.0-beta.5 // indirect
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 // indirect
github.com/apache/thrift v0.17.0 // indirect
Expand Down
94 changes: 82 additions & 12 deletions internal/confmapprovider/discovery/discoverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/base64"
"fmt"
"os"
"strings"
"sync"
"time"

Expand All @@ -42,11 +43,15 @@ import (

"github.com/signalfx/splunk-otel-collector/internal/common/discovery"
"github.com/signalfx/splunk-otel-collector/internal/components"
"github.com/signalfx/splunk-otel-collector/internal/confmapprovider/discovery/properties"
"github.com/signalfx/splunk-otel-collector/internal/receiver/discoveryreceiver"
"github.com/signalfx/splunk-otel-collector/internal/version"
)

const durationEnvVar = "SPLUNK_DISCOVERY_DURATION"
const (
durationEnvVar = "SPLUNK_DISCOVERY_DURATION"
logLevelEnvVar = "SPLUNK_DISCOVERY_LOG_LEVEL"
)

// discoverer provides the mechanism for a "preflight" collector service
// that will stand up the observers and discovery receivers based on the .discovery.yaml
Expand All @@ -64,9 +69,12 @@ type discoverer struct {
unexpandedReceiverEntries map[component.ID]map[component.ID]map[string]any
discoveredConfig map[component.ID]map[string]any
discoveredObservers map[component.ID]discovery.StatusType
info component.BuildInfo
duration time.Duration
mu sync.Mutex
// propertiesConf is a store of all properties from cmdline args and env vars
// that's merged with receiver/observer configs before creation
propertiesConf *confmap.Conf
info component.BuildInfo
duration time.Duration
mu sync.Mutex
}

func newDiscoverer(logger *zap.Logger) (*discoverer, error) {
Expand All @@ -87,7 +95,6 @@ func newDiscoverer(logger *zap.Logger) (*discoverer, error) {
if err != nil {
return (*discoverer)(nil), err
}

m := &discoverer{
logger: logger,
info: info,
Expand All @@ -102,9 +109,31 @@ func newDiscoverer(logger *zap.Logger) (*discoverer, error) {
discoveredConfig: map[component.ID]map[string]any{},
discoveredObservers: map[component.ID]discovery.StatusType{},
}
m.propertiesConf = m.propertiesConfFromEnv()
return m, nil
}

func (d *discoverer) propertiesConfFromEnv() *confmap.Conf {
propertiesConf := confmap.New()
for _, env := range os.Environ() {
equalsIdx := strings.Index(env, "=")
if equalsIdx != -1 && len(env) > equalsIdx+1 {
envVar := env[:equalsIdx]
if envVar == logLevelEnvVar || envVar == durationEnvVar {
continue
}
if p, ok, e := properties.NewPropertyFromEnvVar(envVar, env[equalsIdx+1:]); ok {
if e != nil {
d.logger.Info(fmt.Sprintf("invalid discovery property environment variable %q", env), zap.Error(e))
continue
}
propertiesConf.Merge(confmap.NewFromStringMap(p.ToStringMap()))
}
}
}
return propertiesConf
}

// discover will create all .discovery.yaml components, start them, wait the configured
// duration, and tear them down before returning the discovery config.
func (d *discoverer) discover(cfg *Config) (map[string]any, error) {
Expand Down Expand Up @@ -197,19 +226,38 @@ func (d *discoverer) createDiscoveryReceiversAndObservers(cfg *Config) (map[comp
discoveryReceiverRaw := map[string]any{}
receivers := map[string]any{}

receiversPropertiesConf := confmap.New()
if d.propertiesConf.IsSet("receivers") {
receiversPropertiesConf, err = d.propertiesConf.Sub("receivers")
if err != nil {
return nil, nil, fmt.Errorf("failed obtaining receivers properties config: %w", err)
}
}
for receiverID, receiver := range cfg.ReceiversToDiscover {
if ok, err = d.updateReceiverForObserver(receiverID, receiver, observerID); err != nil {
return nil, nil, err
} else if !ok {
continue
}
d.addUnexpandedReceiverConfig(receiverID, observerID, receiver.Entry.ToStringMap())
receivers[receiverID.String()] = receiver.Entry.ToStringMap()
receiverEntry := receiver.Entry.ToStringMap()
if receiversPropertiesConf.IsSet(receiverID.String()) {
receiverPropertiesConf, e := receiversPropertiesConf.Sub(receiverID.String())
if e != nil {
return nil, nil, fmt.Errorf("failed obtaining receiver properties config: %w", e)
}
entryConf := confmap.NewFromStringMap(receiverEntry)
if err = entryConf.Merge(receiverPropertiesConf); err != nil {
return nil, nil, fmt.Errorf("failed merging receiver properties config: %w", err)
}
receiverEntry = entryConf.ToStringMap()
}

d.addUnexpandedReceiverConfig(receiverID, observerID, receiverEntry)
receivers[receiverID.String()] = receiverEntry
}

discoveryReceiverRaw["receivers"] = receivers
discoveryReceiverConfMap := confmap.NewFromStringMap(discoveryReceiverRaw)

if err = d.expandConverter.Convert(context.Background(), discoveryReceiverConfMap); err != nil {
return nil, nil, fmt.Errorf("error converting environment variables in receiver config: %w", err)
}
Expand Down Expand Up @@ -241,6 +289,24 @@ func (d *discoverer) createObserver(observerID component.ID, cfg *Config) (otelc

observerConfig := observerFactory.CreateDefaultConfig()
observerCfgMap := confmap.NewFromStringMap(cfg.DiscoveryObservers[observerID].ToStringMap())

if d.propertiesConf.IsSet("extensions") {
propertiesConf, e := d.propertiesConf.Sub("extensions")
if e != nil {
return nil, fmt.Errorf("failed obtaining extensions properties config: %w", e)
}
if propertiesConf.IsSet(observerID.String()) {
propertiesConf, e = propertiesConf.Sub(observerID.String())
if e != nil {
return nil, fmt.Errorf("failed obtaining observer properties config: %w", e)
}
if err = observerCfgMap.Merge(propertiesConf); err != nil {
return nil, fmt.Errorf("failed merging observer properties config: %w", err)
}
cfg.DiscoveryObservers[observerID] = ExtensionEntry{observerCfgMap.ToStringMap()}
}
}

if err = d.expandConverter.Convert(context.Background(), observerCfgMap); err != nil {
return nil, fmt.Errorf("error converting environment variables in %q config: %w", observerID.String(), err)
}
Expand Down Expand Up @@ -323,15 +389,17 @@ func (d *discoverer) discoveryConfig(cfg *Config) (map[string]any, error) {
}
}
if receiverAdded {
dCfg.Merge(confmap.NewFromStringMap(map[string]any{
if err := dCfg.Merge(confmap.NewFromStringMap(map[string]any{
"service": map[string]any{
"pipelines": map[string]any{
"metrics": map[string]any{
"receivers": []string{"receiver_creator/discovery"},
},
},
},
}))
})); err != nil {
return nil, fmt.Errorf("failed adding receiver_creator/discovery to metrics pipeline: %w", err)
}
}

extensions := confmap.NewFromStringMap(map[string]any{"extensions": map[string]any{}})
Expand Down Expand Up @@ -573,10 +641,12 @@ func (d *discoverer) ConsumeLogs(_ context.Context, ld plog.Logs) error {

func determineCurrentStatus(current, observed discovery.StatusType) discovery.StatusType {
switch {
case current == discovery.Successful:
// once successful never revert
case observed == discovery.Successful:
current = discovery.Successful
case current == discovery.Failed && observed == discovery.Partial:
current = discovery.Partial
case current == discovery.Partial:
// only update if observed successful (above)
default:
current = observed
}
Expand Down
23 changes: 23 additions & 0 deletions internal/confmapprovider/discovery/discoverer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package discovery

import (
"fmt"
"os"
"strings"
"testing"
Expand All @@ -25,6 +26,8 @@ import (
"go.opentelemetry.io/collector/confmap"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"

"github.com/signalfx/splunk-otel-collector/internal/common/discovery"
)

func TestDiscovererDurationFromEnv(t *testing.T) {
Expand Down Expand Up @@ -140,3 +143,23 @@ func TestMergeEntries(t *testing.T) {
"five.key": "five.val",
}, first)
}

func TestDetermineCurrentStatus(t *testing.T) {
for _, test := range []struct {
current, observed, expected discovery.StatusType
}{
{"failed", "failed", "failed"},
{"failed", "partial", "partial"},
{"failed", "successful", "successful"},
{"partial", "failed", "partial"},
{"partial", "partial", "partial"},
{"partial", "successful", "successful"},
{"successful", "failed", "successful"},
{"successful", "partial", "successful"},
{"successful", "successful", "successful"},
} {
t.Run(fmt.Sprintf("%s:%s->%s", test.current, test.observed, test.expected), func(t *testing.T) {
require.Equal(t, test.expected, determineCurrentStatus(test.current, test.observed))
})
}
}
57 changes: 57 additions & 0 deletions internal/confmapprovider/discovery/properties/env_var.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright Splunk, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package properties

import (
"fmt"

"github.com/alecthomas/participle/v2"
"github.com/alecthomas/participle/v2/lexer"
)

// SPLUNK_DISCOVERY_RECEIVERS_receiver_x2d_type_x2f_receiver_x2d_name_CONFIG_field_x3a_x3a_subfield=val
// SPLUNK_DISCOVERY_EXTENSIONS_observer_x2d_type_x2f_observer_x2d_name_CONFIG_field_x3a_x3a_subfield=val

var envVarLex = lexer.MustSimple([]lexer.SimpleRule{
{Name: "Underscore", Pattern: `_`},
{Name: "String", Pattern: `[^_]+`},
})

var envVarParser = participle.MustBuild[EnvVarProperty](
participle.Lexer(envVarLex),
participle.UseLookahead(participle.MaxLookahead),
)

type EnvVarProperty struct {
ComponentType string `parser:"'SPLUNK' Underscore 'DISCOVERY' Underscore @('RECEIVERS' | 'EXTENSIONS') Underscore"`
Component EnvVarComponentID `parser:"@@"`
Key string `parser:"Underscore 'CONFIG' Underscore @(String|Underscore)+"`
Val string
}

type EnvVarComponentID struct {
Type string `parser:"@~(Underscore (?= 'CONFIG'))+"`
// _x2f_ -> '/'
Name string `parser:"(Underscore 'x2f' Underscore @(~(?= Underscore (?= 'CONFIG'))+|''))?"`
}

func NewEnvVarProperty(property, val string) (*EnvVarProperty, error) {
p, err := envVarParser.ParseString("SPLUNK_DISCOVERY", property)
if err != nil {
return nil, fmt.Errorf("invalid property env var (parsing error): %w", err)
}
p.Val = val
return p, nil
}
Loading