Skip to content

Commit

Permalink
Generalize config converter
Browse files Browse the repository at this point in the history
  • Loading branch information
pmcollins committed Sep 22, 2021
1 parent c73c660 commit 7ad09f0
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 57 deletions.
35 changes: 15 additions & 20 deletions cmd/otelcol/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"go.uber.org/zap"

"github.com/signalfx/splunk-otel-collector/internal/components"
"github.com/signalfx/splunk-otel-collector/internal/configconverter"
"github.com/signalfx/splunk-otel-collector/internal/configprovider"
"github.com/signalfx/splunk-otel-collector/internal/configsources"
"github.com/signalfx/splunk-otel-collector/internal/version"
Expand Down Expand Up @@ -67,7 +68,7 @@ func main() {

// Allow dumping configuration locally by default
// Used by support bundle script
os.Setenv(configServerEnabledEnvVar, "true")
_ = os.Setenv(configServerEnabledEnvVar, "true")

factories, err := components.Get()
if err != nil {
Expand All @@ -79,30 +80,24 @@ func main() {
Version: version.Version,
}

cfgSourcePP := configprovider.NewConfigSourceParserProvider(
newBaseParserProvider(),
zap.NewNop(), // The service logger is not available yet, setting it to NoP.
info,
configsources.Get()...,
)
configConverterPP := configconverter.ParserProvider(cfgSourcePP, configconverter.RemoveBallastKey)
serviceParams := service.CollectorSettings{
BuildInfo: info,
Factories: factories,
ParserProvider: memLimitRemoverParserProvider(info),
ParserProvider: configConverterPP,
}

if err := run(serviceParams); err != nil {
log.Fatal(err)
}
}

// memLimitRemoverParserProvider removes a ballast_size_mib key from a memory
// limiter processor, if it exists. Support for this key will be (or has been)
// removed and will cause the Collector to not start.
func memLimitRemoverParserProvider(info component.BuildInfo) parserprovider.ParserProvider {
cfgSourcePP := configprovider.NewConfigSourceParserProvider(
newBaseParserProvider(),
zap.NewNop(), // The service logger is not available yet, setting it to NoP.
info,
configsources.Get()...,
)
return configprovider.NewMemLimitRemoverParserProvider(cfgSourcePP)
}

// Check whether a string exists in an array of CLI arguments
// Support key/value with and without an equal sign
func contains(arr []string, str string) bool {
Expand Down Expand Up @@ -263,7 +258,7 @@ func setMemoryBallast(memTotalSizeMiB int) int {
if os.Getenv(ballastEnvVarName) != "" {
log.Fatalf("Both %v and '--mem-ballast-size-mib' were specified, but only one is allowed", ballastEnvVarName)
}
os.Setenv(ballastEnvVarName, ballastSizeFlag)
_ = os.Setenv(ballastEnvVarName, ballastSizeFlag)
}

ballastSize := memTotalSizeMiB * defaultMemoryBallastPercentage / 100
Expand All @@ -275,7 +270,7 @@ func setMemoryBallast(memTotalSizeMiB int) int {
}
}

os.Setenv(ballastEnvVarName, strconv.Itoa(ballastSize))
_ = os.Setenv(ballastEnvVarName, strconv.Itoa(ballastSize))
log.Printf("Set ballast to %d MiB", ballastSize)
return ballastSize
}
Expand All @@ -289,7 +284,7 @@ func setMemoryLimit(memTotalSizeMiB int) int {
memLimit = envVarAsInt(memLimitMiBEnvVarName)
}

os.Setenv(memLimitMiBEnvVarName, strconv.Itoa(memLimit))
_ = os.Setenv(memLimitMiBEnvVarName, strconv.Itoa(memLimit))
log.Printf("Set memory limit to %d MiB", memLimit)
return memLimit
}
Expand All @@ -307,15 +302,15 @@ func setDefaultEnvVars() {
for _, v := range testArgs {
_, ok := os.LookupEnv(v[0])
if !ok {
os.Setenv(v[0], v[1])
_ = os.Setenv(v[0], v[1])
}
}
}
token, tokenOk := os.LookupEnv("SPLUNK_ACCESS_TOKEN")
if tokenOk {
_, ok := os.LookupEnv("SPLUNK_HEC_TOKEN")
if !ok {
os.Setenv("SPLUNK_HEC_TOKEN", token)
_ = os.Setenv("SPLUNK_HEC_TOKEN", token)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package configprovider
package configconverter

import (
"fmt"
"log"
"regexp"

"go.opentelemetry.io/collector/config/configparser"
"go.opentelemetry.io/collector/service/parserprovider"
)

var ballastKeyRegexp *regexp.Regexp
Expand All @@ -30,36 +28,18 @@ func init() {
ballastKeyRegexp, _ = regexp.Compile(expr)
}

// memLimitBallastRemoverParserProvider implements ParserProvider, wraps a
// ParserProvider, and removes any ballast_size_mib key from the memory_limiter
// processor, if one exists, from the wrapped config. This is to ensure that the
// Collector will still start when support for this key gets removed.
type memLimitBallastRemoverParserProvider struct {
pp parserprovider.ParserProvider
}

var _ parserprovider.ParserProvider = (*memLimitBallastRemoverParserProvider)(nil)

func NewMemLimitRemoverParserProvider(pp parserprovider.ParserProvider) parserprovider.ParserProvider {
return &memLimitBallastRemoverParserProvider{pp: pp}
}

func (mpp memLimitBallastRemoverParserProvider) Get() (*configparser.ConfigMap, error) {
cfgMap, err := mpp.pp.Get()
if err != nil {
return nil, fmt.Errorf("memLimitBallastRemoverParserProvider.Get(): %w", err)
}
// RemoveBallastKey is a CfgMapFunc that removes a ballast_size_mib on a
// memory_limiter processor config if it exists. This config key will go away at
// some point (or already has) at which point its presence in a config will
// prevent the Collector from starting.
func RemoveBallastKey(cfgMap *configparser.ConfigMap) *configparser.ConfigMap {
out := configparser.NewParser()
for _, k := range cfgMap.AllKeys() {
if isMemLimitBallastKey(k) {
if ballastKeyRegexp.MatchString(k) {
log.Println("Deprecated memory_limiter processor `ballast_size_mib` key found. Removing from config.")
} else {
out.Set(k, cfgMap.Get(k))
}
}
return out, nil
}

func isMemLimitBallastKey(k string) bool {
return ballastKeyRegexp.MatchString(k)
return out
}
55 changes: 55 additions & 0 deletions internal/configconverter/parser_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package configconverter

import (
"fmt"

"go.opentelemetry.io/collector/config/configparser"
"go.opentelemetry.io/collector/service/parserprovider"
)

// converterProvider wraps a ParserProvider and accepts a list of functions that
// convert ConfigMaps. The idea is for this type to conform to the open-closed
// principle.
type converterProvider struct {
wrapped parserprovider.ParserProvider
cfgMapFuncs []CfgMapFunc
}

type CfgMapFunc func(*configparser.ConfigMap) *configparser.ConfigMap

var _ parserprovider.ParserProvider = (*converterProvider)(nil)

func ParserProvider(wrapped parserprovider.ParserProvider, funcs ...CfgMapFunc) parserprovider.ParserProvider {
return &converterProvider{wrapped: wrapped, cfgMapFuncs: funcs}
}

func (p converterProvider) Get() (*configparser.ConfigMap, error) {
cfgMap, err := p.wrapped.Get()
if err != nil {
return nil, fmt.Errorf("converterProvider.Get(): %w", err)
}

for _, cfgMapFunc := range p.cfgMapFuncs {
cfgMap = cfgMapFunc(cfgMap)
}

out := configparser.NewParser()
for _, k := range cfgMap.AllKeys() {
out.Set(k, cfgMap.Get(k))
}
return out, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package configprovider
package configconverter

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/config/configparser"
)

func TestIsMemLimitBallastKey(t *testing.T) {
assert.False(t, isMemLimitBallastKey("processors::ballast_size_mib"))
assert.True(t, isMemLimitBallastKey("processors::memory_limiter::ballast_size_mib"))
assert.True(t, isMemLimitBallastKey("processors::memory_limiter/foo::ballast_size_mib"))
}

func TestMemLimitBallastRemoverPP(t *testing.T) {
tests := []struct {
name string
Expand All @@ -46,8 +41,9 @@ func TestMemLimitBallastRemoverPP(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
pp := &memLimitBallastRemoverParserProvider{
&fileParserProvider{FileName: test.fname},
pp := &converterProvider{
wrapped: &fileParserProvider{fileName: test.fname},
cfgMapFuncs: []CfgMapFunc{RemoveBallastKey},
}
cfgMap, err := pp.Get()
require.NoError(t, err)
Expand All @@ -56,3 +52,11 @@ func TestMemLimitBallastRemoverPP(t *testing.T) {
})
}
}

type fileParserProvider struct {
fileName string
}

func (fpp fileParserProvider) Get() (*configparser.ConfigMap, error) {
return configparser.NewParserFromFile(fpp.fileName)
}

0 comments on commit 7ad09f0

Please sign in to comment.