Skip to content

Commit

Permalink
chore: use tag attributes v2 table (#6616)
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv authored Dec 16, 2024
1 parent bef5b96 commit e676602
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 85 deletions.
36 changes: 13 additions & 23 deletions pkg/query-service/app/clickhouseReader/filter_suggestions.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,27 +138,24 @@ func (r *ClickHouseReader) getValuesForLogAttributes(
```
select * from (
(
select tagKey, stringTagValue, int64TagValue, float64TagValue
from signoz_logs.distributed_tag_attributes
where tagKey = $1 and (
stringTagValue != '' or int64TagValue is not null or float64TagValue is not null
select tag_key, string_value, number_value
from signoz_logs.distributed_tag_attributes_v2
where tag_key = $1 and (
string_value != '' or number_value is not null
)
limit 2
) UNION DISTINCT (
select tagKey, stringTagValue, int64TagValue, float64TagValue
from signoz_logs.distributed_tag_attributes
where tagKey = $2 and (
stringTagValue != '' or int64TagValue is not null or float64TagValue is not null
select tag_key, string_value, number_value
from signoz_logs.distributed_tag_attributes_v2
where tag_key = $2 and (
string_value != '' or number_value is not null
)
limit 2
)
) settings max_threads=2
```
Since tag_attributes table uses ReplacingMergeTree, the values would be distinct and no order by
is being used to ensure the `limit` clause minimizes the amount of data scanned.
This query scanned ~30k rows per attribute on fiscalnote-v2 for attributes like `message` and `time`
that had >~110M values each
*/

if len(attributes) > 10 {
Expand All @@ -173,13 +170,13 @@ func (r *ClickHouseReader) getValuesForLogAttributes(
tagKeyQueryArgs := []any{}
for idx, attrib := range attributes {
tagQueries = append(tagQueries, fmt.Sprintf(`(
select tagKey, stringTagValue, int64TagValue, float64TagValue
select tag_key, string_value, number_value
from %s.%s
where tagKey = $%d and (
stringTagValue != '' or int64TagValue is not null or float64TagValue is not null
where tag_key = $%d and (
string_value != '' or number_value is not null
)
limit %d
)`, r.logsDB, r.logsTagAttributeTable, idx+1, limit))
)`, r.logsDB, r.logsTagAttributeTableV2, idx+1, limit))

tagKeyQueryArgs = append(tagKeyQueryArgs, attrib.Key)
}
Expand Down Expand Up @@ -211,10 +208,9 @@ func (r *ClickHouseReader) getValuesForLogAttributes(
var tagKey string
var stringValue string
var float64Value sql.NullFloat64
var int64Value sql.NullInt64

err := rows.Scan(
&tagKey, &stringValue, &int64Value, &float64Value,
&tagKey, &stringValue, &float64Value,
)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
Expand All @@ -228,12 +224,6 @@ func (r *ClickHouseReader) getValuesForLogAttributes(
result[attrResultIdx] = append(result[attrResultIdx], stringValue)
}

} else if int64Value.Valid {
attrResultIdx := resultIdxForAttrib(tagKey, v3.AttributeKeyDataTypeInt64)
if attrResultIdx >= 0 {
result[attrResultIdx] = append(result[attrResultIdx], int64Value.Int64)
}

} else if float64Value.Valid {
attrResultIdx := resultIdxForAttrib(tagKey, v3.AttributeKeyDataTypeFloat64)
if attrResultIdx >= 0 {
Expand Down
12 changes: 6 additions & 6 deletions pkg/query-service/app/clickhouseReader/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ const (
defaultSpansTable string = "distributed_signoz_spans"
defaultDependencyGraphTable string = "distributed_dependency_graph_minutes_v2"
defaultTopLevelOperationsTable string = "distributed_top_level_operations"
defaultSpanAttributeTable string = "distributed_span_attributes"
defaultSpanAttributeTableV2 string = "distributed_tag_attributes_v2"
defaultSpanAttributeKeysTable string = "distributed_span_attributes_keys"
defaultLogsDB string = "signoz_logs"
defaultLogsTable string = "distributed_logs"
defaultLogsLocalTable string = "logs"
defaultLogAttributeKeysTable string = "distributed_logs_attribute_keys"
defaultLogResourceKeysTable string = "distributed_logs_resource_keys"
defaultLogTagAttributeTable string = "distributed_tag_attributes"
defaultLogTagAttributeTableV2 string = "distributed_tag_attributes_v2"
defaultLiveTailRefreshSeconds int = 5
defaultWriteBatchDelay time.Duration = 5 * time.Second
defaultWriteBatchSize int = 10000
Expand Down Expand Up @@ -69,7 +69,7 @@ type namespaceConfig struct {
UsageExplorerTable string
SpansTable string
ErrorTable string
SpanAttributeTable string
SpanAttributeTableV2 string
SpanAttributeKeysTable string
DependencyGraphTable string
TopLevelOperationsTable string
Expand All @@ -78,7 +78,7 @@ type namespaceConfig struct {
LogsLocalTable string
LogsAttributeKeysTable string
LogsResourceKeysTable string
LogsTagAttributeTable string
LogsTagAttributeTableV2 string
LiveTailRefreshSeconds int
WriteBatchDelay time.Duration
WriteBatchSize int
Expand Down Expand Up @@ -167,7 +167,7 @@ func NewOptions(
DurationTable: defaultDurationTable,
UsageExplorerTable: defaultUsageExplorerTable,
SpansTable: defaultSpansTable,
SpanAttributeTable: defaultSpanAttributeTable,
SpanAttributeTableV2: defaultSpanAttributeTableV2,
SpanAttributeKeysTable: defaultSpanAttributeKeysTable,
DependencyGraphTable: defaultDependencyGraphTable,
TopLevelOperationsTable: defaultTopLevelOperationsTable,
Expand All @@ -176,7 +176,7 @@ func NewOptions(
LogsLocalTable: defaultLogsLocalTable,
LogsAttributeKeysTable: defaultLogAttributeKeysTable,
LogsResourceKeysTable: defaultLogResourceKeysTable,
LogsTagAttributeTable: defaultLogTagAttributeTable,
LogsTagAttributeTableV2: defaultLogTagAttributeTableV2,
LiveTailRefreshSeconds: defaultLiveTailRefreshSeconds,
WriteBatchDelay: defaultWriteBatchDelay,
WriteBatchSize: defaultWriteBatchSize,
Expand Down
69 changes: 23 additions & 46 deletions pkg/query-service/app/clickhouseReader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ type ClickHouseReader struct {
errorTable string
usageExplorerTable string
SpansTable string
spanAttributeTable string
spanAttributeTableV2 string
spanAttributesKeysTable string
dependencyGraphTable string
topLevelOperationsTable string
Expand All @@ -127,7 +127,7 @@ type ClickHouseReader struct {
logsLocalTable string
logsAttributeKeys string
logsResourceKeys string
logsTagAttributeTable string
logsTagAttributeTableV2 string
queryEngine *promql.Engine
remoteStorage *remote.Storage
fanoutStorage *storage.Storage
Expand Down Expand Up @@ -246,7 +246,7 @@ func NewReaderFromClickhouseConnection(
usageExplorerTable: options.primary.UsageExplorerTable,
durationTable: options.primary.DurationTable,
SpansTable: options.primary.SpansTable,
spanAttributeTable: options.primary.SpanAttributeTable,
spanAttributeTableV2: options.primary.SpanAttributeTableV2,
spanAttributesKeysTable: options.primary.SpanAttributeKeysTable,
dependencyGraphTable: options.primary.DependencyGraphTable,
topLevelOperationsTable: options.primary.TopLevelOperationsTable,
Expand All @@ -255,7 +255,7 @@ func NewReaderFromClickhouseConnection(
logsLocalTable: options.primary.LogsLocalTable,
logsAttributeKeys: options.primary.LogsAttributeKeysTable,
logsResourceKeys: options.primary.LogsResourceKeysTable,
logsTagAttributeTable: options.primary.LogsTagAttributeTable,
logsTagAttributeTableV2: options.primary.LogsTagAttributeTableV2,
liveTailRefreshSeconds: options.primary.LiveTailRefreshSeconds,
promConfigFile: configFile,
featureFlags: featureFlag,
Expand Down Expand Up @@ -1035,29 +1035,6 @@ func addExistsOperator(item model.TagQuery, tagMapType string, not bool) (string
return fmt.Sprintf(" AND %s (%s)", notStr, strings.Join(tagOperatorPair, " OR ")), args
}

func excludeTags(_ context.Context, tags []string) []string {
excludedTagsMap := map[string]bool{
"http.code": true,
"http.route": true,
"http.method": true,
"http.url": true,
"http.status_code": true,
"http.host": true,
"messaging.system": true,
"messaging.operation": true,
"error": true,
"service.name": true,
}
newTags := make([]string, 0)
for _, tag := range tags {
_, ok := excludedTagsMap[tag]
if !ok {
newTags = append(newTags, tag)
}
}
return newTags
}

func (r *ClickHouseReader) GetTopOperationsV2(ctx context.Context, queryParams *model.GetTopOperationsParams) (*[]model.TopOperationsItem, *model.ApiError) {

namedArgs := []interface{}{
Expand Down Expand Up @@ -3503,7 +3480,7 @@ func (r *ClickHouseReader) GetLogAggregateAttributes(ctx context.Context, req *v
case
v3.AggregateOperatorCountDistinct,
v3.AggregateOperatorCount:
where = "tagKey ILIKE $1"
where = "tag_key ILIKE $1"
stringAllowed = true
case
v3.AggregateOperatorRateSum,
Expand All @@ -3524,7 +3501,7 @@ func (r *ClickHouseReader) GetLogAggregateAttributes(ctx context.Context, req *v
v3.AggregateOperatorSum,
v3.AggregateOperatorMin,
v3.AggregateOperatorMax:
where = "tagKey ILIKE $1 AND (tagDataType='int64' or tagDataType='float64')"
where = "tag_key ILIKE $1 AND (tag_data_type='int64' or tag_data_type='float64')"
stringAllowed = false
case
v3.AggregateOperatorNoOp:
Expand All @@ -3533,7 +3510,7 @@ func (r *ClickHouseReader) GetLogAggregateAttributes(ctx context.Context, req *v
return nil, fmt.Errorf("unsupported aggregate operator")
}

query = fmt.Sprintf("SELECT DISTINCT(tagKey), tagType, tagDataType from %s.%s WHERE %s limit $2", r.logsDB, r.logsTagAttributeTable, where)
query = fmt.Sprintf("SELECT DISTINCT(tag_key), tag_type, tag_data_type from %s.%s WHERE %s limit $2", r.logsDB, r.logsTagAttributeTableV2, where)
rows, err = r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText), req.Limit)
if err != nil {
zap.L().Error("Error while executing query", zap.Error(err))
Expand Down Expand Up @@ -3582,10 +3559,10 @@ func (r *ClickHouseReader) GetLogAttributeKeys(ctx context.Context, req *v3.Filt
var response v3.FilterAttributeKeyResponse

if len(req.SearchText) != 0 {
query = fmt.Sprintf("select distinct tagKey, tagType, tagDataType from %s.%s where tagKey ILIKE $1 limit $2", r.logsDB, r.logsTagAttributeTable)
query = fmt.Sprintf("select distinct tag_key, tag_type, tag_data_type from %s.%s where tag_key ILIKE $1 limit $2", r.logsDB, r.logsTagAttributeTableV2)
rows, err = r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText), req.Limit)
} else {
query = fmt.Sprintf("select distinct tagKey, tagType, tagDataType from %s.%s limit $1", r.logsDB, r.logsTagAttributeTable)
query = fmt.Sprintf("select distinct tag_key, tag_type, tag_data_type from %s.%s limit $1", r.logsDB, r.logsTagAttributeTableV2)
rows, err = r.db.Query(ctx, query, req.Limit)
}

Expand Down Expand Up @@ -3662,11 +3639,11 @@ func (r *ClickHouseReader) GetLogAttributeValues(ctx context.Context, req *v3.Fi
query := "select distinct"
switch req.FilterAttributeKeyDataType {
case v3.AttributeKeyDataTypeInt64:
filterValueColumn = "int64TagValue"
filterValueColumn = "number_value"
case v3.AttributeKeyDataTypeFloat64:
filterValueColumn = "float64TagValue"
filterValueColumn = "number_value"
case v3.AttributeKeyDataTypeString:
filterValueColumn = "stringTagValue"
filterValueColumn = "string_value"
}

searchText := fmt.Sprintf("%%%s%%", req.SearchText)
Expand Down Expand Up @@ -3694,10 +3671,10 @@ func (r *ClickHouseReader) GetLogAttributeValues(ctx context.Context, req *v3.Fi
if req.FilterAttributeKeyDataType != v3.AttributeKeyDataTypeString {
filterValueColumnWhere = fmt.Sprintf("toString(%s)", filterValueColumn)
}
query = fmt.Sprintf("select distinct %s from %s.%s where tagKey=$1 and %s ILIKE $2 and tagType=$3 limit $4", filterValueColumn, r.logsDB, r.logsTagAttributeTable, filterValueColumnWhere)
query = fmt.Sprintf("SELECT DISTINCT %s FROM %s.%s WHERE tag_key=$1 AND %s ILIKE $2 AND tag_type=$3 LIMIT $4", filterValueColumn, r.logsDB, r.logsTagAttributeTableV2, filterValueColumnWhere)
rows, err = r.db.Query(ctx, query, req.FilterAttributeKey, searchText, req.TagType, req.Limit)
} else {
query = fmt.Sprintf("select distinct %s from %s.%s where tagKey=$1 and tagType=$2 limit $3", filterValueColumn, r.logsDB, r.logsTagAttributeTable)
query = fmt.Sprintf("SELECT DISTINCT %s FROM %s.%s WHERE tag_key=$1 AND tag_type=$2 LIMIT $3", filterValueColumn, r.logsDB, r.logsTagAttributeTableV2)
rows, err = r.db.Query(ctx, query, req.FilterAttributeKey, req.TagType, req.Limit)
}

Expand Down Expand Up @@ -4162,7 +4139,7 @@ func (r *ClickHouseReader) GetTraceAggregateAttributes(ctx context.Context, req
case
v3.AggregateOperatorCountDistinct,
v3.AggregateOperatorCount:
where = "tagKey ILIKE $1"
where = "tag_key ILIKE $1"
stringAllowed = true
case
v3.AggregateOperatorRateSum,
Expand All @@ -4183,15 +4160,15 @@ func (r *ClickHouseReader) GetTraceAggregateAttributes(ctx context.Context, req
v3.AggregateOperatorSum,
v3.AggregateOperatorMin,
v3.AggregateOperatorMax:
where = "tagKey ILIKE $1 AND dataType='float64'"
where = "tag_key ILIKE $1 AND tag_data_type='float64'"
stringAllowed = false
case
v3.AggregateOperatorNoOp:
return &v3.AggregateAttributeResponse{}, nil
default:
return nil, fmt.Errorf("unsupported aggregate operator")
}
query = fmt.Sprintf("SELECT DISTINCT(tagKey), tagType, dataType FROM %s.%s WHERE %s", r.TraceDB, r.spanAttributeTable, where)
query = fmt.Sprintf("SELECT DISTINCT(tag_key), tag_type, tag_data_type FROM %s.%s WHERE %s", r.TraceDB, r.spanAttributeTableV2, where)
if req.Limit != 0 {
query = query + fmt.Sprintf(" LIMIT %d;", req.Limit)
}
Expand Down Expand Up @@ -4253,7 +4230,7 @@ func (r *ClickHouseReader) GetTraceAttributeKeys(ctx context.Context, req *v3.Fi
var rows driver.Rows
var response v3.FilterAttributeKeyResponse

query = fmt.Sprintf("SELECT DISTINCT(tagKey), tagType, dataType FROM %s.%s WHERE tagKey ILIKE $1 LIMIT $2", r.TraceDB, r.spanAttributeTable)
query = fmt.Sprintf("SELECT DISTINCT(tag_key), tag_type, tag_data_type FROM %s.%s WHERE tag_key ILIKE $1 LIMIT $2", r.TraceDB, r.spanAttributeTableV2)

rows, err = r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText), req.Limit)

Expand Down Expand Up @@ -4335,12 +4312,12 @@ func (r *ClickHouseReader) GetTraceAttributeValues(ctx context.Context, req *v3.
}, nil
}

query = "select distinct"
query = "SELECT DISTINCT"
switch req.FilterAttributeKeyDataType {
case v3.AttributeKeyDataTypeFloat64:
filterValueColumn = "float64TagValue"
filterValueColumn = "number_value"
case v3.AttributeKeyDataTypeString:
filterValueColumn = "stringTagValue"
filterValueColumn = "string_value"
}

searchText := fmt.Sprintf("%%%s%%", req.SearchText)
Expand All @@ -4361,14 +4338,14 @@ func (r *ClickHouseReader) GetTraceAttributeValues(ctx context.Context, req *v3.
if r.useTraceNewSchema {
where += " AND ts_bucket_start >= toUInt64(toUnixTimestamp(now() - INTERVAL 48 HOUR))"
}
query = fmt.Sprintf("select distinct %s from %s.%s where %s and %s ILIKE $1 limit $2", selectKey, r.TraceDB, r.traceTableName, where, filterValueColumnWhere)
query = fmt.Sprintf("SELECT DISTINCT %s FROM %s.%s WHERE %s AND %s ILIKE $1 LIMIT $2", selectKey, r.TraceDB, r.traceTableName, where, filterValueColumnWhere)
rows, err = r.db.Query(ctx, query, searchText, req.Limit)
} else {
filterValueColumnWhere := filterValueColumn
if req.FilterAttributeKeyDataType != v3.AttributeKeyDataTypeString {
filterValueColumnWhere = fmt.Sprintf("toString(%s)", filterValueColumn)
}
query = fmt.Sprintf("select distinct %s from %s.%s where tagKey=$1 and %s ILIKE $2 and tagType=$3 limit $4", filterValueColumn, r.TraceDB, r.spanAttributeTable, filterValueColumnWhere)
query = fmt.Sprintf("SELECT DISTINCT %s FROM %s.%s WHERE tag_key=$1 AND %s ILIKE $2 AND tag_type=$3 LIMIT $4", filterValueColumn, r.TraceDB, r.spanAttributeTableV2, filterValueColumnWhere)
rows, err = r.db.Query(ctx, query, req.FilterAttributeKey, searchText, req.TagType, req.Limit)
}

Expand Down
19 changes: 9 additions & 10 deletions pkg/query-service/tests/integration/filter_suggestions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,9 @@ func (tb *FilterSuggestionsTestBed) mockAttribKeysQueryResponse(
attribsToReturn []v3.AttributeKey,
) {
cols := []mockhouse.ColumnType{}
cols = append(cols, mockhouse.ColumnType{Type: "String", Name: "tagKey"})
cols = append(cols, mockhouse.ColumnType{Type: "String", Name: "tagType"})
cols = append(cols, mockhouse.ColumnType{Type: "String", Name: "tagDataType"})
cols = append(cols, mockhouse.ColumnType{Type: "String", Name: "tag_key"})
cols = append(cols, mockhouse.ColumnType{Type: "String", Name: "tag_type"})
cols = append(cols, mockhouse.ColumnType{Type: "String", Name: "tag_data_type"})

values := [][]any{}
for _, a := range attribsToReturn {
Expand All @@ -213,7 +213,7 @@ func (tb *FilterSuggestionsTestBed) mockAttribKeysQueryResponse(
}

tb.mockClickhouse.ExpectQuery(
"select.*from.*signoz_logs.distributed_tag_attributes.*",
"select.*from.*signoz_logs.distributed_tag_attributes_v2.*",
).WithArgs(
constants.DefaultFilterSuggestionsAttributesLimit,
).WillReturnRows(
Expand All @@ -236,10 +236,9 @@ func (tb *FilterSuggestionsTestBed) mockAttribValuesQueryResponse(
stringValuesToReturn [][]string,
) {
resultCols := []mockhouse.ColumnType{
{Type: "String", Name: "tagKey"},
{Type: "String", Name: "stringTagValue"},
{Type: "Nullable(Int64)", Name: "int64TagValue"},
{Type: "Nullable(Float64)", Name: "float64TagValue"},
{Type: "String", Name: "tag_key"},
{Type: "String", Name: "string_value"},
{Type: "Nullable(Int64)", Name: "number_value"},
}

expectedAttribKeysInQuery := []any{}
Expand All @@ -248,13 +247,13 @@ func (tb *FilterSuggestionsTestBed) mockAttribValuesQueryResponse(
expectedAttribKeysInQuery = append(expectedAttribKeysInQuery, attrib.Key)
for _, stringTagValue := range stringValuesToReturn[idx] {
mockResultRows = append(mockResultRows, []any{
attrib.Key, stringTagValue, nil, nil,
attrib.Key, stringTagValue, nil,
})
}
}

tb.mockClickhouse.ExpectQuery(
"select.*tagKey.*stringTagValue.*int64TagValue.*float64TagValue.*distributed_tag_attributes.*tagKey",
"select.*tag_key.*string_value.*number_value.*distributed_tag_attributes_v2.*tag_key",
).WithArgs(expectedAttribKeysInQuery...).WillReturnRows(mockhouse.NewRows(resultCols, mockResultRows))
}

Expand Down

0 comments on commit e676602

Please sign in to comment.