Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Support for Jaeger tags with binary value
Browse files Browse the repository at this point in the history
Since we are using the OTEL translator, when ingesting Jaeger traces all
the tags with `VType=ValueType_BINARY` are being encoded into base64 and
stored as strings. The context of the type of value is lost after this
translation occurs, meaning that, when we return those tags in a Jaeger
query we return the base64 string as `VType=ValueType_STRING`.

To preserve the type of the value, the prefix
`data:application/octet-stream; base64,` will be added to the base64
string before storing in the DB. When retrieving a trace we look for
tags that are string and have this prefix, we remove it and return a
`ValueType_BINARY` with the result of decoding the base64 string.

The prefix was choosen from the `The "data" URL scheme` RFC
https://www.rfc-editor.org/rfc/rfc2397#section-2 .

Jaeger allows these type of tags for Spans, Process and Logs.
  • Loading branch information
alejandrodnm committed Sep 27, 2022
1 parent 024c34b commit 3c2a61e
Show file tree
Hide file tree
Showing 7 changed files with 374 additions and 33 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ We use the following categories for changes:
### Changed
- Log throughput in the same line for samples, spans and metric metadata [#1643]
- The `chunks_created` metrics was removed. [#1634]
- When querying for Jaeger tags with binary values the binary data will be
returned instead of the base64 representation of the string [#1649].

### Fixed
- Do not collect telemetry if `timescaledb.telemetry_level=off` [#1612]
Expand Down
136 changes: 136 additions & 0 deletions pkg/jaeger/store/binary_tags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package store

import (
"encoding/base64"
"fmt"
"strings"

"github.com/jaegertracing/jaeger/model"
)

const MEDIA_TYPE_ENCODED_BINARY = "data:application/octet-stream; base64,"
const MEDIA_TYPE_ENCODED_BINARY_LEN = len(MEDIA_TYPE_ENCODED_BINARY)

// spanBinaryTags is the set of keys from Jaeger model.KeyValue items used in
// model.Span.Tags, model.Process.Tags and model.Log.Fields that have
// `VType=ValueType_BINARY` and had to be encoded to be store as strings.
type spanBinaryTags struct {
spanID int64
spanTags map[string]struct{}
processTags map[string]struct{}
logsTags map[int]map[string]struct{}
}

func (t *spanBinaryTags) isEmpty() bool {
return len(t.spanTags) == 0 && len(t.logsTags) == 0 && len(t.processTags) == 0
}

func isEncodedBinaryValue(v interface{}) bool {
if val, isStr := v.(string); isStr {
if strings.HasPrefix(val, MEDIA_TYPE_ENCODED_BINARY) {
return true
}
}
return false
}

// decodeSpanBinaryTags decodes the tags with binary values that are present in
// the binaryTags sets for the span, process and logs.
//
// The spanBinaryTags struct is necesary in order to not go through all the
// tags for each Span that the query is returning. We are taking advantage on
// the fact that when translating from OTEL to Jaeger we had to visit all the
// tags and were able to construct this set.
//
// When writing binary tags we encode the slice of bytes into a base64 string
// representation and add the prefix `__ValueType_BINARY__`. Decoding implies
// removing the prefix and decoding the base64 string.
func decodeSpanBinaryTags(span *model.Span, binaryTags *spanBinaryTags) {
if len(binaryTags.spanTags) != 0 {
decodeBinaryTags(span.Tags, binaryTags.spanTags)
}

if len(binaryTags.processTags) != 0 {
decodeBinaryTags(span.Process.Tags, binaryTags.processTags)
}

if len(binaryTags.logsTags) != 0 {
for i, logsTags := range binaryTags.logsTags {
decodeBinaryTags(span.Logs[i].Fields, logsTags)
}
}
}

func decodeBinaryTags(actualTags []model.KeyValue, binaryTagsToDecode map[string]struct{}) {
for i, tag := range actualTags {
_, ok := binaryTagsToDecode[tag.Key]
if !ok {
continue
}

if tag.GetVType() != model.ValueType_STRING {
continue
}

encoded := tag.VStr
if !strings.HasPrefix(encoded, MEDIA_TYPE_ENCODED_BINARY) {
continue
}

vBin, err := decodeBinaryTagValue(encoded)

// If we can't decode it means that we didn't encode it in the
// first place, so we should keep it as is.
if err != nil {
continue
}
actualTags[i] = model.KeyValue{
Key: tag.Key,
VType: model.ValueType_BINARY,
VBinary: vBin,
}
}
}

func decodeBinaryTagValue(encoded string) ([]byte, error) {
v := encoded[MEDIA_TYPE_ENCODED_BINARY_LEN:]
return base64.StdEncoding.DecodeString(v)
}

func encodeBinaryTagToStr(tag model.KeyValue) model.KeyValue {
value := fmt.Sprintf("%s%s", MEDIA_TYPE_ENCODED_BINARY, base64.StdEncoding.EncodeToString(tag.GetVBinary()))
return model.KeyValue{
Key: tag.Key,
VType: model.ValueType_STRING,
VStr: value,
}
}

func encodeBinaryTags(span *model.Span) {
for i, tag := range span.Tags {
if !isBinaryTag(tag) {
continue
}
span.Tags[i] = encodeBinaryTagToStr(tag)
}

for _, log := range span.Logs {
for i, tag := range log.Fields {
if !isBinaryTag(tag) {
continue
}
log.Fields[i] = encodeBinaryTagToStr(tag)
}
}

for i, tag := range span.Process.Tags {
if !isBinaryTag(tag) {
continue
}
span.Process.Tags[i] = encodeBinaryTagToStr(tag)
}
}

func isBinaryTag(tag model.KeyValue) bool {
return tag.GetVType() == model.ValueType_BINARY
}
174 changes: 174 additions & 0 deletions pkg/jaeger/store/binary_tags_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package store

import (
"encoding/base64"
"fmt"
"testing"

"github.com/jaegertracing/jaeger/model"
"github.com/stretchr/testify/assert"
)

var binaryValue1 = []byte{66, 105, 110, 97, 114, 121}
var binaryValue2 = []byte{66, 105, 110, 97, 114, 121}

func keyValuesFixture(prefix string) []model.KeyValue {
return []model.KeyValue{
{
Key: fmt.Sprintf("%s-binary1-data", prefix),
VBinary: binaryValue1,
VType: model.ValueType_BINARY,
},
{
Key: fmt.Sprintf("%s-string-data", prefix),
VStr: "My string",
VType: model.ValueType_STRING,
},
{
Key: fmt.Sprintf("%s-int64-data", prefix),
VInt64: 42,
VType: model.ValueType_INT64,
},
{
Key: fmt.Sprintf("%s-binary2-data", prefix),
VBinary: binaryValue2,
VType: model.ValueType_BINARY,
},
{
Key: fmt.Sprintf("%s-float64-data", prefix),
VFloat64: 42.42,
VType: model.ValueType_FLOAT64,
},
{
Key: fmt.Sprintf("%s-bool-data", prefix),
VBool: true,
VType: model.ValueType_BOOL,
},
}
}

func getExpectedStrV(key string, binaryValue []byte) model.KeyValue {
return model.KeyValue{
Key: key,
VStr: fmt.Sprintf("%s%s", MEDIA_TYPE_ENCODED_BINARY, base64.StdEncoding.EncodeToString(binaryValue)),
VType: model.ValueType_STRING,
}
}

func assertEncoded(t *testing.T, prefix string, tags []model.KeyValue) {
original := keyValuesFixture(prefix)
assert.Equal(t, len(original), len(tags))

// Binary values are at position 0 and 3
key1 := fmt.Sprintf("%s-binary1-data", prefix)
assert.Equal(t, getExpectedStrV(key1, binaryValue1), tags[0])
assert.NotEqual(t, original[0], tags[0])

key2 := fmt.Sprintf("%s-binary2-data", prefix)
assert.Equal(t, getExpectedStrV(key2, binaryValue2), tags[3])
assert.NotEqual(t, original[3], tags[3])

notModified := []int{1, 2, 4, 5}
for _, i := range notModified {
assert.Equal(t, original[i], tags[i])
}
}

func TestEncodeBinaryTag(t *testing.T) {
logs := []model.Log{
{
Fields: keyValuesFixture("log1"),
},
{
Fields: keyValuesFixture("log2"),
},
}
process := model.Process{
Tags: keyValuesFixture("process"),
}
span := model.Span{
Tags: keyValuesFixture("span"),
Process: &process,
Logs: logs,
}

encodeBinaryTags(&span)

assertEncoded(t, "span", span.Tags)
assertEncoded(t, "process", span.Process.Tags)
assertEncoded(t, "log1", span.Logs[0].Fields)
assertEncoded(t, "log2", span.Logs[1].Fields)
}

func keyValuesEncodedFixture() []model.KeyValue {
return []model.KeyValue{
{
Key: "binary-data",
VStr: fmt.Sprintf("%s%s", MEDIA_TYPE_ENCODED_BINARY, base64.StdEncoding.EncodeToString(binaryValue1)),
VType: model.ValueType_STRING,
},
{
Key: "string-data",
VStr: "My string",
VType: model.ValueType_STRING,
},
{
Key: "int64-data",
VInt64: 42,
VType: model.ValueType_INT64,
},
{
Key: "binary2-data",
VStr: fmt.Sprintf("%s%s", MEDIA_TYPE_ENCODED_BINARY, base64.StdEncoding.EncodeToString(binaryValue2)),
VType: model.ValueType_STRING,
},
{
Key: "float64-data",
VFloat64: 42.42,
VType: model.ValueType_FLOAT64,
},
{
Key: "bool-data",
VBool: true,
VType: model.ValueType_BOOL,
},
{
Key: "no-prefix",
VStr: base64.StdEncoding.EncodeToString(binaryValue1),
VType: model.ValueType_STRING,
},
{
Key: "no-binary-with-prefix",
VStr: MEDIA_TYPE_ENCODED_BINARY + "a normal string tag",
VType: model.ValueType_STRING,
},
}
}

func TestDecodeBinaryTags(t *testing.T) {
fixture := keyValuesEncodedFixture()
fixtureClone := keyValuesEncodedFixture()
// binary2-data is ignored on purpose
toDecode := map[string]struct{}{
"binary-data": {},
// The following won't be modified because they don't
// match the condition to be decoded.
"string-data": {},
"int64-data": {},
"float64-data": {},
"bool-data": {},
"no-prefix": {},
"no-binary-with-prefix": {},
}
decodeBinaryTags(fixture, toDecode)
assert.Equal(t, len(fixtureClone), len(fixture))

expectedBinaryV := model.KeyValue{
Key: "binary-data",
VBinary: binaryValue1,
VType: model.ValueType_BINARY,
}
// Only the "binary-data" key was decoded
assert.Equal(t, expectedBinaryV, fixture[0])
assert.Equal(t, fixtureClone[1:], fixture[1:])
}
15 changes: 12 additions & 3 deletions pkg/jaeger/store/find_traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,19 @@ func findTraces(ctx context.Context, builder *Builder, conn pgxconn.PgxConn, q *

func scanTraces(rows pgxconn.PgxRows) ([]*model.Trace, error) {
traces := ptrace.NewTraces()
spansBinaryTags := map[int64]*spanBinaryTags{}
for rows.Next() {
if rows.Err() != nil {
return nil, fmt.Errorf("trace row iterator: %w", rows.Err())
}
if err := ScanRow(rows, &traces); err != nil {
var err error
spanBinaryTags, err := ScanRow(rows, &traces)
if err != nil {
return nil, fmt.Errorf("error scanning trace: %w", err)
}
if !spanBinaryTags.isEmpty() {
spansBinaryTags[spanBinaryTags.spanID] = spanBinaryTags
}

}
if rows.Err() != nil {
Expand All @@ -54,10 +60,10 @@ func scanTraces(rows pgxconn.PgxRows) ([]*model.Trace, error) {
return nil, fmt.Errorf("internal-traces-to-jaeger-proto: %w", err)
}

return batchSliceToTraceSlice(batch), nil
return batchSliceToTraceSlice(batch, spansBinaryTags), nil
}

func batchSliceToTraceSlice(bSlice []*model.Batch) []*model.Trace {
func batchSliceToTraceSlice(bSlice []*model.Batch, spansWithBinaryTag map[int64]*spanBinaryTags) []*model.Trace {
// Mostly Copied from Jaeger's grpc_client.go
// https://github.com/jaegertracing/jaeger/blob/067dff713ab635ade66315bbd05518d7b28f40c6/plugin/storage/grpc/shared/grpc_client.go#L179
traces := make([]*model.Trace, 0)
Expand All @@ -74,6 +80,9 @@ func batchSliceToTraceSlice(bSlice []*model.Batch) []*model.Trace {
}
//copy over the process from the batch
span.Process = batch.Process
if binaryTags, ok := spansWithBinaryTag[int64(span.SpanID)]; ok {
decodeSpanBinaryTags(span, binaryTags)
}
trace.Spans = append(trace.Spans, span)
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/jaeger/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func (p *Store) StreamingSpanWriter() spanstore.Writer {
}

func (p *Store) WriteSpan(ctx context.Context, span *model.Span) error {
encodeBinaryTags(span)
batches := []*model.Batch{
{
Spans: []*model.Span{span},
Expand Down
Loading

0 comments on commit 3c2a61e

Please sign in to comment.