Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Promtail: Add pack stage #3401

Merged
merged 13 commits into from
Mar 1, 2021
1 change: 1 addition & 0 deletions docs/sources/clients/promtail/stages/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Parsing stages:
Transform stages:

- [template](template/): Use Go templates to modify extracted data.
- [pack](pack/): Packs a log line in a JSON object allowing extracted values and labels to be placed inside the log line.

Action stages:

Expand Down
84 changes: 84 additions & 0 deletions docs/sources/clients/promtail/stages/pack.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
---
title: pack
---
# `pack` stage

The `pack` stage is a transform stage which lets you embed extracted values and labels into the log line by packing the log line and labels inside a JSON object.

For example, if you wanted to remove the labels `container` and `pod` but still wanted to keep their values you could use this stage to create the following output:

```json
{
"container": "myapp",
"pod": "pod-3223f",
"_entry": "original log message"
}
```

The original message will be stored under the `_entry` key.

This stage is useful if you have some label or other metadata you would like to keep but it doesn't make a good label (isn't useful for querying or is too high cardinality)

The querying capabilities of Loki make it easy to still access this data and filter/aggregate on it at query time.

## Pack stage schema

```yaml
pack:
# Name from extracted data and/or line labels
# Labels provided here are automatically removed from the output labels.
labels:
- [<string>]

# If the resulting log line should use any existing timestamp or use time.Now() when the line was processed.
# To avoid out of order issues with Loki, when combining several log streams (separate source files) into one
# you will want to set a new timestamp on the log line, `ingest_timestamp: true`
# If you are not combining multiple source files or you know your log lines won't have interlaced timestamps
# you can set this value to false.
[ingest_timestamp: <bool> | default = true]
```

## Examples

Removing the container label and embed it into the log line (Kubernetes pods could have multiple containers)

```yaml
pack:
labels:
- container
```

This would create a log line

```json
{
"container": "myapp",
"_entry": "original log message"
}
```

Loki 2.0 has some tools to make querying packed log lines easier as well.

Display the log line as if it were never packed:

```
{cluster="us-central1", job="myjob"} | json | line_format "{{._entry}}"
```

Use the packed labels for filtering:

```
{cluster="us-central1", job="myjob"} | json | container="myapp" | line_format "{{._entry}}"
```

You can even use the `json` parser twice if your original message was json:

```
{cluster="us-central1", job="myjob"} | json | container="myapp" | line_format "{{._entry}}" | json | val_from_original_log_json="foo"
```

Or any other parser

```
{cluster="us-central1", job="myjob"} | json | container="myapp" | line_format "{{._entry}}" | logfmt | val_from_original_log_json="foo"
```
222 changes: 222 additions & 0 deletions pkg/logentry/stages/pack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
package stages

import (
"bytes"
"errors"
"fmt"
"reflect"
"sort"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
json "github.com/json-iterator/go"
"github.com/mitchellh/mapstructure"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
)

const (
entryKey = "_entry"
)

var (
reallyTrue = true
reallyFalse = false
)

type Packed struct {
Labels map[string]string `json:",inline"`
Entry string `json:"_entry"`
}

// UnmarshalJSON populates a Packed struct where every key except the _entry key is added to the Labels field
func (w *Packed) UnmarshalJSON(data []byte) error {
m := &map[string]interface{}{}
err := json.Unmarshal(data, m)
if err != nil {
return err
}
w.Labels = map[string]string{}
for k, v := range *m {
// _entry key goes to the Entry field, everything else becomes a label
if k == entryKey {
if s, ok := v.(string); ok {
w.Entry = s
} else {
return errors.New("failed to unmarshal json, all values must be of type string")
}
} else {
if s, ok := v.(string); ok {
w.Labels[k] = s
} else {
return errors.New("failed to unmarshal json, all values must be of type string")
}
}
}
return nil
}

// MarshalJSON creates a Packed struct as JSON where the Labels are flattened into the top level of the object
func (w Packed) MarshalJSON() ([]byte, error) {

// Marshal the entry to properly escape if it's json or contains quotes
b, err := json.Marshal(w.Entry)
if err != nil {
return nil, err
}

// Creating a map and marshalling from a map results in a non deterministic ordering of the resulting json object
// This is functionally ok but really annoying to humans and automated tests.
// Instead we will build the json ourselves after sorting all the labels to get a consistent output
keys := make([]string, 0, len(w.Labels))
for k := range w.Labels {
keys = append(keys, k)
}
sort.Strings(keys)

var buf bytes.Buffer

buf.WriteString("{")
for i, k := range keys {
if i != 0 {
buf.WriteString(",")
}
// marshal key
key, err := json.Marshal(k)
if err != nil {
return nil, err
}
buf.Write(key)
buf.WriteString(":")
// marshal value
val, err := json.Marshal(w.Labels[k])
if err != nil {
return nil, err
}
buf.Write(val)
}
// Only add the comma if something exists in the buffer other than "{"
if buf.Len() > 1 {
buf.WriteString(",")
}
// Add the line entry
buf.WriteString("\"" + entryKey + "\":")
buf.Write(b)

buf.WriteString("}")
return buf.Bytes(), nil
}

// PackConfig contains the configuration for a packStage
type PackConfig struct {
Labels []string `mapstrcuture:"labels"`
IngestTimestamp *bool `mapstructure:"ingest_timestamp"`
}

//nolint:unparam // Always returns nil until someone adds more validation and can remove this.
// validatePackConfig validates the PackConfig for the packStage
func validatePackConfig(cfg *PackConfig) error {
// Default the IngestTimestamp value to be true
if cfg.IngestTimestamp == nil {
cfg.IngestTimestamp = &reallyTrue
}
return nil
}

// newPackStage creates a DropStage from config
func newPackStage(logger log.Logger, config interface{}, registerer prometheus.Registerer) (Stage, error) {
cfg := &PackConfig{}
err := mapstructure.WeakDecode(config, cfg)
if err != nil {
return nil, err
}
err = validatePackConfig(cfg)
if err != nil {
return nil, err
}

return &packStage{
logger: log.With(logger, "component", "stage", "type", "pack"),
cfg: cfg,
dropCount: getDropCountMetric(registerer),
}, nil
}

// packStage applies Label matchers to determine if the include stages should be run
type packStage struct {
logger log.Logger
cfg *PackConfig
dropCount *prometheus.CounterVec
}

func (m *packStage) Run(in chan Entry) chan Entry {
out := make(chan Entry)
go func() {
defer close(out)
for e := range in {
out <- m.pack(e)
}
}()
return out
}

func (m *packStage) pack(e Entry) Entry {
lbls := e.Labels
packedLabels := make(map[string]string, len(m.cfg.Labels))
foundLabels := []model.LabelName{}

// Iterate through all the extracted map (which also includes all the labels)
for lk, lv := range e.Extracted {
for _, wl := range m.cfg.Labels {
if lk == wl {
sv, err := getString(lv)
if err != nil {
if Debug {
level.Debug(m.logger).Log("msg", fmt.Sprintf("value for key: '%s' cannot be converted to a string and cannot be packed", lk), "err", err, "type", reflect.TypeOf(lv))
}
continue
}
packedLabels[wl] = sv
foundLabels = append(foundLabels, model.LabelName(lk))
}
}
}

// Embed the extracted labels into the wrapper object
w := Packed{
Labels: packedLabels,
Entry: e.Line,
}

// Marshal to json
wl, err := json.Marshal(w)
if err != nil {
if Debug {
level.Debug(m.logger).Log("msg", "pack stage failed to marshal packed object to json, packing will be skipped", "err", err)
}
return e
}

// Remove anything found which is also a label, do this after the marshalling to not remove labels until
// we are sure the line can be successfully packed.
for _, fl := range foundLabels {
delete(lbls, fl)
}

// Replace the labels and the line with new values
e.Labels = lbls
e.Line = string(wl)

// If the config says to re-write the timestamp to the ingested time, do that now
if m.cfg.IngestTimestamp != nil && *m.cfg.IngestTimestamp {
e.Timestamp = time.Now()
}

return e
}

// Name implements Stage
func (m *packStage) Name() string {
return StageTypePack
}
Loading