Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(connector): single events table across namespaces #410

Merged
merged 10 commits into from
Nov 13, 2023
10 changes: 6 additions & 4 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,12 @@ func initClickHouseStreaming(config config.Configuration, meterRepository meter.
}

streamingConnector, err := clickhouse_connector.NewClickhouseConnector(clickhouse_connector.ClickhouseConnectorConfig{
Logger: logger,
ClickHouse: clickHouseClient,
Database: config.Aggregation.ClickHouse.Database,
Meters: meterRepository,
Logger: logger,
ClickHouse: clickHouseClient,
Database: config.Aggregation.ClickHouse.Database,
Meters: meterRepository,
CreateOrReplaceMeter: config.Aggregation.CreateOrReplaceMeter,
PopulateMeter: config.Aggregation.PopulateMeter,
})
if err != nil {
return nil, fmt.Errorf("init clickhouse streaming: %w", err)
Expand Down
31 changes: 19 additions & 12 deletions cmd/sink-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
health "github.com/AppsFlyer/go-sundheit"
healthhttp "github.com/AppsFlyer/go-sundheit/http"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/go-slog/otelslog"
Expand Down Expand Up @@ -253,21 +254,27 @@ func initSink(config config.Configuration, logger *slog.Logger, metricMeter metr

consumerKafkaConfig := config.Ingest.Kafka.CreateKafkaConfig()
_ = consumerKafkaConfig.SetKey("group.id", config.Sink.GroupId)
_ = consumerKafkaConfig.SetKey("session.timeout.ms", 6000)
_ = consumerKafkaConfig.SetKey("enable.auto.commit", false)
_ = consumerKafkaConfig.SetKey("enable.auto.offset.store", false)
_ = consumerKafkaConfig.SetKey("go.application.rebalance.enable", true)

producerKafkaConfig := config.Ingest.Kafka.CreateKafkaConfig()
consumer, err := kafka.NewConsumer(&consumerKafkaConfig)
if err != nil {
return nil, fmt.Errorf("failed to initialize kafka consumer: %s", err)
}

sinkConfig := sink.SinkConfig{
Logger: logger,
Tracer: tracer,
MetricMeter: metricMeter,
MeterRepository: meterRepository,
Storage: storage,
Deduplicator: deduplicator,
ConsumerKafkaConfig: consumerKafkaConfig,
ProducerKafkaConfig: producerKafkaConfig,
MinCommitCount: config.Sink.MinCommitCount,
MaxCommitWait: config.Sink.MaxCommitWait,
NamespaceRefetch: config.Sink.NamespaceRefetch,
Logger: logger,
Tracer: tracer,
MetricMeter: metricMeter,
MeterRepository: meterRepository,
Storage: storage,
Deduplicator: deduplicator,
Consumer: consumer,
MinCommitCount: config.Sink.MinCommitCount,
MaxCommitWait: config.Sink.MaxCommitWait,
NamespaceRefetch: config.Sink.NamespaceRefetch,
}

return sink.NewSink(sinkConfig)
Expand Down
6 changes: 6 additions & 0 deletions config/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ import (

type AggregationConfiguration struct {
ClickHouse ClickHouseAggregationConfiguration
// Populate creates the materialized view with data from the events table
// This is not safe to use in production as requires to stop ingestion
PopulateMeter bool
// CreateOrReplace is used to force the recreation of the materialized view
// This is not safe to use in production as it will drop the existing views
CreateOrReplaceMeter bool
}

// Validate validates the configuration.
Expand Down
118 changes: 29 additions & 89 deletions internal/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
)

var namespaceTopicRegexp = regexp.MustCompile(`^om_([A-Za-z0-9]+(?:_[A-Za-z0-9]+)*)_events$`)
var defaultDeadletterTopicTemplate = "om_%s_events_deadletter"

type SinkMessage struct {
Namespace string
Expand All @@ -34,8 +33,6 @@ type SinkMessage struct {
}

type Sink struct {
consumer *kafka.Consumer
producer *kafka.Producer
config SinkConfig
running bool
buffer *SinkBuffer
Expand All @@ -47,14 +44,13 @@ type Sink struct {
}

type SinkConfig struct {
Logger *slog.Logger
Tracer trace.Tracer
MetricMeter metric.Meter
MeterRepository meter.Repository
Storage Storage
Deduplicator dedupe.Deduplicator
ConsumerKafkaConfig kafka.ConfigMap
ProducerKafkaConfig kafka.ConfigMap
Logger *slog.Logger
Tracer trace.Tracer
MetricMeter metric.Meter
MeterRepository meter.Repository
Storage Storage
Deduplicator dedupe.Deduplicator
Consumer *kafka.Consumer
// MinCommitCount is the minimum number of messages to wait before flushing the buffer.
// Whichever happens earlier MinCommitCount or MaxCommitWait will trigger a flush.
MinCommitCount int
Expand All @@ -64,9 +60,6 @@ type SinkConfig struct {
// this information is used to configure which topics the consumer subscribes and
// the meter configs used in event validation.
NamespaceRefetch time.Duration
// DeadletterTopicTemplate is the template used to create the deadletter topic name per namespace.
// It is a sprintf template with the namespace as the only argument.
DeadletterTopicTemplate string
// OnFlushSuccess is an optional lifecycle hook
OnFlushSuccess func(string, int64)
}
Expand All @@ -76,22 +69,6 @@ func NewSink(config SinkConfig) (*Sink, error) {
config.Logger.Warn("deduplicator is not set, deduplication will be disabled")
}

// These are Kafka configs but also related to sink logic
_ = config.ConsumerKafkaConfig.SetKey("session.timeout.ms", 6000)
_ = config.ConsumerKafkaConfig.SetKey("enable.auto.commit", false)
_ = config.ConsumerKafkaConfig.SetKey("enable.auto.offset.store", false)
_ = config.ConsumerKafkaConfig.SetKey("go.application.rebalance.enable", true)

consumer, err := kafka.NewConsumer(&config.ConsumerKafkaConfig)
if err != nil {
return nil, fmt.Errorf("failed to create consumer: %s", err)
}

producer, err := kafka.NewProducer(&config.ProducerKafkaConfig)
if err != nil {
return nil, fmt.Errorf("failed to create producer: %w", err)
}

// Defaults
if config.MinCommitCount == 0 {
config.MinCommitCount = 1
Expand All @@ -102,9 +79,6 @@ func NewSink(config SinkConfig) (*Sink, error) {
if config.NamespaceRefetch == 0 {
config.NamespaceRefetch = 15 * time.Second
}
if config.DeadletterTopicTemplate == "" {
config.DeadletterTopicTemplate = defaultDeadletterTopicTemplate
}

// Initialize OTel metrics
messageCounter, err := config.MetricMeter.Int64Counter(
Expand All @@ -126,8 +100,6 @@ func NewSink(config SinkConfig) (*Sink, error) {
}

sink := &Sink{
consumer: consumer,
producer: producer,
config: config,
buffer: NewSinkBuffer(),
namespaceStore: NewNamespaceStore(),
Expand Down Expand Up @@ -259,9 +231,9 @@ func (s *Sink) persistToStorage(ctx context.Context, messages []SinkMessage) ([]
defer persistSpan.End()

deadletterMessages := []SinkMessage{}
batchesPerNamespace := map[string][]SinkMessage{}
batch := []SinkMessage{}

// Group messages per namespaces and filter out deadletter and drop messages
// Flter out deadletter and drop messages
for _, message := range messages {
if message.Error != nil {
switch message.Error.ProcessingControl {
Expand All @@ -277,25 +249,13 @@ func (s *Sink) persistToStorage(ctx context.Context, messages []SinkMessage) ([]
return deadletterMessages, fmt.Errorf("unknown error type: %s", message.Error)
}
}

batchesPerNamespace[message.Namespace] = append(batchesPerNamespace[message.Namespace], message)
batch = append(batch, message)
}

// Insert into permanent storage per namespace
for namespace, batch := range batchesPerNamespace {
// Start otel span for storage batch insert
// Storage Batch insert
if len(batch) > 0 {
storageCtx, storageSpan := s.config.Tracer.Start(persistCtx, "storage-batch-insert")
storageSpan.SetAttributes(
attribute.String("namespace", namespace),
attribute.Int("size", len(batch)),
)

list := []*serializer.CloudEventsKafkaPayload{}
for _, message := range batch {
list = append(list, message.Serialized)
}

err := s.config.Storage.BatchInsert(storageCtx, namespace, list)
err := s.config.Storage.BatchInsert(storageCtx, batch)
if err != nil {
// Note: a single error in batch will make the whole batch fail
if perr, ok := err.(*ProcessingError); ok {
Expand All @@ -305,7 +265,6 @@ func (s *Sink) persistToStorage(ctx context.Context, messages []SinkMessage) ([]
deadletterMessages = append(deadletterMessages, batch...)
case DROP:
storageSpan.SetStatus(codes.Error, "drop")
continue
default:
storageSpan.SetStatus(codes.Error, "unknown processing error type")
storageSpan.RecordError(err)
Expand All @@ -321,45 +280,26 @@ func (s *Sink) persistToStorage(ctx context.Context, messages []SinkMessage) ([]
}
}
logger.Debug("succeeded to sink to storage", "buffer size", len(messages))
storageSpan.End()
}

return deadletterMessages, nil
}

// deadLetter sends a message to the dead letter queue, useful permanent non-recoverable errors like json parsing
// deadLetter stores invalid message, useful permanent non-recoverable errors like json parsing
func (s *Sink) deadLetter(ctx context.Context, messages ...SinkMessage) error {
logger := s.config.Logger.With("operation", "deadLetter")
_, deadletterSpan := s.config.Tracer.Start(ctx, "deadletter")

for _, message := range messages {
topic := fmt.Sprintf(s.config.DeadletterTopicTemplate, message.Namespace)
headers := message.KafkaMessage.Headers
headers = append(headers, kafka.Header{Key: "error", Value: []byte(message.Error.Error())})

msg := &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Timestamp: message.KafkaMessage.Timestamp,
Headers: headers,
Key: message.KafkaMessage.Key,
Value: message.KafkaMessage.Value,
}

err := s.producer.Produce(msg, nil)
if err != nil {
deadletterSpan.SetStatus(codes.Error, "deadletter failure")
deadletterSpan.RecordError(err)
deadletterSpan.End()
err := s.config.Storage.BatchInsertInvalid(ctx, messages)
if err != nil {
deadletterSpan.SetStatus(codes.Error, "deadletter failure")
deadletterSpan.RecordError(err)
deadletterSpan.End()

return fmt.Errorf("producing kafka message to deadletter topic: %w", err)
}
deadletterSpan.AddEvent(
"deadletter",
trace.WithAttributes(attribute.String("namespace", message.Namespace)),
)
return fmt.Errorf("storing invalid messages: %w", err)
}

logger.Debug("succeeded to deadletter", "messages", len(messages))
logger.Debug("succeeded to store invalid", "messages", len(messages))
deadletterSpan.End()
return nil
}
Expand All @@ -378,7 +318,7 @@ func (s *Sink) offsetCommit(ctx context.Context, messages []SinkMessage) error {
// We retry with exponential backoff as it's critical that either step #2 or #3 succeeds.
err := retry.Do(
func() error {
commitedOffsets, err := s.consumer.CommitOffsets(offsets)
commitedOffsets, err := s.config.Consumer.CommitOffsets(offsets)
if err != nil {
return err
}
Expand Down Expand Up @@ -484,7 +424,7 @@ func (s *Sink) subscribeToNamespaces() error {
topics := getTopics(*s.namespaceStore)
logger.Info("new namespaces detected, subscribing to topics", "topics", topics)

err = s.consumer.SubscribeTopics(topics, s.rebalance)
err = s.config.Consumer.SubscribeTopics(topics, s.rebalance)
if err != nil {
return fmt.Errorf("failed to subscribe to topics: %s", err)
}
Expand Down Expand Up @@ -558,7 +498,7 @@ func (s *Sink) Run() error {
logger.Error("caught signal, terminating", "sig", sig)
s.running = false
default:
ev := s.consumer.Poll(100)
ev := s.config.Consumer.Poll(100)
if ev == nil {
continue
}
Expand Down Expand Up @@ -587,7 +527,7 @@ func (s *Sink) Run() error {
logger.Debug("event added to buffer", "partition", e.TopicPartition.Partition, "offset", e.TopicPartition.Offset, "event", kafkaCloudEvent)

// Store message, this won't commit offset immediately just store it for the next manual commit
_, err = s.consumer.StoreMessage(e)
_, err = s.config.Consumer.StoreMessage(e)
if err != nil {
// Stop processing, non-recoverable error
return fmt.Errorf("failed to store kafka message for upcoming offset commit: %w", err)
Expand Down Expand Up @@ -625,7 +565,7 @@ func (s *Sink) rebalance(c *kafka.Consumer, event kafka.Event) error {
switch e := event.(type) {
case kafka.AssignedPartitions:
logger.Info("kafka assigned partitions", "partitions", e.Partitions)
err := s.consumer.Assign(e.Partitions)
err := s.config.Consumer.Assign(e.Partitions)
if err != nil {
return fmt.Errorf("failed to assign partitions: %w", err)
}
Expand All @@ -641,7 +581,7 @@ func (s *Sink) rebalance(c *kafka.Consumer, event kafka.Event) error {
// involuntarily. In this case, the partition might already be owned
// by another consumer, and operations including committing
// offsets may not work.
if s.consumer.AssignmentLost() {
if s.config.Consumer.AssignmentLost() {
// Our consumer has been kicked out of the group and the
// entire assignment is thus lost.
logger.Warn("assignment lost involuntarily, commit may fail")
Expand All @@ -653,7 +593,7 @@ func (s *Sink) rebalance(c *kafka.Consumer, event kafka.Event) error {
return fmt.Errorf("failed to flush: %w", err)
}

err = s.consumer.Unassign()
err = s.config.Consumer.Unassign()
if err != nil {
return fmt.Errorf("failed to unassign partitions: %w", err)
}
Expand Down Expand Up @@ -712,7 +652,7 @@ func (s *Sink) Close() error {
if s.flushTimer != nil {
s.flushTimer.Stop()
}
return s.consumer.Close()
return s.config.Consumer.Close()
}

// getNamespace from topic
Expand Down
Loading