Skip to content

Commit

Permalink
feat(connector)!: single events table across namespaces (#410)
Browse files Browse the repository at this point in the history
BREAKING CHANGE: This changes the query behind materialized views moving the meters.
See #410 for migration.
  • Loading branch information
hekike authored Nov 13, 2023
1 parent 83596a4 commit 2f48810
Show file tree
Hide file tree
Showing 9 changed files with 449 additions and 329 deletions.
10 changes: 6 additions & 4 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,12 @@ func initClickHouseStreaming(config config.Configuration, meterRepository meter.
}

streamingConnector, err := clickhouse_connector.NewClickhouseConnector(clickhouse_connector.ClickhouseConnectorConfig{
Logger: logger,
ClickHouse: clickHouseClient,
Database: config.Aggregation.ClickHouse.Database,
Meters: meterRepository,
Logger: logger,
ClickHouse: clickHouseClient,
Database: config.Aggregation.ClickHouse.Database,
Meters: meterRepository,
CreateOrReplaceMeter: config.Aggregation.CreateOrReplaceMeter,
PopulateMeter: config.Aggregation.PopulateMeter,
})
if err != nil {
return nil, fmt.Errorf("init clickhouse streaming: %w", err)
Expand Down
31 changes: 19 additions & 12 deletions cmd/sink-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
health "github.com/AppsFlyer/go-sundheit"
healthhttp "github.com/AppsFlyer/go-sundheit/http"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/go-slog/otelslog"
Expand Down Expand Up @@ -253,21 +254,27 @@ func initSink(config config.Configuration, logger *slog.Logger, metricMeter metr

consumerKafkaConfig := config.Ingest.Kafka.CreateKafkaConfig()
_ = consumerKafkaConfig.SetKey("group.id", config.Sink.GroupId)
_ = consumerKafkaConfig.SetKey("session.timeout.ms", 6000)
_ = consumerKafkaConfig.SetKey("enable.auto.commit", false)
_ = consumerKafkaConfig.SetKey("enable.auto.offset.store", false)
_ = consumerKafkaConfig.SetKey("go.application.rebalance.enable", true)

producerKafkaConfig := config.Ingest.Kafka.CreateKafkaConfig()
consumer, err := kafka.NewConsumer(&consumerKafkaConfig)
if err != nil {
return nil, fmt.Errorf("failed to initialize kafka consumer: %s", err)
}

sinkConfig := sink.SinkConfig{
Logger: logger,
Tracer: tracer,
MetricMeter: metricMeter,
MeterRepository: meterRepository,
Storage: storage,
Deduplicator: deduplicator,
ConsumerKafkaConfig: consumerKafkaConfig,
ProducerKafkaConfig: producerKafkaConfig,
MinCommitCount: config.Sink.MinCommitCount,
MaxCommitWait: config.Sink.MaxCommitWait,
NamespaceRefetch: config.Sink.NamespaceRefetch,
Logger: logger,
Tracer: tracer,
MetricMeter: metricMeter,
MeterRepository: meterRepository,
Storage: storage,
Deduplicator: deduplicator,
Consumer: consumer,
MinCommitCount: config.Sink.MinCommitCount,
MaxCommitWait: config.Sink.MaxCommitWait,
NamespaceRefetch: config.Sink.NamespaceRefetch,
}

return sink.NewSink(sinkConfig)
Expand Down
6 changes: 6 additions & 0 deletions config/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ import (

type AggregationConfiguration struct {
ClickHouse ClickHouseAggregationConfiguration
// Populate creates the materialized view with data from the events table
// This is not safe to use in production as requires to stop ingestion
PopulateMeter bool
// CreateOrReplace is used to force the recreation of the materialized view
// This is not safe to use in production as it will drop the existing views
CreateOrReplaceMeter bool
}

// Validate validates the configuration.
Expand Down
118 changes: 29 additions & 89 deletions internal/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
)

var namespaceTopicRegexp = regexp.MustCompile(`^om_([A-Za-z0-9]+(?:_[A-Za-z0-9]+)*)_events$`)
var defaultDeadletterTopicTemplate = "om_%s_events_deadletter"

type SinkMessage struct {
Namespace string
Expand All @@ -34,8 +33,6 @@ type SinkMessage struct {
}

type Sink struct {
consumer *kafka.Consumer
producer *kafka.Producer
config SinkConfig
running bool
buffer *SinkBuffer
Expand All @@ -47,14 +44,13 @@ type Sink struct {
}

type SinkConfig struct {
Logger *slog.Logger
Tracer trace.Tracer
MetricMeter metric.Meter
MeterRepository meter.Repository
Storage Storage
Deduplicator dedupe.Deduplicator
ConsumerKafkaConfig kafka.ConfigMap
ProducerKafkaConfig kafka.ConfigMap
Logger *slog.Logger
Tracer trace.Tracer
MetricMeter metric.Meter
MeterRepository meter.Repository
Storage Storage
Deduplicator dedupe.Deduplicator
Consumer *kafka.Consumer
// MinCommitCount is the minimum number of messages to wait before flushing the buffer.
// Whichever happens earlier MinCommitCount or MaxCommitWait will trigger a flush.
MinCommitCount int
Expand All @@ -64,9 +60,6 @@ type SinkConfig struct {
// this information is used to configure which topics the consumer subscribes and
// the meter configs used in event validation.
NamespaceRefetch time.Duration
// DeadletterTopicTemplate is the template used to create the deadletter topic name per namespace.
// It is a sprintf template with the namespace as the only argument.
DeadletterTopicTemplate string
// OnFlushSuccess is an optional lifecycle hook
OnFlushSuccess func(string, int64)
}
Expand All @@ -76,22 +69,6 @@ func NewSink(config SinkConfig) (*Sink, error) {
config.Logger.Warn("deduplicator is not set, deduplication will be disabled")
}

// These are Kafka configs but also related to sink logic
_ = config.ConsumerKafkaConfig.SetKey("session.timeout.ms", 6000)
_ = config.ConsumerKafkaConfig.SetKey("enable.auto.commit", false)
_ = config.ConsumerKafkaConfig.SetKey("enable.auto.offset.store", false)
_ = config.ConsumerKafkaConfig.SetKey("go.application.rebalance.enable", true)

consumer, err := kafka.NewConsumer(&config.ConsumerKafkaConfig)
if err != nil {
return nil, fmt.Errorf("failed to create consumer: %s", err)
}

producer, err := kafka.NewProducer(&config.ProducerKafkaConfig)
if err != nil {
return nil, fmt.Errorf("failed to create producer: %w", err)
}

// Defaults
if config.MinCommitCount == 0 {
config.MinCommitCount = 1
Expand All @@ -102,9 +79,6 @@ func NewSink(config SinkConfig) (*Sink, error) {
if config.NamespaceRefetch == 0 {
config.NamespaceRefetch = 15 * time.Second
}
if config.DeadletterTopicTemplate == "" {
config.DeadletterTopicTemplate = defaultDeadletterTopicTemplate
}

// Initialize OTel metrics
messageCounter, err := config.MetricMeter.Int64Counter(
Expand All @@ -126,8 +100,6 @@ func NewSink(config SinkConfig) (*Sink, error) {
}

sink := &Sink{
consumer: consumer,
producer: producer,
config: config,
buffer: NewSinkBuffer(),
namespaceStore: NewNamespaceStore(),
Expand Down Expand Up @@ -259,9 +231,9 @@ func (s *Sink) persistToStorage(ctx context.Context, messages []SinkMessage) ([]
defer persistSpan.End()

deadletterMessages := []SinkMessage{}
batchesPerNamespace := map[string][]SinkMessage{}
batch := []SinkMessage{}

// Group messages per namespaces and filter out deadletter and drop messages
// Flter out deadletter and drop messages
for _, message := range messages {
if message.Error != nil {
switch message.Error.ProcessingControl {
Expand All @@ -277,25 +249,13 @@ func (s *Sink) persistToStorage(ctx context.Context, messages []SinkMessage) ([]
return deadletterMessages, fmt.Errorf("unknown error type: %s", message.Error)
}
}

batchesPerNamespace[message.Namespace] = append(batchesPerNamespace[message.Namespace], message)
batch = append(batch, message)
}

// Insert into permanent storage per namespace
for namespace, batch := range batchesPerNamespace {
// Start otel span for storage batch insert
// Storage Batch insert
if len(batch) > 0 {
storageCtx, storageSpan := s.config.Tracer.Start(persistCtx, "storage-batch-insert")
storageSpan.SetAttributes(
attribute.String("namespace", namespace),
attribute.Int("size", len(batch)),
)

list := []*serializer.CloudEventsKafkaPayload{}
for _, message := range batch {
list = append(list, message.Serialized)
}

err := s.config.Storage.BatchInsert(storageCtx, namespace, list)
err := s.config.Storage.BatchInsert(storageCtx, batch)
if err != nil {
// Note: a single error in batch will make the whole batch fail
if perr, ok := err.(*ProcessingError); ok {
Expand All @@ -305,7 +265,6 @@ func (s *Sink) persistToStorage(ctx context.Context, messages []SinkMessage) ([]
deadletterMessages = append(deadletterMessages, batch...)
case DROP:
storageSpan.SetStatus(codes.Error, "drop")
continue
default:
storageSpan.SetStatus(codes.Error, "unknown processing error type")
storageSpan.RecordError(err)
Expand All @@ -321,45 +280,26 @@ func (s *Sink) persistToStorage(ctx context.Context, messages []SinkMessage) ([]
}
}
logger.Debug("succeeded to sink to storage", "buffer size", len(messages))
storageSpan.End()
}

return deadletterMessages, nil
}

// deadLetter sends a message to the dead letter queue, useful permanent non-recoverable errors like json parsing
// deadLetter stores invalid message, useful permanent non-recoverable errors like json parsing
func (s *Sink) deadLetter(ctx context.Context, messages ...SinkMessage) error {
logger := s.config.Logger.With("operation", "deadLetter")
_, deadletterSpan := s.config.Tracer.Start(ctx, "deadletter")

for _, message := range messages {
topic := fmt.Sprintf(s.config.DeadletterTopicTemplate, message.Namespace)
headers := message.KafkaMessage.Headers
headers = append(headers, kafka.Header{Key: "error", Value: []byte(message.Error.Error())})

msg := &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Timestamp: message.KafkaMessage.Timestamp,
Headers: headers,
Key: message.KafkaMessage.Key,
Value: message.KafkaMessage.Value,
}

err := s.producer.Produce(msg, nil)
if err != nil {
deadletterSpan.SetStatus(codes.Error, "deadletter failure")
deadletterSpan.RecordError(err)
deadletterSpan.End()
err := s.config.Storage.BatchInsertInvalid(ctx, messages)
if err != nil {
deadletterSpan.SetStatus(codes.Error, "deadletter failure")
deadletterSpan.RecordError(err)
deadletterSpan.End()

return fmt.Errorf("producing kafka message to deadletter topic: %w", err)
}
deadletterSpan.AddEvent(
"deadletter",
trace.WithAttributes(attribute.String("namespace", message.Namespace)),
)
return fmt.Errorf("storing invalid messages: %w", err)
}

logger.Debug("succeeded to deadletter", "messages", len(messages))
logger.Debug("succeeded to store invalid", "messages", len(messages))
deadletterSpan.End()
return nil
}
Expand All @@ -378,7 +318,7 @@ func (s *Sink) offsetCommit(ctx context.Context, messages []SinkMessage) error {
// We retry with exponential backoff as it's critical that either step #2 or #3 succeeds.
err := retry.Do(
func() error {
commitedOffsets, err := s.consumer.CommitOffsets(offsets)
commitedOffsets, err := s.config.Consumer.CommitOffsets(offsets)
if err != nil {
return err
}
Expand Down Expand Up @@ -484,7 +424,7 @@ func (s *Sink) subscribeToNamespaces() error {
topics := getTopics(*s.namespaceStore)
logger.Info("new namespaces detected, subscribing to topics", "topics", topics)

err = s.consumer.SubscribeTopics(topics, s.rebalance)
err = s.config.Consumer.SubscribeTopics(topics, s.rebalance)
if err != nil {
return fmt.Errorf("failed to subscribe to topics: %s", err)
}
Expand Down Expand Up @@ -558,7 +498,7 @@ func (s *Sink) Run() error {
logger.Error("caught signal, terminating", "sig", sig)
s.running = false
default:
ev := s.consumer.Poll(100)
ev := s.config.Consumer.Poll(100)
if ev == nil {
continue
}
Expand Down Expand Up @@ -587,7 +527,7 @@ func (s *Sink) Run() error {
logger.Debug("event added to buffer", "partition", e.TopicPartition.Partition, "offset", e.TopicPartition.Offset, "event", kafkaCloudEvent)

// Store message, this won't commit offset immediately just store it for the next manual commit
_, err = s.consumer.StoreMessage(e)
_, err = s.config.Consumer.StoreMessage(e)
if err != nil {
// Stop processing, non-recoverable error
return fmt.Errorf("failed to store kafka message for upcoming offset commit: %w", err)
Expand Down Expand Up @@ -625,7 +565,7 @@ func (s *Sink) rebalance(c *kafka.Consumer, event kafka.Event) error {
switch e := event.(type) {
case kafka.AssignedPartitions:
logger.Info("kafka assigned partitions", "partitions", e.Partitions)
err := s.consumer.Assign(e.Partitions)
err := s.config.Consumer.Assign(e.Partitions)
if err != nil {
return fmt.Errorf("failed to assign partitions: %w", err)
}
Expand All @@ -641,7 +581,7 @@ func (s *Sink) rebalance(c *kafka.Consumer, event kafka.Event) error {
// involuntarily. In this case, the partition might already be owned
// by another consumer, and operations including committing
// offsets may not work.
if s.consumer.AssignmentLost() {
if s.config.Consumer.AssignmentLost() {
// Our consumer has been kicked out of the group and the
// entire assignment is thus lost.
logger.Warn("assignment lost involuntarily, commit may fail")
Expand All @@ -653,7 +593,7 @@ func (s *Sink) rebalance(c *kafka.Consumer, event kafka.Event) error {
return fmt.Errorf("failed to flush: %w", err)
}

err = s.consumer.Unassign()
err = s.config.Consumer.Unassign()
if err != nil {
return fmt.Errorf("failed to unassign partitions: %w", err)
}
Expand Down Expand Up @@ -712,7 +652,7 @@ func (s *Sink) Close() error {
if s.flushTimer != nil {
s.flushTimer.Stop()
}
return s.consumer.Close()
return s.config.Consumer.Close()
}

// getNamespace from topic
Expand Down
Loading

0 comments on commit 2f48810

Please sign in to comment.