diff --git a/cmd/server/main.go b/cmd/server/main.go index 6884368d7..8276329ea 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -385,10 +385,12 @@ func initClickHouseStreaming(config config.Configuration, meterRepository meter. } streamingConnector, err := clickhouse_connector.NewClickhouseConnector(clickhouse_connector.ClickhouseConnectorConfig{ - Logger: logger, - ClickHouse: clickHouseClient, - Database: config.Aggregation.ClickHouse.Database, - Meters: meterRepository, + Logger: logger, + ClickHouse: clickHouseClient, + Database: config.Aggregation.ClickHouse.Database, + Meters: meterRepository, + CreateOrReplaceMeter: config.Aggregation.CreateOrReplaceMeter, + PopulateMeter: config.Aggregation.PopulateMeter, }) if err != nil { return nil, fmt.Errorf("init clickhouse streaming: %w", err) diff --git a/cmd/sink-worker/main.go b/cmd/sink-worker/main.go index c6f7e4fc0..198591eaf 100644 --- a/cmd/sink-worker/main.go +++ b/cmd/sink-worker/main.go @@ -13,6 +13,7 @@ import ( health "github.com/AppsFlyer/go-sundheit" healthhttp "github.com/AppsFlyer/go-sundheit/http" "github.com/ClickHouse/clickhouse-go/v2" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" "github.com/go-slog/otelslog" @@ -253,21 +254,27 @@ func initSink(config config.Configuration, logger *slog.Logger, metricMeter metr consumerKafkaConfig := config.Ingest.Kafka.CreateKafkaConfig() _ = consumerKafkaConfig.SetKey("group.id", config.Sink.GroupId) + _ = consumerKafkaConfig.SetKey("session.timeout.ms", 6000) + _ = consumerKafkaConfig.SetKey("enable.auto.commit", false) + _ = consumerKafkaConfig.SetKey("enable.auto.offset.store", false) + _ = consumerKafkaConfig.SetKey("go.application.rebalance.enable", true) - producerKafkaConfig := config.Ingest.Kafka.CreateKafkaConfig() + consumer, err := kafka.NewConsumer(&consumerKafkaConfig) + if err != nil { + return nil, fmt.Errorf("failed to initialize kafka consumer: %s", err) + } sinkConfig := sink.SinkConfig{ - Logger: logger, - Tracer: tracer, - MetricMeter: metricMeter, - MeterRepository: meterRepository, - Storage: storage, - Deduplicator: deduplicator, - ConsumerKafkaConfig: consumerKafkaConfig, - ProducerKafkaConfig: producerKafkaConfig, - MinCommitCount: config.Sink.MinCommitCount, - MaxCommitWait: config.Sink.MaxCommitWait, - NamespaceRefetch: config.Sink.NamespaceRefetch, + Logger: logger, + Tracer: tracer, + MetricMeter: metricMeter, + MeterRepository: meterRepository, + Storage: storage, + Deduplicator: deduplicator, + Consumer: consumer, + MinCommitCount: config.Sink.MinCommitCount, + MaxCommitWait: config.Sink.MaxCommitWait, + NamespaceRefetch: config.Sink.NamespaceRefetch, } return sink.NewSink(sinkConfig) diff --git a/config/aggregation.go b/config/aggregation.go index 545347730..f4982aebe 100644 --- a/config/aggregation.go +++ b/config/aggregation.go @@ -9,6 +9,12 @@ import ( type AggregationConfiguration struct { ClickHouse ClickHouseAggregationConfiguration + // Populate creates the materialized view with data from the events table + // This is not safe to use in production as requires to stop ingestion + PopulateMeter bool + // CreateOrReplace is used to force the recreation of the materialized view + // This is not safe to use in production as it will drop the existing views + CreateOrReplaceMeter bool } // Validate validates the configuration. diff --git a/internal/sink/sink.go b/internal/sink/sink.go index 8525e468d..390bad579 100644 --- a/internal/sink/sink.go +++ b/internal/sink/sink.go @@ -24,7 +24,6 @@ import ( ) var namespaceTopicRegexp = regexp.MustCompile(`^om_([A-Za-z0-9]+(?:_[A-Za-z0-9]+)*)_events$`) -var defaultDeadletterTopicTemplate = "om_%s_events_deadletter" type SinkMessage struct { Namespace string @@ -34,8 +33,6 @@ type SinkMessage struct { } type Sink struct { - consumer *kafka.Consumer - producer *kafka.Producer config SinkConfig running bool buffer *SinkBuffer @@ -47,14 +44,13 @@ type Sink struct { } type SinkConfig struct { - Logger *slog.Logger - Tracer trace.Tracer - MetricMeter metric.Meter - MeterRepository meter.Repository - Storage Storage - Deduplicator dedupe.Deduplicator - ConsumerKafkaConfig kafka.ConfigMap - ProducerKafkaConfig kafka.ConfigMap + Logger *slog.Logger + Tracer trace.Tracer + MetricMeter metric.Meter + MeterRepository meter.Repository + Storage Storage + Deduplicator dedupe.Deduplicator + Consumer *kafka.Consumer // MinCommitCount is the minimum number of messages to wait before flushing the buffer. // Whichever happens earlier MinCommitCount or MaxCommitWait will trigger a flush. MinCommitCount int @@ -64,9 +60,6 @@ type SinkConfig struct { // this information is used to configure which topics the consumer subscribes and // the meter configs used in event validation. NamespaceRefetch time.Duration - // DeadletterTopicTemplate is the template used to create the deadletter topic name per namespace. - // It is a sprintf template with the namespace as the only argument. - DeadletterTopicTemplate string // OnFlushSuccess is an optional lifecycle hook OnFlushSuccess func(string, int64) } @@ -76,22 +69,6 @@ func NewSink(config SinkConfig) (*Sink, error) { config.Logger.Warn("deduplicator is not set, deduplication will be disabled") } - // These are Kafka configs but also related to sink logic - _ = config.ConsumerKafkaConfig.SetKey("session.timeout.ms", 6000) - _ = config.ConsumerKafkaConfig.SetKey("enable.auto.commit", false) - _ = config.ConsumerKafkaConfig.SetKey("enable.auto.offset.store", false) - _ = config.ConsumerKafkaConfig.SetKey("go.application.rebalance.enable", true) - - consumer, err := kafka.NewConsumer(&config.ConsumerKafkaConfig) - if err != nil { - return nil, fmt.Errorf("failed to create consumer: %s", err) - } - - producer, err := kafka.NewProducer(&config.ProducerKafkaConfig) - if err != nil { - return nil, fmt.Errorf("failed to create producer: %w", err) - } - // Defaults if config.MinCommitCount == 0 { config.MinCommitCount = 1 @@ -102,9 +79,6 @@ func NewSink(config SinkConfig) (*Sink, error) { if config.NamespaceRefetch == 0 { config.NamespaceRefetch = 15 * time.Second } - if config.DeadletterTopicTemplate == "" { - config.DeadletterTopicTemplate = defaultDeadletterTopicTemplate - } // Initialize OTel metrics messageCounter, err := config.MetricMeter.Int64Counter( @@ -126,8 +100,6 @@ func NewSink(config SinkConfig) (*Sink, error) { } sink := &Sink{ - consumer: consumer, - producer: producer, config: config, buffer: NewSinkBuffer(), namespaceStore: NewNamespaceStore(), @@ -259,9 +231,9 @@ func (s *Sink) persistToStorage(ctx context.Context, messages []SinkMessage) ([] defer persistSpan.End() deadletterMessages := []SinkMessage{} - batchesPerNamespace := map[string][]SinkMessage{} + batch := []SinkMessage{} - // Group messages per namespaces and filter out deadletter and drop messages + // Flter out deadletter and drop messages for _, message := range messages { if message.Error != nil { switch message.Error.ProcessingControl { @@ -277,25 +249,13 @@ func (s *Sink) persistToStorage(ctx context.Context, messages []SinkMessage) ([] return deadletterMessages, fmt.Errorf("unknown error type: %s", message.Error) } } - - batchesPerNamespace[message.Namespace] = append(batchesPerNamespace[message.Namespace], message) + batch = append(batch, message) } - // Insert into permanent storage per namespace - for namespace, batch := range batchesPerNamespace { - // Start otel span for storage batch insert + // Storage Batch insert + if len(batch) > 0 { storageCtx, storageSpan := s.config.Tracer.Start(persistCtx, "storage-batch-insert") - storageSpan.SetAttributes( - attribute.String("namespace", namespace), - attribute.Int("size", len(batch)), - ) - - list := []*serializer.CloudEventsKafkaPayload{} - for _, message := range batch { - list = append(list, message.Serialized) - } - - err := s.config.Storage.BatchInsert(storageCtx, namespace, list) + err := s.config.Storage.BatchInsert(storageCtx, batch) if err != nil { // Note: a single error in batch will make the whole batch fail if perr, ok := err.(*ProcessingError); ok { @@ -305,7 +265,6 @@ func (s *Sink) persistToStorage(ctx context.Context, messages []SinkMessage) ([] deadletterMessages = append(deadletterMessages, batch...) case DROP: storageSpan.SetStatus(codes.Error, "drop") - continue default: storageSpan.SetStatus(codes.Error, "unknown processing error type") storageSpan.RecordError(err) @@ -321,45 +280,26 @@ func (s *Sink) persistToStorage(ctx context.Context, messages []SinkMessage) ([] } } logger.Debug("succeeded to sink to storage", "buffer size", len(messages)) - storageSpan.End() } return deadletterMessages, nil } -// deadLetter sends a message to the dead letter queue, useful permanent non-recoverable errors like json parsing +// deadLetter stores invalid message, useful permanent non-recoverable errors like json parsing func (s *Sink) deadLetter(ctx context.Context, messages ...SinkMessage) error { logger := s.config.Logger.With("operation", "deadLetter") _, deadletterSpan := s.config.Tracer.Start(ctx, "deadletter") - for _, message := range messages { - topic := fmt.Sprintf(s.config.DeadletterTopicTemplate, message.Namespace) - headers := message.KafkaMessage.Headers - headers = append(headers, kafka.Header{Key: "error", Value: []byte(message.Error.Error())}) - - msg := &kafka.Message{ - TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, - Timestamp: message.KafkaMessage.Timestamp, - Headers: headers, - Key: message.KafkaMessage.Key, - Value: message.KafkaMessage.Value, - } - - err := s.producer.Produce(msg, nil) - if err != nil { - deadletterSpan.SetStatus(codes.Error, "deadletter failure") - deadletterSpan.RecordError(err) - deadletterSpan.End() + err := s.config.Storage.BatchInsertInvalid(ctx, messages) + if err != nil { + deadletterSpan.SetStatus(codes.Error, "deadletter failure") + deadletterSpan.RecordError(err) + deadletterSpan.End() - return fmt.Errorf("producing kafka message to deadletter topic: %w", err) - } - deadletterSpan.AddEvent( - "deadletter", - trace.WithAttributes(attribute.String("namespace", message.Namespace)), - ) + return fmt.Errorf("storing invalid messages: %w", err) } - logger.Debug("succeeded to deadletter", "messages", len(messages)) + logger.Debug("succeeded to store invalid", "messages", len(messages)) deadletterSpan.End() return nil } @@ -378,7 +318,7 @@ func (s *Sink) offsetCommit(ctx context.Context, messages []SinkMessage) error { // We retry with exponential backoff as it's critical that either step #2 or #3 succeeds. err := retry.Do( func() error { - commitedOffsets, err := s.consumer.CommitOffsets(offsets) + commitedOffsets, err := s.config.Consumer.CommitOffsets(offsets) if err != nil { return err } @@ -484,7 +424,7 @@ func (s *Sink) subscribeToNamespaces() error { topics := getTopics(*s.namespaceStore) logger.Info("new namespaces detected, subscribing to topics", "topics", topics) - err = s.consumer.SubscribeTopics(topics, s.rebalance) + err = s.config.Consumer.SubscribeTopics(topics, s.rebalance) if err != nil { return fmt.Errorf("failed to subscribe to topics: %s", err) } @@ -558,7 +498,7 @@ func (s *Sink) Run() error { logger.Error("caught signal, terminating", "sig", sig) s.running = false default: - ev := s.consumer.Poll(100) + ev := s.config.Consumer.Poll(100) if ev == nil { continue } @@ -587,7 +527,7 @@ func (s *Sink) Run() error { logger.Debug("event added to buffer", "partition", e.TopicPartition.Partition, "offset", e.TopicPartition.Offset, "event", kafkaCloudEvent) // Store message, this won't commit offset immediately just store it for the next manual commit - _, err = s.consumer.StoreMessage(e) + _, err = s.config.Consumer.StoreMessage(e) if err != nil { // Stop processing, non-recoverable error return fmt.Errorf("failed to store kafka message for upcoming offset commit: %w", err) @@ -625,7 +565,7 @@ func (s *Sink) rebalance(c *kafka.Consumer, event kafka.Event) error { switch e := event.(type) { case kafka.AssignedPartitions: logger.Info("kafka assigned partitions", "partitions", e.Partitions) - err := s.consumer.Assign(e.Partitions) + err := s.config.Consumer.Assign(e.Partitions) if err != nil { return fmt.Errorf("failed to assign partitions: %w", err) } @@ -641,7 +581,7 @@ func (s *Sink) rebalance(c *kafka.Consumer, event kafka.Event) error { // involuntarily. In this case, the partition might already be owned // by another consumer, and operations including committing // offsets may not work. - if s.consumer.AssignmentLost() { + if s.config.Consumer.AssignmentLost() { // Our consumer has been kicked out of the group and the // entire assignment is thus lost. logger.Warn("assignment lost involuntarily, commit may fail") @@ -653,7 +593,7 @@ func (s *Sink) rebalance(c *kafka.Consumer, event kafka.Event) error { return fmt.Errorf("failed to flush: %w", err) } - err = s.consumer.Unassign() + err = s.config.Consumer.Unassign() if err != nil { return fmt.Errorf("failed to unassign partitions: %w", err) } @@ -712,7 +652,7 @@ func (s *Sink) Close() error { if s.flushTimer != nil { s.flushTimer.Stop() } - return s.consumer.Close() + return s.config.Consumer.Close() } // getNamespace from topic diff --git a/internal/sink/storage.go b/internal/sink/storage.go index 16764a648..3afc45c20 100644 --- a/internal/sink/storage.go +++ b/internal/sink/storage.go @@ -9,14 +9,14 @@ import ( "github.com/ClickHouse/clickhouse-go/v2" "github.com/huandu/go-sqlbuilder" - "github.com/openmeterio/openmeter/internal/ingest/kafkaingest/serializer" "github.com/openmeterio/openmeter/internal/streaming/clickhouse_connector" ) var codeRegexp = regexp.MustCompile(`code: (0-9]+)`) type Storage interface { - BatchInsert(ctx context.Context, namespace string, events []*serializer.CloudEventsKafkaPayload) error + BatchInsert(ctx context.Context, messages []SinkMessage) error + BatchInsertInvalid(ctx context.Context, messages []SinkMessage) error } type ClickHouseStorageConfig struct { @@ -34,11 +34,10 @@ type ClickHouseStorage struct { config ClickHouseStorageConfig } -func (c *ClickHouseStorage) BatchInsert(ctx context.Context, namespace string, events []*serializer.CloudEventsKafkaPayload) error { +func (c *ClickHouseStorage) BatchInsert(ctx context.Context, messages []SinkMessage) error { query := InsertEventsQuery{ - Database: c.config.Database, - EventsTableName: clickhouse_connector.GetEventsTableName(namespace), - Events: events, + Database: c.config.Database, + Messages: messages, } sql, args, err := query.ToSQL() if err != nil { @@ -82,21 +81,66 @@ func (c *ClickHouseStorage) BatchInsert(ctx context.Context, namespace string, e return nil } +func (c *ClickHouseStorage) BatchInsertInvalid(ctx context.Context, messages []SinkMessage) error { + query := InsertInvalidQuery{ + Database: c.config.Database, + Messages: messages, + } + sql, args, err := query.ToSQL() + if err != nil { + return err + } + + err = c.config.ClickHouse.Exec(ctx, sql, args...) + if err != nil { + return err + } + + return nil +} + type InsertEventsQuery struct { - Database string - EventsTableName string - Events []*serializer.CloudEventsKafkaPayload + Database string + Messages []SinkMessage } func (q InsertEventsQuery) ToSQL() (string, []interface{}, error) { - tableName := fmt.Sprintf("%s.%s", sqlbuilder.Escape(q.Database), sqlbuilder.Escape(q.EventsTableName)) + tableName := clickhouse_connector.GetEventsTableName(q.Database) + + query := sqlbuilder.ClickHouse.NewInsertBuilder() + query.InsertInto(tableName) + query.Cols("namespace", "id", "type", "source", "subject", "time", "data") + + for _, message := range q.Messages { + query.Values( + message.Namespace, + message.Serialized.Id, + message.Serialized.Type, + message.Serialized.Source, + message.Serialized.Subject, + message.Serialized.Time, + message.Serialized.Data, + ) + } + + sql, args := query.Build() + return sql, args, nil +} + +type InsertInvalidQuery struct { + Database string + Messages []SinkMessage +} + +func (q InsertInvalidQuery) ToSQL() (string, []interface{}, error) { + tableName := clickhouse_connector.GetInvalidEventsTableName(q.Database) query := sqlbuilder.ClickHouse.NewInsertBuilder() query.InsertInto(tableName) - query.Cols("id", "type", "source", "subject", "time", "data") + query.Cols("namespace", "time", "error", "event") - for _, event := range q.Events { - query.Values(event.Id, event.Type, event.Source, event.Subject, event.Time, event.Data) + for _, message := range q.Messages { + query.Values(message.Namespace, message.KafkaMessage.Timestamp, message.Error.Message, string(message.KafkaMessage.Value)) } sql, args := query.Build() diff --git a/internal/sink/storage_test.go b/internal/sink/storage_test.go index 64f154360..2eeaad075 100644 --- a/internal/sink/storage_test.go +++ b/internal/sink/storage_test.go @@ -14,24 +14,29 @@ func TestInsertEventsQuery(t *testing.T) { now := time.Now() query := sink.InsertEventsQuery{ - Database: "database", - EventsTableName: "events_table", - Events: []*serializer.CloudEventsKafkaPayload{ + Database: "database", + Messages: []sink.SinkMessage{ { - Id: "1", - Source: "source", - Subject: "subject-1", - Time: now.UnixMilli(), - Type: "api-calls", - Data: `{"duration_ms": 100, "method": "GET", "path": "/api/v1"}`, + Namespace: "my_namespace", + Serialized: &serializer.CloudEventsKafkaPayload{ + Id: "1", + Source: "source", + Subject: "subject-1", + Time: now.UnixMilli(), + Type: "api-calls", + Data: `{"duration_ms": 100, "method": "GET", "path": "/api/v1"}`, + }, }, { - Id: "2", - Source: "source", - Subject: "subject-2", - Time: now.UnixMilli(), - Type: "api-calls", - Data: `{"duration_ms": 80, "method": "GET", "path": "/api/v1"}`, + Namespace: "my_namespace", + Serialized: &serializer.CloudEventsKafkaPayload{ + Id: "2", + Source: "source", + Subject: "subject-2", + Time: now.UnixMilli(), + Type: "api-calls", + Data: `{"duration_ms": 80, "method": "GET", "path": "/api/v1"}`, + }, }, }, } @@ -39,9 +44,9 @@ func TestInsertEventsQuery(t *testing.T) { sql, args, err := query.ToSQL() assert.NoError(t, err) assert.Equal(t, args, []interface{}{ - "1", "api-calls", "source", "subject-1", now.UnixMilli(), `{"duration_ms": 100, "method": "GET", "path": "/api/v1"}`, - "2", "api-calls", "source", "subject-2", now.UnixMilli(), `{"duration_ms": 80, "method": "GET", "path": "/api/v1"}`, + "my_namespace", "1", "api-calls", "source", "subject-1", now.UnixMilli(), `{"duration_ms": 100, "method": "GET", "path": "/api/v1"}`, + "my_namespace", "2", "api-calls", "source", "subject-2", now.UnixMilli(), `{"duration_ms": 80, "method": "GET", "path": "/api/v1"}`, }) - assert.Equal(t, `INSERT INTO database.events_table (id, type, source, subject, time, data) VALUES (?, ?, ?, ?, ?, ?), (?, ?, ?, ?, ?, ?)`, sql) + assert.Equal(t, `INSERT INTO database.om_events (namespace, id, type, source, subject, time, data) VALUES (?, ?, ?, ?, ?, ?, ?), (?, ?, ?, ?, ?, ?, ?)`, sql) } diff --git a/internal/streaming/clickhouse_connector/connector.go b/internal/streaming/clickhouse_connector/connector.go index b78f4bacc..9e3a23905 100644 --- a/internal/streaming/clickhouse_connector/connector.go +++ b/internal/streaming/clickhouse_connector/connector.go @@ -17,8 +17,9 @@ import ( ) var ( - prefix = "om" - eventsTableName = "events" + tablePrefix = "om_" + EventsTableName = "events" + InvalidEventsTableName = "invalid_events" ) // ClickhouseConnector implements `ingest.Connector“ and `namespace.Handler interfaces. @@ -27,10 +28,12 @@ type ClickhouseConnector struct { } type ClickhouseConnectorConfig struct { - Logger *slog.Logger - ClickHouse clickhouse.Conn - Database string - Meters meter.Repository + Logger *slog.Logger + ClickHouse clickhouse.Conn + Database string + Meters meter.Repository + CreateOrReplaceMeter bool + PopulateMeter bool } func NewClickhouseConnector(config ClickhouseConnectorConfig) (*ClickhouseConnector, error) { @@ -141,13 +144,17 @@ func (c *ClickhouseConnector) CreateNamespace(ctx context.Context, namespace str return fmt.Errorf("create namespace in clickhouse: %w", err) } + err = c.createInvalidEventsTable(ctx) + if err != nil { + return fmt.Errorf("create namespace in clickhouse: %w", err) + } + return nil } func (c *ClickhouseConnector) createEventsTable(ctx context.Context, namespace string) error { table := createEventsTable{ - Database: c.config.Database, - EventsTableName: GetEventsTableName(namespace), + Database: c.config.Database, } err := c.config.ClickHouse.Exec(ctx, table.toSQL()) @@ -158,11 +165,24 @@ func (c *ClickhouseConnector) createEventsTable(ctx context.Context, namespace s return nil } +func (c *ClickhouseConnector) createInvalidEventsTable(ctx context.Context) error { + table := createInvalidEventsTable{ + Database: c.config.Database, + } + + err := c.config.ClickHouse.Exec(ctx, table.toSQL()) + if err != nil { + return fmt.Errorf("create invalid events table: %w", err) + } + + return nil +} + func (c *ClickhouseConnector) queryEventsTable(ctx context.Context, namespace string, params streaming.ListEventsParams) ([]event.Event, error) { table := queryEventsTable{ - Database: c.config.Database, - EventsTableName: GetEventsTableName(namespace), - Limit: params.Limit, + Database: c.config.Database, + Namespace: namespace, + Limit: params.Limit, } sql, _, err := table.toSQL() @@ -217,14 +237,24 @@ func (c *ClickhouseConnector) queryEventsTable(ctx context.Context, namespace st } func (c *ClickhouseConnector) createMeterView(ctx context.Context, namespace string, meter *models.Meter) error { + // CreateOrReplace is used to force the recreation of the materialized view + // This is not safe to use in production as it will drop the existing views + if c.config.CreateOrReplaceMeter { + err := c.deleteMeterView(ctx, namespace, meter.Slug) + if err != nil { + return fmt.Errorf("drop meter view: %w", err) + } + } + view := createMeterView{ - Database: c.config.Database, - EventsTableName: GetEventsTableName(namespace), - Aggregation: meter.Aggregation, - EventType: meter.EventType, - MeterViewName: getMeterViewNameBySlug(namespace, meter.Slug), - ValueProperty: meter.ValueProperty, - GroupBy: meter.GroupBy, + Populate: c.config.PopulateMeter, + Database: c.config.Database, + Namespace: namespace, + MeterSlug: meter.Slug, + Aggregation: meter.Aggregation, + EventType: meter.EventType, + ValueProperty: meter.ValueProperty, + GroupBy: meter.GroupBy, } sql, args, err := view.toSQL() if err != nil { @@ -240,8 +270,9 @@ func (c *ClickhouseConnector) createMeterView(ctx context.Context, namespace str func (c *ClickhouseConnector) deleteMeterView(ctx context.Context, namespace string, meterSlug string) error { query := deleteMeterView{ - Database: c.config.Database, - MeterViewName: getMeterViewNameBySlug(namespace, meterSlug), + Database: c.config.Database, + Namespace: namespace, + MeterSlug: meterSlug, } sql, args := query.toSQL() err := c.config.ClickHouse.Exec(ctx, sql, args...) @@ -259,7 +290,8 @@ func (c *ClickhouseConnector) deleteMeterView(ctx context.Context, namespace str func (c *ClickhouseConnector) queryMeterView(ctx context.Context, namespace string, meterSlug string, params *streaming.QueryParams) ([]*models.MeterValue, error) { queryMeter := queryMeterView{ Database: c.config.Database, - MeterViewName: getMeterViewNameBySlug(namespace, meterSlug), + Namespace: namespace, + MeterSlug: meterSlug, Aggregation: params.Aggregation, From: params.From, To: params.To, @@ -339,10 +371,11 @@ func (c *ClickhouseConnector) queryMeterView(ctx context.Context, namespace stri func (c *ClickhouseConnector) listMeterViewSubjects(ctx context.Context, namespace string, meterSlug string, from *time.Time, to *time.Time) ([]string, error) { query := listMeterViewSubjects{ - Database: c.config.Database, - MeterViewName: getMeterViewNameBySlug(namespace, meterSlug), - From: from, - To: to, + Database: c.config.Database, + Namespace: namespace, + MeterSlug: meterSlug, + From: from, + To: to, } sql, args, err := query.toSQL() @@ -371,11 +404,3 @@ func (c *ClickhouseConnector) listMeterViewSubjects(ctx context.Context, namespa return subjects, nil } - -func GetEventsTableName(namespace string) string { - return fmt.Sprintf("%s_%s_%s", prefix, namespace, eventsTableName) -} - -func getMeterViewNameBySlug(namespace string, meterSlug string) string { - return fmt.Sprintf("%s_%s_%s", prefix, namespace, meterSlug) -} diff --git a/internal/streaming/clickhouse_connector/query.go b/internal/streaming/clickhouse_connector/query.go index c93a8c065..89437fe8a 100644 --- a/internal/streaming/clickhouse_connector/query.go +++ b/internal/streaming/clickhouse_connector/query.go @@ -20,16 +20,16 @@ type column struct { // Create Events Table type createEventsTable struct { - Database string - EventsTableName string + Database string } func (d createEventsTable) toSQL() string { - tableName := fmt.Sprintf("%s.%s", sqlbuilder.Escape(d.Database), sqlbuilder.Escape(d.EventsTableName)) + tableName := GetEventsTableName(d.Database) sb := sqlbuilder.ClickHouse.NewCreateTableBuilder() sb.CreateTable(tableName) sb.IfNotExists() + sb.Define("namespace", "String") sb.Define("id", "String") sb.Define("type", "LowCardinality(String)") sb.Define("subject", "String") @@ -38,24 +38,48 @@ func (d createEventsTable) toSQL() string { sb.Define("data", "String") sb.SQL("ENGINE = MergeTree") sb.SQL("PARTITION BY toYYYYMM(time)") - sb.SQL("ORDER BY (time, type, subject)") + sb.SQL("ORDER BY (namespace, time, type, subject)") + + sql, _ := sb.Build() + return sql +} + +// Create Invalid Events Table +type createInvalidEventsTable struct { + Database string +} + +func (d createInvalidEventsTable) toSQL() string { + tableName := GetInvalidEventsTableName(d.Database) + + sb := sqlbuilder.ClickHouse.NewCreateTableBuilder() + sb.CreateTable(tableName) + sb.IfNotExists() + sb.Define("namespace", "String") + sb.Define("time", "DateTime") + sb.Define("error", "String") + sb.Define("event", "String") + sb.SQL("ENGINE = MergeTree") + sb.SQL("PARTITION BY toYYYYMM(time)") + sb.SQL("ORDER BY (namespace, time)") sql, _ := sb.Build() return sql } type queryEventsTable struct { - Database string - EventsTableName string - Limit int + Database string + Namespace string + Limit int } func (d queryEventsTable) toSQL() (string, []interface{}, error) { - tableName := fmt.Sprintf("%s.%s", sqlbuilder.Escape(d.Database), sqlbuilder.Escape(d.EventsTableName)) + tableName := GetEventsTableName(d.Database) query := sqlbuilder.ClickHouse.NewSelectBuilder() query.Select("id", "type", "subject", "source", "time", "data") query.From(tableName) + query.Where(query.Equal("namespace", d.Namespace)) query.Desc().OrderBy("time") query.Limit(d.Limit) @@ -64,69 +88,53 @@ func (d queryEventsTable) toSQL() (string, []interface{}, error) { } type createMeterView struct { - Database string - Aggregation models.MeterAggregation - EventsTableName string - MeterViewName string - EventType string - ValueProperty string - GroupBy map[string]string + Database string + Aggregation models.MeterAggregation + Namespace string + MeterSlug string + EventType string + ValueProperty string + GroupBy map[string]string + // Populate creates the materialized view with data from the events table + // This is not safe to use in production as requires to stop ingestion + Populate bool } func (d createMeterView) toSQL() (string, []interface{}, error) { - viewName := fmt.Sprintf("%s.%s", sqlbuilder.Escape(d.Database), sqlbuilder.Escape(d.MeterViewName)) - eventsTableName := fmt.Sprintf("%s.%s", sqlbuilder.Escape(d.Database), sqlbuilder.Escape(d.EventsTableName)) + viewName := GetMeterViewName(d.Database, d.Namespace, d.MeterSlug) columns := []column{ {Name: "subject", Type: "String"}, {Name: "windowstart", Type: "DateTime"}, {Name: "windowend", Type: "DateTime"}, } - asSelects := []string{ - "subject", - "tumbleStart(time, toIntervalMinute(1)) AS windowstart", - "tumbleEnd(time, toIntervalMinute(1)) AS windowend", - } // Value agg := "" - aggStateFn := "" switch d.Aggregation { case models.MeterAggregationSum: agg = "sum" - aggStateFn = "sumState" case models.MeterAggregationAvg: agg = "avg" - aggStateFn = "avgState" case models.MeterAggregationMin: agg = "min" - aggStateFn = "minState" case models.MeterAggregationMax: agg = "max" - aggStateFn = "maxState" case models.MeterAggregationCount: agg = "count" - aggStateFn = "countState" default: return "", nil, fmt.Errorf("invalid aggregation type: %s", d.Aggregation) } columns = append(columns, column{Name: "value", Type: fmt.Sprintf("AggregateFunction(%s, Float64)", agg)}) - if d.ValueProperty == "" && d.Aggregation == models.MeterAggregationCount { - asSelects = append(asSelects, fmt.Sprintf("%s(*) AS value", aggStateFn)) - } else { - asSelects = append(asSelects, fmt.Sprintf("%s(cast(JSON_VALUE(data, '%s'), 'Float64')) AS value", aggStateFn, sqlbuilder.Escape(d.ValueProperty))) - } // Group by orderBy := []string{"windowstart", "windowend", "subject"} sortedGroupBy := sortedKeys(d.GroupBy) for _, k := range sortedGroupBy { - v := d.GroupBy[k] columnName := sqlbuilder.Escape(k) orderBy = append(orderBy, sqlbuilder.Escape(columnName)) columns = append(columns, column{Name: columnName, Type: "String"}) - asSelects = append(asSelects, fmt.Sprintf("JSON_VALUE(data, '%s') as %s", sqlbuilder.Escape(v), sqlbuilder.Escape(k))) } sb := sqlbuilder.ClickHouse.NewCreateTableBuilder() @@ -137,15 +145,17 @@ func (d createMeterView) toSQL() (string, []interface{}, error) { } sb.SQL("ENGINE = AggregatingMergeTree()") sb.SQL(fmt.Sprintf("ORDER BY (%s)", strings.Join(orderBy, ", "))) + if d.Populate { + sb.SQL("POPULATE") + } sb.SQL("AS") - sbAs := sqlbuilder.ClickHouse.NewSelectBuilder() - sbAs.Select(asSelects...) - sbAs.From(eventsTableName) - // We use absolute path for type to avoid shadowing in the case the materialized view have a `type` column due to group by - sbAs.Where(fmt.Sprintf("%s.type = '%s'", eventsTableName, sqlbuilder.Escape(d.EventType))) - sbAs.GroupBy(orderBy...) - sb.SQL(sbAs.String()) + selectQuery, err := d.toSelectSQL() + if err != nil { + return "", nil, err + } + + sb.SQL(selectQuery) sql, args := sb.Build() // TODO: can we do it differently? @@ -154,19 +164,73 @@ func (d createMeterView) toSQL() (string, []interface{}, error) { return sql, args, nil } +func (d createMeterView) toSelectSQL() (string, error) { + eventsTableName := GetEventsTableName(d.Database) + + aggStateFn := "" + switch d.Aggregation { + case models.MeterAggregationSum: + aggStateFn = "sumState" + case models.MeterAggregationAvg: + aggStateFn = "avgState" + case models.MeterAggregationMin: + aggStateFn = "minState" + case models.MeterAggregationMax: + aggStateFn = "maxState" + case models.MeterAggregationCount: + aggStateFn = "countState" + default: + return "", fmt.Errorf("invalid aggregation type: %s", d.Aggregation) + } + + // Selects + selects := []string{ + "subject", + "tumbleStart(time, toIntervalMinute(1)) AS windowstart", + "tumbleEnd(time, toIntervalMinute(1)) AS windowend", + } + if d.ValueProperty == "" && d.Aggregation == models.MeterAggregationCount { + selects = append(selects, fmt.Sprintf("%s(*) AS value", aggStateFn)) + } else { + selects = append(selects, fmt.Sprintf("%s(cast(JSON_VALUE(data, '%s'), 'Float64')) AS value", aggStateFn, sqlbuilder.Escape(d.ValueProperty))) + } + + // Group by + orderBy := []string{"windowstart", "windowend", "subject"} + sortedGroupBy := sortedKeys(d.GroupBy) + for _, k := range sortedGroupBy { + v := d.GroupBy[k] + columnName := sqlbuilder.Escape(k) + orderBy = append(orderBy, sqlbuilder.Escape(columnName)) + selects = append(selects, fmt.Sprintf("JSON_VALUE(data, '%s') as %s", sqlbuilder.Escape(v), sqlbuilder.Escape(k))) + } + + query := sqlbuilder.ClickHouse.NewSelectBuilder() + query.Select(selects...) + query.From(eventsTableName) + // We use absolute path for type to avoid shadowing in the case the materialized view have a `type` column due to group by + query.Where(fmt.Sprintf("namespace = '%s'", sqlbuilder.Escape(d.Namespace))) + query.Where(fmt.Sprintf("%s.type = '%s'", eventsTableName, sqlbuilder.Escape(d.EventType))) + query.GroupBy(orderBy...) + + return query.String(), nil +} + type deleteMeterView struct { - Database string - MeterViewName string + Database string + Namespace string + MeterSlug string } func (d deleteMeterView) toSQL() (string, []interface{}) { - viewName := fmt.Sprintf("%s.%s", sqlbuilder.Escape(d.Database), sqlbuilder.Escape(d.MeterViewName)) + viewName := GetMeterViewName(d.Database, d.Namespace, d.MeterSlug) return fmt.Sprintf("DROP VIEW %s", viewName), nil } type queryMeterView struct { Database string - MeterViewName string + Namespace string + MeterSlug string Aggregation models.MeterAggregation Subject []string From *time.Time @@ -178,7 +242,7 @@ type queryMeterView struct { } func (d queryMeterView) toSQL() (string, []interface{}, error) { - viewName := fmt.Sprintf("%s.%s", sqlbuilder.Escape(d.Database), sqlbuilder.Escape(d.MeterViewName)) + viewName := GetMeterViewName(d.Database, d.Namespace, d.MeterSlug) var selectColumns, groupByColumns, where []string @@ -300,14 +364,15 @@ func sortedKeys(m map[string]string) []string { } type listMeterViewSubjects struct { - Database string - MeterViewName string - From *time.Time - To *time.Time + Database string + Namespace string + MeterSlug string + From *time.Time + To *time.Time } func (d listMeterViewSubjects) toSQL() (string, []interface{}, error) { - viewName := fmt.Sprintf("%s.%s", sqlbuilder.Escape(d.Database), sqlbuilder.Escape(d.MeterViewName)) + viewName := GetMeterViewName(d.Database, d.Namespace, d.MeterSlug) var where []string sb := sqlbuilder.ClickHouse.NewSelectBuilder() @@ -331,3 +396,16 @@ func (d listMeterViewSubjects) toSQL() (string, []interface{}, error) { sql, args := sb.Build() return sql, args, nil } + +func GetEventsTableName(database string) string { + return fmt.Sprintf("%s.%s%s", sqlbuilder.Escape(database), tablePrefix, EventsTableName) +} + +func GetInvalidEventsTableName(database string) string { + return fmt.Sprintf("%s.%s%s", sqlbuilder.Escape(database), tablePrefix, InvalidEventsTableName) +} + +func GetMeterViewName(database string, namespace string, meterSlug string) string { + meterViewName := fmt.Sprintf("%s%s_%s", tablePrefix, namespace, meterSlug) + return fmt.Sprintf("%s.%s", sqlbuilder.Escape(database), sqlbuilder.Escape(meterViewName)) +} diff --git a/internal/streaming/clickhouse_connector/query_test.go b/internal/streaming/clickhouse_connector/query_test.go index d79c1bc99..d07bcaf7e 100644 --- a/internal/streaming/clickhouse_connector/query_test.go +++ b/internal/streaming/clickhouse_connector/query_test.go @@ -16,10 +16,9 @@ func TestCreateEventsTable(t *testing.T) { }{ { data: createEventsTable{ - Database: "openmeter", - EventsTableName: "meter_events", + Database: "openmeter", }, - want: "CREATE TABLE IF NOT EXISTS openmeter.meter_events (id String, type LowCardinality(String), subject String, source String, time DateTime, data String) ENGINE = MergeTree PARTITION BY toYYYYMM(time) ORDER BY (time, type, subject)", + want: "CREATE TABLE IF NOT EXISTS openmeter.om_events (namespace String, id String, type LowCardinality(String), subject String, source String, time DateTime, data String) ENGINE = MergeTree PARTITION BY toYYYYMM(time) ORDER BY (namespace, time, type, subject)", }, } @@ -40,12 +39,12 @@ func TestQueryEventsTable(t *testing.T) { }{ { query: queryEventsTable{ - Database: "openmeter", - EventsTableName: "meter_events", - Limit: 100, + Database: "openmeter", + Namespace: "my_namespace", + Limit: 100, }, - wantSQL: "SELECT id, type, subject, source, time, data FROM openmeter.meter_events ORDER BY time DESC LIMIT 100", - wantArgs: nil, + wantSQL: "SELECT id, type, subject, source, time, data FROM openmeter.om_events WHERE namespace = ? ORDER BY time DESC LIMIT 100", + wantArgs: []interface{}{"my_namespace"}, }, } @@ -72,54 +71,54 @@ func TestCreateMeterView(t *testing.T) { }{ { query: createMeterView{ - Database: "openmeter", - EventsTableName: "meter_events", - Aggregation: models.MeterAggregationSum, - EventType: "myevent", - MeterViewName: "meter_meter1", - ValueProperty: "$.duration_ms", - GroupBy: map[string]string{"group1": "$.group1", "group2": "$.group2"}, + Database: "openmeter", + Namespace: "my_namespace", + MeterSlug: "meter1", + Aggregation: models.MeterAggregationSum, + EventType: "myevent", + ValueProperty: "$.duration_ms", + GroupBy: map[string]string{"group1": "$.group1", "group2": "$.group2"}, }, - wantSQL: "CREATE MATERIALIZED VIEW IF NOT EXISTS openmeter.meter_meter1 (subject String, windowstart DateTime, windowend DateTime, value AggregateFunction(sum, Float64), group1 String, group2 String) ENGINE = AggregatingMergeTree() ORDER BY (windowstart, windowend, subject, group1, group2) AS SELECT subject, tumbleStart(time, toIntervalMinute(1)) AS windowstart, tumbleEnd(time, toIntervalMinute(1)) AS windowend, sumState(cast(JSON_VALUE(data, '$.duration_ms'), 'Float64')) AS value, JSON_VALUE(data, '$.group1') as group1, JSON_VALUE(data, '$.group2') as group2 FROM openmeter.meter_events WHERE openmeter.meter_events.type = 'myevent' GROUP BY windowstart, windowend, subject, group1, group2", + wantSQL: "CREATE MATERIALIZED VIEW IF NOT EXISTS openmeter.om_my_namespace_meter1 (subject String, windowstart DateTime, windowend DateTime, value AggregateFunction(sum, Float64), group1 String, group2 String) ENGINE = AggregatingMergeTree() ORDER BY (windowstart, windowend, subject, group1, group2) AS SELECT subject, tumbleStart(time, toIntervalMinute(1)) AS windowstart, tumbleEnd(time, toIntervalMinute(1)) AS windowend, sumState(cast(JSON_VALUE(data, '$.duration_ms'), 'Float64')) AS value, JSON_VALUE(data, '$.group1') as group1, JSON_VALUE(data, '$.group2') as group2 FROM openmeter.om_events WHERE namespace = 'my_namespace' AND openmeter.om_events.type = 'myevent' GROUP BY windowstart, windowend, subject, group1, group2", wantArgs: nil, }, { query: createMeterView{ - Database: "openmeter", - EventsTableName: "meter_events", - Aggregation: models.MeterAggregationAvg, - EventType: "myevent", - MeterViewName: "meter_meter1", - ValueProperty: "$.token_count", - GroupBy: map[string]string{}, + Database: "openmeter", + Namespace: "my_namespace", + MeterSlug: "meter1", + Aggregation: models.MeterAggregationAvg, + EventType: "myevent", + ValueProperty: "$.token_count", + GroupBy: map[string]string{}, }, - wantSQL: "CREATE MATERIALIZED VIEW IF NOT EXISTS openmeter.meter_meter1 (subject String, windowstart DateTime, windowend DateTime, value AggregateFunction(avg, Float64)) ENGINE = AggregatingMergeTree() ORDER BY (windowstart, windowend, subject) AS SELECT subject, tumbleStart(time, toIntervalMinute(1)) AS windowstart, tumbleEnd(time, toIntervalMinute(1)) AS windowend, avgState(cast(JSON_VALUE(data, '$.token_count'), 'Float64')) AS value FROM openmeter.meter_events WHERE openmeter.meter_events.type = 'myevent' GROUP BY windowstart, windowend, subject", + wantSQL: "CREATE MATERIALIZED VIEW IF NOT EXISTS openmeter.om_my_namespace_meter1 (subject String, windowstart DateTime, windowend DateTime, value AggregateFunction(avg, Float64)) ENGINE = AggregatingMergeTree() ORDER BY (windowstart, windowend, subject) AS SELECT subject, tumbleStart(time, toIntervalMinute(1)) AS windowstart, tumbleEnd(time, toIntervalMinute(1)) AS windowend, avgState(cast(JSON_VALUE(data, '$.token_count'), 'Float64')) AS value FROM openmeter.om_events WHERE namespace = 'my_namespace' AND openmeter.om_events.type = 'myevent' GROUP BY windowstart, windowend, subject", wantArgs: nil, }, { query: createMeterView{ - Database: "openmeter", - EventsTableName: "meter_events", - Aggregation: models.MeterAggregationCount, - EventType: "myevent", - MeterViewName: "meter_meter1", - ValueProperty: "", - GroupBy: map[string]string{}, + Database: "openmeter", + Namespace: "my_namespace", + MeterSlug: "meter1", + Aggregation: models.MeterAggregationCount, + EventType: "myevent", + ValueProperty: "", + GroupBy: map[string]string{}, }, - wantSQL: "CREATE MATERIALIZED VIEW IF NOT EXISTS openmeter.meter_meter1 (subject String, windowstart DateTime, windowend DateTime, value AggregateFunction(count, Float64)) ENGINE = AggregatingMergeTree() ORDER BY (windowstart, windowend, subject) AS SELECT subject, tumbleStart(time, toIntervalMinute(1)) AS windowstart, tumbleEnd(time, toIntervalMinute(1)) AS windowend, countState(*) AS value FROM openmeter.meter_events WHERE openmeter.meter_events.type = 'myevent' GROUP BY windowstart, windowend, subject", + wantSQL: "CREATE MATERIALIZED VIEW IF NOT EXISTS openmeter.om_my_namespace_meter1 (subject String, windowstart DateTime, windowend DateTime, value AggregateFunction(count, Float64)) ENGINE = AggregatingMergeTree() ORDER BY (windowstart, windowend, subject) AS SELECT subject, tumbleStart(time, toIntervalMinute(1)) AS windowstart, tumbleEnd(time, toIntervalMinute(1)) AS windowend, countState(*) AS value FROM openmeter.om_events WHERE namespace = 'my_namespace' AND openmeter.om_events.type = 'myevent' GROUP BY windowstart, windowend, subject", wantArgs: nil, }, { query: createMeterView{ - Database: "openmeter", - EventsTableName: "meter_events", - Aggregation: models.MeterAggregationCount, - EventType: "myevent", - MeterViewName: "meter_meter1", - ValueProperty: "", - GroupBy: map[string]string{}, + Database: "openmeter", + Namespace: "my_namespace", + MeterSlug: "meter1", + Aggregation: models.MeterAggregationCount, + EventType: "myevent", + ValueProperty: "", + GroupBy: map[string]string{}, }, - wantSQL: "CREATE MATERIALIZED VIEW IF NOT EXISTS openmeter.meter_meter1 (subject String, windowstart DateTime, windowend DateTime, value AggregateFunction(count, Float64)) ENGINE = AggregatingMergeTree() ORDER BY (windowstart, windowend, subject) AS SELECT subject, tumbleStart(time, toIntervalMinute(1)) AS windowstart, tumbleEnd(time, toIntervalMinute(1)) AS windowend, countState(*) AS value FROM openmeter.meter_events WHERE openmeter.meter_events.type = 'myevent' GROUP BY windowstart, windowend, subject", + wantSQL: "CREATE MATERIALIZED VIEW IF NOT EXISTS openmeter.om_my_namespace_meter1 (subject String, windowstart DateTime, windowend DateTime, value AggregateFunction(count, Float64)) ENGINE = AggregatingMergeTree() ORDER BY (windowstart, windowend, subject) AS SELECT subject, tumbleStart(time, toIntervalMinute(1)) AS windowstart, tumbleEnd(time, toIntervalMinute(1)) AS windowend, countState(*) AS value FROM openmeter.om_events WHERE namespace = 'my_namespace' AND openmeter.om_events.type = 'myevent' GROUP BY windowstart, windowend, subject", wantArgs: nil, }, } @@ -147,10 +146,11 @@ func TestDeleteMeterView(t *testing.T) { }{ { data: deleteMeterView{ - Database: "openmeter", - MeterViewName: "meter_meter1", + Database: "openmeter", + Namespace: "my_namespace", + MeterSlug: "meter1", }, - wantSQL: "DROP VIEW openmeter.meter_meter1", + wantSQL: "DROP VIEW openmeter.om_my_namespace_meter1", wantArgs: nil, }, } @@ -180,111 +180,121 @@ func TestQueryMeterView(t *testing.T) { }{ { query: queryMeterView{ - Database: "openmeter", - MeterViewName: "meter_meter1", - Aggregation: models.MeterAggregationSum, - Subject: []string{subject}, - From: &from, - To: &to, - GroupBy: []string{"group1", "group2"}, - WindowSize: &windowSize, + Database: "openmeter", + Namespace: "my_namespace", + MeterSlug: "meter1", + Aggregation: models.MeterAggregationSum, + Subject: []string{subject}, + From: &from, + To: &to, + GroupBy: []string{"group1", "group2"}, + WindowSize: &windowSize, }, - wantSQL: "SELECT tumbleStart(windowstart, toIntervalHour(1), 'UTC') AS windowstart, tumbleEnd(windowstart, toIntervalHour(1), 'UTC') AS windowend, subject, sumMerge(value) AS value, group1, group2 FROM openmeter.meter_meter1 WHERE (subject = ?) AND windowstart >= ? AND windowend <= ? GROUP BY windowstart, windowend, subject, group1, group2 ORDER BY windowstart", + wantSQL: "SELECT tumbleStart(windowstart, toIntervalHour(1), 'UTC') AS windowstart, tumbleEnd(windowstart, toIntervalHour(1), 'UTC') AS windowend, subject, sumMerge(value) AS value, group1, group2 FROM openmeter.om_my_namespace_meter1 WHERE (subject = ?) AND windowstart >= ? AND windowend <= ? GROUP BY windowstart, windowend, subject, group1, group2 ORDER BY windowstart", wantArgs: []interface{}{"subject1", from.Unix(), to.Unix()}, }, { // Aggregate all available data query: queryMeterView{ - Database: "openmeter", - MeterViewName: "meter_meter1", - Aggregation: models.MeterAggregationSum, + Database: "openmeter", + Namespace: "my_namespace", + MeterSlug: "meter1", + Aggregation: models.MeterAggregationSum, }, - wantSQL: "SELECT min(windowstart), max(windowend), sumMerge(value) AS value FROM openmeter.meter_meter1", + wantSQL: "SELECT min(windowstart), max(windowend), sumMerge(value) AS value FROM openmeter.om_my_namespace_meter1", wantArgs: nil, }, { // Aggregate with count aggregation query: queryMeterView{ - Database: "openmeter", - MeterViewName: "meter_meter1", - Aggregation: models.MeterAggregationCount, + Database: "openmeter", + Namespace: "my_namespace", + MeterSlug: "meter1", + Aggregation: models.MeterAggregationCount, }, - wantSQL: "SELECT min(windowstart), max(windowend), toFloat64(countMerge(value)) AS value FROM openmeter.meter_meter1", + wantSQL: "SELECT min(windowstart), max(windowend), toFloat64(countMerge(value)) AS value FROM openmeter.om_my_namespace_meter1", wantArgs: nil, }, { // Aggregate data from start query: queryMeterView{ - Database: "openmeter", - MeterViewName: "meter_meter1", - Aggregation: models.MeterAggregationSum, - From: &from, + Database: "openmeter", + Namespace: "my_namespace", + MeterSlug: "meter1", + Aggregation: models.MeterAggregationSum, + From: &from, }, - wantSQL: "SELECT min(windowstart), max(windowend), sumMerge(value) AS value FROM openmeter.meter_meter1 WHERE windowstart >= ?", + wantSQL: "SELECT min(windowstart), max(windowend), sumMerge(value) AS value FROM openmeter.om_my_namespace_meter1 WHERE windowstart >= ?", wantArgs: []interface{}{from.Unix()}, }, { // Aggregate data between period query: queryMeterView{ - Database: "openmeter", - MeterViewName: "meter_meter1", - Aggregation: models.MeterAggregationSum, - From: &from, - To: &to, + Database: "openmeter", + Namespace: "my_namespace", + MeterSlug: "meter1", + Aggregation: models.MeterAggregationSum, + From: &from, + To: &to, }, - wantSQL: "SELECT min(windowstart), max(windowend), sumMerge(value) AS value FROM openmeter.meter_meter1 WHERE windowstart >= ? AND windowend <= ?", + wantSQL: "SELECT min(windowstart), max(windowend), sumMerge(value) AS value FROM openmeter.om_my_namespace_meter1 WHERE windowstart >= ? AND windowend <= ?", wantArgs: []interface{}{from.Unix(), to.Unix()}, }, { // Aggregate data between period, groupped by window size query: queryMeterView{ - Database: "openmeter", - MeterViewName: "meter_meter1", - Aggregation: models.MeterAggregationSum, - From: &from, - To: &to, - WindowSize: &windowSize, + Database: "openmeter", + Namespace: "my_namespace", + MeterSlug: "meter1", + Aggregation: models.MeterAggregationSum, + From: &from, + To: &to, + WindowSize: &windowSize, }, - wantSQL: "SELECT tumbleStart(windowstart, toIntervalHour(1), 'UTC') AS windowstart, tumbleEnd(windowstart, toIntervalHour(1), 'UTC') AS windowend, sumMerge(value) AS value FROM openmeter.meter_meter1 WHERE windowstart >= ? AND windowend <= ? GROUP BY windowstart, windowend ORDER BY windowstart", + wantSQL: "SELECT tumbleStart(windowstart, toIntervalHour(1), 'UTC') AS windowstart, tumbleEnd(windowstart, toIntervalHour(1), 'UTC') AS windowend, sumMerge(value) AS value FROM openmeter.om_my_namespace_meter1 WHERE windowstart >= ? AND windowend <= ? GROUP BY windowstart, windowend ORDER BY windowstart", wantArgs: []interface{}{from.Unix(), to.Unix()}, }, { // Aggregate data between period in a different timezone, groupped by window size query: queryMeterView{ Database: "openmeter", - MeterViewName: "meter_meter1", + Namespace: "my_namespace", + MeterSlug: "meter1", Aggregation: models.MeterAggregationSum, From: &from, To: &to, WindowSize: &windowSize, WindowTimeZone: tz, }, - wantSQL: "SELECT tumbleStart(windowstart, toIntervalHour(1), 'Asia/Shanghai') AS windowstart, tumbleEnd(windowstart, toIntervalHour(1), 'Asia/Shanghai') AS windowend, sumMerge(value) AS value FROM openmeter.meter_meter1 WHERE windowstart >= ? AND windowend <= ? GROUP BY windowstart, windowend ORDER BY windowstart", + wantSQL: "SELECT tumbleStart(windowstart, toIntervalHour(1), 'Asia/Shanghai') AS windowstart, tumbleEnd(windowstart, toIntervalHour(1), 'Asia/Shanghai') AS windowend, sumMerge(value) AS value FROM openmeter.om_my_namespace_meter1 WHERE windowstart >= ? AND windowend <= ? GROUP BY windowstart, windowend ORDER BY windowstart", wantArgs: []interface{}{from.Unix(), to.Unix()}, }, { // Aggregate data for a single subject query: queryMeterView{ - Database: "openmeter", - MeterViewName: "meter_meter1", - Aggregation: models.MeterAggregationSum, - Subject: []string{subject}, + Database: "openmeter", + Namespace: "my_namespace", + MeterSlug: "meter1", + Aggregation: models.MeterAggregationSum, + Subject: []string{subject}, }, - wantSQL: "SELECT min(windowstart), max(windowend), subject, sumMerge(value) AS value FROM openmeter.meter_meter1 WHERE (subject = ?) GROUP BY subject", + wantSQL: "SELECT min(windowstart), max(windowend), subject, sumMerge(value) AS value FROM openmeter.om_my_namespace_meter1 WHERE (subject = ?) GROUP BY subject", wantArgs: []interface{}{"subject1"}, }, { // Aggregate data for a single subject and group by additional fields query: queryMeterView{ - Database: "openmeter", - MeterViewName: "meter_meter1", - Aggregation: models.MeterAggregationSum, - Subject: []string{subject}, - GroupBy: []string{"group1", "group2"}, + Database: "openmeter", + Namespace: "my_namespace", + MeterSlug: "meter1", + Aggregation: models.MeterAggregationSum, + Subject: []string{subject}, + GroupBy: []string{"group1", "group2"}, }, - wantSQL: "SELECT min(windowstart), max(windowend), subject, sumMerge(value) AS value, group1, group2 FROM openmeter.meter_meter1 WHERE (subject = ?) GROUP BY subject, group1, group2", + wantSQL: "SELECT min(windowstart), max(windowend), subject, sumMerge(value) AS value, group1, group2 FROM openmeter.om_my_namespace_meter1 WHERE (subject = ?) GROUP BY subject, group1, group2", wantArgs: []interface{}{"subject1"}, }, { // Aggregate data for a multiple subjects query: queryMeterView{ - Database: "openmeter", - MeterViewName: "meter_meter1", - Aggregation: models.MeterAggregationSum, - Subject: []string{subject, "subject2"}, + Database: "openmeter", + Namespace: "my_namespace", + MeterSlug: "meter1", + Aggregation: models.MeterAggregationSum, + Subject: []string{subject, "subject2"}, }, - wantSQL: "SELECT min(windowstart), max(windowend), subject, sumMerge(value) AS value FROM openmeter.meter_meter1 WHERE (subject = ? OR subject = ?) GROUP BY subject", + wantSQL: "SELECT min(windowstart), max(windowend), subject, sumMerge(value) AS value FROM openmeter.om_my_namespace_meter1 WHERE (subject = ? OR subject = ?) GROUP BY subject", wantArgs: []interface{}{"subject1", "subject2"}, }, } @@ -315,29 +325,32 @@ func TestListMeterViewSubjects(t *testing.T) { }{ { query: listMeterViewSubjects{ - Database: "openmeter", - MeterViewName: "meter_meter1", + Database: "openmeter", + Namespace: "my_namespace", + MeterSlug: "meter1", }, - wantSQL: "SELECT DISTINCT subject FROM openmeter.meter_meter1 ORDER BY subject", + wantSQL: "SELECT DISTINCT subject FROM openmeter.om_my_namespace_meter1 ORDER BY subject", wantArgs: nil, }, { query: listMeterViewSubjects{ - Database: "openmeter", - MeterViewName: "meter_meter1", - From: &from, + Database: "openmeter", + Namespace: "my_namespace", + MeterSlug: "meter1", + From: &from, }, - wantSQL: "SELECT DISTINCT subject FROM openmeter.meter_meter1 WHERE windowstart >= ? ORDER BY subject", + wantSQL: "SELECT DISTINCT subject FROM openmeter.om_my_namespace_meter1 WHERE windowstart >= ? ORDER BY subject", wantArgs: []interface{}{from.Unix()}, }, { query: listMeterViewSubjects{ - Database: "openmeter", - MeterViewName: "meter_meter1", - From: &from, - To: &to, + Database: "openmeter", + Namespace: "my_namespace", + MeterSlug: "meter1", + From: &from, + To: &to, }, - wantSQL: "SELECT DISTINCT subject FROM openmeter.meter_meter1 WHERE windowstart >= ? AND windowend <= ? ORDER BY subject", + wantSQL: "SELECT DISTINCT subject FROM openmeter.om_my_namespace_meter1 WHERE windowstart >= ? AND windowend <= ? ORDER BY subject", wantArgs: []interface{}{from.Unix(), to.Unix()}, }, }