Skip to content

Commit

Permalink
chore: merge branch 'main' into cache-pkg
Browse files Browse the repository at this point in the history
  • Loading branch information
vikrantgupta25 committed Jan 3, 2025
2 parents 118c2f2 + c5938b6 commit 622aebd
Show file tree
Hide file tree
Showing 14 changed files with 612 additions and 7 deletions.
9 changes: 6 additions & 3 deletions ee/query-service/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type ServerOptions struct {
GatewayUrl string
UseLogsNewSchema bool
UseTraceNewSchema bool
SkipWebFrontend bool
}

// Server runs HTTP api service
Expand Down Expand Up @@ -385,9 +386,11 @@ func (s *Server) createPublicServer(apiHandler *api.APIHandler, web *web.Web) (*

handler = handlers.CompressHandler(handler)

err := web.AddToRouter(r)
if err != nil {
return nil, err
if !s.serverOptions.SkipWebFrontend {
err := web.AddToRouter(r)
if err != nil {
return nil, err
}
}

return &http.Server{
Expand Down
4 changes: 3 additions & 1 deletion ee/query-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func main() {
var dialTimeout time.Duration
var gatewayUrl string
var useLicensesV3 bool
var skipWebFrontend bool

flag.BoolVar(&useLogsNewSchema, "use-logs-new-schema", false, "use logs_v2 schema for logs")
flag.BoolVar(&useTraceNewSchema, "use-trace-new-schema", false, "use new schema for traces")
Expand All @@ -125,7 +126,7 @@ func main() {
flag.StringVar(&cluster, "cluster", "cluster", "(cluster name - defaults to 'cluster')")
flag.StringVar(&gatewayUrl, "gateway-url", "", "(url to the gateway)")
flag.BoolVar(&useLicensesV3, "use-licenses-v3", false, "use licenses_v3 schema for licenses")

flag.BoolVar(&skipWebFrontend, "skip-web-frontend", false, "skip web frontend")
flag.Parse()

loggerMgr := initZapLog(enableQueryServiceLogOTLPExport)
Expand Down Expand Up @@ -170,6 +171,7 @@ func main() {
GatewayUrl: gatewayUrl,
UseLogsNewSchema: useLogsNewSchema,
UseTraceNewSchema: useTraceNewSchema,
SkipWebFrontend: skipWebFrontend,
}

// Read the jwt secret key
Expand Down
2 changes: 1 addition & 1 deletion pkg/query-service/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4196,7 +4196,7 @@ func (aH *APIHandler) autocompleteAggregateAttributes(w http.ResponseWriter, r *

switch req.DataSource {
case v3.DataSourceMetrics:
response, err = aH.reader.GetMetricAggregateAttributes(r.Context(), req, true)
response, err = aH.reader.GetMetricAggregateAttributes(r.Context(), req, false)
case v3.DataSourceLogs:
response, err = aH.reader.GetLogAggregateAttributes(r.Context(), req)
case v3.DataSourceTraces:
Expand Down
5 changes: 5 additions & 0 deletions pkg/query-service/app/metrics/v3/query_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,19 +217,22 @@ func buildMetricQuery(start, end, step int64, mq *v3.BuilderQuery) (string, erro
// groupingSets returns a string of comma separated tags for group by clause
// `ts` is always added to the group by clause
func groupingSets(tags ...string) string {
tags = utils.AddBackTickToFormatTags(tags...)
withTs := append(tags, "ts")
return strings.Join(withTs, ", ")
}

// groupBy returns a string of comma separated tags for group by clause
// `ts` is always added to the group by clause
func groupBy(tags ...string) string {
tags = utils.AddBackTickToFormatTags(tags...)
tags = append(tags, "ts")
return strings.Join(tags, ",")
}

// groupSelect returns a string of comma separated tags for select clause
func groupSelect(tags ...string) string {
tags = utils.AddBackTickToFormatTags(tags...)
groupTags := strings.Join(tags, ",")
if len(tags) != 0 {
groupTags += ", "
Expand Down Expand Up @@ -270,11 +273,13 @@ func orderBy(items []v3.OrderBy, tags []string) string {
for _, item := range items {
if item.ColumnName == tag {
found = true
item.ColumnName = utils.AddBackTickToFormatTag(item.ColumnName)
orderBy = append(orderBy, fmt.Sprintf("%s %s", item.ColumnName, item.Order))
break
}
}
if !found {
tag = utils.AddBackTickToFormatTag(tag)
orderBy = append(orderBy, fmt.Sprintf("%s ASC", tag))
}
}
Expand Down
160 changes: 160 additions & 0 deletions pkg/query-service/app/metrics/v3/query_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,3 +380,163 @@ func TestBuildQueryAdjustedTimes(t *testing.T) {
})
}
}

func TestBuildQueryWithDotInMetricAndAttributes(t *testing.T) {
cases := []struct {
name string
params *v3.QueryRangeParamsV3
expected string
}{
{
name: "TestBuildQueryWithDotInMetricAndAttributes with dot in metric and attributes",
params: &v3.QueryRangeParamsV3{
Start: 1735036101000,
End: 1735637901000,
Step: 60,
Variables: map[string]interface{}{
"SIGNOZ_START_TIME": 1735034992000,
"SIGNOZ_END_TIME": 1735036792000,
},
FormatForWeb: false,
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeValue,
FillGaps: false,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
DataSource: v3.DataSourceMetrics,
AggregateOperator: v3.AggregateOperatorAvg,
AggregateAttribute: v3.AttributeKey{
Key: "system.memory.usage",
DataType: v3.AttributeKeyDataTypeFloat64,
Type: v3.AttributeKeyType("Gauge"),
IsColumn: true,
},
TimeAggregation: v3.TimeAggregationAvg,
SpaceAggregation: v3.SpaceAggregationSum,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "os.type",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: false,
},
Operator: v3.FilterOperatorEqual,
Value: "linux",
},
},
},
Expression: "A",
Disabled: false,
StepInterval: 60,
OrderBy: []v3.OrderBy{
{
ColumnName: "os.type",
Order: v3.DirectionAsc,
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
},
GroupBy: []v3.AttributeKey{
{
Key: "os.type",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: false,
},
},
Legend: "",
ReduceTo: v3.ReduceToOperatorAvg,
Having: []v3.Having{},
},
},
},
},
expected: "SELECT *, now() AS ts FROM (SELECT avgIf(value, toUnixTimestamp(ts) != 0) as value, anyIf(ts, toUnixTimestamp(ts) != 0) AS timestamp FROM (SELECT `os.type`, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'os.type') as `os.type`, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'system.memory.usage' AND temporality = '' AND unix_milli >= 1734998400000 AND unix_milli < 1735637880000 AND JSONExtractString(labels, 'os.type') = 'linux') as filtered_time_series USING fingerprint WHERE metric_name = 'system.memory.usage' AND unix_milli >= 1735036080000 AND unix_milli < 1735637880000 GROUP BY `os.type`, ts ORDER BY `os.type` asc, ts) )",
},
{
name: "TestBuildQueryWithDotInMetricAndAttributes with dot in metric and attributes with rate_avg aggregation",
params: &v3.QueryRangeParamsV3{
Start: 1735036101000,
End: 1735637901000,
Step: 60,
Variables: map[string]interface{}{
"SIGNOZ_START_TIME": 1735034992000,
"SIGNOZ_END_TIME": 1735036792000,
},
FormatForWeb: false,
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeValue,
FillGaps: false,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
DataSource: v3.DataSourceMetrics,
AggregateOperator: v3.AggregateOperatorRateAvg,
AggregateAttribute: v3.AttributeKey{
Key: "system.memory.usage",
DataType: v3.AttributeKeyDataTypeFloat64,
Type: v3.AttributeKeyType("Gauge"),
IsColumn: true,
},
TimeAggregation: v3.TimeAggregationAvg,
SpaceAggregation: v3.SpaceAggregationSum,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "os.type",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: false,
},
Operator: v3.FilterOperatorEqual,
Value: "linux",
},
},
},
Expression: "A",
Disabled: false,
StepInterval: 60,
OrderBy: []v3.OrderBy{
{
ColumnName: "os.type",
Order: v3.DirectionAsc,
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
},
GroupBy: []v3.AttributeKey{
{
Key: "os.type",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: false,
},
},
Legend: "",
ReduceTo: v3.ReduceToOperatorAvg,
Having: []v3.Having{},
},
},
},
},
expected: "SELECT *, now() AS ts FROM (SELECT avgIf(value, toUnixTimestamp(ts) != 0) as value, anyIf(ts, toUnixTimestamp(ts) != 0) AS timestamp FROM (SELECT `os.type`, ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as value FROM(SELECT `os.type`, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'os.type') as `os.type`, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'system.memory.usage' AND temporality = '' AND unix_milli >= 1734998400000 AND unix_milli < 1735637880000 AND JSONExtractString(labels, 'os.type') = 'linux') as filtered_time_series USING fingerprint WHERE metric_name = 'system.memory.usage' AND unix_milli >= 1735036020000 AND unix_milli < 1735637880000 GROUP BY `os.type`, ts ORDER BY `os.type` asc, ts) WINDOW rate_window as (PARTITION BY `os.type` ORDER BY `os.type`, ts) ) )",
},
}
for _, testCase := range cases {
t.Run(testCase.name, func(t *testing.T) {
q := testCase.params
query, err := PrepareMetricQuery(q.Start, q.End, q.CompositeQuery.QueryType, q.CompositeQuery.PanelType, q.CompositeQuery.BuilderQueries["A"], Options{PreferRPM: false})
require.NoError(t, err)

require.Contains(t, query, testCase.expected)
})
}
}
55 changes: 55 additions & 0 deletions pkg/query-service/app/metrics/v4/cumulative/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,61 @@ func TestPrepareTableQuery(t *testing.T) {
end: 1701796780000,
expectedQueryContains: "SELECT service_name, ts, sum(per_series_value) as value FROM (SELECT service_name, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND unix_milli >= 1701792000000 AND unix_milli < 1701796780000 AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND unix_milli >= 1701794980000 AND unix_milli < 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC",
},
{
name: "test time aggregation = avg, space aggregation = avg, temporality = unspecified, testing metrics and attribute name with dot",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
DataSource: v3.DataSourceMetrics,
AggregateOperator: v3.AggregateOperatorAvg,
AggregateAttribute: v3.AttributeKey{
Key: "system.memory.usage",
DataType: v3.AttributeKeyDataTypeFloat64,
Type: v3.AttributeKeyType("Gauge"),
IsColumn: true,
},
Temporality: v3.Unspecified,
TimeAggregation: v3.TimeAggregationAvg,
SpaceAggregation: v3.SpaceAggregationAvg,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "host.name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: false,
},
Operator: v3.FilterOperatorEqual,
Value: "signoz-host",
},
},
},
Expression: "A",
Disabled: false,
StepInterval: 60,
OrderBy: []v3.OrderBy{
{
ColumnName: "state",
Order: v3.DirectionDesc,
},
},
GroupBy: []v3.AttributeKey{
{
Key: "state",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: false,
},
},
Legend: "",
ReduceTo: v3.ReduceToOperatorAvg,
Having: []v3.Having{},
},
start: 1735295140000,
end: 1735554340000,
expectedQueryContains: "SELECT state, ts, avg(per_series_value) as value FROM (SELECT fingerprint, any(state) as state, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(sum) / sum(count) as per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'state') as state, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'system.memory.usage' AND temporality = 'Unspecified' AND unix_milli >= 1735257600000 AND unix_milli < 1735554340000 AND JSONExtractString(labels, 'host.name') = 'signoz-host') as filtered_time_series USING fingerprint WHERE metric_name = 'system.memory.usage' AND unix_milli >= 1735295140000 AND unix_milli < 1735554340000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY state, ts ORDER BY state desc, ts ASC",
},
}

for _, testCase := range testCases {
Expand Down
55 changes: 55 additions & 0 deletions pkg/query-service/app/metrics/v4/cumulative/timeseries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,61 @@ func TestPrepareTimeseriesQuery(t *testing.T) {
end: 1701796780000,
expectedQueryContains: "SELECT service_name, ts, sum(per_series_value) as value FROM (SELECT service_name, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND unix_milli >= 1701792000000 AND unix_milli < 1701796780000 AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND unix_milli >= 1701794980000 AND unix_milli < 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC",
},
{
name: "test time aggregation = avg, space aggregation = avg, temporality = unspecified, testing metrics and attribute name with dot",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
DataSource: v3.DataSourceMetrics,
AggregateOperator: v3.AggregateOperatorAvg,
AggregateAttribute: v3.AttributeKey{
Key: "system.memory.usage",
DataType: v3.AttributeKeyDataTypeFloat64,
Type: v3.AttributeKeyType("Gauge"),
IsColumn: true,
},
Temporality: v3.Unspecified,
TimeAggregation: v3.TimeAggregationAvg,
SpaceAggregation: v3.SpaceAggregationAvg,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "host.name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: false,
},
Operator: v3.FilterOperatorEqual,
Value: "signoz-host",
},
},
},
Expression: "A",
Disabled: false,
StepInterval: 60,
OrderBy: []v3.OrderBy{
{
ColumnName: "state",
Order: v3.DirectionDesc,
},
},
GroupBy: []v3.AttributeKey{
{
Key: "state",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: false,
},
},
Legend: "",
ReduceTo: v3.ReduceToOperatorAvg,
Having: []v3.Having{},
},
start: 1735295140000,
end: 1735554340000,
expectedQueryContains: "SELECT state, ts, avg(per_series_value) as value FROM (SELECT fingerprint, any(state) as state, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(sum) / sum(count) as per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'state') as state, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'system.memory.usage' AND temporality = 'Unspecified' AND unix_milli >= 1735257600000 AND unix_milli < 1735554340000 AND JSONExtractString(labels, 'host.name') = 'signoz-host') as filtered_time_series USING fingerprint WHERE metric_name = 'system.memory.usage' AND unix_milli >= 1735295140000 AND unix_milli < 1735554340000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY state, ts ORDER BY state desc, ts ASC",
},
}

for _, testCase := range testCases {
Expand Down
Loading

0 comments on commit 622aebd

Please sign in to comment.