diff --git a/pkg/query-service/app/clickhouseReader/filter_suggestions.go b/pkg/query-service/app/clickhouseReader/filter_suggestions.go index 4de924ddc3..eeda450050 100644 --- a/pkg/query-service/app/clickhouseReader/filter_suggestions.go +++ b/pkg/query-service/app/clickhouseReader/filter_suggestions.go @@ -138,17 +138,17 @@ func (r *ClickHouseReader) getValuesForLogAttributes( ``` select * from ( ( - select tagKey, stringTagValue, int64TagValue, float64TagValue - from signoz_logs.distributed_tag_attributes - where tagKey = $1 and ( - stringTagValue != '' or int64TagValue is not null or float64TagValue is not null + select tag_key, string_value, number_value + from signoz_logs.distributed_tag_attributes_v2 + where tag_key = $1 and ( + string_value != '' or number_value is not null ) limit 2 ) UNION DISTINCT ( - select tagKey, stringTagValue, int64TagValue, float64TagValue - from signoz_logs.distributed_tag_attributes - where tagKey = $2 and ( - stringTagValue != '' or int64TagValue is not null or float64TagValue is not null + select tag_key, string_value, number_value + from signoz_logs.distributed_tag_attributes_v2 + where tag_key = $2 and ( + string_value != '' or number_value is not null ) limit 2 ) @@ -156,9 +156,6 @@ func (r *ClickHouseReader) getValuesForLogAttributes( ``` Since tag_attributes table uses ReplacingMergeTree, the values would be distinct and no order by is being used to ensure the `limit` clause minimizes the amount of data scanned. - - This query scanned ~30k rows per attribute on fiscalnote-v2 for attributes like `message` and `time` - that had >~110M values each */ if len(attributes) > 10 { @@ -173,13 +170,13 @@ func (r *ClickHouseReader) getValuesForLogAttributes( tagKeyQueryArgs := []any{} for idx, attrib := range attributes { tagQueries = append(tagQueries, fmt.Sprintf(`( - select tagKey, stringTagValue, int64TagValue, float64TagValue + select tag_key, string_value, number_value from %s.%s - where tagKey = $%d and ( - stringTagValue != '' or int64TagValue is not null or float64TagValue is not null + where tag_key = $%d and ( + string_value != '' or number_value is not null ) limit %d - )`, r.logsDB, r.logsTagAttributeTable, idx+1, limit)) + )`, r.logsDB, r.logsTagAttributeTableV2, idx+1, limit)) tagKeyQueryArgs = append(tagKeyQueryArgs, attrib.Key) } @@ -211,10 +208,9 @@ func (r *ClickHouseReader) getValuesForLogAttributes( var tagKey string var stringValue string var float64Value sql.NullFloat64 - var int64Value sql.NullInt64 err := rows.Scan( - &tagKey, &stringValue, &int64Value, &float64Value, + &tagKey, &stringValue, &float64Value, ) if err != nil { return nil, model.InternalError(fmt.Errorf( @@ -228,12 +224,6 @@ func (r *ClickHouseReader) getValuesForLogAttributes( result[attrResultIdx] = append(result[attrResultIdx], stringValue) } - } else if int64Value.Valid { - attrResultIdx := resultIdxForAttrib(tagKey, v3.AttributeKeyDataTypeInt64) - if attrResultIdx >= 0 { - result[attrResultIdx] = append(result[attrResultIdx], int64Value.Int64) - } - } else if float64Value.Valid { attrResultIdx := resultIdxForAttrib(tagKey, v3.AttributeKeyDataTypeFloat64) if attrResultIdx >= 0 { diff --git a/pkg/query-service/app/clickhouseReader/options.go b/pkg/query-service/app/clickhouseReader/options.go index 25eea0c7ff..b9de1db054 100644 --- a/pkg/query-service/app/clickhouseReader/options.go +++ b/pkg/query-service/app/clickhouseReader/options.go @@ -29,14 +29,14 @@ const ( defaultSpansTable string = "distributed_signoz_spans" defaultDependencyGraphTable string = "distributed_dependency_graph_minutes_v2" defaultTopLevelOperationsTable string = "distributed_top_level_operations" - defaultSpanAttributeTable string = "distributed_span_attributes" + defaultSpanAttributeTableV2 string = "distributed_tag_attributes_v2" defaultSpanAttributeKeysTable string = "distributed_span_attributes_keys" defaultLogsDB string = "signoz_logs" defaultLogsTable string = "distributed_logs" defaultLogsLocalTable string = "logs" defaultLogAttributeKeysTable string = "distributed_logs_attribute_keys" defaultLogResourceKeysTable string = "distributed_logs_resource_keys" - defaultLogTagAttributeTable string = "distributed_tag_attributes" + defaultLogTagAttributeTableV2 string = "distributed_tag_attributes_v2" defaultLiveTailRefreshSeconds int = 5 defaultWriteBatchDelay time.Duration = 5 * time.Second defaultWriteBatchSize int = 10000 @@ -69,7 +69,7 @@ type namespaceConfig struct { UsageExplorerTable string SpansTable string ErrorTable string - SpanAttributeTable string + SpanAttributeTableV2 string SpanAttributeKeysTable string DependencyGraphTable string TopLevelOperationsTable string @@ -78,7 +78,7 @@ type namespaceConfig struct { LogsLocalTable string LogsAttributeKeysTable string LogsResourceKeysTable string - LogsTagAttributeTable string + LogsTagAttributeTableV2 string LiveTailRefreshSeconds int WriteBatchDelay time.Duration WriteBatchSize int @@ -167,7 +167,7 @@ func NewOptions( DurationTable: defaultDurationTable, UsageExplorerTable: defaultUsageExplorerTable, SpansTable: defaultSpansTable, - SpanAttributeTable: defaultSpanAttributeTable, + SpanAttributeTableV2: defaultSpanAttributeTableV2, SpanAttributeKeysTable: defaultSpanAttributeKeysTable, DependencyGraphTable: defaultDependencyGraphTable, TopLevelOperationsTable: defaultTopLevelOperationsTable, @@ -176,7 +176,7 @@ func NewOptions( LogsLocalTable: defaultLogsLocalTable, LogsAttributeKeysTable: defaultLogAttributeKeysTable, LogsResourceKeysTable: defaultLogResourceKeysTable, - LogsTagAttributeTable: defaultLogTagAttributeTable, + LogsTagAttributeTableV2: defaultLogTagAttributeTableV2, LiveTailRefreshSeconds: defaultLiveTailRefreshSeconds, WriteBatchDelay: defaultWriteBatchDelay, WriteBatchSize: defaultWriteBatchSize, diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 4f1aa99dfc..9c7828af45 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -118,7 +118,7 @@ type ClickHouseReader struct { errorTable string usageExplorerTable string SpansTable string - spanAttributeTable string + spanAttributeTableV2 string spanAttributesKeysTable string dependencyGraphTable string topLevelOperationsTable string @@ -127,7 +127,7 @@ type ClickHouseReader struct { logsLocalTable string logsAttributeKeys string logsResourceKeys string - logsTagAttributeTable string + logsTagAttributeTableV2 string queryEngine *promql.Engine remoteStorage *remote.Storage fanoutStorage *storage.Storage @@ -246,7 +246,7 @@ func NewReaderFromClickhouseConnection( usageExplorerTable: options.primary.UsageExplorerTable, durationTable: options.primary.DurationTable, SpansTable: options.primary.SpansTable, - spanAttributeTable: options.primary.SpanAttributeTable, + spanAttributeTableV2: options.primary.SpanAttributeTableV2, spanAttributesKeysTable: options.primary.SpanAttributeKeysTable, dependencyGraphTable: options.primary.DependencyGraphTable, topLevelOperationsTable: options.primary.TopLevelOperationsTable, @@ -255,7 +255,7 @@ func NewReaderFromClickhouseConnection( logsLocalTable: options.primary.LogsLocalTable, logsAttributeKeys: options.primary.LogsAttributeKeysTable, logsResourceKeys: options.primary.LogsResourceKeysTable, - logsTagAttributeTable: options.primary.LogsTagAttributeTable, + logsTagAttributeTableV2: options.primary.LogsTagAttributeTableV2, liveTailRefreshSeconds: options.primary.LiveTailRefreshSeconds, promConfigFile: configFile, featureFlags: featureFlag, @@ -1035,29 +1035,6 @@ func addExistsOperator(item model.TagQuery, tagMapType string, not bool) (string return fmt.Sprintf(" AND %s (%s)", notStr, strings.Join(tagOperatorPair, " OR ")), args } -func excludeTags(_ context.Context, tags []string) []string { - excludedTagsMap := map[string]bool{ - "http.code": true, - "http.route": true, - "http.method": true, - "http.url": true, - "http.status_code": true, - "http.host": true, - "messaging.system": true, - "messaging.operation": true, - "error": true, - "service.name": true, - } - newTags := make([]string, 0) - for _, tag := range tags { - _, ok := excludedTagsMap[tag] - if !ok { - newTags = append(newTags, tag) - } - } - return newTags -} - func (r *ClickHouseReader) GetTopOperationsV2(ctx context.Context, queryParams *model.GetTopOperationsParams) (*[]model.TopOperationsItem, *model.ApiError) { namedArgs := []interface{}{ @@ -3503,7 +3480,7 @@ func (r *ClickHouseReader) GetLogAggregateAttributes(ctx context.Context, req *v case v3.AggregateOperatorCountDistinct, v3.AggregateOperatorCount: - where = "tagKey ILIKE $1" + where = "tag_key ILIKE $1" stringAllowed = true case v3.AggregateOperatorRateSum, @@ -3524,7 +3501,7 @@ func (r *ClickHouseReader) GetLogAggregateAttributes(ctx context.Context, req *v v3.AggregateOperatorSum, v3.AggregateOperatorMin, v3.AggregateOperatorMax: - where = "tagKey ILIKE $1 AND (tagDataType='int64' or tagDataType='float64')" + where = "tag_key ILIKE $1 AND (tag_data_type='int64' or tag_data_type='float64')" stringAllowed = false case v3.AggregateOperatorNoOp: @@ -3533,7 +3510,7 @@ func (r *ClickHouseReader) GetLogAggregateAttributes(ctx context.Context, req *v return nil, fmt.Errorf("unsupported aggregate operator") } - query = fmt.Sprintf("SELECT DISTINCT(tagKey), tagType, tagDataType from %s.%s WHERE %s limit $2", r.logsDB, r.logsTagAttributeTable, where) + query = fmt.Sprintf("SELECT DISTINCT(tag_key), tag_type, tag_data_type from %s.%s WHERE %s limit $2", r.logsDB, r.logsTagAttributeTableV2, where) rows, err = r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText), req.Limit) if err != nil { zap.L().Error("Error while executing query", zap.Error(err)) @@ -3582,10 +3559,10 @@ func (r *ClickHouseReader) GetLogAttributeKeys(ctx context.Context, req *v3.Filt var response v3.FilterAttributeKeyResponse if len(req.SearchText) != 0 { - query = fmt.Sprintf("select distinct tagKey, tagType, tagDataType from %s.%s where tagKey ILIKE $1 limit $2", r.logsDB, r.logsTagAttributeTable) + query = fmt.Sprintf("select distinct tag_key, tag_type, tag_data_type from %s.%s where tag_key ILIKE $1 limit $2", r.logsDB, r.logsTagAttributeTableV2) rows, err = r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText), req.Limit) } else { - query = fmt.Sprintf("select distinct tagKey, tagType, tagDataType from %s.%s limit $1", r.logsDB, r.logsTagAttributeTable) + query = fmt.Sprintf("select distinct tag_key, tag_type, tag_data_type from %s.%s limit $1", r.logsDB, r.logsTagAttributeTableV2) rows, err = r.db.Query(ctx, query, req.Limit) } @@ -3662,11 +3639,11 @@ func (r *ClickHouseReader) GetLogAttributeValues(ctx context.Context, req *v3.Fi query := "select distinct" switch req.FilterAttributeKeyDataType { case v3.AttributeKeyDataTypeInt64: - filterValueColumn = "int64TagValue" + filterValueColumn = "number_value" case v3.AttributeKeyDataTypeFloat64: - filterValueColumn = "float64TagValue" + filterValueColumn = "number_value" case v3.AttributeKeyDataTypeString: - filterValueColumn = "stringTagValue" + filterValueColumn = "string_value" } searchText := fmt.Sprintf("%%%s%%", req.SearchText) @@ -3694,10 +3671,10 @@ func (r *ClickHouseReader) GetLogAttributeValues(ctx context.Context, req *v3.Fi if req.FilterAttributeKeyDataType != v3.AttributeKeyDataTypeString { filterValueColumnWhere = fmt.Sprintf("toString(%s)", filterValueColumn) } - query = fmt.Sprintf("select distinct %s from %s.%s where tagKey=$1 and %s ILIKE $2 and tagType=$3 limit $4", filterValueColumn, r.logsDB, r.logsTagAttributeTable, filterValueColumnWhere) + query = fmt.Sprintf("SELECT DISTINCT %s FROM %s.%s WHERE tag_key=$1 AND %s ILIKE $2 AND tag_type=$3 LIMIT $4", filterValueColumn, r.logsDB, r.logsTagAttributeTableV2, filterValueColumnWhere) rows, err = r.db.Query(ctx, query, req.FilterAttributeKey, searchText, req.TagType, req.Limit) } else { - query = fmt.Sprintf("select distinct %s from %s.%s where tagKey=$1 and tagType=$2 limit $3", filterValueColumn, r.logsDB, r.logsTagAttributeTable) + query = fmt.Sprintf("SELECT DISTINCT %s FROM %s.%s WHERE tag_key=$1 AND tag_type=$2 LIMIT $3", filterValueColumn, r.logsDB, r.logsTagAttributeTableV2) rows, err = r.db.Query(ctx, query, req.FilterAttributeKey, req.TagType, req.Limit) } @@ -4162,7 +4139,7 @@ func (r *ClickHouseReader) GetTraceAggregateAttributes(ctx context.Context, req case v3.AggregateOperatorCountDistinct, v3.AggregateOperatorCount: - where = "tagKey ILIKE $1" + where = "tag_key ILIKE $1" stringAllowed = true case v3.AggregateOperatorRateSum, @@ -4183,7 +4160,7 @@ func (r *ClickHouseReader) GetTraceAggregateAttributes(ctx context.Context, req v3.AggregateOperatorSum, v3.AggregateOperatorMin, v3.AggregateOperatorMax: - where = "tagKey ILIKE $1 AND dataType='float64'" + where = "tag_key ILIKE $1 AND tag_data_type='float64'" stringAllowed = false case v3.AggregateOperatorNoOp: @@ -4191,7 +4168,7 @@ func (r *ClickHouseReader) GetTraceAggregateAttributes(ctx context.Context, req default: return nil, fmt.Errorf("unsupported aggregate operator") } - query = fmt.Sprintf("SELECT DISTINCT(tagKey), tagType, dataType FROM %s.%s WHERE %s", r.TraceDB, r.spanAttributeTable, where) + query = fmt.Sprintf("SELECT DISTINCT(tag_key), tag_type, tag_data_type FROM %s.%s WHERE %s", r.TraceDB, r.spanAttributeTableV2, where) if req.Limit != 0 { query = query + fmt.Sprintf(" LIMIT %d;", req.Limit) } @@ -4253,7 +4230,7 @@ func (r *ClickHouseReader) GetTraceAttributeKeys(ctx context.Context, req *v3.Fi var rows driver.Rows var response v3.FilterAttributeKeyResponse - query = fmt.Sprintf("SELECT DISTINCT(tagKey), tagType, dataType FROM %s.%s WHERE tagKey ILIKE $1 LIMIT $2", r.TraceDB, r.spanAttributeTable) + query = fmt.Sprintf("SELECT DISTINCT(tag_key), tag_type, tag_data_type FROM %s.%s WHERE tag_key ILIKE $1 LIMIT $2", r.TraceDB, r.spanAttributeTableV2) rows, err = r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText), req.Limit) @@ -4335,12 +4312,12 @@ func (r *ClickHouseReader) GetTraceAttributeValues(ctx context.Context, req *v3. }, nil } - query = "select distinct" + query = "SELECT DISTINCT" switch req.FilterAttributeKeyDataType { case v3.AttributeKeyDataTypeFloat64: - filterValueColumn = "float64TagValue" + filterValueColumn = "number_value" case v3.AttributeKeyDataTypeString: - filterValueColumn = "stringTagValue" + filterValueColumn = "string_value" } searchText := fmt.Sprintf("%%%s%%", req.SearchText) @@ -4361,14 +4338,14 @@ func (r *ClickHouseReader) GetTraceAttributeValues(ctx context.Context, req *v3. if r.useTraceNewSchema { where += " AND ts_bucket_start >= toUInt64(toUnixTimestamp(now() - INTERVAL 48 HOUR))" } - query = fmt.Sprintf("select distinct %s from %s.%s where %s and %s ILIKE $1 limit $2", selectKey, r.TraceDB, r.traceTableName, where, filterValueColumnWhere) + query = fmt.Sprintf("SELECT DISTINCT %s FROM %s.%s WHERE %s AND %s ILIKE $1 LIMIT $2", selectKey, r.TraceDB, r.traceTableName, where, filterValueColumnWhere) rows, err = r.db.Query(ctx, query, searchText, req.Limit) } else { filterValueColumnWhere := filterValueColumn if req.FilterAttributeKeyDataType != v3.AttributeKeyDataTypeString { filterValueColumnWhere = fmt.Sprintf("toString(%s)", filterValueColumn) } - query = fmt.Sprintf("select distinct %s from %s.%s where tagKey=$1 and %s ILIKE $2 and tagType=$3 limit $4", filterValueColumn, r.TraceDB, r.spanAttributeTable, filterValueColumnWhere) + query = fmt.Sprintf("SELECT DISTINCT %s FROM %s.%s WHERE tag_key=$1 AND %s ILIKE $2 AND tag_type=$3 LIMIT $4", filterValueColumn, r.TraceDB, r.spanAttributeTableV2, filterValueColumnWhere) rows, err = r.db.Query(ctx, query, req.FilterAttributeKey, searchText, req.TagType, req.Limit) } diff --git a/pkg/query-service/tests/integration/filter_suggestions_test.go b/pkg/query-service/tests/integration/filter_suggestions_test.go index 6c8224be50..793ca7d442 100644 --- a/pkg/query-service/tests/integration/filter_suggestions_test.go +++ b/pkg/query-service/tests/integration/filter_suggestions_test.go @@ -199,9 +199,9 @@ func (tb *FilterSuggestionsTestBed) mockAttribKeysQueryResponse( attribsToReturn []v3.AttributeKey, ) { cols := []mockhouse.ColumnType{} - cols = append(cols, mockhouse.ColumnType{Type: "String", Name: "tagKey"}) - cols = append(cols, mockhouse.ColumnType{Type: "String", Name: "tagType"}) - cols = append(cols, mockhouse.ColumnType{Type: "String", Name: "tagDataType"}) + cols = append(cols, mockhouse.ColumnType{Type: "String", Name: "tag_key"}) + cols = append(cols, mockhouse.ColumnType{Type: "String", Name: "tag_type"}) + cols = append(cols, mockhouse.ColumnType{Type: "String", Name: "tag_data_type"}) values := [][]any{} for _, a := range attribsToReturn { @@ -213,7 +213,7 @@ func (tb *FilterSuggestionsTestBed) mockAttribKeysQueryResponse( } tb.mockClickhouse.ExpectQuery( - "select.*from.*signoz_logs.distributed_tag_attributes.*", + "select.*from.*signoz_logs.distributed_tag_attributes_v2.*", ).WithArgs( constants.DefaultFilterSuggestionsAttributesLimit, ).WillReturnRows( @@ -236,10 +236,9 @@ func (tb *FilterSuggestionsTestBed) mockAttribValuesQueryResponse( stringValuesToReturn [][]string, ) { resultCols := []mockhouse.ColumnType{ - {Type: "String", Name: "tagKey"}, - {Type: "String", Name: "stringTagValue"}, - {Type: "Nullable(Int64)", Name: "int64TagValue"}, - {Type: "Nullable(Float64)", Name: "float64TagValue"}, + {Type: "String", Name: "tag_key"}, + {Type: "String", Name: "string_value"}, + {Type: "Nullable(Int64)", Name: "number_value"}, } expectedAttribKeysInQuery := []any{} @@ -248,13 +247,13 @@ func (tb *FilterSuggestionsTestBed) mockAttribValuesQueryResponse( expectedAttribKeysInQuery = append(expectedAttribKeysInQuery, attrib.Key) for _, stringTagValue := range stringValuesToReturn[idx] { mockResultRows = append(mockResultRows, []any{ - attrib.Key, stringTagValue, nil, nil, + attrib.Key, stringTagValue, nil, }) } } tb.mockClickhouse.ExpectQuery( - "select.*tagKey.*stringTagValue.*int64TagValue.*float64TagValue.*distributed_tag_attributes.*tagKey", + "select.*tag_key.*string_value.*number_value.*distributed_tag_attributes_v2.*tag_key", ).WithArgs(expectedAttribKeysInQuery...).WillReturnRows(mockhouse.NewRows(resultCols, mockResultRows)) }