Skip to content

Commit

Permalink
Merge pull request #69 from xataio/improve-instrumentation
Browse files Browse the repository at this point in the history
Improve instrumentation
  • Loading branch information
eminano authored Sep 3, 2024
2 parents bab0a8e + 41e4ecc commit 0f761ef
Show file tree
Hide file tree
Showing 11 changed files with 456 additions and 42 deletions.
124 changes: 124 additions & 0 deletions pkg/kafka/instrumentation/instrumented_kafka_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// SPDX-License-Identifier: Apache-2.0

package instrumentation

import (
"context"
"fmt"
"time"

"github.com/xataio/pgstream/pkg/kafka"
"github.com/xataio/pgstream/pkg/otel"

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
)

type Reader struct {
inner kafka.MessageReader
meter metric.Meter
tracer trace.Tracer
metrics *readerMetrics
}

type readerMetrics struct {
msgBytes metric.Int64Histogram
fetchLatency metric.Int64Histogram
commitLatency metric.Int64Histogram
commitBatchSize metric.Int64Histogram
}

func NewReader(inner kafka.MessageReader, instrumentation *otel.Instrumentation) (kafka.MessageReader, error) {
if instrumentation == nil {
return inner, nil
}

i := &Reader{
inner: inner,
meter: instrumentation.Meter,
tracer: instrumentation.Tracer,
metrics: &readerMetrics{},
}

if err := i.initMetrics(); err != nil {
return nil, fmt.Errorf("error initialising kafka reader metrics: %w", err)
}

return i, nil
}

func (i *Reader) initMetrics() error {
if i.meter == nil {
return nil
}

var err error
i.metrics.msgBytes, err = i.meter.Int64Histogram("pgstream.kafka.reader.msg.bytes",
metric.WithUnit("bytes"),
metric.WithDescription("Distribution of message bytes read by the kafka reader"))
if err != nil {
return err
}

i.metrics.fetchLatency, err = i.meter.Int64Histogram("pgstream.kafka.reader.fetch.latency",
metric.WithUnit("ms"),
metric.WithDescription("Distribution of time taken by the reader to fetch messages from kafka"))
if err != nil {
return err
}

i.metrics.commitLatency, err = i.meter.Int64Histogram("pgstream.kafka.reader.commit.latency",
metric.WithUnit("ms"),
metric.WithDescription("Distribution of time taken by the reader to commit messages to kafka"))
if err != nil {
return err
}

i.metrics.commitBatchSize, err = i.meter.Int64Histogram("pgstream.kafka.reader.commit.batch.size",
metric.WithUnit("offsets"),
metric.WithDescription("Distribution of the offset batch size committed by the kafka reader"))
if err != nil {
return err
}

return nil
}

func (i *Reader) FetchMessage(ctx context.Context) (msg *kafka.Message, err error) {
ctx, span := otel.StartSpan(ctx, i.tracer, "kafka.FetchMessages")
defer otel.CloseSpan(span, err)

if i.meter != nil {
startTime := time.Now()
defer func() {
i.metrics.fetchLatency.Record(ctx, time.Since(startTime).Milliseconds())
}()

}

msg, err = i.inner.FetchMessage(ctx)
if msg != nil && i.meter != nil {
i.metrics.msgBytes.Record(ctx, int64(len(msg.Value)))
}

return msg, err
}

func (i *Reader) CommitOffsets(ctx context.Context, offsets ...*kafka.Offset) (err error) {
ctx, span := otel.StartSpan(ctx, i.tracer, "kafka.CommitOffsets")
defer otel.CloseSpan(span, err)

if i.meter != nil {
startTime := time.Now()
defer func() {
i.metrics.commitLatency.Record(ctx, time.Since(startTime).Milliseconds())
}()
i.metrics.commitBatchSize.Record(ctx, int64(len(offsets)))
}

return i.inner.CommitOffsets(ctx, offsets...)
}

func (i *Reader) Close() error {
return i.inner.Close()
}
47 changes: 32 additions & 15 deletions pkg/kafka/instrumentation/instrumented_kafka_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ import (
"time"

"github.com/xataio/pgstream/pkg/kafka"
"github.com/xataio/pgstream/pkg/otel"

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
)

type Writer struct {
inner kafka.MessageWriter
meter metric.Meter
tracer trace.Tracer
metrics *writerMetrics
}

Expand All @@ -24,10 +27,15 @@ type writerMetrics struct {
writeLatency metric.Int64Histogram
}

func NewWriter(inner kafka.MessageWriter, meter metric.Meter) (*Writer, error) {
func NewWriter(inner kafka.MessageWriter, instrumentation *otel.Instrumentation) (kafka.MessageWriter, error) {
if instrumentation == nil {
return inner, nil
}

i := &Writer{
inner: inner,
meter: meter,
meter: instrumentation.Meter,
tracer: instrumentation.Tracer,
metrics: &writerMetrics{},
}

Expand All @@ -39,6 +47,10 @@ func NewWriter(inner kafka.MessageWriter, meter metric.Meter) (*Writer, error) {
}

func (i *Writer) initMetrics() error {
if i.meter == nil {
return nil
}

var err error
i.metrics.batchSize, err = i.meter.Int64Histogram("pgstream.kafka.writer.batch.size",
metric.WithUnit("messages"),
Expand All @@ -65,19 +77,24 @@ func (i *Writer) initMetrics() error {
}

func (i *Writer) WriteMessages(ctx context.Context, msgs ...kafka.Message) (err error) {
startTime := time.Now()
defer func() {
i.metrics.writeLatency.Record(ctx, time.Since(startTime).Milliseconds())
}()
i.metrics.batchSize.Record(ctx, int64(len(msgs)))

go func() {
batchBytes := 0
for _, msg := range msgs {
batchBytes += len(msg.Value)
}
i.metrics.batchBytes.Record(ctx, int64(batchBytes))
}()
ctx, span := otel.StartSpan(ctx, i.tracer, "kafka.WriteMessages")
defer otel.CloseSpan(span, err)

if i.meter != nil {
startTime := time.Now()
defer func() {
i.metrics.writeLatency.Record(ctx, time.Since(startTime).Milliseconds())
}()
i.metrics.batchSize.Record(ctx, int64(len(msgs)))

go func() {
batchBytes := 0
for _, msg := range msgs {
batchBytes += len(msg.Value)
}
i.metrics.batchBytes.Record(ctx, int64(batchBytes))
}()
}

return i.inner.WriteMessages(ctx, msgs...)
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/kafka/kafka_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ import (
loglib "github.com/xataio/pgstream/pkg/log"
)

type MessageReader interface {
FetchMessage(ctx context.Context) (*Message, error)
CommitOffsets(ctx context.Context, offsets ...*Offset) error
Close() error
}

type Reader struct {
reader *kafka.Reader
}
Expand Down
48 changes: 48 additions & 0 deletions pkg/otel/otel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// SPDX-License-Identifier: Apache-2.0

package otel

import (
"context"

"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
)

type Instrumentation struct {
Meter metric.Meter
Tracer trace.Tracer
}

func (i *Instrumentation) IsEnabled() bool {
return i != nil && (i.Meter != nil || i.Tracer != nil)
}

// StartSpan will start a span using the tracer on input. If the tracer is nil,
// the context returned is the same as on input, and the span will be nil.
func StartSpan(ctx context.Context, tracer trace.Tracer, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
if tracer == nil {
return ctx, nil
}
return tracer.Start(ctx, name, opts...)
}

// CloseSpan closes a span and records the given error if not nil. If the span
// is nil, this is a noop.
func CloseSpan(span trace.Span, err error) {
if span == nil {
return
}
recordSpanResult(span, err)
span.End()
}

func recordSpanResult(span trace.Span, err error) {
if err == nil {
return
}

span.RecordError(err)
span.SetStatus(codes.Error, "")
}
44 changes: 44 additions & 0 deletions pkg/schemalog/instrumentation/instrumented_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// SPDX-License-Identifier: Apache-2.0

package instrumentation

import (
"context"

"github.com/xataio/pgstream/pkg/otel"
"github.com/xataio/pgstream/pkg/schemalog"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

type Store struct {
inner schemalog.Store
tracer trace.Tracer
}

func NewStore(inner schemalog.Store, instrumentation *otel.Instrumentation) schemalog.Store {
if instrumentation == nil {
return inner
}

return &Store{
inner: inner,
tracer: instrumentation.Tracer,
}
}

func (s *Store) Fetch(ctx context.Context, schemaName string, acked bool) (le *schemalog.LogEntry, err error) {
ctx, span := otel.StartSpan(ctx, s.tracer, "schemalogstore.Fetch", trace.WithAttributes(attribute.String("schema", schemaName)))
defer otel.CloseSpan(span, err)
return s.inner.Fetch(ctx, schemaName, acked)
}

func (s *Store) Ack(ctx context.Context, le *schemalog.LogEntry) (err error) {
ctx, span := otel.StartSpan(ctx, s.tracer, "schemalogstore.Ack", trace.WithAttributes(attribute.String("schema", le.SchemaName)))
defer otel.CloseSpan(span, err)
return s.inner.Ack(ctx, le)
}

func (s *Store) Close() error {
return s.inner.Close()
}
Loading

0 comments on commit 0f761ef

Please sign in to comment.