Skip to content

Commit

Permalink
Improve OpenTelemetry Collector JSON Marshalling
Browse files Browse the repository at this point in the history
Signed-off-by: Israel Blancas <iblancas@redhat.com>
  • Loading branch information
iblancasa committed Sep 12, 2024
1 parent e203cbc commit cb6600e
Show file tree
Hide file tree
Showing 20 changed files with 532 additions and 33 deletions.
16 changes: 16 additions & 0 deletions .chloggen/3281-marshal-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. collector, target allocator, auto-instrumentation, opamp, github action)
component: operator

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix Configuration marshalling for v1beta1 OpenTelemetryCollector instances.

# One or more tracking issues related to the change
issues: [3281]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
34 changes: 31 additions & 3 deletions apis/v1beta1/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,24 @@ func (c *AnyConfig) UnmarshalJSON(b []byte) error {
}

// MarshalJSON specifies how to convert this object into JSON.
func (c *AnyConfig) MarshalJSON() ([]byte, error) {
if c == nil {
func (c AnyConfig) MarshalJSON() ([]byte, error) {
if c.Object == nil {
return []byte("{}"), nil
}
return json.Marshal(c.Object)

nonNilMap := make(map[string]interface{})

for k, v := range c.Object {
if v == nil {
nonNilMap[k] = nil
} else if emptyMap, ok := v.(map[string]interface{}); ok && len(emptyMap) == 0 {
nonNilMap[k] = emptyMap
} else {
nonNilMap[k] = v
}
}

return json.Marshal(nonNilMap)
}

// Pipeline is a struct of component type to a list of component IDs.
Expand Down Expand Up @@ -198,6 +211,21 @@ func (c *Config) Yaml() (string, error) {
return buf.String(), nil
}

func (c Config) MarshalJSON() ([]byte, error) {
type Alias Config
receiversJSON, err := json.Marshal(c.Receivers)
if err != nil {
return nil, err
}
return json.Marshal(&struct {
*Alias
Receivers json.RawMessage `json:"receivers,omitempty"`
}{
Alias: (*Alias)(&c),
Receivers: receiversJSON,
})
}

// Returns null objects in the config.
func (c *Config) nullObjects() []string {
var nullKeys []string
Expand Down
276 changes: 276 additions & 0 deletions apis/v1beta1/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@
package v1beta1

import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"path"
"path/filepath"
"strings"
"testing"

Expand All @@ -26,11 +30,216 @@ import (
"github.com/stretchr/testify/require"
go_yaml "gopkg.in/yaml.v3"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/yaml"
)

func TestCreateConfigInKubernetesEmptyValues(t *testing.T) {
testScheme := scheme.Scheme
err := AddToScheme(testScheme)
require.NoError(t, err)

testEnv := &envtest.Environment{
CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases")},
}

cfg, err := testEnv.Start()
require.NoError(t, err)
defer func() {
errStop := testEnv.Stop()
require.NoError(t, errStop)
}()

k8sClient, err := client.New(cfg, client.Options{Scheme: testScheme})
if err != nil {
fmt.Printf("failed to setup a Kubernetes client: %v", err)
os.Exit(1)
}

newCollector := &OpenTelemetryCollector{
ObjectMeta: metav1.ObjectMeta{
Name: "my-collector",
Namespace: "default",
},
Spec: OpenTelemetryCollectorSpec{
UpgradeStrategy: UpgradeStrategyNone,
Config: Config{
Exporters: AnyConfig{
Object: map[string]interface{}{
"logging": map[string]interface{}{},
},
},
Receivers: AnyConfig{
Object: map[string]interface{}{
"otlp": map[string]interface{}{
"protocols": map[string]interface{}{
"grpc": map[string]interface{}{},
"http": map[string]interface{}{},
},
},
},
},
Service: Service{
Pipelines: map[string]*Pipeline{
"traces": {
Receivers: []string{"otlp"},
Exporters: []string{"logging"},
},
},
},
},
},
}

err = k8sClient.Create(context.TODO(), newCollector)
if err != nil {
log.Fatal(err)
}

// Fetch the created OpenTelemetryCollector
otelCollector := &OpenTelemetryCollector{}
err = k8sClient.Get(context.TODO(), types.NamespacedName{
Name: "my-collector",
Namespace: "default",
}, otelCollector)

if err != nil {
log.Fatal(err)
}

jsonData, err := json.Marshal(otelCollector.Spec)
if err != nil {
log.Fatal(err)
}

require.Contains(t, string(jsonData), "{\"grpc\":{},\"http\":{}}")

unmarshalledCollector := &OpenTelemetryCollector{}
err = json.Unmarshal(jsonData, &unmarshalledCollector.Spec)
require.NoError(t, err)

require.NotNil(t, unmarshalledCollector.Spec.Config.Receivers.Object["otlp"])

otlpReceiver, ok := unmarshalledCollector.Spec.Config.Receivers.Object["otlp"].(map[string]interface{})
require.True(t, ok, "otlp receiver should be a map")
protocols, ok := otlpReceiver["protocols"].(map[string]interface{})
require.True(t, ok, "protocols should be a map")

grpc, ok := protocols["grpc"]
require.True(t, ok, "grpc protocol should exist")
require.NotNil(t, grpc, "grpc protocol should be nil")

http, ok := protocols["http"]
require.True(t, ok, "http protocol should exist")
require.NotNil(t, http, "http protocol should be nil")
}

func TestCreateConfigInKubernetesNullValues(t *testing.T) {
testScheme := scheme.Scheme
err := AddToScheme(testScheme)
require.NoError(t, err)

testEnv := &envtest.Environment{
CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases")},
}

cfg, err := testEnv.Start()
require.NoError(t, err)
defer func() {
errStop := testEnv.Stop()
require.NoError(t, errStop)
}()

k8sClient, err := client.New(cfg, client.Options{Scheme: testScheme})
if err != nil {
fmt.Printf("failed to setup a Kubernetes client: %v", err)
os.Exit(1)
}

newCollector := &OpenTelemetryCollector{
ObjectMeta: metav1.ObjectMeta{
Name: "my-collector",
Namespace: "default",
},
Spec: OpenTelemetryCollectorSpec{
UpgradeStrategy: UpgradeStrategyNone,
Config: Config{
Exporters: AnyConfig{
Object: map[string]interface{}{
"logging": map[string]interface{}{},
},
},
Receivers: AnyConfig{
Object: map[string]interface{}{
"otlp": map[string]interface{}{
"protocols": map[string]interface{}{
"grpc": nil,
"http": nil,
},
},
},
},
Service: Service{
Pipelines: map[string]*Pipeline{
"traces": {
Receivers: []string{"otlp"},
Exporters: []string{"logging"},
},
},
},
},
},
}

err = k8sClient.Create(context.TODO(), newCollector)
if err != nil {
log.Fatal(err)
}

// Fetch the created OpenTelemetryCollector
otelCollector := &OpenTelemetryCollector{}
err = k8sClient.Get(context.TODO(), types.NamespacedName{
Name: "my-collector",
Namespace: "default",
}, otelCollector)

if err != nil {
log.Fatal(err)
}

jsonData, err := json.Marshal(otelCollector.Spec)
if err != nil {
log.Fatal(err)
}

require.Contains(t, string(jsonData), "{\"grpc\":null,\"http\":null")

unmarshalledCollector := &OpenTelemetryCollector{}
err = json.Unmarshal(jsonData, &unmarshalledCollector.Spec)
require.NoError(t, err)

require.NotNil(t, unmarshalledCollector.Spec.Config.Receivers.Object["otlp"])

otlpReceiver, ok := unmarshalledCollector.Spec.Config.Receivers.Object["otlp"].(map[string]interface{})
require.True(t, ok, "otlp receiver should be a map")
protocols, ok := otlpReceiver["protocols"].(map[string]interface{})
require.True(t, ok, "protocols should be a map")

grpc, ok := protocols["grpc"]
require.True(t, ok, "grpc protocol should exist")
require.Nil(t, grpc, "grpc protocol should be nil")

http, ok := protocols["http"]
require.True(t, ok, "http protocol should exist")
require.Nil(t, http, "http protocol should be nil")
}

func TestConfigFiles(t *testing.T) {
files, err := os.ReadDir("./testdata")
require.NoError(t, err)
Expand Down Expand Up @@ -546,3 +755,70 @@ func TestConfig_GetExporterPorts(t *testing.T) {
})
}
}

func TestAnyConfig_MarshalJSON(t *testing.T) {
tests := []struct {
name string
config AnyConfig
want string
wantErr bool
}{
{
name: "nil Object",
config: AnyConfig{},
want: "{}",
},
{
name: "empty Object",
config: AnyConfig{
Object: map[string]interface{}{},
},
want: "{}",
},
{
name: "Object with nil value",
config: AnyConfig{
Object: map[string]interface{}{
"key": nil,
},
},
want: `{"key":null}`,
},
{
name: "Object with empty map value",
config: AnyConfig{
Object: map[string]interface{}{
"key": map[string]interface{}{},
},
},
want: `{"key":{}}`,
},
{
name: "Object with non-empty values",
config: AnyConfig{
Object: map[string]interface{}{
"string": "value",
"number": 42,
"bool": true,
"map": map[string]interface{}{
"nested": "data",
},
},
},
want: `{"bool":true,"map":{"nested":"data"},"number":42,"string":"value"}`,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := tt.config.MarshalJSON()
if (err != nil) != tt.wantErr {
t.Errorf("AnyConfig.MarshalJSON() error = %v, wantErr %v", err, tt.wantErr)
return
}
if string(got) != tt.want {
t.Errorf("AnyConfig.MarshalJSON() = %v, want %v", string(got), tt.want)
}
})
}
}
Loading

0 comments on commit cb6600e

Please sign in to comment.