Skip to content

Commit

Permalink
fix: add bucketing
Browse files Browse the repository at this point in the history
  • Loading branch information
shivanshuraj1333 committed Dec 18, 2024
1 parent 83aa48c commit b21a5e8
Showing 1 changed file with 55 additions and 12 deletions.
67 changes: 55 additions & 12 deletions pkg/query-service/app/integrations/messagingQueues/kafka/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (

func generateConsumerSQL(start, end int64, topic, partition, consumerGroup, queueType string) string {
timeRange := (end - start) / 1000000000
tsBucketStart := (start / 1000000000) - 1800
tsBucketEnd := end / 1000000000
query := fmt.Sprintf(`
WITH consumer_query AS (
SELECT
Expand All @@ -18,6 +20,8 @@ WITH consumer_query AS (
WHERE
timestamp >= '%d'
AND timestamp <= '%d'
AND ts_bucket_start >= '%d'
AND ts_bucket_start <= '%d'
AND kind = 5
AND attribute_string_messaging$$system = '%s'
AND attributes_string['messaging.destination.name'] = '%s'
Expand All @@ -36,13 +40,15 @@ FROM
consumer_query
ORDER BY
resource_string_service$$name;
`, start, end, queueType, topic, partition, consumerGroup, timeRange)
`, start, end, tsBucketStart, tsBucketEnd, queueType, topic, partition, consumerGroup, timeRange)
return query
}

// S1 landing
func generatePartitionLatencySQL(start, end int64, queueType string) string {
timeRange := (end - start) / 1000000000
tsBucketStart := (start / 1000000000) - 1800
tsBucketEnd := end / 1000000000
query := fmt.Sprintf(`
WITH partition_query AS (
SELECT
Expand All @@ -54,6 +60,8 @@ WITH partition_query AS (
WHERE
timestamp >= '%d'
AND timestamp <= '%d'
AND ts_bucket_start >= '%d'
AND ts_bucket_start <= '%d'
AND kind = 4
AND attribute_string_messaging$$system = '%s'
GROUP BY topic, partition
Expand All @@ -68,13 +76,15 @@ FROM
partition_query
ORDER BY
topic;
`, start, end, queueType, timeRange)
`, start, end, tsBucketStart, tsBucketEnd, queueType, timeRange)
return query
}

// S1 consumer
func generateConsumerPartitionLatencySQL(start, end int64, topic, partition, queueType string) string {
timeRange := (end - start) / 1000000000
tsBucketStart := (start / 1000000000) - 1800
tsBucketEnd := end / 1000000000
query := fmt.Sprintf(`
WITH consumer_pl AS (
SELECT
Expand All @@ -87,6 +97,8 @@ WITH consumer_pl AS (
WHERE
timestamp >= '%d'
AND timestamp <= '%d'
AND ts_bucket_start >= '%d'
AND ts_bucket_start <= '%d'
AND kind = 5
AND attribute_string_messaging$$system = '%s'
AND attributes_string['messaging.destination.name'] = '%s'
Expand All @@ -104,14 +116,15 @@ FROM
consumer_pl
ORDER BY
consumer_group;
`, start, end, queueType, topic, partition, timeRange)
`, start, end, tsBucketStart, tsBucketEnd, queueType, topic, partition, timeRange)
return query
}

// S3, producer overview
func generateProducerPartitionThroughputSQL(start, end int64, queueType string) string {
timeRange := (end - start) / 1000000000
// t, svc, rps, byte*, p99, err
tsBucketStart := (start / 1000000000) - 1800
tsBucketEnd := end / 1000000000 // t, svc, rps, byte*, p99, err
query := fmt.Sprintf(`
WITH producer_latency AS (
SELECT
Expand All @@ -124,6 +137,8 @@ WITH producer_latency AS (
WHERE
timestamp >= '%d'
AND timestamp <= '%d'
AND ts_bucket_start >= '%d'
AND ts_bucket_start <= '%d'
AND kind = 4
AND attribute_string_messaging$$system = '%s'
GROUP BY topic, resource_string_service$$name
Expand All @@ -137,13 +152,15 @@ SELECT
COALESCE(total_requests / %d, 0) AS throughput
FROM
producer_latency
`, start, end, queueType, timeRange)
`, start, end, tsBucketStart, tsBucketEnd, queueType, timeRange)
return query
}

// S3, producer topic/service overview
func generateProducerTopicLatencySQL(start, end int64, topic, service, queueType string) string {
timeRange := (end - start) / 1000000000
tsBucketStart := (start / 1000000000) - 1800
tsBucketEnd := end / 1000000000
query := fmt.Sprintf(`
WITH consumer_latency AS (
SELECT
Expand All @@ -155,6 +172,8 @@ WITH consumer_latency AS (
WHERE
timestamp >= '%d'
AND timestamp <= '%d'
AND ts_bucket_start >= '%d'
AND ts_bucket_start <= '%d'
AND kind = 4
AND resource_string_service$$name = '%s'
AND attribute_string_messaging$$system = '%s'
Expand All @@ -169,13 +188,15 @@ SELECT
COALESCE(total_requests / %d, 0) AS throughput
FROM
consumer_latency
`, start, end, service, queueType, topic, timeRange)
`, start, end, tsBucketStart, tsBucketEnd, service, queueType, topic, timeRange)
return query
}

// S3 consumer overview
func generateConsumerLatencySQL(start, end int64, queueType string) string {
timeRange := (end - start) / 1000000000
tsBucketStart := (start / 1000000000) - 1800
tsBucketEnd := end / 1000000000
query := fmt.Sprintf(`
WITH consumer_latency AS (
SELECT
Expand All @@ -189,6 +210,8 @@ WITH consumer_latency AS (
WHERE
timestamp >= '%d'
AND timestamp <= '%d'
AND ts_bucket_start >= '%d'
AND ts_bucket_start <= '%d'
AND kind = 5
AND attribute_string_messaging$$system = '%s'
GROUP BY topic, resource_string_service$$name
Expand All @@ -205,13 +228,15 @@ FROM
consumer_latency
ORDER BY
topic;
`, start, end, queueType, timeRange, timeRange)
`, start, end, tsBucketStart, tsBucketEnd, queueType, timeRange, timeRange)
return query
}

// S3 consumer topic/service
func generateConsumerServiceLatencySQL(start, end int64, topic, service, queueType string) string {
timeRange := (end - start) / 1000000000
tsBucketStart := (start / 1000000000) - 1800
tsBucketEnd := end / 1000000000
query := fmt.Sprintf(`
WITH consumer_latency AS (
SELECT
Expand All @@ -223,6 +248,8 @@ WITH consumer_latency AS (
WHERE
timestamp >= '%d'
AND timestamp <= '%d'
AND ts_bucket_start >= '%d'
AND ts_bucket_start <= '%d'
AND kind = 5
AND resource_string_service$$name = '%s'
AND attribute_string_messaging$$system = '%s'
Expand All @@ -237,7 +264,7 @@ SELECT
COALESCE(total_requests / %d, 0) AS throughput
FROM
consumer_latency
`, start, end, service, queueType, topic, timeRange)
`, start, end, tsBucketStart, tsBucketEnd, service, queueType, topic, timeRange)
return query
}

Expand Down Expand Up @@ -293,6 +320,8 @@ GROUP BY

func generateProducerSQL(start, end int64, topic, partition, queueType string) string {
timeRange := (end - start) / 1000000000
tsBucketStart := (start / 1000000000) - 1800
tsBucketEnd := end / 1000000000
query := fmt.Sprintf(`
WITH producer_query AS (
SELECT
Expand All @@ -304,6 +333,8 @@ WITH producer_query AS (
WHERE
timestamp >= '%d'
AND timestamp <= '%d'
AND ts_bucket_start >= '%d'
AND ts_bucket_start <= '%d'
AND kind = 4
AND attribute_string_messaging$$system = '%s'
AND attributes_string['messaging.destination.name'] = '%s'
Expand All @@ -320,12 +351,14 @@ FROM
producer_query
ORDER BY
resource_string_service$$name;
`, start, end, queueType, topic, partition, timeRange)
`, start, end, tsBucketStart, tsBucketEnd, queueType, topic, partition, timeRange)
return query
}

func generateNetworkLatencyThroughputSQL(start, end int64, consumerGroup, partitionID, queueType string) string {
timeRange := (end - start) / 1000000000
tsBucketStart := (start / 1000000000) - 1800
tsBucketEnd := end / 1000000000
query := fmt.Sprintf(`
SELECT
attributes_string['messaging.client_id'] AS client_id,
Expand All @@ -336,17 +369,21 @@ FROM signoz_traces.distributed_signoz_index_v3
WHERE
timestamp >= '%d'
AND timestamp <= '%d'
AND ts_bucket_start >= '%d'
AND ts_bucket_start <= '%d'
AND kind = 5
AND attribute_string_messaging$$system = '%s'
AND attributes_string['messaging.kafka.consumer.group'] = '%s'
AND attributes_string['messaging.destination.partition.id'] = '%s'
GROUP BY service_name, client_id, service_instance_id
ORDER BY throughput DESC
`, timeRange, start, end, queueType, consumerGroup, partitionID)
`, timeRange, start, end, tsBucketStart, tsBucketEnd, queueType, consumerGroup, partitionID)
return query
}

func onboardProducersSQL(start, end int64, queueType string) string {
tsBucketStart := (start / 1000000000) - 1800
tsBucketEnd := end / 1000000000
query := fmt.Sprintf(`
SELECT
COUNT(*) = 0 AS entries,
Expand All @@ -358,11 +395,15 @@ FROM
signoz_traces.distributed_signoz_index_v3
WHERE
timestamp >= '%d'
AND timestamp <= '%d';`, queueType, start, end)
AND timestamp <= '%d'
AND ts_bucket_start >= '%d'
AND ts_bucket_start <= '%d';`, queueType, start, end, tsBucketStart, tsBucketEnd)
return query
}

func onboardConsumerSQL(start, end int64, queueType string) string {
tsBucketStart := (start / 1000000000) - 1800
tsBucketEnd := end / 1000000000
query := fmt.Sprintf(`
SELECT
COUNT(*) = 0 AS entries,
Expand All @@ -378,6 +419,8 @@ SELECT
FROM signoz_traces.distributed_signoz_index_v3
WHERE
timestamp >= '%d'
AND timestamp <= '%d';`, queueType, start, end)
AND timestamp <= '%d'
AND ts_bucket_start >= '%d'
AND ts_bucket_start <= '%d' ;`, queueType, start, end, tsBucketStart, tsBucketEnd)
return query
}

0 comments on commit b21a5e8

Please sign in to comment.