From 2591ee6647ee45c0702010cac7dd49db5e79702a Mon Sep 17 00:00:00 2001 From: Peter Marton Date: Sun, 12 Nov 2023 16:31:56 -0800 Subject: [PATCH 01/10] feat(sink): save invalid events to storage --- cmd/sink-worker/main.go | 31 ++++--- internal/sink/sink.go | 92 +++++-------------- internal/sink/storage.go | 39 ++++++++ .../clickhouse_connector/connector.go | 24 ++++- .../streaming/clickhouse_connector/query.go | 24 +++++ 5 files changed, 127 insertions(+), 83 deletions(-) diff --git a/cmd/sink-worker/main.go b/cmd/sink-worker/main.go index c6f7e4fc0..198591eaf 100644 --- a/cmd/sink-worker/main.go +++ b/cmd/sink-worker/main.go @@ -13,6 +13,7 @@ import ( health "github.com/AppsFlyer/go-sundheit" healthhttp "github.com/AppsFlyer/go-sundheit/http" "github.com/ClickHouse/clickhouse-go/v2" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" "github.com/go-slog/otelslog" @@ -253,21 +254,27 @@ func initSink(config config.Configuration, logger *slog.Logger, metricMeter metr consumerKafkaConfig := config.Ingest.Kafka.CreateKafkaConfig() _ = consumerKafkaConfig.SetKey("group.id", config.Sink.GroupId) + _ = consumerKafkaConfig.SetKey("session.timeout.ms", 6000) + _ = consumerKafkaConfig.SetKey("enable.auto.commit", false) + _ = consumerKafkaConfig.SetKey("enable.auto.offset.store", false) + _ = consumerKafkaConfig.SetKey("go.application.rebalance.enable", true) - producerKafkaConfig := config.Ingest.Kafka.CreateKafkaConfig() + consumer, err := kafka.NewConsumer(&consumerKafkaConfig) + if err != nil { + return nil, fmt.Errorf("failed to initialize kafka consumer: %s", err) + } sinkConfig := sink.SinkConfig{ - Logger: logger, - Tracer: tracer, - MetricMeter: metricMeter, - MeterRepository: meterRepository, - Storage: storage, - Deduplicator: deduplicator, - ConsumerKafkaConfig: consumerKafkaConfig, - ProducerKafkaConfig: producerKafkaConfig, - MinCommitCount: config.Sink.MinCommitCount, - MaxCommitWait: config.Sink.MaxCommitWait, - NamespaceRefetch: config.Sink.NamespaceRefetch, + Logger: logger, + Tracer: tracer, + MetricMeter: metricMeter, + MeterRepository: meterRepository, + Storage: storage, + Deduplicator: deduplicator, + Consumer: consumer, + MinCommitCount: config.Sink.MinCommitCount, + MaxCommitWait: config.Sink.MaxCommitWait, + NamespaceRefetch: config.Sink.NamespaceRefetch, } return sink.NewSink(sinkConfig) diff --git a/internal/sink/sink.go b/internal/sink/sink.go index 8525e468d..8069d3437 100644 --- a/internal/sink/sink.go +++ b/internal/sink/sink.go @@ -24,7 +24,6 @@ import ( ) var namespaceTopicRegexp = regexp.MustCompile(`^om_([A-Za-z0-9]+(?:_[A-Za-z0-9]+)*)_events$`) -var defaultDeadletterTopicTemplate = "om_%s_events_deadletter" type SinkMessage struct { Namespace string @@ -34,8 +33,6 @@ type SinkMessage struct { } type Sink struct { - consumer *kafka.Consumer - producer *kafka.Producer config SinkConfig running bool buffer *SinkBuffer @@ -47,14 +44,13 @@ type Sink struct { } type SinkConfig struct { - Logger *slog.Logger - Tracer trace.Tracer - MetricMeter metric.Meter - MeterRepository meter.Repository - Storage Storage - Deduplicator dedupe.Deduplicator - ConsumerKafkaConfig kafka.ConfigMap - ProducerKafkaConfig kafka.ConfigMap + Logger *slog.Logger + Tracer trace.Tracer + MetricMeter metric.Meter + MeterRepository meter.Repository + Storage Storage + Deduplicator dedupe.Deduplicator + Consumer *kafka.Consumer // MinCommitCount is the minimum number of messages to wait before flushing the buffer. // Whichever happens earlier MinCommitCount or MaxCommitWait will trigger a flush. MinCommitCount int @@ -64,9 +60,6 @@ type SinkConfig struct { // this information is used to configure which topics the consumer subscribes and // the meter configs used in event validation. NamespaceRefetch time.Duration - // DeadletterTopicTemplate is the template used to create the deadletter topic name per namespace. - // It is a sprintf template with the namespace as the only argument. - DeadletterTopicTemplate string // OnFlushSuccess is an optional lifecycle hook OnFlushSuccess func(string, int64) } @@ -76,22 +69,6 @@ func NewSink(config SinkConfig) (*Sink, error) { config.Logger.Warn("deduplicator is not set, deduplication will be disabled") } - // These are Kafka configs but also related to sink logic - _ = config.ConsumerKafkaConfig.SetKey("session.timeout.ms", 6000) - _ = config.ConsumerKafkaConfig.SetKey("enable.auto.commit", false) - _ = config.ConsumerKafkaConfig.SetKey("enable.auto.offset.store", false) - _ = config.ConsumerKafkaConfig.SetKey("go.application.rebalance.enable", true) - - consumer, err := kafka.NewConsumer(&config.ConsumerKafkaConfig) - if err != nil { - return nil, fmt.Errorf("failed to create consumer: %s", err) - } - - producer, err := kafka.NewProducer(&config.ProducerKafkaConfig) - if err != nil { - return nil, fmt.Errorf("failed to create producer: %w", err) - } - // Defaults if config.MinCommitCount == 0 { config.MinCommitCount = 1 @@ -102,9 +79,6 @@ func NewSink(config SinkConfig) (*Sink, error) { if config.NamespaceRefetch == 0 { config.NamespaceRefetch = 15 * time.Second } - if config.DeadletterTopicTemplate == "" { - config.DeadletterTopicTemplate = defaultDeadletterTopicTemplate - } // Initialize OTel metrics messageCounter, err := config.MetricMeter.Int64Counter( @@ -126,8 +100,6 @@ func NewSink(config SinkConfig) (*Sink, error) { } sink := &Sink{ - consumer: consumer, - producer: producer, config: config, buffer: NewSinkBuffer(), namespaceStore: NewNamespaceStore(), @@ -327,39 +299,21 @@ func (s *Sink) persistToStorage(ctx context.Context, messages []SinkMessage) ([] return deadletterMessages, nil } -// deadLetter sends a message to the dead letter queue, useful permanent non-recoverable errors like json parsing +// deadLetter stores invalid message, useful permanent non-recoverable errors like json parsing func (s *Sink) deadLetter(ctx context.Context, messages ...SinkMessage) error { logger := s.config.Logger.With("operation", "deadLetter") _, deadletterSpan := s.config.Tracer.Start(ctx, "deadletter") - for _, message := range messages { - topic := fmt.Sprintf(s.config.DeadletterTopicTemplate, message.Namespace) - headers := message.KafkaMessage.Headers - headers = append(headers, kafka.Header{Key: "error", Value: []byte(message.Error.Error())}) - - msg := &kafka.Message{ - TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, - Timestamp: message.KafkaMessage.Timestamp, - Headers: headers, - Key: message.KafkaMessage.Key, - Value: message.KafkaMessage.Value, - } - - err := s.producer.Produce(msg, nil) - if err != nil { - deadletterSpan.SetStatus(codes.Error, "deadletter failure") - deadletterSpan.RecordError(err) - deadletterSpan.End() + err := s.config.Storage.BatchInsertInvalid(ctx, messages) + if err != nil { + deadletterSpan.SetStatus(codes.Error, "deadletter failure") + deadletterSpan.RecordError(err) + deadletterSpan.End() - return fmt.Errorf("producing kafka message to deadletter topic: %w", err) - } - deadletterSpan.AddEvent( - "deadletter", - trace.WithAttributes(attribute.String("namespace", message.Namespace)), - ) + return fmt.Errorf("storing invalid messages: %w", err) } - logger.Debug("succeeded to deadletter", "messages", len(messages)) + logger.Debug("succeeded to store invalid", "messages", len(messages)) deadletterSpan.End() return nil } @@ -378,7 +332,7 @@ func (s *Sink) offsetCommit(ctx context.Context, messages []SinkMessage) error { // We retry with exponential backoff as it's critical that either step #2 or #3 succeeds. err := retry.Do( func() error { - commitedOffsets, err := s.consumer.CommitOffsets(offsets) + commitedOffsets, err := s.config.Consumer.CommitOffsets(offsets) if err != nil { return err } @@ -484,7 +438,7 @@ func (s *Sink) subscribeToNamespaces() error { topics := getTopics(*s.namespaceStore) logger.Info("new namespaces detected, subscribing to topics", "topics", topics) - err = s.consumer.SubscribeTopics(topics, s.rebalance) + err = s.config.Consumer.SubscribeTopics(topics, s.rebalance) if err != nil { return fmt.Errorf("failed to subscribe to topics: %s", err) } @@ -558,7 +512,7 @@ func (s *Sink) Run() error { logger.Error("caught signal, terminating", "sig", sig) s.running = false default: - ev := s.consumer.Poll(100) + ev := s.config.Consumer.Poll(100) if ev == nil { continue } @@ -587,7 +541,7 @@ func (s *Sink) Run() error { logger.Debug("event added to buffer", "partition", e.TopicPartition.Partition, "offset", e.TopicPartition.Offset, "event", kafkaCloudEvent) // Store message, this won't commit offset immediately just store it for the next manual commit - _, err = s.consumer.StoreMessage(e) + _, err = s.config.Consumer.StoreMessage(e) if err != nil { // Stop processing, non-recoverable error return fmt.Errorf("failed to store kafka message for upcoming offset commit: %w", err) @@ -625,7 +579,7 @@ func (s *Sink) rebalance(c *kafka.Consumer, event kafka.Event) error { switch e := event.(type) { case kafka.AssignedPartitions: logger.Info("kafka assigned partitions", "partitions", e.Partitions) - err := s.consumer.Assign(e.Partitions) + err := s.config.Consumer.Assign(e.Partitions) if err != nil { return fmt.Errorf("failed to assign partitions: %w", err) } @@ -641,7 +595,7 @@ func (s *Sink) rebalance(c *kafka.Consumer, event kafka.Event) error { // involuntarily. In this case, the partition might already be owned // by another consumer, and operations including committing // offsets may not work. - if s.consumer.AssignmentLost() { + if s.config.Consumer.AssignmentLost() { // Our consumer has been kicked out of the group and the // entire assignment is thus lost. logger.Warn("assignment lost involuntarily, commit may fail") @@ -653,7 +607,7 @@ func (s *Sink) rebalance(c *kafka.Consumer, event kafka.Event) error { return fmt.Errorf("failed to flush: %w", err) } - err = s.consumer.Unassign() + err = s.config.Consumer.Unassign() if err != nil { return fmt.Errorf("failed to unassign partitions: %w", err) } @@ -712,7 +666,7 @@ func (s *Sink) Close() error { if s.flushTimer != nil { s.flushTimer.Stop() } - return s.consumer.Close() + return s.config.Consumer.Close() } // getNamespace from topic diff --git a/internal/sink/storage.go b/internal/sink/storage.go index 16764a648..01babcb9b 100644 --- a/internal/sink/storage.go +++ b/internal/sink/storage.go @@ -17,6 +17,7 @@ var codeRegexp = regexp.MustCompile(`code: (0-9]+)`) type Storage interface { BatchInsert(ctx context.Context, namespace string, events []*serializer.CloudEventsKafkaPayload) error + BatchInsertInvalid(ctx context.Context, messages []SinkMessage) error } type ClickHouseStorageConfig struct { @@ -82,6 +83,24 @@ func (c *ClickHouseStorage) BatchInsert(ctx context.Context, namespace string, e return nil } +func (c *ClickHouseStorage) BatchInsertInvalid(ctx context.Context, messages []SinkMessage) error { + query := InsertInvalidQuery{ + Database: c.config.Database, + Messages: messages, + } + sql, args, err := query.ToSQL() + if err != nil { + return err + } + + err = c.config.ClickHouse.Exec(ctx, sql, args...) + if err != nil { + return err + } + + return nil +} + type InsertEventsQuery struct { Database string EventsTableName string @@ -103,6 +122,26 @@ func (q InsertEventsQuery) ToSQL() (string, []interface{}, error) { return sql, args, nil } +type InsertInvalidQuery struct { + Database string + Messages []SinkMessage +} + +func (q InsertInvalidQuery) ToSQL() (string, []interface{}, error) { + tableName := fmt.Sprintf("%s.%s", sqlbuilder.Escape(q.Database), clickhouse_connector.InvalidEventsTableName) + + query := sqlbuilder.ClickHouse.NewInsertBuilder() + query.InsertInto(tableName) + query.Cols("namespace", "time", "error", "event") + + for _, message := range q.Messages { + query.Values(message.Namespace, message.KafkaMessage.Timestamp, message.Error.Message, string(message.KafkaMessage.Value)) + } + + sql, args := query.Build() + return sql, args, nil +} + func getCode(err error) int { tmp := codeRegexp.FindStringSubmatch(err.Error()) if len(tmp) != 2 || tmp[1] == "" { diff --git a/internal/streaming/clickhouse_connector/connector.go b/internal/streaming/clickhouse_connector/connector.go index b78f4bacc..7797da873 100644 --- a/internal/streaming/clickhouse_connector/connector.go +++ b/internal/streaming/clickhouse_connector/connector.go @@ -17,8 +17,9 @@ import ( ) var ( - prefix = "om" - eventsTableName = "events" + prefix = "om" + eventsTableName = "events" + InvalidEventsTableName = "invalid_events" ) // ClickhouseConnector implements `ingest.Connector“ and `namespace.Handler interfaces. @@ -141,6 +142,11 @@ func (c *ClickhouseConnector) CreateNamespace(ctx context.Context, namespace str return fmt.Errorf("create namespace in clickhouse: %w", err) } + err = c.createInvalidEventsTable(ctx) + if err != nil { + return fmt.Errorf("create namespace in clickhouse: %w", err) + } + return nil } @@ -158,6 +164,20 @@ func (c *ClickhouseConnector) createEventsTable(ctx context.Context, namespace s return nil } +func (c *ClickhouseConnector) createInvalidEventsTable(ctx context.Context) error { + table := createInvalidEventsTable{ + Database: c.config.Database, + TableName: InvalidEventsTableName, + } + + err := c.config.ClickHouse.Exec(ctx, table.toSQL()) + if err != nil { + return fmt.Errorf("create invalid events table: %w", err) + } + + return nil +} + func (c *ClickhouseConnector) queryEventsTable(ctx context.Context, namespace string, params streaming.ListEventsParams) ([]event.Event, error) { table := queryEventsTable{ Database: c.config.Database, diff --git a/internal/streaming/clickhouse_connector/query.go b/internal/streaming/clickhouse_connector/query.go index c93a8c065..42b66cc98 100644 --- a/internal/streaming/clickhouse_connector/query.go +++ b/internal/streaming/clickhouse_connector/query.go @@ -44,6 +44,30 @@ func (d createEventsTable) toSQL() string { return sql } +// Create Invalid Events Table +type createInvalidEventsTable struct { + Database string + TableName string +} + +func (d createInvalidEventsTable) toSQL() string { + tableName := fmt.Sprintf("%s.%s", sqlbuilder.Escape(d.Database), sqlbuilder.Escape(d.TableName)) + + sb := sqlbuilder.ClickHouse.NewCreateTableBuilder() + sb.CreateTable(tableName) + sb.IfNotExists() + sb.Define("namespace", "String") + sb.Define("time", "DateTime") + sb.Define("error", "String") + sb.Define("event", "String") + sb.SQL("ENGINE = MergeTree") + sb.SQL("PARTITION BY toYYYYMM(time)") + sb.SQL("ORDER BY (namespace, time)") + + sql, _ := sb.Build() + return sql +} + type queryEventsTable struct { Database string EventsTableName string From 6e4f557ddcf83059fcaa63318041142b5fc0f887 Mon Sep 17 00:00:00 2001 From: Peter Marton Date: Sun, 12 Nov 2023 16:57:32 -0800 Subject: [PATCH 02/10] feat(connector): single events table across namespaces --- internal/sink/storage.go | 5 +- .../clickhouse_connector/connector.go | 32 +++----- .../streaming/clickhouse_connector/query.go | 40 +++++----- .../clickhouse_connector/query_test.go | 79 +++++++++---------- 4 files changed, 75 insertions(+), 81 deletions(-) diff --git a/internal/sink/storage.go b/internal/sink/storage.go index 01babcb9b..70a13351a 100644 --- a/internal/sink/storage.go +++ b/internal/sink/storage.go @@ -37,9 +37,8 @@ type ClickHouseStorage struct { func (c *ClickHouseStorage) BatchInsert(ctx context.Context, namespace string, events []*serializer.CloudEventsKafkaPayload) error { query := InsertEventsQuery{ - Database: c.config.Database, - EventsTableName: clickhouse_connector.GetEventsTableName(namespace), - Events: events, + Database: c.config.Database, + Events: events, } sql, args, err := query.ToSQL() if err != nil { diff --git a/internal/streaming/clickhouse_connector/connector.go b/internal/streaming/clickhouse_connector/connector.go index 7797da873..b57e6a2cc 100644 --- a/internal/streaming/clickhouse_connector/connector.go +++ b/internal/streaming/clickhouse_connector/connector.go @@ -18,7 +18,7 @@ import ( var ( prefix = "om" - eventsTableName = "events" + EventsTableName = "events" InvalidEventsTableName = "invalid_events" ) @@ -152,8 +152,7 @@ func (c *ClickhouseConnector) CreateNamespace(ctx context.Context, namespace str func (c *ClickhouseConnector) createEventsTable(ctx context.Context, namespace string) error { table := createEventsTable{ - Database: c.config.Database, - EventsTableName: GetEventsTableName(namespace), + Database: c.config.Database, } err := c.config.ClickHouse.Exec(ctx, table.toSQL()) @@ -166,8 +165,7 @@ func (c *ClickhouseConnector) createEventsTable(ctx context.Context, namespace s func (c *ClickhouseConnector) createInvalidEventsTable(ctx context.Context) error { table := createInvalidEventsTable{ - Database: c.config.Database, - TableName: InvalidEventsTableName, + Database: c.config.Database, } err := c.config.ClickHouse.Exec(ctx, table.toSQL()) @@ -180,9 +178,9 @@ func (c *ClickhouseConnector) createInvalidEventsTable(ctx context.Context) erro func (c *ClickhouseConnector) queryEventsTable(ctx context.Context, namespace string, params streaming.ListEventsParams) ([]event.Event, error) { table := queryEventsTable{ - Database: c.config.Database, - EventsTableName: GetEventsTableName(namespace), - Limit: params.Limit, + Database: c.config.Database, + Namespace: namespace, + Limit: params.Limit, } sql, _, err := table.toSQL() @@ -238,13 +236,13 @@ func (c *ClickhouseConnector) queryEventsTable(ctx context.Context, namespace st func (c *ClickhouseConnector) createMeterView(ctx context.Context, namespace string, meter *models.Meter) error { view := createMeterView{ - Database: c.config.Database, - EventsTableName: GetEventsTableName(namespace), - Aggregation: meter.Aggregation, - EventType: meter.EventType, - MeterViewName: getMeterViewNameBySlug(namespace, meter.Slug), - ValueProperty: meter.ValueProperty, - GroupBy: meter.GroupBy, + Database: c.config.Database, + Namespace: namespace, + Aggregation: meter.Aggregation, + EventType: meter.EventType, + MeterViewName: getMeterViewNameBySlug(namespace, meter.Slug), + ValueProperty: meter.ValueProperty, + GroupBy: meter.GroupBy, } sql, args, err := view.toSQL() if err != nil { @@ -392,10 +390,6 @@ func (c *ClickhouseConnector) listMeterViewSubjects(ctx context.Context, namespa return subjects, nil } -func GetEventsTableName(namespace string) string { - return fmt.Sprintf("%s_%s_%s", prefix, namespace, eventsTableName) -} - func getMeterViewNameBySlug(namespace string, meterSlug string) string { return fmt.Sprintf("%s_%s_%s", prefix, namespace, meterSlug) } diff --git a/internal/streaming/clickhouse_connector/query.go b/internal/streaming/clickhouse_connector/query.go index 42b66cc98..a0b3dce69 100644 --- a/internal/streaming/clickhouse_connector/query.go +++ b/internal/streaming/clickhouse_connector/query.go @@ -20,16 +20,17 @@ type column struct { // Create Events Table type createEventsTable struct { - Database string - EventsTableName string + Database string + TablePrefix string } func (d createEventsTable) toSQL() string { - tableName := fmt.Sprintf("%s.%s", sqlbuilder.Escape(d.Database), sqlbuilder.Escape(d.EventsTableName)) + tableName := fmt.Sprintf("%s.%s_%s", sqlbuilder.Escape(d.Database), prefix, EventsTableName) sb := sqlbuilder.ClickHouse.NewCreateTableBuilder() sb.CreateTable(tableName) sb.IfNotExists() + sb.Define("namespace", "String") sb.Define("id", "String") sb.Define("type", "LowCardinality(String)") sb.Define("subject", "String") @@ -38,7 +39,7 @@ func (d createEventsTable) toSQL() string { sb.Define("data", "String") sb.SQL("ENGINE = MergeTree") sb.SQL("PARTITION BY toYYYYMM(time)") - sb.SQL("ORDER BY (time, type, subject)") + sb.SQL("ORDER BY (namespace, time, type, subject)") sql, _ := sb.Build() return sql @@ -46,12 +47,11 @@ func (d createEventsTable) toSQL() string { // Create Invalid Events Table type createInvalidEventsTable struct { - Database string - TableName string + Database string } func (d createInvalidEventsTable) toSQL() string { - tableName := fmt.Sprintf("%s.%s", sqlbuilder.Escape(d.Database), sqlbuilder.Escape(d.TableName)) + tableName := fmt.Sprintf("%s.%s_%s", sqlbuilder.Escape(d.Database), prefix, InvalidEventsTableName) sb := sqlbuilder.ClickHouse.NewCreateTableBuilder() sb.CreateTable(tableName) @@ -69,17 +69,18 @@ func (d createInvalidEventsTable) toSQL() string { } type queryEventsTable struct { - Database string - EventsTableName string - Limit int + Database string + Namespace string + Limit int } func (d queryEventsTable) toSQL() (string, []interface{}, error) { - tableName := fmt.Sprintf("%s.%s", sqlbuilder.Escape(d.Database), sqlbuilder.Escape(d.EventsTableName)) + tableName := fmt.Sprintf("%s.%s_%s", sqlbuilder.Escape(d.Database), prefix, EventsTableName) query := sqlbuilder.ClickHouse.NewSelectBuilder() query.Select("id", "type", "subject", "source", "time", "data") query.From(tableName) + query.Where(query.Equal("namespace", d.Namespace)) query.Desc().OrderBy("time") query.Limit(d.Limit) @@ -88,18 +89,18 @@ func (d queryEventsTable) toSQL() (string, []interface{}, error) { } type createMeterView struct { - Database string - Aggregation models.MeterAggregation - EventsTableName string - MeterViewName string - EventType string - ValueProperty string - GroupBy map[string]string + Database string + Aggregation models.MeterAggregation + Namespace string + MeterViewName string + EventType string + ValueProperty string + GroupBy map[string]string } func (d createMeterView) toSQL() (string, []interface{}, error) { viewName := fmt.Sprintf("%s.%s", sqlbuilder.Escape(d.Database), sqlbuilder.Escape(d.MeterViewName)) - eventsTableName := fmt.Sprintf("%s.%s", sqlbuilder.Escape(d.Database), sqlbuilder.Escape(d.EventsTableName)) + eventsTableName := fmt.Sprintf("%s.%s_%s", sqlbuilder.Escape(d.Database), prefix, EventsTableName) columns := []column{ {Name: "subject", Type: "String"}, {Name: "windowstart", Type: "DateTime"}, @@ -167,6 +168,7 @@ func (d createMeterView) toSQL() (string, []interface{}, error) { sbAs.Select(asSelects...) sbAs.From(eventsTableName) // We use absolute path for type to avoid shadowing in the case the materialized view have a `type` column due to group by + sbAs.Where(fmt.Sprintf("namespace = '%s'", sqlbuilder.Escape(d.Namespace))) sbAs.Where(fmt.Sprintf("%s.type = '%s'", eventsTableName, sqlbuilder.Escape(d.EventType))) sbAs.GroupBy(orderBy...) sb.SQL(sbAs.String()) diff --git a/internal/streaming/clickhouse_connector/query_test.go b/internal/streaming/clickhouse_connector/query_test.go index d79c1bc99..205a594c6 100644 --- a/internal/streaming/clickhouse_connector/query_test.go +++ b/internal/streaming/clickhouse_connector/query_test.go @@ -16,10 +16,9 @@ func TestCreateEventsTable(t *testing.T) { }{ { data: createEventsTable{ - Database: "openmeter", - EventsTableName: "meter_events", + Database: "openmeter", }, - want: "CREATE TABLE IF NOT EXISTS openmeter.meter_events (id String, type LowCardinality(String), subject String, source String, time DateTime, data String) ENGINE = MergeTree PARTITION BY toYYYYMM(time) ORDER BY (time, type, subject)", + want: "CREATE TABLE IF NOT EXISTS openmeter.om_events (namespace String, id String, type LowCardinality(String), subject String, source String, time DateTime, data String) ENGINE = MergeTree PARTITION BY toYYYYMM(time) ORDER BY (namespace, time, type, subject)", }, } @@ -40,12 +39,12 @@ func TestQueryEventsTable(t *testing.T) { }{ { query: queryEventsTable{ - Database: "openmeter", - EventsTableName: "meter_events", - Limit: 100, + Database: "openmeter", + Namespace: "my_namespace", + Limit: 100, }, - wantSQL: "SELECT id, type, subject, source, time, data FROM openmeter.meter_events ORDER BY time DESC LIMIT 100", - wantArgs: nil, + wantSQL: "SELECT id, type, subject, source, time, data FROM openmeter.om_events WHERE namespace = ? ORDER BY time DESC LIMIT 100", + wantArgs: []interface{}{"my_namespace"}, }, } @@ -72,54 +71,54 @@ func TestCreateMeterView(t *testing.T) { }{ { query: createMeterView{ - Database: "openmeter", - EventsTableName: "meter_events", - Aggregation: models.MeterAggregationSum, - EventType: "myevent", - MeterViewName: "meter_meter1", - ValueProperty: "$.duration_ms", - GroupBy: map[string]string{"group1": "$.group1", "group2": "$.group2"}, + Database: "openmeter", + Namespace: "my_namespace", + Aggregation: models.MeterAggregationSum, + EventType: "myevent", + MeterViewName: "meter_meter1", + ValueProperty: "$.duration_ms", + GroupBy: map[string]string{"group1": "$.group1", "group2": "$.group2"}, }, - wantSQL: "CREATE MATERIALIZED VIEW IF NOT EXISTS openmeter.meter_meter1 (subject String, windowstart DateTime, windowend DateTime, value AggregateFunction(sum, Float64), group1 String, group2 String) ENGINE = AggregatingMergeTree() ORDER BY (windowstart, windowend, subject, group1, group2) AS SELECT subject, tumbleStart(time, toIntervalMinute(1)) AS windowstart, tumbleEnd(time, toIntervalMinute(1)) AS windowend, sumState(cast(JSON_VALUE(data, '$.duration_ms'), 'Float64')) AS value, JSON_VALUE(data, '$.group1') as group1, JSON_VALUE(data, '$.group2') as group2 FROM openmeter.meter_events WHERE openmeter.meter_events.type = 'myevent' GROUP BY windowstart, windowend, subject, group1, group2", + wantSQL: "CREATE MATERIALIZED VIEW IF NOT EXISTS openmeter.meter_meter1 (subject String, windowstart DateTime, windowend DateTime, value AggregateFunction(sum, Float64), group1 String, group2 String) ENGINE = AggregatingMergeTree() ORDER BY (windowstart, windowend, subject, group1, group2) AS SELECT subject, tumbleStart(time, toIntervalMinute(1)) AS windowstart, tumbleEnd(time, toIntervalMinute(1)) AS windowend, sumState(cast(JSON_VALUE(data, '$.duration_ms'), 'Float64')) AS value, JSON_VALUE(data, '$.group1') as group1, JSON_VALUE(data, '$.group2') as group2 FROM openmeter.om_events WHERE namespace = 'my_namespace' AND openmeter.om_events.type = 'myevent' GROUP BY windowstart, windowend, subject, group1, group2", wantArgs: nil, }, { query: createMeterView{ - Database: "openmeter", - EventsTableName: "meter_events", - Aggregation: models.MeterAggregationAvg, - EventType: "myevent", - MeterViewName: "meter_meter1", - ValueProperty: "$.token_count", - GroupBy: map[string]string{}, + Database: "openmeter", + Namespace: "my_namespace", + Aggregation: models.MeterAggregationAvg, + EventType: "myevent", + MeterViewName: "meter_meter1", + ValueProperty: "$.token_count", + GroupBy: map[string]string{}, }, - wantSQL: "CREATE MATERIALIZED VIEW IF NOT EXISTS openmeter.meter_meter1 (subject String, windowstart DateTime, windowend DateTime, value AggregateFunction(avg, Float64)) ENGINE = AggregatingMergeTree() ORDER BY (windowstart, windowend, subject) AS SELECT subject, tumbleStart(time, toIntervalMinute(1)) AS windowstart, tumbleEnd(time, toIntervalMinute(1)) AS windowend, avgState(cast(JSON_VALUE(data, '$.token_count'), 'Float64')) AS value FROM openmeter.meter_events WHERE openmeter.meter_events.type = 'myevent' GROUP BY windowstart, windowend, subject", + wantSQL: "CREATE MATERIALIZED VIEW IF NOT EXISTS openmeter.meter_meter1 (subject String, windowstart DateTime, windowend DateTime, value AggregateFunction(avg, Float64)) ENGINE = AggregatingMergeTree() ORDER BY (windowstart, windowend, subject) AS SELECT subject, tumbleStart(time, toIntervalMinute(1)) AS windowstart, tumbleEnd(time, toIntervalMinute(1)) AS windowend, avgState(cast(JSON_VALUE(data, '$.token_count'), 'Float64')) AS value FROM openmeter.om_events WHERE namespace = 'my_namespace' AND openmeter.om_events.type = 'myevent' GROUP BY windowstart, windowend, subject", wantArgs: nil, }, { query: createMeterView{ - Database: "openmeter", - EventsTableName: "meter_events", - Aggregation: models.MeterAggregationCount, - EventType: "myevent", - MeterViewName: "meter_meter1", - ValueProperty: "", - GroupBy: map[string]string{}, + Database: "openmeter", + Namespace: "my_namespace", + Aggregation: models.MeterAggregationCount, + EventType: "myevent", + MeterViewName: "meter_meter1", + ValueProperty: "", + GroupBy: map[string]string{}, }, - wantSQL: "CREATE MATERIALIZED VIEW IF NOT EXISTS openmeter.meter_meter1 (subject String, windowstart DateTime, windowend DateTime, value AggregateFunction(count, Float64)) ENGINE = AggregatingMergeTree() ORDER BY (windowstart, windowend, subject) AS SELECT subject, tumbleStart(time, toIntervalMinute(1)) AS windowstart, tumbleEnd(time, toIntervalMinute(1)) AS windowend, countState(*) AS value FROM openmeter.meter_events WHERE openmeter.meter_events.type = 'myevent' GROUP BY windowstart, windowend, subject", + wantSQL: "CREATE MATERIALIZED VIEW IF NOT EXISTS openmeter.meter_meter1 (subject String, windowstart DateTime, windowend DateTime, value AggregateFunction(count, Float64)) ENGINE = AggregatingMergeTree() ORDER BY (windowstart, windowend, subject) AS SELECT subject, tumbleStart(time, toIntervalMinute(1)) AS windowstart, tumbleEnd(time, toIntervalMinute(1)) AS windowend, countState(*) AS value FROM openmeter.om_events WHERE namespace = 'my_namespace' AND openmeter.om_events.type = 'myevent' GROUP BY windowstart, windowend, subject", wantArgs: nil, }, { query: createMeterView{ - Database: "openmeter", - EventsTableName: "meter_events", - Aggregation: models.MeterAggregationCount, - EventType: "myevent", - MeterViewName: "meter_meter1", - ValueProperty: "", - GroupBy: map[string]string{}, + Database: "openmeter", + Namespace: "my_namespace", + Aggregation: models.MeterAggregationCount, + EventType: "myevent", + MeterViewName: "meter_meter1", + ValueProperty: "", + GroupBy: map[string]string{}, }, - wantSQL: "CREATE MATERIALIZED VIEW IF NOT EXISTS openmeter.meter_meter1 (subject String, windowstart DateTime, windowend DateTime, value AggregateFunction(count, Float64)) ENGINE = AggregatingMergeTree() ORDER BY (windowstart, windowend, subject) AS SELECT subject, tumbleStart(time, toIntervalMinute(1)) AS windowstart, tumbleEnd(time, toIntervalMinute(1)) AS windowend, countState(*) AS value FROM openmeter.meter_events WHERE openmeter.meter_events.type = 'myevent' GROUP BY windowstart, windowend, subject", + wantSQL: "CREATE MATERIALIZED VIEW IF NOT EXISTS openmeter.meter_meter1 (subject String, windowstart DateTime, windowend DateTime, value AggregateFunction(count, Float64)) ENGINE = AggregatingMergeTree() ORDER BY (windowstart, windowend, subject) AS SELECT subject, tumbleStart(time, toIntervalMinute(1)) AS windowstart, tumbleEnd(time, toIntervalMinute(1)) AS windowend, countState(*) AS value FROM openmeter.om_events WHERE namespace = 'my_namespace' AND openmeter.om_events.type = 'myevent' GROUP BY windowstart, windowend, subject", wantArgs: nil, }, } From 01ec1615a7bf8c43da174e9c175fecb44285bde7 Mon Sep 17 00:00:00 2001 From: Peter Marton Date: Sun, 12 Nov 2023 17:21:16 -0800 Subject: [PATCH 03/10] fix(sink): events table --- internal/sink/storage.go | 21 ++- internal/sink/storage_test.go | 10 +- .../clickhouse_connector/connector.go | 25 ++- .../streaming/clickhouse_connector/query.go | 48 +++-- .../clickhouse_connector/query_test.go | 166 ++++++++++-------- 5 files changed, 151 insertions(+), 119 deletions(-) diff --git a/internal/sink/storage.go b/internal/sink/storage.go index 70a13351a..073dede03 100644 --- a/internal/sink/storage.go +++ b/internal/sink/storage.go @@ -35,10 +35,13 @@ type ClickHouseStorage struct { config ClickHouseStorageConfig } +// TODO: we could insert for all namespaces in one query but that would increase error blast radius +// consider this approach and tradeoffs func (c *ClickHouseStorage) BatchInsert(ctx context.Context, namespace string, events []*serializer.CloudEventsKafkaPayload) error { query := InsertEventsQuery{ - Database: c.config.Database, - Events: events, + Database: c.config.Database, + Namespace: namespace, + Events: events, } sql, args, err := query.ToSQL() if err != nil { @@ -101,20 +104,20 @@ func (c *ClickHouseStorage) BatchInsertInvalid(ctx context.Context, messages []S } type InsertEventsQuery struct { - Database string - EventsTableName string - Events []*serializer.CloudEventsKafkaPayload + Database string + Namespace string + Events []*serializer.CloudEventsKafkaPayload } func (q InsertEventsQuery) ToSQL() (string, []interface{}, error) { - tableName := fmt.Sprintf("%s.%s", sqlbuilder.Escape(q.Database), sqlbuilder.Escape(q.EventsTableName)) + tableName := clickhouse_connector.GetEventsTableName(q.Database) query := sqlbuilder.ClickHouse.NewInsertBuilder() query.InsertInto(tableName) - query.Cols("id", "type", "source", "subject", "time", "data") + query.Cols("namespace", "id", "type", "source", "subject", "time", "data") for _, event := range q.Events { - query.Values(event.Id, event.Type, event.Source, event.Subject, event.Time, event.Data) + query.Values(q.Namespace, event.Id, event.Type, event.Source, event.Subject, event.Time, event.Data) } sql, args := query.Build() @@ -127,7 +130,7 @@ type InsertInvalidQuery struct { } func (q InsertInvalidQuery) ToSQL() (string, []interface{}, error) { - tableName := fmt.Sprintf("%s.%s", sqlbuilder.Escape(q.Database), clickhouse_connector.InvalidEventsTableName) + tableName := fmt.Sprintf("%s.%s_%s", sqlbuilder.Escape(q.Database), clickhouse_connector.TablePrefix, clickhouse_connector.InvalidEventsTableName) query := sqlbuilder.ClickHouse.NewInsertBuilder() query.InsertInto(tableName) diff --git a/internal/sink/storage_test.go b/internal/sink/storage_test.go index 64f154360..b722a7e0f 100644 --- a/internal/sink/storage_test.go +++ b/internal/sink/storage_test.go @@ -14,8 +14,8 @@ func TestInsertEventsQuery(t *testing.T) { now := time.Now() query := sink.InsertEventsQuery{ - Database: "database", - EventsTableName: "events_table", + Database: "database", + Namespace: "my_namespace", Events: []*serializer.CloudEventsKafkaPayload{ { Id: "1", @@ -39,9 +39,9 @@ func TestInsertEventsQuery(t *testing.T) { sql, args, err := query.ToSQL() assert.NoError(t, err) assert.Equal(t, args, []interface{}{ - "1", "api-calls", "source", "subject-1", now.UnixMilli(), `{"duration_ms": 100, "method": "GET", "path": "/api/v1"}`, - "2", "api-calls", "source", "subject-2", now.UnixMilli(), `{"duration_ms": 80, "method": "GET", "path": "/api/v1"}`, + "my_namespace", "1", "api-calls", "source", "subject-1", now.UnixMilli(), `{"duration_ms": 100, "method": "GET", "path": "/api/v1"}`, + "my_namespace", "2", "api-calls", "source", "subject-2", now.UnixMilli(), `{"duration_ms": 80, "method": "GET", "path": "/api/v1"}`, }) - assert.Equal(t, `INSERT INTO database.events_table (id, type, source, subject, time, data) VALUES (?, ?, ?, ?, ?, ?), (?, ?, ?, ?, ?, ?)`, sql) + assert.Equal(t, `INSERT INTO database.om_events (namespace, id, type, source, subject, time, data) VALUES (?, ?, ?, ?, ?, ?, ?), (?, ?, ?, ?, ?, ?, ?)`, sql) } diff --git a/internal/streaming/clickhouse_connector/connector.go b/internal/streaming/clickhouse_connector/connector.go index b57e6a2cc..3dc700500 100644 --- a/internal/streaming/clickhouse_connector/connector.go +++ b/internal/streaming/clickhouse_connector/connector.go @@ -17,7 +17,7 @@ import ( ) var ( - prefix = "om" + TablePrefix = "om" EventsTableName = "events" InvalidEventsTableName = "invalid_events" ) @@ -238,9 +238,9 @@ func (c *ClickhouseConnector) createMeterView(ctx context.Context, namespace str view := createMeterView{ Database: c.config.Database, Namespace: namespace, + MeterSlug: meter.Slug, Aggregation: meter.Aggregation, EventType: meter.EventType, - MeterViewName: getMeterViewNameBySlug(namespace, meter.Slug), ValueProperty: meter.ValueProperty, GroupBy: meter.GroupBy, } @@ -258,8 +258,9 @@ func (c *ClickhouseConnector) createMeterView(ctx context.Context, namespace str func (c *ClickhouseConnector) deleteMeterView(ctx context.Context, namespace string, meterSlug string) error { query := deleteMeterView{ - Database: c.config.Database, - MeterViewName: getMeterViewNameBySlug(namespace, meterSlug), + Database: c.config.Database, + Namespace: namespace, + MeterSlug: meterSlug, } sql, args := query.toSQL() err := c.config.ClickHouse.Exec(ctx, sql, args...) @@ -277,7 +278,8 @@ func (c *ClickhouseConnector) deleteMeterView(ctx context.Context, namespace str func (c *ClickhouseConnector) queryMeterView(ctx context.Context, namespace string, meterSlug string, params *streaming.QueryParams) ([]*models.MeterValue, error) { queryMeter := queryMeterView{ Database: c.config.Database, - MeterViewName: getMeterViewNameBySlug(namespace, meterSlug), + Namespace: namespace, + MeterSlug: meterSlug, Aggregation: params.Aggregation, From: params.From, To: params.To, @@ -357,10 +359,11 @@ func (c *ClickhouseConnector) queryMeterView(ctx context.Context, namespace stri func (c *ClickhouseConnector) listMeterViewSubjects(ctx context.Context, namespace string, meterSlug string, from *time.Time, to *time.Time) ([]string, error) { query := listMeterViewSubjects{ - Database: c.config.Database, - MeterViewName: getMeterViewNameBySlug(namespace, meterSlug), - From: from, - To: to, + Database: c.config.Database, + Namespace: namespace, + MeterSlug: meterSlug, + From: from, + To: to, } sql, args, err := query.toSQL() @@ -389,7 +392,3 @@ func (c *ClickhouseConnector) listMeterViewSubjects(ctx context.Context, namespa return subjects, nil } - -func getMeterViewNameBySlug(namespace string, meterSlug string) string { - return fmt.Sprintf("%s_%s_%s", prefix, namespace, meterSlug) -} diff --git a/internal/streaming/clickhouse_connector/query.go b/internal/streaming/clickhouse_connector/query.go index a0b3dce69..b7acbf0e3 100644 --- a/internal/streaming/clickhouse_connector/query.go +++ b/internal/streaming/clickhouse_connector/query.go @@ -25,7 +25,7 @@ type createEventsTable struct { } func (d createEventsTable) toSQL() string { - tableName := fmt.Sprintf("%s.%s_%s", sqlbuilder.Escape(d.Database), prefix, EventsTableName) + tableName := GetEventsTableName(d.Database) sb := sqlbuilder.ClickHouse.NewCreateTableBuilder() sb.CreateTable(tableName) @@ -51,7 +51,7 @@ type createInvalidEventsTable struct { } func (d createInvalidEventsTable) toSQL() string { - tableName := fmt.Sprintf("%s.%s_%s", sqlbuilder.Escape(d.Database), prefix, InvalidEventsTableName) + tableName := GetInvalidEventsTableName(d.Database) sb := sqlbuilder.ClickHouse.NewCreateTableBuilder() sb.CreateTable(tableName) @@ -75,7 +75,7 @@ type queryEventsTable struct { } func (d queryEventsTable) toSQL() (string, []interface{}, error) { - tableName := fmt.Sprintf("%s.%s_%s", sqlbuilder.Escape(d.Database), prefix, EventsTableName) + tableName := GetEventsTableName(d.Database) query := sqlbuilder.ClickHouse.NewSelectBuilder() query.Select("id", "type", "subject", "source", "time", "data") @@ -92,15 +92,15 @@ type createMeterView struct { Database string Aggregation models.MeterAggregation Namespace string - MeterViewName string + MeterSlug string EventType string ValueProperty string GroupBy map[string]string } func (d createMeterView) toSQL() (string, []interface{}, error) { - viewName := fmt.Sprintf("%s.%s", sqlbuilder.Escape(d.Database), sqlbuilder.Escape(d.MeterViewName)) - eventsTableName := fmt.Sprintf("%s.%s_%s", sqlbuilder.Escape(d.Database), prefix, EventsTableName) + viewName := GetMeterViewName(d.Database, d.Namespace, d.MeterSlug) + eventsTableName := GetEventsTableName(d.Database) columns := []column{ {Name: "subject", Type: "String"}, {Name: "windowstart", Type: "DateTime"}, @@ -181,18 +181,20 @@ func (d createMeterView) toSQL() (string, []interface{}, error) { } type deleteMeterView struct { - Database string - MeterViewName string + Database string + Namespace string + MeterSlug string } func (d deleteMeterView) toSQL() (string, []interface{}) { - viewName := fmt.Sprintf("%s.%s", sqlbuilder.Escape(d.Database), sqlbuilder.Escape(d.MeterViewName)) + viewName := GetMeterViewName(d.Database, d.Namespace, d.MeterSlug) return fmt.Sprintf("DROP VIEW %s", viewName), nil } type queryMeterView struct { Database string - MeterViewName string + Namespace string + MeterSlug string Aggregation models.MeterAggregation Subject []string From *time.Time @@ -204,7 +206,7 @@ type queryMeterView struct { } func (d queryMeterView) toSQL() (string, []interface{}, error) { - viewName := fmt.Sprintf("%s.%s", sqlbuilder.Escape(d.Database), sqlbuilder.Escape(d.MeterViewName)) + viewName := GetMeterViewName(d.Database, d.Namespace, d.MeterSlug) var selectColumns, groupByColumns, where []string @@ -326,14 +328,15 @@ func sortedKeys(m map[string]string) []string { } type listMeterViewSubjects struct { - Database string - MeterViewName string - From *time.Time - To *time.Time + Database string + Namespace string + MeterSlug string + From *time.Time + To *time.Time } func (d listMeterViewSubjects) toSQL() (string, []interface{}, error) { - viewName := fmt.Sprintf("%s.%s", sqlbuilder.Escape(d.Database), sqlbuilder.Escape(d.MeterViewName)) + viewName := GetMeterViewName(d.Database, d.Namespace, d.MeterSlug) var where []string sb := sqlbuilder.ClickHouse.NewSelectBuilder() @@ -357,3 +360,16 @@ func (d listMeterViewSubjects) toSQL() (string, []interface{}, error) { sql, args := sb.Build() return sql, args, nil } + +func GetEventsTableName(database string) string { + return fmt.Sprintf("%s.%s_%s", sqlbuilder.Escape(database), TablePrefix, EventsTableName) +} + +func GetInvalidEventsTableName(database string) string { + return fmt.Sprintf("%s.%s_%s", sqlbuilder.Escape(database), TablePrefix, InvalidEventsTableName) +} + +func GetMeterViewName(database string, namespace string, meterSlug string) string { + meterViewName := fmt.Sprintf("%s_%s_%s", TablePrefix, namespace, meterSlug) + return fmt.Sprintf("%s.%s", sqlbuilder.Escape(database), sqlbuilder.Escape(meterViewName)) +} diff --git a/internal/streaming/clickhouse_connector/query_test.go b/internal/streaming/clickhouse_connector/query_test.go index 205a594c6..d07bcaf7e 100644 --- a/internal/streaming/clickhouse_connector/query_test.go +++ b/internal/streaming/clickhouse_connector/query_test.go @@ -73,52 +73,52 @@ func TestCreateMeterView(t *testing.T) { query: createMeterView{ Database: "openmeter", Namespace: "my_namespace", + MeterSlug: "meter1", Aggregation: models.MeterAggregationSum, EventType: "myevent", - MeterViewName: "meter_meter1", ValueProperty: "$.duration_ms", GroupBy: map[string]string{"group1": "$.group1", "group2": "$.group2"}, }, - wantSQL: "CREATE MATERIALIZED VIEW IF NOT EXISTS openmeter.meter_meter1 (subject String, windowstart DateTime, windowend DateTime, value AggregateFunction(sum, Float64), group1 String, group2 String) ENGINE = AggregatingMergeTree() ORDER BY (windowstart, windowend, subject, group1, group2) AS SELECT subject, tumbleStart(time, toIntervalMinute(1)) AS windowstart, tumbleEnd(time, toIntervalMinute(1)) AS windowend, sumState(cast(JSON_VALUE(data, '$.duration_ms'), 'Float64')) AS value, JSON_VALUE(data, '$.group1') as group1, JSON_VALUE(data, '$.group2') as group2 FROM openmeter.om_events WHERE namespace = 'my_namespace' AND openmeter.om_events.type = 'myevent' GROUP BY windowstart, windowend, subject, group1, group2", + wantSQL: "CREATE MATERIALIZED VIEW IF NOT EXISTS openmeter.om_my_namespace_meter1 (subject String, windowstart DateTime, windowend DateTime, value AggregateFunction(sum, Float64), group1 String, group2 String) ENGINE = AggregatingMergeTree() ORDER BY (windowstart, windowend, subject, group1, group2) AS SELECT subject, tumbleStart(time, toIntervalMinute(1)) AS windowstart, tumbleEnd(time, toIntervalMinute(1)) AS windowend, sumState(cast(JSON_VALUE(data, '$.duration_ms'), 'Float64')) AS value, JSON_VALUE(data, '$.group1') as group1, JSON_VALUE(data, '$.group2') as group2 FROM openmeter.om_events WHERE namespace = 'my_namespace' AND openmeter.om_events.type = 'myevent' GROUP BY windowstart, windowend, subject, group1, group2", wantArgs: nil, }, { query: createMeterView{ Database: "openmeter", Namespace: "my_namespace", + MeterSlug: "meter1", Aggregation: models.MeterAggregationAvg, EventType: "myevent", - MeterViewName: "meter_meter1", ValueProperty: "$.token_count", GroupBy: map[string]string{}, }, - wantSQL: "CREATE MATERIALIZED VIEW IF NOT EXISTS openmeter.meter_meter1 (subject String, windowstart DateTime, windowend DateTime, value AggregateFunction(avg, Float64)) ENGINE = AggregatingMergeTree() ORDER BY (windowstart, windowend, subject) AS SELECT subject, tumbleStart(time, toIntervalMinute(1)) AS windowstart, tumbleEnd(time, toIntervalMinute(1)) AS windowend, avgState(cast(JSON_VALUE(data, '$.token_count'), 'Float64')) AS value FROM openmeter.om_events WHERE namespace = 'my_namespace' AND openmeter.om_events.type = 'myevent' GROUP BY windowstart, windowend, subject", + wantSQL: "CREATE MATERIALIZED VIEW IF NOT EXISTS openmeter.om_my_namespace_meter1 (subject String, windowstart DateTime, windowend DateTime, value AggregateFunction(avg, Float64)) ENGINE = AggregatingMergeTree() ORDER BY (windowstart, windowend, subject) AS SELECT subject, tumbleStart(time, toIntervalMinute(1)) AS windowstart, tumbleEnd(time, toIntervalMinute(1)) AS windowend, avgState(cast(JSON_VALUE(data, '$.token_count'), 'Float64')) AS value FROM openmeter.om_events WHERE namespace = 'my_namespace' AND openmeter.om_events.type = 'myevent' GROUP BY windowstart, windowend, subject", wantArgs: nil, }, { query: createMeterView{ Database: "openmeter", Namespace: "my_namespace", + MeterSlug: "meter1", Aggregation: models.MeterAggregationCount, EventType: "myevent", - MeterViewName: "meter_meter1", ValueProperty: "", GroupBy: map[string]string{}, }, - wantSQL: "CREATE MATERIALIZED VIEW IF NOT EXISTS openmeter.meter_meter1 (subject String, windowstart DateTime, windowend DateTime, value AggregateFunction(count, Float64)) ENGINE = AggregatingMergeTree() ORDER BY (windowstart, windowend, subject) AS SELECT subject, tumbleStart(time, toIntervalMinute(1)) AS windowstart, tumbleEnd(time, toIntervalMinute(1)) AS windowend, countState(*) AS value FROM openmeter.om_events WHERE namespace = 'my_namespace' AND openmeter.om_events.type = 'myevent' GROUP BY windowstart, windowend, subject", + wantSQL: "CREATE MATERIALIZED VIEW IF NOT EXISTS openmeter.om_my_namespace_meter1 (subject String, windowstart DateTime, windowend DateTime, value AggregateFunction(count, Float64)) ENGINE = AggregatingMergeTree() ORDER BY (windowstart, windowend, subject) AS SELECT subject, tumbleStart(time, toIntervalMinute(1)) AS windowstart, tumbleEnd(time, toIntervalMinute(1)) AS windowend, countState(*) AS value FROM openmeter.om_events WHERE namespace = 'my_namespace' AND openmeter.om_events.type = 'myevent' GROUP BY windowstart, windowend, subject", wantArgs: nil, }, { query: createMeterView{ Database: "openmeter", Namespace: "my_namespace", + MeterSlug: "meter1", Aggregation: models.MeterAggregationCount, EventType: "myevent", - MeterViewName: "meter_meter1", ValueProperty: "", GroupBy: map[string]string{}, }, - wantSQL: "CREATE MATERIALIZED VIEW IF NOT EXISTS openmeter.meter_meter1 (subject String, windowstart DateTime, windowend DateTime, value AggregateFunction(count, Float64)) ENGINE = AggregatingMergeTree() ORDER BY (windowstart, windowend, subject) AS SELECT subject, tumbleStart(time, toIntervalMinute(1)) AS windowstart, tumbleEnd(time, toIntervalMinute(1)) AS windowend, countState(*) AS value FROM openmeter.om_events WHERE namespace = 'my_namespace' AND openmeter.om_events.type = 'myevent' GROUP BY windowstart, windowend, subject", + wantSQL: "CREATE MATERIALIZED VIEW IF NOT EXISTS openmeter.om_my_namespace_meter1 (subject String, windowstart DateTime, windowend DateTime, value AggregateFunction(count, Float64)) ENGINE = AggregatingMergeTree() ORDER BY (windowstart, windowend, subject) AS SELECT subject, tumbleStart(time, toIntervalMinute(1)) AS windowstart, tumbleEnd(time, toIntervalMinute(1)) AS windowend, countState(*) AS value FROM openmeter.om_events WHERE namespace = 'my_namespace' AND openmeter.om_events.type = 'myevent' GROUP BY windowstart, windowend, subject", wantArgs: nil, }, } @@ -146,10 +146,11 @@ func TestDeleteMeterView(t *testing.T) { }{ { data: deleteMeterView{ - Database: "openmeter", - MeterViewName: "meter_meter1", + Database: "openmeter", + Namespace: "my_namespace", + MeterSlug: "meter1", }, - wantSQL: "DROP VIEW openmeter.meter_meter1", + wantSQL: "DROP VIEW openmeter.om_my_namespace_meter1", wantArgs: nil, }, } @@ -179,111 +180,121 @@ func TestQueryMeterView(t *testing.T) { }{ { query: queryMeterView{ - Database: "openmeter", - MeterViewName: "meter_meter1", - Aggregation: models.MeterAggregationSum, - Subject: []string{subject}, - From: &from, - To: &to, - GroupBy: []string{"group1", "group2"}, - WindowSize: &windowSize, + Database: "openmeter", + Namespace: "my_namespace", + MeterSlug: "meter1", + Aggregation: models.MeterAggregationSum, + Subject: []string{subject}, + From: &from, + To: &to, + GroupBy: []string{"group1", "group2"}, + WindowSize: &windowSize, }, - wantSQL: "SELECT tumbleStart(windowstart, toIntervalHour(1), 'UTC') AS windowstart, tumbleEnd(windowstart, toIntervalHour(1), 'UTC') AS windowend, subject, sumMerge(value) AS value, group1, group2 FROM openmeter.meter_meter1 WHERE (subject = ?) AND windowstart >= ? AND windowend <= ? GROUP BY windowstart, windowend, subject, group1, group2 ORDER BY windowstart", + wantSQL: "SELECT tumbleStart(windowstart, toIntervalHour(1), 'UTC') AS windowstart, tumbleEnd(windowstart, toIntervalHour(1), 'UTC') AS windowend, subject, sumMerge(value) AS value, group1, group2 FROM openmeter.om_my_namespace_meter1 WHERE (subject = ?) AND windowstart >= ? AND windowend <= ? GROUP BY windowstart, windowend, subject, group1, group2 ORDER BY windowstart", wantArgs: []interface{}{"subject1", from.Unix(), to.Unix()}, }, { // Aggregate all available data query: queryMeterView{ - Database: "openmeter", - MeterViewName: "meter_meter1", - Aggregation: models.MeterAggregationSum, + Database: "openmeter", + Namespace: "my_namespace", + MeterSlug: "meter1", + Aggregation: models.MeterAggregationSum, }, - wantSQL: "SELECT min(windowstart), max(windowend), sumMerge(value) AS value FROM openmeter.meter_meter1", + wantSQL: "SELECT min(windowstart), max(windowend), sumMerge(value) AS value FROM openmeter.om_my_namespace_meter1", wantArgs: nil, }, { // Aggregate with count aggregation query: queryMeterView{ - Database: "openmeter", - MeterViewName: "meter_meter1", - Aggregation: models.MeterAggregationCount, + Database: "openmeter", + Namespace: "my_namespace", + MeterSlug: "meter1", + Aggregation: models.MeterAggregationCount, }, - wantSQL: "SELECT min(windowstart), max(windowend), toFloat64(countMerge(value)) AS value FROM openmeter.meter_meter1", + wantSQL: "SELECT min(windowstart), max(windowend), toFloat64(countMerge(value)) AS value FROM openmeter.om_my_namespace_meter1", wantArgs: nil, }, { // Aggregate data from start query: queryMeterView{ - Database: "openmeter", - MeterViewName: "meter_meter1", - Aggregation: models.MeterAggregationSum, - From: &from, + Database: "openmeter", + Namespace: "my_namespace", + MeterSlug: "meter1", + Aggregation: models.MeterAggregationSum, + From: &from, }, - wantSQL: "SELECT min(windowstart), max(windowend), sumMerge(value) AS value FROM openmeter.meter_meter1 WHERE windowstart >= ?", + wantSQL: "SELECT min(windowstart), max(windowend), sumMerge(value) AS value FROM openmeter.om_my_namespace_meter1 WHERE windowstart >= ?", wantArgs: []interface{}{from.Unix()}, }, { // Aggregate data between period query: queryMeterView{ - Database: "openmeter", - MeterViewName: "meter_meter1", - Aggregation: models.MeterAggregationSum, - From: &from, - To: &to, + Database: "openmeter", + Namespace: "my_namespace", + MeterSlug: "meter1", + Aggregation: models.MeterAggregationSum, + From: &from, + To: &to, }, - wantSQL: "SELECT min(windowstart), max(windowend), sumMerge(value) AS value FROM openmeter.meter_meter1 WHERE windowstart >= ? AND windowend <= ?", + wantSQL: "SELECT min(windowstart), max(windowend), sumMerge(value) AS value FROM openmeter.om_my_namespace_meter1 WHERE windowstart >= ? AND windowend <= ?", wantArgs: []interface{}{from.Unix(), to.Unix()}, }, { // Aggregate data between period, groupped by window size query: queryMeterView{ - Database: "openmeter", - MeterViewName: "meter_meter1", - Aggregation: models.MeterAggregationSum, - From: &from, - To: &to, - WindowSize: &windowSize, + Database: "openmeter", + Namespace: "my_namespace", + MeterSlug: "meter1", + Aggregation: models.MeterAggregationSum, + From: &from, + To: &to, + WindowSize: &windowSize, }, - wantSQL: "SELECT tumbleStart(windowstart, toIntervalHour(1), 'UTC') AS windowstart, tumbleEnd(windowstart, toIntervalHour(1), 'UTC') AS windowend, sumMerge(value) AS value FROM openmeter.meter_meter1 WHERE windowstart >= ? AND windowend <= ? GROUP BY windowstart, windowend ORDER BY windowstart", + wantSQL: "SELECT tumbleStart(windowstart, toIntervalHour(1), 'UTC') AS windowstart, tumbleEnd(windowstart, toIntervalHour(1), 'UTC') AS windowend, sumMerge(value) AS value FROM openmeter.om_my_namespace_meter1 WHERE windowstart >= ? AND windowend <= ? GROUP BY windowstart, windowend ORDER BY windowstart", wantArgs: []interface{}{from.Unix(), to.Unix()}, }, { // Aggregate data between period in a different timezone, groupped by window size query: queryMeterView{ Database: "openmeter", - MeterViewName: "meter_meter1", + Namespace: "my_namespace", + MeterSlug: "meter1", Aggregation: models.MeterAggregationSum, From: &from, To: &to, WindowSize: &windowSize, WindowTimeZone: tz, }, - wantSQL: "SELECT tumbleStart(windowstart, toIntervalHour(1), 'Asia/Shanghai') AS windowstart, tumbleEnd(windowstart, toIntervalHour(1), 'Asia/Shanghai') AS windowend, sumMerge(value) AS value FROM openmeter.meter_meter1 WHERE windowstart >= ? AND windowend <= ? GROUP BY windowstart, windowend ORDER BY windowstart", + wantSQL: "SELECT tumbleStart(windowstart, toIntervalHour(1), 'Asia/Shanghai') AS windowstart, tumbleEnd(windowstart, toIntervalHour(1), 'Asia/Shanghai') AS windowend, sumMerge(value) AS value FROM openmeter.om_my_namespace_meter1 WHERE windowstart >= ? AND windowend <= ? GROUP BY windowstart, windowend ORDER BY windowstart", wantArgs: []interface{}{from.Unix(), to.Unix()}, }, { // Aggregate data for a single subject query: queryMeterView{ - Database: "openmeter", - MeterViewName: "meter_meter1", - Aggregation: models.MeterAggregationSum, - Subject: []string{subject}, + Database: "openmeter", + Namespace: "my_namespace", + MeterSlug: "meter1", + Aggregation: models.MeterAggregationSum, + Subject: []string{subject}, }, - wantSQL: "SELECT min(windowstart), max(windowend), subject, sumMerge(value) AS value FROM openmeter.meter_meter1 WHERE (subject = ?) GROUP BY subject", + wantSQL: "SELECT min(windowstart), max(windowend), subject, sumMerge(value) AS value FROM openmeter.om_my_namespace_meter1 WHERE (subject = ?) GROUP BY subject", wantArgs: []interface{}{"subject1"}, }, { // Aggregate data for a single subject and group by additional fields query: queryMeterView{ - Database: "openmeter", - MeterViewName: "meter_meter1", - Aggregation: models.MeterAggregationSum, - Subject: []string{subject}, - GroupBy: []string{"group1", "group2"}, + Database: "openmeter", + Namespace: "my_namespace", + MeterSlug: "meter1", + Aggregation: models.MeterAggregationSum, + Subject: []string{subject}, + GroupBy: []string{"group1", "group2"}, }, - wantSQL: "SELECT min(windowstart), max(windowend), subject, sumMerge(value) AS value, group1, group2 FROM openmeter.meter_meter1 WHERE (subject = ?) GROUP BY subject, group1, group2", + wantSQL: "SELECT min(windowstart), max(windowend), subject, sumMerge(value) AS value, group1, group2 FROM openmeter.om_my_namespace_meter1 WHERE (subject = ?) GROUP BY subject, group1, group2", wantArgs: []interface{}{"subject1"}, }, { // Aggregate data for a multiple subjects query: queryMeterView{ - Database: "openmeter", - MeterViewName: "meter_meter1", - Aggregation: models.MeterAggregationSum, - Subject: []string{subject, "subject2"}, + Database: "openmeter", + Namespace: "my_namespace", + MeterSlug: "meter1", + Aggregation: models.MeterAggregationSum, + Subject: []string{subject, "subject2"}, }, - wantSQL: "SELECT min(windowstart), max(windowend), subject, sumMerge(value) AS value FROM openmeter.meter_meter1 WHERE (subject = ? OR subject = ?) GROUP BY subject", + wantSQL: "SELECT min(windowstart), max(windowend), subject, sumMerge(value) AS value FROM openmeter.om_my_namespace_meter1 WHERE (subject = ? OR subject = ?) GROUP BY subject", wantArgs: []interface{}{"subject1", "subject2"}, }, } @@ -314,29 +325,32 @@ func TestListMeterViewSubjects(t *testing.T) { }{ { query: listMeterViewSubjects{ - Database: "openmeter", - MeterViewName: "meter_meter1", + Database: "openmeter", + Namespace: "my_namespace", + MeterSlug: "meter1", }, - wantSQL: "SELECT DISTINCT subject FROM openmeter.meter_meter1 ORDER BY subject", + wantSQL: "SELECT DISTINCT subject FROM openmeter.om_my_namespace_meter1 ORDER BY subject", wantArgs: nil, }, { query: listMeterViewSubjects{ - Database: "openmeter", - MeterViewName: "meter_meter1", - From: &from, + Database: "openmeter", + Namespace: "my_namespace", + MeterSlug: "meter1", + From: &from, }, - wantSQL: "SELECT DISTINCT subject FROM openmeter.meter_meter1 WHERE windowstart >= ? ORDER BY subject", + wantSQL: "SELECT DISTINCT subject FROM openmeter.om_my_namespace_meter1 WHERE windowstart >= ? ORDER BY subject", wantArgs: []interface{}{from.Unix()}, }, { query: listMeterViewSubjects{ - Database: "openmeter", - MeterViewName: "meter_meter1", - From: &from, - To: &to, + Database: "openmeter", + Namespace: "my_namespace", + MeterSlug: "meter1", + From: &from, + To: &to, }, - wantSQL: "SELECT DISTINCT subject FROM openmeter.meter_meter1 WHERE windowstart >= ? AND windowend <= ? ORDER BY subject", + wantSQL: "SELECT DISTINCT subject FROM openmeter.om_my_namespace_meter1 WHERE windowstart >= ? AND windowend <= ? ORDER BY subject", wantArgs: []interface{}{from.Unix(), to.Unix()}, }, } From f4f859f1ca0080b9292c09e749d912335079ffb4 Mon Sep 17 00:00:00 2001 From: Peter Marton Date: Sun, 12 Nov 2023 19:48:44 -0800 Subject: [PATCH 04/10] feat(sink): batch insert --- internal/sink/sink.go | 26 ++--- internal/sink/storage.go | 29 +++--- internal/sink/storage_test.go | 35 ++++--- .../streaming/clickhouse_connector/query.go | 97 +++++++++++++------ 4 files changed, 112 insertions(+), 75 deletions(-) diff --git a/internal/sink/sink.go b/internal/sink/sink.go index 8069d3437..390bad579 100644 --- a/internal/sink/sink.go +++ b/internal/sink/sink.go @@ -231,9 +231,9 @@ func (s *Sink) persistToStorage(ctx context.Context, messages []SinkMessage) ([] defer persistSpan.End() deadletterMessages := []SinkMessage{} - batchesPerNamespace := map[string][]SinkMessage{} + batch := []SinkMessage{} - // Group messages per namespaces and filter out deadletter and drop messages + // Flter out deadletter and drop messages for _, message := range messages { if message.Error != nil { switch message.Error.ProcessingControl { @@ -249,25 +249,13 @@ func (s *Sink) persistToStorage(ctx context.Context, messages []SinkMessage) ([] return deadletterMessages, fmt.Errorf("unknown error type: %s", message.Error) } } - - batchesPerNamespace[message.Namespace] = append(batchesPerNamespace[message.Namespace], message) + batch = append(batch, message) } - // Insert into permanent storage per namespace - for namespace, batch := range batchesPerNamespace { - // Start otel span for storage batch insert + // Storage Batch insert + if len(batch) > 0 { storageCtx, storageSpan := s.config.Tracer.Start(persistCtx, "storage-batch-insert") - storageSpan.SetAttributes( - attribute.String("namespace", namespace), - attribute.Int("size", len(batch)), - ) - - list := []*serializer.CloudEventsKafkaPayload{} - for _, message := range batch { - list = append(list, message.Serialized) - } - - err := s.config.Storage.BatchInsert(storageCtx, namespace, list) + err := s.config.Storage.BatchInsert(storageCtx, batch) if err != nil { // Note: a single error in batch will make the whole batch fail if perr, ok := err.(*ProcessingError); ok { @@ -277,7 +265,6 @@ func (s *Sink) persistToStorage(ctx context.Context, messages []SinkMessage) ([] deadletterMessages = append(deadletterMessages, batch...) case DROP: storageSpan.SetStatus(codes.Error, "drop") - continue default: storageSpan.SetStatus(codes.Error, "unknown processing error type") storageSpan.RecordError(err) @@ -293,7 +280,6 @@ func (s *Sink) persistToStorage(ctx context.Context, messages []SinkMessage) ([] } } logger.Debug("succeeded to sink to storage", "buffer size", len(messages)) - storageSpan.End() } return deadletterMessages, nil diff --git a/internal/sink/storage.go b/internal/sink/storage.go index 073dede03..b02a52d1c 100644 --- a/internal/sink/storage.go +++ b/internal/sink/storage.go @@ -9,14 +9,13 @@ import ( "github.com/ClickHouse/clickhouse-go/v2" "github.com/huandu/go-sqlbuilder" - "github.com/openmeterio/openmeter/internal/ingest/kafkaingest/serializer" "github.com/openmeterio/openmeter/internal/streaming/clickhouse_connector" ) var codeRegexp = regexp.MustCompile(`code: (0-9]+)`) type Storage interface { - BatchInsert(ctx context.Context, namespace string, events []*serializer.CloudEventsKafkaPayload) error + BatchInsert(ctx context.Context, messages []SinkMessage) error BatchInsertInvalid(ctx context.Context, messages []SinkMessage) error } @@ -35,13 +34,10 @@ type ClickHouseStorage struct { config ClickHouseStorageConfig } -// TODO: we could insert for all namespaces in one query but that would increase error blast radius -// consider this approach and tradeoffs -func (c *ClickHouseStorage) BatchInsert(ctx context.Context, namespace string, events []*serializer.CloudEventsKafkaPayload) error { +func (c *ClickHouseStorage) BatchInsert(ctx context.Context, messages []SinkMessage) error { query := InsertEventsQuery{ - Database: c.config.Database, - Namespace: namespace, - Events: events, + Database: c.config.Database, + Messages: messages, } sql, args, err := query.ToSQL() if err != nil { @@ -104,9 +100,8 @@ func (c *ClickHouseStorage) BatchInsertInvalid(ctx context.Context, messages []S } type InsertEventsQuery struct { - Database string - Namespace string - Events []*serializer.CloudEventsKafkaPayload + Database string + Messages []SinkMessage } func (q InsertEventsQuery) ToSQL() (string, []interface{}, error) { @@ -116,8 +111,16 @@ func (q InsertEventsQuery) ToSQL() (string, []interface{}, error) { query.InsertInto(tableName) query.Cols("namespace", "id", "type", "source", "subject", "time", "data") - for _, event := range q.Events { - query.Values(q.Namespace, event.Id, event.Type, event.Source, event.Subject, event.Time, event.Data) + for _, message := range q.Messages { + query.Values( + message.Namespace, + message.Serialized.Id, + message.Serialized.Type, + message.Serialized.Source, + message.Serialized.Subject, + message.Serialized.Time, + message.Serialized.Data, + ) } sql, args := query.Build() diff --git a/internal/sink/storage_test.go b/internal/sink/storage_test.go index b722a7e0f..2eeaad075 100644 --- a/internal/sink/storage_test.go +++ b/internal/sink/storage_test.go @@ -14,24 +14,29 @@ func TestInsertEventsQuery(t *testing.T) { now := time.Now() query := sink.InsertEventsQuery{ - Database: "database", - Namespace: "my_namespace", - Events: []*serializer.CloudEventsKafkaPayload{ + Database: "database", + Messages: []sink.SinkMessage{ { - Id: "1", - Source: "source", - Subject: "subject-1", - Time: now.UnixMilli(), - Type: "api-calls", - Data: `{"duration_ms": 100, "method": "GET", "path": "/api/v1"}`, + Namespace: "my_namespace", + Serialized: &serializer.CloudEventsKafkaPayload{ + Id: "1", + Source: "source", + Subject: "subject-1", + Time: now.UnixMilli(), + Type: "api-calls", + Data: `{"duration_ms": 100, "method": "GET", "path": "/api/v1"}`, + }, }, { - Id: "2", - Source: "source", - Subject: "subject-2", - Time: now.UnixMilli(), - Type: "api-calls", - Data: `{"duration_ms": 80, "method": "GET", "path": "/api/v1"}`, + Namespace: "my_namespace", + Serialized: &serializer.CloudEventsKafkaPayload{ + Id: "2", + Source: "source", + Subject: "subject-2", + Time: now.UnixMilli(), + Type: "api-calls", + Data: `{"duration_ms": 80, "method": "GET", "path": "/api/v1"}`, + }, }, }, } diff --git a/internal/streaming/clickhouse_connector/query.go b/internal/streaming/clickhouse_connector/query.go index b7acbf0e3..dccbff3b1 100644 --- a/internal/streaming/clickhouse_connector/query.go +++ b/internal/streaming/clickhouse_connector/query.go @@ -100,58 +100,39 @@ type createMeterView struct { func (d createMeterView) toSQL() (string, []interface{}, error) { viewName := GetMeterViewName(d.Database, d.Namespace, d.MeterSlug) - eventsTableName := GetEventsTableName(d.Database) columns := []column{ {Name: "subject", Type: "String"}, {Name: "windowstart", Type: "DateTime"}, {Name: "windowend", Type: "DateTime"}, } - asSelects := []string{ - "subject", - "tumbleStart(time, toIntervalMinute(1)) AS windowstart", - "tumbleEnd(time, toIntervalMinute(1)) AS windowend", - } // Value agg := "" - aggStateFn := "" switch d.Aggregation { case models.MeterAggregationSum: agg = "sum" - aggStateFn = "sumState" case models.MeterAggregationAvg: agg = "avg" - aggStateFn = "avgState" case models.MeterAggregationMin: agg = "min" - aggStateFn = "minState" case models.MeterAggregationMax: agg = "max" - aggStateFn = "maxState" case models.MeterAggregationCount: agg = "count" - aggStateFn = "countState" default: return "", nil, fmt.Errorf("invalid aggregation type: %s", d.Aggregation) } columns = append(columns, column{Name: "value", Type: fmt.Sprintf("AggregateFunction(%s, Float64)", agg)}) - if d.ValueProperty == "" && d.Aggregation == models.MeterAggregationCount { - asSelects = append(asSelects, fmt.Sprintf("%s(*) AS value", aggStateFn)) - } else { - asSelects = append(asSelects, fmt.Sprintf("%s(cast(JSON_VALUE(data, '%s'), 'Float64')) AS value", aggStateFn, sqlbuilder.Escape(d.ValueProperty))) - } // Group by orderBy := []string{"windowstart", "windowend", "subject"} sortedGroupBy := sortedKeys(d.GroupBy) for _, k := range sortedGroupBy { - v := d.GroupBy[k] columnName := sqlbuilder.Escape(k) orderBy = append(orderBy, sqlbuilder.Escape(columnName)) columns = append(columns, column{Name: columnName, Type: "String"}) - asSelects = append(asSelects, fmt.Sprintf("JSON_VALUE(data, '%s') as %s", sqlbuilder.Escape(v), sqlbuilder.Escape(k))) } sb := sqlbuilder.ClickHouse.NewCreateTableBuilder() @@ -164,14 +145,12 @@ func (d createMeterView) toSQL() (string, []interface{}, error) { sb.SQL(fmt.Sprintf("ORDER BY (%s)", strings.Join(orderBy, ", "))) sb.SQL("AS") - sbAs := sqlbuilder.ClickHouse.NewSelectBuilder() - sbAs.Select(asSelects...) - sbAs.From(eventsTableName) - // We use absolute path for type to avoid shadowing in the case the materialized view have a `type` column due to group by - sbAs.Where(fmt.Sprintf("namespace = '%s'", sqlbuilder.Escape(d.Namespace))) - sbAs.Where(fmt.Sprintf("%s.type = '%s'", eventsTableName, sqlbuilder.Escape(d.EventType))) - sbAs.GroupBy(orderBy...) - sb.SQL(sbAs.String()) + selectQuery, err := d.toSelectSQL() + if err != nil { + return "", nil, err + } + + sb.SQL(selectQuery) sql, args := sb.Build() // TODO: can we do it differently? @@ -180,6 +159,70 @@ func (d createMeterView) toSQL() (string, []interface{}, error) { return sql, args, nil } +func (d createMeterView) updateQuery() (string, error) { + viewName := GetMeterViewName(d.Database, d.Namespace, d.MeterSlug) + selectQuery, err := d.toSelectSQL() + if err != nil { + return "", err + } + + return fmt.Sprintf("ALTER TABLE %s MODIFY QUERY %s", viewName, selectQuery), nil +} + +func (d createMeterView) toSelectSQL() (string, error) { + eventsTableName := GetEventsTableName(d.Database) + + asSelects := []string{ + "subject", + "tumbleStart(time, toIntervalMinute(1)) AS windowstart", + "tumbleEnd(time, toIntervalMinute(1)) AS windowend", + } + + // Value + aggStateFn := "" + + switch d.Aggregation { + case models.MeterAggregationSum: + aggStateFn = "sumState" + case models.MeterAggregationAvg: + aggStateFn = "avgState" + case models.MeterAggregationMin: + aggStateFn = "minState" + case models.MeterAggregationMax: + aggStateFn = "maxState" + case models.MeterAggregationCount: + aggStateFn = "countState" + default: + return "", fmt.Errorf("invalid aggregation type: %s", d.Aggregation) + } + + if d.ValueProperty == "" && d.Aggregation == models.MeterAggregationCount { + asSelects = append(asSelects, fmt.Sprintf("%s(*) AS value", aggStateFn)) + } else { + asSelects = append(asSelects, fmt.Sprintf("%s(cast(JSON_VALUE(data, '%s'), 'Float64')) AS value", aggStateFn, sqlbuilder.Escape(d.ValueProperty))) + } + + // Group by + orderBy := []string{"windowstart", "windowend", "subject"} + sortedGroupBy := sortedKeys(d.GroupBy) + for _, k := range sortedGroupBy { + v := d.GroupBy[k] + columnName := sqlbuilder.Escape(k) + orderBy = append(orderBy, sqlbuilder.Escape(columnName)) + asSelects = append(asSelects, fmt.Sprintf("JSON_VALUE(data, '%s') as %s", sqlbuilder.Escape(v), sqlbuilder.Escape(k))) + } + + query := sqlbuilder.ClickHouse.NewSelectBuilder() + query.Select(asSelects...) + query.From(eventsTableName) + // We use absolute path for type to avoid shadowing in the case the materialized view have a `type` column due to group by + query.Where(fmt.Sprintf("namespace = '%s'", sqlbuilder.Escape(d.Namespace))) + query.Where(fmt.Sprintf("%s.type = '%s'", eventsTableName, sqlbuilder.Escape(d.EventType))) + query.GroupBy(orderBy...) + + return query.String(), nil +} + type deleteMeterView struct { Database string Namespace string From 965120e9fe78971c7e1ec5c26b3eb2de3a1a668f Mon Sep 17 00:00:00 2001 From: Peter Marton Date: Sun, 12 Nov 2023 19:53:50 -0800 Subject: [PATCH 05/10] feat(sink): batch insert --- internal/streaming/clickhouse_connector/query.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/internal/streaming/clickhouse_connector/query.go b/internal/streaming/clickhouse_connector/query.go index dccbff3b1..843756510 100644 --- a/internal/streaming/clickhouse_connector/query.go +++ b/internal/streaming/clickhouse_connector/query.go @@ -159,16 +159,6 @@ func (d createMeterView) toSQL() (string, []interface{}, error) { return sql, args, nil } -func (d createMeterView) updateQuery() (string, error) { - viewName := GetMeterViewName(d.Database, d.Namespace, d.MeterSlug) - selectQuery, err := d.toSelectSQL() - if err != nil { - return "", err - } - - return fmt.Sprintf("ALTER TABLE %s MODIFY QUERY %s", viewName, selectQuery), nil -} - func (d createMeterView) toSelectSQL() (string, error) { eventsTableName := GetEventsTableName(d.Database) From a8f8d601bf5aba6abae940f23d7385e7684095f7 Mon Sep 17 00:00:00 2001 From: Peter Marton Date: Sun, 12 Nov 2023 20:09:21 -0800 Subject: [PATCH 06/10] refactor(sink): query --- internal/sink/storage.go | 2 +- internal/streaming/clickhouse_connector/connector.go | 2 +- internal/streaming/clickhouse_connector/query.go | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/sink/storage.go b/internal/sink/storage.go index b02a52d1c..3afc45c20 100644 --- a/internal/sink/storage.go +++ b/internal/sink/storage.go @@ -133,7 +133,7 @@ type InsertInvalidQuery struct { } func (q InsertInvalidQuery) ToSQL() (string, []interface{}, error) { - tableName := fmt.Sprintf("%s.%s_%s", sqlbuilder.Escape(q.Database), clickhouse_connector.TablePrefix, clickhouse_connector.InvalidEventsTableName) + tableName := clickhouse_connector.GetInvalidEventsTableName(q.Database) query := sqlbuilder.ClickHouse.NewInsertBuilder() query.InsertInto(tableName) diff --git a/internal/streaming/clickhouse_connector/connector.go b/internal/streaming/clickhouse_connector/connector.go index 3dc700500..8f0392fbe 100644 --- a/internal/streaming/clickhouse_connector/connector.go +++ b/internal/streaming/clickhouse_connector/connector.go @@ -17,7 +17,7 @@ import ( ) var ( - TablePrefix = "om" + tablePrefix = "om" EventsTableName = "events" InvalidEventsTableName = "invalid_events" ) diff --git a/internal/streaming/clickhouse_connector/query.go b/internal/streaming/clickhouse_connector/query.go index 843756510..2c8b43c89 100644 --- a/internal/streaming/clickhouse_connector/query.go +++ b/internal/streaming/clickhouse_connector/query.go @@ -395,14 +395,14 @@ func (d listMeterViewSubjects) toSQL() (string, []interface{}, error) { } func GetEventsTableName(database string) string { - return fmt.Sprintf("%s.%s_%s", sqlbuilder.Escape(database), TablePrefix, EventsTableName) + return fmt.Sprintf("%s.%s_%s", sqlbuilder.Escape(database), tablePrefix, EventsTableName) } func GetInvalidEventsTableName(database string) string { - return fmt.Sprintf("%s.%s_%s", sqlbuilder.Escape(database), TablePrefix, InvalidEventsTableName) + return fmt.Sprintf("%s.%s_%s", sqlbuilder.Escape(database), tablePrefix, InvalidEventsTableName) } func GetMeterViewName(database string, namespace string, meterSlug string) string { - meterViewName := fmt.Sprintf("%s_%s_%s", TablePrefix, namespace, meterSlug) + meterViewName := fmt.Sprintf("%s_%s_%s", tablePrefix, namespace, meterSlug) return fmt.Sprintf("%s.%s", sqlbuilder.Escape(database), sqlbuilder.Escape(meterViewName)) } From 0237661f731ce78e4b843e6f5de6822a70aa8bb5 Mon Sep 17 00:00:00 2001 From: Peter Marton Date: Sun, 12 Nov 2023 20:10:30 -0800 Subject: [PATCH 07/10] refactor(sink): query --- internal/streaming/clickhouse_connector/query.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/streaming/clickhouse_connector/query.go b/internal/streaming/clickhouse_connector/query.go index 2c8b43c89..2bdb777d5 100644 --- a/internal/streaming/clickhouse_connector/query.go +++ b/internal/streaming/clickhouse_connector/query.go @@ -20,8 +20,7 @@ type column struct { // Create Events Table type createEventsTable struct { - Database string - TablePrefix string + Database string } func (d createEventsTable) toSQL() string { From b4b16db3ff572038d13d38aee306177e59cc6910 Mon Sep 17 00:00:00 2001 From: Peter Marton Date: Sun, 12 Nov 2023 20:12:07 -0800 Subject: [PATCH 08/10] refactor(connector): query --- internal/streaming/clickhouse_connector/connector.go | 2 +- internal/streaming/clickhouse_connector/query.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/streaming/clickhouse_connector/connector.go b/internal/streaming/clickhouse_connector/connector.go index 8f0392fbe..966dc2f97 100644 --- a/internal/streaming/clickhouse_connector/connector.go +++ b/internal/streaming/clickhouse_connector/connector.go @@ -17,7 +17,7 @@ import ( ) var ( - tablePrefix = "om" + tablePrefix = "om_" EventsTableName = "events" InvalidEventsTableName = "invalid_events" ) diff --git a/internal/streaming/clickhouse_connector/query.go b/internal/streaming/clickhouse_connector/query.go index 2bdb777d5..d96a54a9a 100644 --- a/internal/streaming/clickhouse_connector/query.go +++ b/internal/streaming/clickhouse_connector/query.go @@ -394,14 +394,14 @@ func (d listMeterViewSubjects) toSQL() (string, []interface{}, error) { } func GetEventsTableName(database string) string { - return fmt.Sprintf("%s.%s_%s", sqlbuilder.Escape(database), tablePrefix, EventsTableName) + return fmt.Sprintf("%s.%s%s", sqlbuilder.Escape(database), tablePrefix, EventsTableName) } func GetInvalidEventsTableName(database string) string { - return fmt.Sprintf("%s.%s_%s", sqlbuilder.Escape(database), tablePrefix, InvalidEventsTableName) + return fmt.Sprintf("%s.%s%s", sqlbuilder.Escape(database), tablePrefix, InvalidEventsTableName) } func GetMeterViewName(database string, namespace string, meterSlug string) string { - meterViewName := fmt.Sprintf("%s_%s_%s", tablePrefix, namespace, meterSlug) + meterViewName := fmt.Sprintf("%s%s_%s", tablePrefix, namespace, meterSlug) return fmt.Sprintf("%s.%s", sqlbuilder.Escape(database), sqlbuilder.Escape(meterViewName)) } From 23bf2f6501ec73a8062eb357ae9909f5fd093649 Mon Sep 17 00:00:00 2001 From: Peter Marton Date: Sun, 12 Nov 2023 20:14:24 -0800 Subject: [PATCH 09/10] refactor(connector): query --- .../streaming/clickhouse_connector/query.go | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/internal/streaming/clickhouse_connector/query.go b/internal/streaming/clickhouse_connector/query.go index d96a54a9a..73f8d644b 100644 --- a/internal/streaming/clickhouse_connector/query.go +++ b/internal/streaming/clickhouse_connector/query.go @@ -161,15 +161,7 @@ func (d createMeterView) toSQL() (string, []interface{}, error) { func (d createMeterView) toSelectSQL() (string, error) { eventsTableName := GetEventsTableName(d.Database) - asSelects := []string{ - "subject", - "tumbleStart(time, toIntervalMinute(1)) AS windowstart", - "tumbleEnd(time, toIntervalMinute(1)) AS windowend", - } - - // Value aggStateFn := "" - switch d.Aggregation { case models.MeterAggregationSum: aggStateFn = "sumState" @@ -185,10 +177,16 @@ func (d createMeterView) toSelectSQL() (string, error) { return "", fmt.Errorf("invalid aggregation type: %s", d.Aggregation) } + // Selects + selects := []string{ + "subject", + "tumbleStart(time, toIntervalMinute(1)) AS windowstart", + "tumbleEnd(time, toIntervalMinute(1)) AS windowend", + } if d.ValueProperty == "" && d.Aggregation == models.MeterAggregationCount { - asSelects = append(asSelects, fmt.Sprintf("%s(*) AS value", aggStateFn)) + selects = append(selects, fmt.Sprintf("%s(*) AS value", aggStateFn)) } else { - asSelects = append(asSelects, fmt.Sprintf("%s(cast(JSON_VALUE(data, '%s'), 'Float64')) AS value", aggStateFn, sqlbuilder.Escape(d.ValueProperty))) + selects = append(selects, fmt.Sprintf("%s(cast(JSON_VALUE(data, '%s'), 'Float64')) AS value", aggStateFn, sqlbuilder.Escape(d.ValueProperty))) } // Group by @@ -198,11 +196,11 @@ func (d createMeterView) toSelectSQL() (string, error) { v := d.GroupBy[k] columnName := sqlbuilder.Escape(k) orderBy = append(orderBy, sqlbuilder.Escape(columnName)) - asSelects = append(asSelects, fmt.Sprintf("JSON_VALUE(data, '%s') as %s", sqlbuilder.Escape(v), sqlbuilder.Escape(k))) + selects = append(selects, fmt.Sprintf("JSON_VALUE(data, '%s') as %s", sqlbuilder.Escape(v), sqlbuilder.Escape(k))) } query := sqlbuilder.ClickHouse.NewSelectBuilder() - query.Select(asSelects...) + query.Select(selects...) query.From(eventsTableName) // We use absolute path for type to avoid shadowing in the case the materialized view have a `type` column due to group by query.Where(fmt.Sprintf("namespace = '%s'", sqlbuilder.Escape(d.Namespace))) From 1e8998e3eb7b2e60bc689b0ad69fdc5d45cf6f14 Mon Sep 17 00:00:00 2001 From: Peter Marton Date: Mon, 13 Nov 2023 04:52:25 -0800 Subject: [PATCH 10/10] feat(connector): query --- cmd/server/main.go | 10 ++++++---- config/aggregation.go | 6 ++++++ .../clickhouse_connector/connector.go | 20 +++++++++++++++---- .../streaming/clickhouse_connector/query.go | 6 ++++++ 4 files changed, 34 insertions(+), 8 deletions(-) diff --git a/cmd/server/main.go b/cmd/server/main.go index 6884368d7..8276329ea 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -385,10 +385,12 @@ func initClickHouseStreaming(config config.Configuration, meterRepository meter. } streamingConnector, err := clickhouse_connector.NewClickhouseConnector(clickhouse_connector.ClickhouseConnectorConfig{ - Logger: logger, - ClickHouse: clickHouseClient, - Database: config.Aggregation.ClickHouse.Database, - Meters: meterRepository, + Logger: logger, + ClickHouse: clickHouseClient, + Database: config.Aggregation.ClickHouse.Database, + Meters: meterRepository, + CreateOrReplaceMeter: config.Aggregation.CreateOrReplaceMeter, + PopulateMeter: config.Aggregation.PopulateMeter, }) if err != nil { return nil, fmt.Errorf("init clickhouse streaming: %w", err) diff --git a/config/aggregation.go b/config/aggregation.go index 545347730..f4982aebe 100644 --- a/config/aggregation.go +++ b/config/aggregation.go @@ -9,6 +9,12 @@ import ( type AggregationConfiguration struct { ClickHouse ClickHouseAggregationConfiguration + // Populate creates the materialized view with data from the events table + // This is not safe to use in production as requires to stop ingestion + PopulateMeter bool + // CreateOrReplace is used to force the recreation of the materialized view + // This is not safe to use in production as it will drop the existing views + CreateOrReplaceMeter bool } // Validate validates the configuration. diff --git a/internal/streaming/clickhouse_connector/connector.go b/internal/streaming/clickhouse_connector/connector.go index 966dc2f97..9e3a23905 100644 --- a/internal/streaming/clickhouse_connector/connector.go +++ b/internal/streaming/clickhouse_connector/connector.go @@ -28,10 +28,12 @@ type ClickhouseConnector struct { } type ClickhouseConnectorConfig struct { - Logger *slog.Logger - ClickHouse clickhouse.Conn - Database string - Meters meter.Repository + Logger *slog.Logger + ClickHouse clickhouse.Conn + Database string + Meters meter.Repository + CreateOrReplaceMeter bool + PopulateMeter bool } func NewClickhouseConnector(config ClickhouseConnectorConfig) (*ClickhouseConnector, error) { @@ -235,7 +237,17 @@ func (c *ClickhouseConnector) queryEventsTable(ctx context.Context, namespace st } func (c *ClickhouseConnector) createMeterView(ctx context.Context, namespace string, meter *models.Meter) error { + // CreateOrReplace is used to force the recreation of the materialized view + // This is not safe to use in production as it will drop the existing views + if c.config.CreateOrReplaceMeter { + err := c.deleteMeterView(ctx, namespace, meter.Slug) + if err != nil { + return fmt.Errorf("drop meter view: %w", err) + } + } + view := createMeterView{ + Populate: c.config.PopulateMeter, Database: c.config.Database, Namespace: namespace, MeterSlug: meter.Slug, diff --git a/internal/streaming/clickhouse_connector/query.go b/internal/streaming/clickhouse_connector/query.go index 73f8d644b..89437fe8a 100644 --- a/internal/streaming/clickhouse_connector/query.go +++ b/internal/streaming/clickhouse_connector/query.go @@ -95,6 +95,9 @@ type createMeterView struct { EventType string ValueProperty string GroupBy map[string]string + // Populate creates the materialized view with data from the events table + // This is not safe to use in production as requires to stop ingestion + Populate bool } func (d createMeterView) toSQL() (string, []interface{}, error) { @@ -142,6 +145,9 @@ func (d createMeterView) toSQL() (string, []interface{}, error) { } sb.SQL("ENGINE = AggregatingMergeTree()") sb.SQL(fmt.Sprintf("ORDER BY (%s)", strings.Join(orderBy, ", "))) + if d.Populate { + sb.SQL("POPULATE") + } sb.SQL("AS") selectQuery, err := d.toSelectSQL()