Skip to content

Commit

Permalink
Feat: integrations v0 metrics connection status (#4715)
Browse files Browse the repository at this point in the history
* chore: add test expectations for integration metrics connection status

* chore: reorg logs connection status calculation for parallelization

* chore: add interface for reader.GetMetricLastReceivedTsMillis

* chore: add plumbing for calculating integration metrics connection status

* chore: impl and test mocks for reader.GetMetricReceivedLatest

* chore: wrap things up and get test passing

* chore: some cleanup

* chore: some more cleanup

* chore: use prom metric names for integration connection test
  • Loading branch information
raj-k-singh authored Mar 18, 2024
1 parent 4c21749 commit 43f9830
Show file tree
Hide file tree
Showing 8 changed files with 300 additions and 46 deletions.
62 changes: 62 additions & 0 deletions pkg/query-service/app/clickhouseReader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ const (
signozSampleLocalTableName = "samples_v2"
signozSampleTableName = "distributed_samples_v2"
signozTSTableName = "distributed_time_series_v2"
signozTSTableNameV4 = "distributed_time_series_v4"
signozTSTableNameV41Day = "distributed_time_series_v4_1day"

minTimespanForProgressiveSearch = time.Hour
Expand Down Expand Up @@ -4271,6 +4272,67 @@ func (r *ClickHouseReader) GetMetricMetadata(ctx context.Context, metricName, se
}, nil
}

func (r *ClickHouseReader) GetLatestReceivedMetric(
ctx context.Context, metricNames []string,
) (*model.MetricStatus, *model.ApiError) {
if len(metricNames) < 1 {
return nil, nil
}

quotedMetricNames := []string{}
for _, m := range metricNames {
quotedMetricNames = append(quotedMetricNames, fmt.Sprintf(`'%s'`, m))
}
commaSeparatedMetricNames := strings.Join(quotedMetricNames, ", ")

query := fmt.Sprintf(`
SELECT metric_name, labels, unix_milli
from %s.%s
where metric_name in (
%s
)
order by unix_milli desc
limit 1
`, signozMetricDBName, signozTSTableNameV4, commaSeparatedMetricNames,
)

rows, err := r.db.Query(ctx, query)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't query clickhouse for received metrics status: %w", err,
))
}
defer rows.Close()

var result *model.MetricStatus

if rows.Next() {

result = &model.MetricStatus{}
var labelsJson string

err := rows.Scan(
&result.MetricName,
&labelsJson,
&result.LastReceivedTsMillis,
)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't scan metric status row: %w", err,
))
}

err = json.Unmarshal([]byte(labelsJson), &result.LastReceivedLabels)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't unmarshal metric labels json: %w", err,
))
}
}

return result, nil
}

func isColumn(tableStatement, attrType, field, datType string) bool {
// value of attrType will be `resource` or `tag`, if `tag` change it to `attribute`
name := utils.GetClickhouseColumnName(attrType, datType, field)
Expand Down
179 changes: 134 additions & 45 deletions pkg/query-service/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2481,11 +2481,25 @@ func (ah *APIHandler) GetIntegrationConnectionStatus(
w http.ResponseWriter, r *http.Request,
) {
integrationId := mux.Vars(r)["integrationId"]
isInstalled, apiErr := ah.IntegrationsController.IsIntegrationInstalled(
r.Context(), integrationId,
)
if apiErr != nil {
RespondError(w, apiErr, "failed to check if integration is installed")
return
}

// Do not spend resources calculating connection status unless installed.
if !isInstalled {
ah.Respond(w, &integrations.IntegrationConnectionStatus{})
return
}

connectionTests, apiErr := ah.IntegrationsController.GetIntegrationConnectionTests(
r.Context(), integrationId,
)
if apiErr != nil {
RespondError(w, apiErr, "Failed to fetch integration connection tests")
RespondError(w, apiErr, "failed to fetch integration connection tests")
return
}

Expand All @@ -2511,65 +2525,140 @@ func (ah *APIHandler) calculateConnectionStatus(
connectionTests *integrations.IntegrationConnectionTests,
lookbackSeconds int64,
) (*integrations.IntegrationConnectionStatus, *model.ApiError) {
// Calculate connection status for signals in parallel

result := &integrations.IntegrationConnectionStatus{}
errors := []*model.ApiError{}
var resultLock sync.Mutex

if connectionTests.Logs != nil {
qrParams := &v3.QueryRangeParamsV3{
Start: time.Now().UnixMilli() - (lookbackSeconds * 1000),
End: time.Now().UnixMilli(),
CompositeQuery: &v3.CompositeQuery{
PanelType: v3.PanelTypeList,
QueryType: v3.QueryTypeBuilder,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
PageSize: 1,
Filters: connectionTests.Logs,
QueryName: "A",
DataSource: v3.DataSourceLogs,
Expression: "A",
AggregateOperator: v3.AggregateOperatorNoOp,
},
},
},
}
queryRes, err, _ := ah.querier.QueryRange(
ctx, qrParams, map[string]v3.AttributeKey{},
var wg sync.WaitGroup

// Calculate logs connection status
wg.Add(1)
go func() {
defer wg.Done()

logsConnStatus, apiErr := ah.calculateLogsConnectionStatus(
ctx, connectionTests.Logs, lookbackSeconds,
)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"could not query for integration connection status: %w", err,
))

resultLock.Lock()
defer resultLock.Unlock()

if apiErr != nil {
errors = append(errors, apiErr)
} else {
result.Logs = logsConnStatus
}
}()

// Calculate metrics connection status
wg.Add(1)
go func() {
defer wg.Done()

if connectionTests.Metrics == nil || len(connectionTests.Metrics) < 1 {
return
}
if len(queryRes) > 0 && queryRes[0].List != nil && len(queryRes[0].List) > 0 {
lastLog := queryRes[0].List[0]

statusForLastReceivedMetric, apiErr := ah.reader.GetLatestReceivedMetric(
ctx, connectionTests.Metrics,
)

resultLock.Lock()
defer resultLock.Unlock()

if apiErr != nil {
errors = append(errors, apiErr)

} else if statusForLastReceivedMetric != nil {
resourceSummaryParts := []string{}
lastLogResourceAttribs := lastLog.Data["resources_string"]
if lastLogResourceAttribs != nil {
resourceAttribs, ok := lastLogResourceAttribs.(*map[string]string)
if !ok {
return nil, model.InternalError(fmt.Errorf(
"could not cast log resource attribs",
))
}
for k, v := range *resourceAttribs {
resourceSummaryParts = append(resourceSummaryParts, fmt.Sprintf(
"%s=%s", k, v,
))
}
for k, v := range statusForLastReceivedMetric.LastReceivedLabels {
resourceSummaryParts = append(resourceSummaryParts, fmt.Sprintf(
"%s=%s", k, v,
))
}
lastLogResourceSummary := strings.Join(resourceSummaryParts, ", ")

result.Logs = &integrations.SignalConnectionStatus{
LastReceivedTsMillis: lastLog.Timestamp.UnixMilli(),
LastReceivedFrom: lastLogResourceSummary,
result.Metrics = &integrations.SignalConnectionStatus{
LastReceivedFrom: strings.Join(resourceSummaryParts, ", "),
LastReceivedTsMillis: statusForLastReceivedMetric.LastReceivedTsMillis,
}
}
}()

wg.Wait()

if len(errors) > 0 {
return nil, errors[0]
}

return result, nil
}

func (ah *APIHandler) calculateLogsConnectionStatus(
ctx context.Context,
logsConnectionTest *v3.FilterSet,
lookbackSeconds int64,
) (*integrations.SignalConnectionStatus, *model.ApiError) {
if logsConnectionTest == nil {
return nil, nil
}

qrParams := &v3.QueryRangeParamsV3{
Start: time.Now().UnixMilli() - (lookbackSeconds * 1000),
End: time.Now().UnixMilli(),
CompositeQuery: &v3.CompositeQuery{
PanelType: v3.PanelTypeList,
QueryType: v3.QueryTypeBuilder,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
PageSize: 1,
Filters: logsConnectionTest,
QueryName: "A",
DataSource: v3.DataSourceLogs,
Expression: "A",
AggregateOperator: v3.AggregateOperatorNoOp,
},
},
},
}
queryRes, err, _ := ah.querier.QueryRange(
ctx, qrParams, map[string]v3.AttributeKey{},
)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"could not query for integration connection status: %w", err,
))
}
if len(queryRes) > 0 && queryRes[0].List != nil && len(queryRes[0].List) > 0 {
lastLog := queryRes[0].List[0]

resourceSummaryParts := []string{}
lastLogResourceAttribs := lastLog.Data["resources_string"]
if lastLogResourceAttribs != nil {
resourceAttribs, ok := lastLogResourceAttribs.(*map[string]string)
if !ok {
return nil, model.InternalError(fmt.Errorf(
"could not cast log resource attribs",
))
}
for k, v := range *resourceAttribs {
resourceSummaryParts = append(resourceSummaryParts, fmt.Sprintf(
"%s=%s", k, v,
))
}
}
lastLogResourceSummary := strings.Join(resourceSummaryParts, ", ")

return &integrations.SignalConnectionStatus{
LastReceivedTsMillis: lastLog.Timestamp.UnixMilli(),
LastReceivedFrom: lastLogResourceSummary,
}, nil
}

return nil, nil
}

func (ah *APIHandler) InstallIntegration(
w http.ResponseWriter, r *http.Request,
) {
Expand Down
40 changes: 40 additions & 0 deletions pkg/query-service/app/integrations/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"embed"
"strings"
"unicode"

"encoding/base64"
"encoding/json"
Expand Down Expand Up @@ -133,6 +134,14 @@ func readBuiltInIntegration(dirpath string) (
}

integration.Id = "builtin-" + integration.Id
if len(integration.DataCollected.Metrics) > 0 {
metricsForConnTest := []string{}
for _, collectedMetric := range integration.DataCollected.Metrics {
promName := toPromMetricName(collectedMetric.Name)
metricsForConnTest = append(metricsForConnTest, promName)
}
integration.ConnectionTests.Metrics = metricsForConnTest
}

return &integration, nil
}
Expand Down Expand Up @@ -223,3 +232,34 @@ func readFileIfUri(maybeFileUri string, basedir string) (interface{}, error) {

return nil, fmt.Errorf("unsupported file type %s", maybeFileUri)
}

// copied from signoz clickhouse exporter's `sanitize` which
// in turn is copied from prometheus-go-metric-exporter
//
// replaces non-alphanumeric characters with underscores in s.
func toPromMetricName(s string) string {
if len(s) == 0 {
return s
}

// Note: No length limit for label keys because Prometheus doesn't
// define a length limit, thus we should NOT be truncating label keys.
// See https://github.com/orijtech/prometheus-go-metrics-exporter/issues/4.

s = strings.Map(func(r rune) rune {
// sanitizeRune converts anything that is not a letter or digit to an underscore
if unicode.IsLetter(r) || unicode.IsDigit(r) {
return r
}
// Everything else turns into an underscore
return '_'
}, s)

if unicode.IsDigit(rune(s[0])) {
s = "key" + "_" + s
}
if s[0] == '_' {
s = "key" + s
}
return s
}
12 changes: 12 additions & 0 deletions pkg/query-service/app/integrations/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,18 @@ func (c *Controller) GetIntegration(
return c.mgr.GetIntegration(ctx, integrationId)
}

func (c *Controller) IsIntegrationInstalled(
ctx context.Context,
integrationId string,
) (bool, *model.ApiError) {
installation, apiErr := c.mgr.getInstalledIntegration(ctx, integrationId)
if apiErr != nil {
return false, apiErr
}
isInstalled := installation != nil
return isInstalled, nil
}

func (c *Controller) GetIntegrationConnectionTests(
ctx context.Context, integrationId string,
) (*IntegrationConnectionTests, *model.ApiError) {
Expand Down
4 changes: 3 additions & 1 deletion pkg/query-service/app/integrations/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,11 @@ type IntegrationConnectionStatus struct {
}

type IntegrationConnectionTests struct {
// Filter to use for finding logs for the integration.
Logs *v3.FilterSet `json:"logs"`

// TODO(Raj): Add connection tests for other signals.
// Metric names expected to have been received for the integration.
Metrics []string `json:"metrics"`
}

type IntegrationDetails struct {
Expand Down
3 changes: 3 additions & 0 deletions pkg/query-service/interfaces/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ type Reader interface {
GetMetricAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error)
GetMetricAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error)

// Returns `MetricStatus` for latest received metric among `metricNames`. Useful for status calculations
GetLatestReceivedMetric(ctx context.Context, metricNames []string) (*model.MetricStatus, *model.ApiError)

// QB V3 metrics/traces/logs
GetTimeSeriesResultV3(ctx context.Context, query string) ([]*v3.Series, error)
GetListResultV3(ctx context.Context, query string) ([]*v3.Row, error)
Expand Down
Loading

0 comments on commit 43f9830

Please sign in to comment.