diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 4f1aa99dfcb..72fa61d7e80 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -2717,8 +2717,8 @@ func (r *ClickHouseReader) GetTagsInfoInLastHeartBeatInterval(ctx context.Contex } // remove this after sometime -func removeUnderscoreDuplicateFields(fields []model.LogField) []model.LogField { - lookup := map[string]model.LogField{} +func removeUnderscoreDuplicateFields(fields []model.Field) []model.Field { + lookup := map[string]model.Field{} for _, v := range fields { lookup[v.Name+v.DataType] = v } @@ -2729,7 +2729,7 @@ func removeUnderscoreDuplicateFields(fields []model.LogField) []model.LogField { } } - updatedFields := []model.LogField{} + updatedFields := []model.Field{} for _, v := range lookup { updatedFields = append(updatedFields, v) } @@ -2740,11 +2740,11 @@ func (r *ClickHouseReader) GetLogFields(ctx context.Context) (*model.GetFieldsRe // response will contain top level fields from the otel log model response := model.GetFieldsResponse{ Selected: constants.StaticSelectedLogFields, - Interesting: []model.LogField{}, + Interesting: []model.Field{}, } // get attribute keys - attributes := []model.LogField{} + attributes := []model.Field{} query := fmt.Sprintf("SELECT DISTINCT name, datatype from %s.%s group by name, datatype", r.logsDB, r.logsAttributeKeys) err := r.db.Select(ctx, &attributes, query) if err != nil { @@ -2752,7 +2752,7 @@ func (r *ClickHouseReader) GetLogFields(ctx context.Context) (*model.GetFieldsRe } // get resource keys - resources := []model.LogField{} + resources := []model.Field{} query = fmt.Sprintf("SELECT DISTINCT name, datatype from %s.%s group by name, datatype", r.logsDB, r.logsResourceKeys) err = r.db.Select(ctx, &resources, query) if err != nil { @@ -2776,9 +2776,11 @@ func (r *ClickHouseReader) GetLogFields(ctx context.Context) (*model.GetFieldsRe return &response, nil } -func (r *ClickHouseReader) extractSelectedAndInterestingFields(tableStatement string, fieldType string, fields *[]model.LogField, response *model.GetFieldsResponse) { +func (r *ClickHouseReader) extractSelectedAndInterestingFields(tableStatement string, overrideFieldType string, fields *[]model.Field, response *model.GetFieldsResponse) { for _, field := range *fields { - field.Type = fieldType + if overrideFieldType != "" { + field.Type = overrideFieldType + } // all static fields are assumed to be selected as we don't allow changing them if isColumn(r.useLogsNewSchema, tableStatement, field.Type, field.Name, field.DataType) { response.Selected = append(response.Selected, field) @@ -2968,6 +2970,129 @@ func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.Upda return nil } +func (r *ClickHouseReader) GetTraceFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError) { + // response will contain top level fields from the otel log model + response := model.GetFieldsResponse{ + Selected: []model.Field{}, + Interesting: []model.Field{}, + } + + // get the top level selected fields + for _, field := range constants.NewStaticFieldsTraces { + response.Selected = append(response.Selected, model.Field{ + Name: field.Key, + DataType: field.DataType.String(), + Type: field.Type.String(), + }) + } + + // get attribute keys + attributes := []model.Field{} + query := fmt.Sprintf("SELECT DISTINCT tagKey, tagType, dataType from %s.%s group by tagKey, tagType, dataType", r.TraceDB, r.spanAttributesKeysTable) + fmt.Println(query) + rows, err := r.db.Query(ctx, query) + if err != nil { + return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + defer rows.Close() + + var tagKey string + var dataType string + var tagType string + for rows.Next() { + if err := rows.Scan(&tagKey, &tagType, &dataType); err != nil { + return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + attributes = append(attributes, model.Field{ + Name: tagKey, + DataType: dataType, + Type: tagType, + }) + } + + statements := []model.ShowCreateTableStatement{} + query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.TraceDB, r.traceLocalTableName) + err = r.db.Select(ctx, &statements, query) + if err != nil { + return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + + r.extractSelectedAndInterestingFields(statements[0].Statement, "", &attributes, &response) + + return &response, nil + +} + +func (r *ClickHouseReader) UpdateTraceField(ctx context.Context, field *model.UpdateField) *model.ApiError { + if !field.Selected { + return model.ForbiddenError(errors.New("removing a selected field is not allowed, please reach out to support.")) + } + + colname := utils.GetClickhouseColumnNameV2(field.Type, field.DataType, field.Name) + + dataType := strings.ToLower(field.DataType) + if dataType == "int64" || dataType == "float64" { + dataType = "number" + } + // if dataType == "string" { + // dataType = "String" + // } + attrColName := fmt.Sprintf("%s_%s", field.Type, dataType) + for _, table := range []string{r.traceLocalTableName, r.traceTableName} { + q := "ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS `%s` %s DEFAULT %s['%s'] CODEC(ZSTD(1))" + query := fmt.Sprintf(q, + r.TraceDB, table, + r.cluster, + colname, field.DataType, + attrColName, + field.Name, + ) + fmt.Println(query) + err := r.db.Exec(ctx, query) + if err != nil { + return &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + + query = fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS `%s_exists` bool DEFAULT if(mapContains(%s, '%s') != 0, true, false) CODEC(ZSTD(1))", + r.TraceDB, table, + r.cluster, + colname, + attrColName, + field.Name, + ) + err = r.db.Exec(ctx, query) + if err != nil { + return &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + } + + // create the index + if strings.ToLower(field.DataType) == "bool" { + // there is no point in creating index for bool attributes as the cardinality is just 2 + return nil + } + + if field.IndexType == "" { + field.IndexType = constants.DefaultLogSkipIndexType + } + if field.IndexGranularity == 0 { + field.IndexGranularity = constants.DefaultLogSkipIndexGranularity + } + query := fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD INDEX IF NOT EXISTS `%s_idx` (`%s`) TYPE %s GRANULARITY %d", + r.TraceDB, r.traceLocalTableName, + r.cluster, + colname, + colname, + field.IndexType, + field.IndexGranularity, + ) + err := r.db.Exec(ctx, query) + if err != nil { + return &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + return nil +} + func (r *ClickHouseReader) GetLogs(ctx context.Context, params *model.LogsFilterParams) (*[]model.SignozLog, *model.ApiError) { response := []model.SignozLog{} fields, apiErr := r.GetLogFields(ctx) diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 3e25ab23c80..0611e0f050d 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -531,6 +531,9 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *AuthMiddleware) { router.HandleFunc("/api/v1/settings/ingestion_key", am.AdminAccess(aH.insertIngestionKey)).Methods(http.MethodPost) router.HandleFunc("/api/v1/settings/ingestion_key", am.ViewAccess(aH.getIngestionKeys)).Methods(http.MethodGet) + router.HandleFunc("/api/v2/traces/fields", am.ViewAccess(aH.traceFields)).Methods(http.MethodGet) + router.HandleFunc("/api/v2/traces/fields", am.EditAccess(aH.updateTraceField)).Methods(http.MethodPost) + router.HandleFunc("/api/v1/version", am.OpenAccess(aH.getVersion)).Methods(http.MethodGet) router.HandleFunc("/api/v1/featureFlags", am.OpenAccess(aH.getFeatureFlags)).Methods(http.MethodGet) router.HandleFunc("/api/v1/configs", am.OpenAccess(aH.getConfigs)).Methods(http.MethodGet) @@ -4897,3 +4900,35 @@ func (aH *APIHandler) QueryRangeV4(w http.ResponseWriter, r *http.Request) { aH.queryRangeV4(r.Context(), queryRangeParams, w, r) } + +func (aH *APIHandler) traceFields(w http.ResponseWriter, r *http.Request) { + fields, apiErr := aH.reader.GetTraceFields(r.Context()) + if apiErr != nil { + RespondError(w, apiErr, "Failed to fetch fields from the DB") + return + } + aH.WriteJSON(w, r, fields) +} + +func (aH *APIHandler) updateTraceField(w http.ResponseWriter, r *http.Request) { + field := model.UpdateField{} + if err := json.NewDecoder(r.Body).Decode(&field); err != nil { + apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErr, "Failed to decode payload") + return + } + + err := logs.ValidateUpdateFieldPayload(&field) + if err != nil { + apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErr, "Incorrect payload") + return + } + + apiErr := aH.reader.UpdateTraceField(r.Context(), &field) + if apiErr != nil { + RespondError(w, apiErr, "Failed to update field in the DB") + return + } + aH.WriteJSON(w, r, field) +} diff --git a/pkg/query-service/app/logs/parser.go b/pkg/query-service/app/logs/parser.go index 855a0235288..be524d00da9 100644 --- a/pkg/query-service/app/logs/parser.go +++ b/pkg/query-service/app/logs/parser.go @@ -228,8 +228,8 @@ func parseColumn(s string) (*string, error) { return &colName, nil } -func arrayToMap(fields []model.LogField) map[string]model.LogField { - res := map[string]model.LogField{} +func arrayToMap(fields []model.Field) map[string]model.Field { + res := map[string]model.Field{} for _, field := range fields { res[field.Name] = field } @@ -251,7 +251,7 @@ func replaceInterestingFields(allFields *model.GetFieldsResponse, queryTokens [] return queryTokens, nil } -func replaceFieldInToken(queryToken string, selectedFieldsLookup map[string]model.LogField, interestingFieldLookup map[string]model.LogField) (string, error) { +func replaceFieldInToken(queryToken string, selectedFieldsLookup map[string]model.Field, interestingFieldLookup map[string]model.Field) (string, error) { op := strings.TrimSpace(operatorRegex.FindString(queryToken)) opLower := strings.ToLower(op) @@ -283,7 +283,7 @@ func replaceFieldInToken(queryToken string, selectedFieldsLookup map[string]mode } } else { // creating the query token here as we have the metadata - field := model.LogField{} + field := model.Field{} if sfield, ok := selectedFieldsLookup[sqlColName]; ok { field = sfield diff --git a/pkg/query-service/app/logs/parser_test.go b/pkg/query-service/app/logs/parser_test.go index 6397738437d..f894fcaecd5 100644 --- a/pkg/query-service/app/logs/parser_test.go +++ b/pkg/query-service/app/logs/parser_test.go @@ -238,14 +238,14 @@ func TestParseColumn(t *testing.T) { func TestReplaceInterestingFields(t *testing.T) { queryTokens := []string{"id.userid IN (100) ", "and id_key >= 50 ", `AND body ILIKE '%searchstring%'`} allFields := model.GetFieldsResponse{ - Selected: []model.LogField{ + Selected: []model.Field{ { Name: "id_key", DataType: "int64", Type: "attributes", }, }, - Interesting: []model.LogField{ + Interesting: []model.Field{ { Name: "id.userid", DataType: "int64", @@ -326,7 +326,7 @@ func TestCheckIfPrevousPaginateAndModifyOrder(t *testing.T) { } var generateSQLQueryFields = model.GetFieldsResponse{ - Selected: []model.LogField{ + Selected: []model.Field{ { Name: "field1", DataType: "int64", @@ -348,7 +348,7 @@ var generateSQLQueryFields = model.GetFieldsResponse{ Type: "static", }, }, - Interesting: []model.LogField{ + Interesting: []model.Field{ { Name: "FielD1", DataType: "int64", diff --git a/pkg/query-service/app/logs/validator.go b/pkg/query-service/app/logs/validator.go index d4a1e422346..69c60b71643 100644 --- a/pkg/query-service/app/logs/validator.go +++ b/pkg/query-service/app/logs/validator.go @@ -6,6 +6,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/model" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" ) func ValidateUpdateFieldPayload(field *model.UpdateField) error { @@ -19,7 +20,9 @@ func ValidateUpdateFieldPayload(field *model.UpdateField) error { return fmt.Errorf("dataType cannot be empty") } - matched, err := regexp.MatchString(fmt.Sprintf("^(%s|%s|%s)$", constants.Static, constants.Attributes, constants.Resources), field.Type) + // the logs api uses the old names i.e attributes and resources while traces use tag and attribute. + // update log api to use tag and attribute. + matched, err := regexp.MatchString(fmt.Sprintf("^(%s|%s|%s|%s|%s)$", constants.Static, constants.Attributes, constants.Resources, v3.AttributeKeyTypeTag, v3.AttributeKeyTypeResource), field.Type) if err != nil { return err } diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index 7d6f087188f..242b2cd4a39 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -290,7 +290,7 @@ const ( UINT8 = "Uint8" ) -var StaticSelectedLogFields = []model.LogField{ +var StaticSelectedLogFields = []model.Field{ { Name: "timestamp", DataType: UINT32, diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index a2acd8c6c93..ac4ab91f9eb 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -109,6 +109,10 @@ type Reader interface { SubscribeToQueryProgress(queryId string) (<-chan model.QueryProgress, func(), *model.ApiError) GetCountOfThings(ctx context.Context, query string) (uint64, error) + + //trace + GetTraceFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError) + UpdateTraceField(ctx context.Context, field *model.UpdateField) *model.ApiError } type Querier interface { diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go index c4e743ce928..281b606a89a 100644 --- a/pkg/query-service/model/response.go +++ b/pkg/query-service/model/response.go @@ -509,15 +509,15 @@ type ShowCreateTableStatement struct { Statement string `json:"statement" ch:"statement"` } -type LogField struct { +type Field struct { Name string `json:"name" ch:"name"` DataType string `json:"dataType" ch:"datatype"` Type string `json:"type"` } type GetFieldsResponse struct { - Selected []LogField `json:"selected"` - Interesting []LogField `json:"interesting"` + Selected []Field `json:"selected"` + Interesting []Field `json:"interesting"` } // Represents a log record in query service requests and responses.