Skip to content

Commit

Permalink
Move model pdata interfaces to pdata, expose them publicly (#3455)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored Jun 22, 2021
1 parent 6c7274c commit 9bb6ed6
Show file tree
Hide file tree
Showing 41 changed files with 708 additions and 870 deletions.
9 changes: 9 additions & 0 deletions consumer/pdata/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,13 @@
// is non-nil. Several structures also provide New*Slice functions that allow creating
// more than one instance of the struct more efficiently instead of calling New*
// repeatedly. Use it where appropriate.
//
// This package also provides common ways for decoding serialized bytes into protocol-specific
// in-memory data models (e.g. Zipkin Span). These data models can then be translated to pdata
// representations. Similarly, pdata types can be translated from a data model which can then
// be serialized into bytes.
//
// * Encoding: Common interfaces for serializing/deserializing bytes from/to protocol-specific data models.
// * Translation: Common interfaces for translating protocol-specific data models from/to pdata types.
// * Marshaling: Common higher level APIs that do both encoding and translation of bytes and data model if going directly pdata types to bytes.
package pdata
2 changes: 1 addition & 1 deletion internal/model/errors.go → consumer/pdata/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package model
package pdata

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package model
package pdata

import (
"testing"
Expand Down
96 changes: 95 additions & 1 deletion consumer/pdata/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,106 @@
package pdata

import (
"fmt"

"go.opentelemetry.io/collector/internal"
otlpcollectorlog "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1"
otlplogs "go.opentelemetry.io/collector/internal/data/protogen/logs/v1"
)

// This file defines in-memory data structures to represent logs.
// LogsDecoder is an interface to decode bytes into protocol-specific data model.
type LogsDecoder interface {
// DecodeLogs decodes bytes into protocol-specific data model.
// If the error is not nil, the returned interface cannot be used.
DecodeLogs(buf []byte) (interface{}, error)
}

// LogsEncoder is an interface to encode protocol-specific data model into bytes.
type LogsEncoder interface {
// EncodeLogs encodes protocol-specific data model into bytes.
// If the error is not nil, the returned bytes slice cannot be used.
EncodeLogs(model interface{}) ([]byte, error)
}

// FromLogsTranslator is an interface to translate pdata.Logs into protocol-specific data model.
type FromLogsTranslator interface {
// FromLogs translates pdata.Logs into protocol-specific data model.
// If the error is not nil, the returned pdata.Logs cannot be used.
FromLogs(ld Logs) (interface{}, error)
}

// ToLogsTranslator is an interface to translate a protocol-specific data model into pdata.Traces.
type ToLogsTranslator interface {
// ToLogs translates a protocol-specific data model into pdata.Logs.
// If the error is not nil, the returned pdata.Logs cannot be used.
ToLogs(src interface{}) (Logs, error)
}

// LogsMarshaler marshals pdata.Logs into bytes.
type LogsMarshaler interface {
// Marshal the given pdata.Logs into bytes.
// If the error is not nil, the returned bytes slice cannot be used.
Marshal(td Logs) ([]byte, error)
}

type logsMarshaler struct {
encoder LogsEncoder
translator FromLogsTranslator
}

// NewLogsMarshaler returns a new LogsMarshaler.
func NewLogsMarshaler(encoder LogsEncoder, translator FromLogsTranslator) LogsMarshaler {
return &logsMarshaler{
encoder: encoder,
translator: translator,
}
}

// Marshal pdata.Logs into bytes.
func (t *logsMarshaler) Marshal(td Logs) ([]byte, error) {
model, err := t.translator.FromLogs(td)
if err != nil {
return nil, fmt.Errorf("converting pdata to model failed: %w", err)
}
buf, err := t.encoder.EncodeLogs(model)
if err != nil {
return nil, fmt.Errorf("marshal failed: %w", err)
}
return buf, nil
}

// LogsUnmarshaler unmarshalls bytes into pdata.Logs.
type LogsUnmarshaler interface {
// Unmarshal the given bytes into pdata.Logs.
// If the error is not nil, the returned pdata.Logs cannot be used.
Unmarshal(buf []byte) (Logs, error)
}

type logsUnmarshaler struct {
decoder LogsDecoder
translator ToLogsTranslator
}

// NewLogsUnmarshaler returns a new LogsUnmarshaler.
func NewLogsUnmarshaler(decoder LogsDecoder, translator ToLogsTranslator) LogsUnmarshaler {
return &logsUnmarshaler{
decoder: decoder,
translator: translator,
}
}

// Unmarshal bytes into pdata.Logs. On error pdata.Logs is invalid.
func (t *logsUnmarshaler) Unmarshal(buf []byte) (Logs, error) {
model, err := t.decoder.DecodeLogs(buf)
if err != nil {
return Logs{}, fmt.Errorf("unmarshal failed: %w", err)
}
td, err := t.translator.ToLogs(model)
if err != nil {
return Logs{}, fmt.Errorf("converting model to pdata failed: %w", err)
}
return td, nil
}

// Logs is the top-level struct that is propagated through the logs pipeline.
//
Expand Down
96 changes: 96 additions & 0 deletions consumer/pdata/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package pdata

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -25,6 +26,101 @@ import (
otlplogs "go.opentelemetry.io/collector/internal/data/protogen/logs/v1"
)

func TestLogsMarshal_TranslationError(t *testing.T) {
translator := &mockTranslator{}
encoder := &mockEncoder{}

lm := NewLogsMarshaler(encoder, translator)
ld := NewLogs()

translator.On("FromLogs", ld).Return(nil, errors.New("translation failed"))

_, err := lm.Marshal(ld)
assert.Error(t, err)
assert.EqualError(t, err, "converting pdata to model failed: translation failed")
}

func TestLogsMarshal_SerializeError(t *testing.T) {
translator := &mockTranslator{}
encoder := &mockEncoder{}

lm := NewLogsMarshaler(encoder, translator)
ld := NewLogs()
expectedModel := struct{}{}

translator.On("FromLogs", ld).Return(expectedModel, nil)
encoder.On("EncodeLogs", expectedModel).Return(nil, errors.New("serialization failed"))

_, err := lm.Marshal(ld)
assert.Error(t, err)
assert.EqualError(t, err, "marshal failed: serialization failed")
}

func TestLogsMarshal_Encode(t *testing.T) {
translator := &mockTranslator{}
encoder := &mockEncoder{}

lm := NewLogsMarshaler(encoder, translator)
expectedLogs := NewLogs()
expectedBytes := []byte{1, 2, 3}
expectedModel := struct{}{}

translator.On("FromLogs", expectedLogs).Return(expectedModel, nil)
encoder.On("EncodeLogs", expectedModel).Return(expectedBytes, nil)

actualBytes, err := lm.Marshal(expectedLogs)
assert.NoError(t, err)
assert.Equal(t, expectedBytes, actualBytes)
}

func TestLogsUnmarshal_EncodingError(t *testing.T) {
translator := &mockTranslator{}
encoder := &mockEncoder{}

lu := NewLogsUnmarshaler(encoder, translator)
expectedBytes := []byte{1, 2, 3}
expectedModel := struct{}{}

encoder.On("DecodeLogs", expectedBytes).Return(expectedModel, errors.New("decode failed"))

_, err := lu.Unmarshal(expectedBytes)
assert.Error(t, err)
assert.EqualError(t, err, "unmarshal failed: decode failed")
}

func TestLogsUnmarshal_TranslationError(t *testing.T) {
translator := &mockTranslator{}
encoder := &mockEncoder{}

lu := NewLogsUnmarshaler(encoder, translator)
expectedBytes := []byte{1, 2, 3}
expectedModel := struct{}{}

encoder.On("DecodeLogs", expectedBytes).Return(expectedModel, nil)
translator.On("ToLogs", expectedModel).Return(NewLogs(), errors.New("translation failed"))

_, err := lu.Unmarshal(expectedBytes)
assert.Error(t, err)
assert.EqualError(t, err, "converting model to pdata failed: translation failed")
}

func TestLogsUnmarshal_Decode(t *testing.T) {
translator := &mockTranslator{}
encoder := &mockEncoder{}

lu := NewLogsUnmarshaler(encoder, translator)
expectedLogs := NewLogs()
expectedBytes := []byte{1, 2, 3}
expectedModel := struct{}{}

encoder.On("DecodeLogs", expectedBytes).Return(expectedModel, nil)
translator.On("ToLogs", expectedModel).Return(expectedLogs, nil)

actualLogs, err := lu.Unmarshal(expectedBytes)
assert.NoError(t, err)
assert.Equal(t, expectedLogs, actualLogs)
}

func TestLogRecordCount(t *testing.T) {
md := NewLogs()
assert.EqualValues(t, 0, md.LogRecordCount())
Expand Down
124 changes: 110 additions & 14 deletions consumer/pdata/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,105 @@
package pdata

import (
"fmt"

"go.opentelemetry.io/collector/internal"
otlpcollectormetrics "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1"
otlpmetrics "go.opentelemetry.io/collector/internal/data/protogen/metrics/v1"
)

// AggregationTemporality defines how a metric aggregator reports aggregated values.
// It describes how those values relate to the time interval over which they are aggregated.
type AggregationTemporality int32
// MetricsDecoder is an interface to decode bytes into protocol-specific data model.
type MetricsDecoder interface {
// DecodeMetrics decodes bytes into protocol-specific data model.
// If the error is not nil, the returned interface cannot be used.
DecodeMetrics(buf []byte) (interface{}, error)
}

const (
// AggregationTemporalityUnspecified is the default AggregationTemporality, it MUST NOT be used.
AggregationTemporalityUnspecified = AggregationTemporality(otlpmetrics.AggregationTemporality_AGGREGATION_TEMPORALITY_UNSPECIFIED)
// AggregationTemporalityDelta is an AggregationTemporality for a metric aggregator which reports changes since last report time.
AggregationTemporalityDelta = AggregationTemporality(otlpmetrics.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA)
// AggregationTemporalityCumulative is an AggregationTemporality for a metric aggregator which reports changes since a fixed start time.
AggregationTemporalityCumulative = AggregationTemporality(otlpmetrics.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE)
)
// MetricsEncoder is an interface to encode protocol-specific data model into bytes.
type MetricsEncoder interface {
// EncodeMetrics encodes protocol-specific data model into bytes.
// If the error is not nil, the returned bytes slice cannot be used.
EncodeMetrics(model interface{}) ([]byte, error)
}

// String returns the string representation of the AggregationTemporality.
func (at AggregationTemporality) String() string {
return otlpmetrics.AggregationTemporality(at).String()
// FromMetricsTranslator is an interface to translate pdata.Metrics into protocol-specific data model.
type FromMetricsTranslator interface {
// FromMetrics translates pdata.Metrics into protocol-specific data model.
// If the error is not nil, the returned pdata.Metrics cannot be used.
FromMetrics(md Metrics) (interface{}, error)
}

// ToMetricsTranslator is an interface to translate a protocol-specific data model into pdata.Traces.
type ToMetricsTranslator interface {
// ToMetrics translates a protocol-specific data model into pdata.Metrics.
// If the error is not nil, the returned pdata.Metrics cannot be used.
ToMetrics(src interface{}) (Metrics, error)
}

// MetricsMarshaler marshals pdata.Metrics into bytes.
type MetricsMarshaler interface {
// Marshal the given pdata.Metrics into bytes.
// If the error is not nil, the returned bytes slice cannot be used.
Marshal(td Metrics) ([]byte, error)
}

type metricsMarshaler struct {
encoder MetricsEncoder
translator FromMetricsTranslator
}

// NewMetricsMarshaler returns a new MetricsMarshaler.
func NewMetricsMarshaler(encoder MetricsEncoder, translator FromMetricsTranslator) MetricsMarshaler {
return &metricsMarshaler{
encoder: encoder,
translator: translator,
}
}

// Marshal pdata.Metrics into bytes.
func (t *metricsMarshaler) Marshal(td Metrics) ([]byte, error) {
model, err := t.translator.FromMetrics(td)
if err != nil {
return nil, fmt.Errorf("converting pdata to model failed: %w", err)
}
buf, err := t.encoder.EncodeMetrics(model)
if err != nil {
return nil, fmt.Errorf("marshal failed: %w", err)
}
return buf, nil
}

// MetricsUnmarshaler unmarshalls bytes into pdata.Metrics.
type MetricsUnmarshaler interface {
// Unmarshal the given bytes into pdata.Metrics.
// If the error is not nil, the returned pdata.Metrics cannot be used.
Unmarshal(buf []byte) (Metrics, error)
}

type metricsUnmarshaler struct {
decoder MetricsDecoder
translator ToMetricsTranslator
}

// NewMetricsUnmarshaler returns a new MetricsUnmarshaler.
func NewMetricsUnmarshaler(decoder MetricsDecoder, translator ToMetricsTranslator) MetricsUnmarshaler {
return &metricsUnmarshaler{
decoder: decoder,
translator: translator,
}
}

// Unmarshal bytes into pdata.Metrics. On error pdata.Metrics is invalid.
func (t *metricsUnmarshaler) Unmarshal(buf []byte) (Metrics, error) {
model, err := t.decoder.DecodeMetrics(buf)
if err != nil {
return Metrics{}, fmt.Errorf("unmarshal failed: %w", err)
}
td, err := t.translator.ToMetrics(model)
if err != nil {
return Metrics{}, fmt.Errorf("converting model to pdata failed: %w", err)
}
return td, nil
}

// Metrics is an opaque interface that allows transition to the new internal Metrics data, but also facilitates the
Expand Down Expand Up @@ -313,3 +391,21 @@ func copyData(src, dest *otlpmetrics.Metric) {
dest.Data = data
}
}

// AggregationTemporality defines how a metric aggregator reports aggregated values.
// It describes how those values relate to the time interval over which they are aggregated.
type AggregationTemporality int32

const (
// AggregationTemporalityUnspecified is the default AggregationTemporality, it MUST NOT be used.
AggregationTemporalityUnspecified = AggregationTemporality(otlpmetrics.AggregationTemporality_AGGREGATION_TEMPORALITY_UNSPECIFIED)
// AggregationTemporalityDelta is an AggregationTemporality for a metric aggregator which reports changes since last report time.
AggregationTemporalityDelta = AggregationTemporality(otlpmetrics.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA)
// AggregationTemporalityCumulative is an AggregationTemporality for a metric aggregator which reports changes since a fixed start time.
AggregationTemporalityCumulative = AggregationTemporality(otlpmetrics.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE)
)

// String returns the string representation of the AggregationTemporality.
func (at AggregationTemporality) String() string {
return otlpmetrics.AggregationTemporality(at).String()
}
Loading

0 comments on commit 9bb6ed6

Please sign in to comment.