Skip to content

Commit

Permalink
add nested attribute handling
Browse files Browse the repository at this point in the history
  • Loading branch information
obs-gh-alexlew committed Jul 24, 2024
1 parent dcedaac commit 1bcc3fc
Show file tree
Hide file tree
Showing 5 changed files with 668 additions and 604 deletions.
12 changes: 12 additions & 0 deletions components/processors/observek8sattributesprocessor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Observe K8s Attributes Processor
This processor operates on K8s resource logs from the `k8sobjectsreceiver` and adds additional attributes.


## Caveats
This processor currently expects the `kind` field to be set at the base level of the event. In the case of `watch` events from the `k8sobjectsreceiver`, this field is instead present inside of the `object` field. This processor currently expects this field to be lifted from inside the `object` field to the base level by a transform processor earlier in the pipeline. If that isn't set up, this processor will only calculate status for `pull` events from the `k8sobjectsreceiver`.

## Emitted Attributes

| Attribute Key | Description |
|-----------------------------------|--------------------------------------------------------------|
| `observe_transform.facets.status` | The derived Pod status based on the current Pod description. |
19 changes: 14 additions & 5 deletions components/processors/observek8sattributesprocessor/podstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

const (
PodStatusAttributeKey = "observe_transform.facets.status"
PodStatusAttributeKey = "status"
// from https://github.com/kubernetes/kubernetes/blob/abe6321296123aaba8e83978f7d17951ab1b64fd/pkg/util/node/node.go#L43
nodeUnreachablePodReason = "NodeLost"
)
Expand All @@ -18,14 +18,23 @@ type OTELKubernetesEvent struct {
Object v1.Pod `json:"object"`
}

var PodStatusAction = K8sEventProcessorAction{
Key: PodStatusAttributeKey,
ValueFn: getStatus,
FilterFn: filterFn,
}

func filterFn(event K8sEvent) bool {
return event.Kind == "Pod"
}

func getStatus(objLog plog.LogRecord) string {
var event OTELKubernetesEvent
err := json.Unmarshal([]byte(objLog.Body().AsString()), &event)
var p v1.Pod
err := json.Unmarshal([]byte(objLog.Body().AsString()), &p)
if err != nil {
fmt.Printf("Error %v", err)
return "Unknown"
}
p := event.Object
// based on https://github.com/kubernetes/kubernetes/blob/0d3b859af81e6a5f869a7766c8d45afd1c600b04/pkg/printers/internalversion/printers.go#L901
reason := string(p.Status.Phase)
if p.Status.Reason != "" {
reason = p.Status.Reason
Expand Down
90 changes: 53 additions & 37 deletions components/processors/observek8sattributesprocessor/processor.go
Original file line number Diff line number Diff line change
@@ -1,68 +1,84 @@
package observek8sattributesprocessor

import (
// "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/attraction"
"context"
"fmt"
"encoding/json"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
)

type k8sEventsProcessor struct {
cfg component.Config
logger *zap.Logger
type K8sEvent struct {
Kind string `json:"kind,omitempty"`
ApiVersion string `json:"apiVersion,omitempty"`
}

// func newK8sEventAttributesProcessor(logger *zap.Logger, status string) *attraction.AttrProc {
// actions := []attraction.Action{
// {
// Key: "observe_transform.facets.status",
// AttributeValue: status,
// Action: attraction.INSERT,
// },
// }
// return &attraction.AttrProc{actions: actions}
// }
type K8sEventsProcessor struct {
cfg component.Config
logger *zap.Logger
actions []K8sEventProcessorAction
}

type K8sEventProcessorAction struct {
Key string
ValueFn func(plog.LogRecord) string
FilterFn func(K8sEvent) bool
}

func newK8sEventsProcessor(logger *zap.Logger, cfg component.Config) *k8sEventsProcessor {
return &k8sEventsProcessor{
func newK8sEventsProcessor(logger *zap.Logger, cfg component.Config) *K8sEventsProcessor {
return &K8sEventsProcessor{
cfg: cfg,
logger: logger,
actions: []K8sEventProcessorAction{
PodStatusAction,
},
}
}

func (kep *k8sEventsProcessor) Start(_ context.Context, _ component.Host) error {
func (kep *K8sEventsProcessor) Start(_ context.Context, _ component.Host) error {
kep.logger.Info("observek8sattributes processor has started.")
return nil
}

func (kep *k8sEventsProcessor) Shutdown(_ context.Context) error {
func (kep *K8sEventsProcessor) Shutdown(_ context.Context) error {
kep.logger.Info("observek8sattributes processor shutting down.")
return nil
}

func (kep *k8sEventsProcessor) processLogs(_ context.Context, logs plog.Logs) (plog.Logs, error) {
rl := logs.ResourceLogs()
for i := 0; i < rl.Len(); i++ {
sl := rl.At(i).ScopeLogs()
for j := 0; j < sl.Len(); j++ {
lr := sl.At(j).LogRecords()
for k := 0; k < lr.Len(); k++ {
lr := lr.At(k)
status := getStatus(lr)
if status != "" {
logLine := fmt.Sprintf("calculated status: %s for event: %s", status, lr.Body().AsString())
kep.logger.Info(logLine)
func (kep *K8sEventsProcessor) processLogs(_ context.Context, logs plog.Logs) (plog.Logs, error) {
rls := logs.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
sls := rls.At(i).ScopeLogs()
for j := 0; j < sls.Len(); j++ {
lrs := sls.At(j).LogRecords()
for k := 0; k < lrs.Len(); k++ {
lr := lrs.At(k)
var event K8sEvent
err := json.Unmarshal([]byte(lr.Body().AsString()), &event)
if err != nil {
kep.logger.Error("failed to unmarshal event", zap.Error(err))
continue
}
currObserveFacets, exists := lr.Attributes().Get("observe_transform")
if exists {
facets := currObserveFacets.Map()

facets.PutStr(PodStatusAttributeKey, getStatus(lr))
for _, action := range kep.actions {
if action.FilterFn != nil && !action.FilterFn(event) {
continue
}
transform, exists := lr.Attributes().Get("observe_transform")
if exists {
facets, exists := transform.Map().Get("facets")
if exists {
facets.Map().PutStr(action.Key, action.ValueFn(lr))
} else {
facets := transform.Map().PutEmptyMap("facets")
facets.PutStr(action.Key, action.ValueFn(lr))
}
} else {
transform := lr.Attributes().PutEmptyMap("observe_transform")
facets := transform.PutEmptyMap("facets")
facets.PutStr(action.Key, action.ValueFn(lr))
}
}
lr.Attributes().PutStr(PodStatusAttributeKey, getStatus(lr))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package observek8sattributesprocessor

import (
"context"
"io/ioutil"
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
)

type observeTransformFacets struct {
facets map[string]string
}

type logWithResource struct {
logName string
resourceAttributes map[string]any
Expand All @@ -18,7 +24,7 @@ type logWithResource struct {
severityNumber plog.SeverityNumber
}

type k8sEventProcessorTest struct {
type K8sEventProcessorTest struct {
name string
inLogs plog.Logs
outAttributes []map[string]interface{}
Expand All @@ -27,20 +33,44 @@ type k8sEventProcessorTest struct {
var (
podStatusInLogs = []logWithResource{
{
logName: "test1",
logName: "noObserveTransformAttributes",
testBodyFilepath: "./testdata/podObjectEvent.json",
},
{
logName: "existingObserveTransformAttributes",
testBodyFilepath: "./testdata/podObjectEvent.json",
recordAttributes: map[string]any{
"observe_transform": map[string]any{
"facets": map[string]any{
"other_key": "test",
},
},
},
},
}
podStatusOutAttributes = []map[string]interface{}{
{
"observe_transform.facets.status": "Terminating",
"name": "test1",
"observe_transform": map[string]interface{}{
"facets": map[string]interface{}{
"status": "Terminating",
},
},
"name": "noObserveTransformAttributes",
},
{
"observe_transform": map[string]interface{}{
"facets": map[string]interface{}{
"status": "Terminating",
"other_key": "test",
},
},
"name": "existingObserveTransformAttributes",
},
}

tests = []k8sEventProcessorTest{
tests = []K8sEventProcessorTest{
{
name: "test1",
name: "noObserveTransformAttributes",
inLogs: testResourceLogs(podStatusInLogs),
outAttributes: podStatusOutAttributes,
},
Expand All @@ -51,12 +81,12 @@ func TestK8sEventsProcessor(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
kep := newK8sEventsProcessor(zap.NewNop(), &Config{})
logs, err := kep.processLogs(test.inLogs)
logs, err := kep.processLogs(context.Background(), test.inLogs)
for i := 0; i < logs.ResourceLogs().Len(); i++ {
sl := logs.ResourceLogs().At(i).ScopeLogs()
for j := 0; j < sl.Len(); j++ {
lr := sl.At(j).LogRecords()
require.Equal(t, test.outAttributes[j], lr.At(0).Attributes().AsRaw())
require.Equal(t, test.outAttributes[i], lr.At(0).Attributes().AsRaw())
}
}

Expand Down
Loading

0 comments on commit 1bcc3fc

Please sign in to comment.