Skip to content

Commit

Permalink
refactor(ingest): use collector metrics middleware
Browse files Browse the repository at this point in the history
Signed-off-by: Mark Sagi-Kazar <mark.sagikazar@gmail.com>
  • Loading branch information
sagikazarmark committed Sep 3, 2024
1 parent e9ab54f commit 6bcf3b2
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 25 deletions.
11 changes: 9 additions & 2 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/openmeterio/openmeter/config"
"github.com/openmeterio/openmeter/openmeter/debug"
"github.com/openmeterio/openmeter/openmeter/ingest"
"github.com/openmeterio/openmeter/openmeter/ingest/ingestadapter"
"github.com/openmeterio/openmeter/openmeter/ingest/ingestdriver"
"github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest"
"github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest/serializer"
Expand Down Expand Up @@ -573,17 +574,23 @@ func initKafkaProducer(ctx context.Context, config config.Configuration, logger
return producer, nil
}

func initKafkaIngest(producer *kafka.Producer, config config.Configuration, logger *slog.Logger, metricMeter metric.Meter, serializer serializer.Serializer) (*kafkaingest.Collector, *kafkaingest.NamespaceHandler, error) {
func initKafkaIngest(producer *kafka.Producer, config config.Configuration, logger *slog.Logger, metricMeter metric.Meter, serializer serializer.Serializer) (ingest.Collector, *kafkaingest.NamespaceHandler, error) {
var collector ingest.Collector

collector, err := kafkaingest.NewCollector(
producer,
serializer,
config.Ingest.Kafka.EventsTopicTemplate,
metricMeter,
)
if err != nil {
return nil, nil, fmt.Errorf("init kafka ingest: %w", err)
}

collector, err = ingestadapter.WithMetrics(collector, metricMeter)
if err != nil {
return nil, nil, fmt.Errorf("init kafka ingest: %w", err)
}

kafkaAdminClient, err := kafka.NewAdminClientFromProducer(producer)
if err != nil {
return nil, nil, err
Expand Down
23 changes: 0 additions & 23 deletions openmeter/ingest/kafkaingest/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (

"github.com/cloudevents/sdk-go/v2/event"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest/serializer"
kafkametrics "github.com/openmeterio/openmeter/pkg/kafka/metrics"
Expand All @@ -25,15 +23,12 @@ type Collector struct {
// NamespacedTopicTemplate needs to contain at least one string parameter passed to fmt.Sprintf.
// For example: "om_%s_events"
NamespacedTopicTemplate string

ingestEventCounter metric.Int64Counter
}

func NewCollector(
producer *kafka.Producer,
serializer serializer.Serializer,
namespacedTopicTemplate string,
metricMeter metric.Meter,
) (*Collector, error) {
if producer == nil {
return nil, fmt.Errorf("producer is required")
Expand All @@ -44,25 +39,11 @@ func NewCollector(
if namespacedTopicTemplate == "" {
return nil, fmt.Errorf("namespaced topic template is required")
}
if metricMeter == nil {
return nil, fmt.Errorf("metric meter is required")
}

// Initialize OTel metrics
ingestEventCounter, err := metricMeter.Int64Counter(
"ingest.events",
metric.WithDescription("The number of events ingested"),
metric.WithUnit("{event}"),
)
if err != nil {
return nil, fmt.Errorf("failed to create events counter: %w", err)
}

return &Collector{
Producer: producer,
Serializer: serializer,
NamespacedTopicTemplate: namespacedTopicTemplate,
ingestEventCounter: ingestEventCounter,
}, nil
}

Expand Down Expand Up @@ -96,10 +77,6 @@ func (s Collector) Ingest(ctx context.Context, namespace string, ev event.Event)
return fmt.Errorf("producing kafka message: %w", err)
}

// Increment the ingest event counter metric
namespaceAttr := attribute.String("namespace", namespace)
s.ingestEventCounter.Add(ctx, 1, metric.WithAttributes(namespaceAttr))

return nil
}

Expand Down

0 comments on commit 6bcf3b2

Please sign in to comment.