diff --git a/enterprise/reporting/error_index/types.go b/enterprise/reporting/error_index/types.go index 30f364bb98..48d354eb9f 100644 --- a/enterprise/reporting/error_index/types.go +++ b/enterprise/reporting/error_index/types.go @@ -34,8 +34,8 @@ type payload struct { FailedStage string `json:"failedStage" parquet:"name=failed_stage, type=BYTE_ARRAY, convertedtype=UTF8, encoding=RLE_DICTIONARY"` EventType string `json:"eventType" parquet:"name=event_type, type=BYTE_ARRAY, convertedtype=UTF8, encoding=RLE_DICTIONARY"` EventName string `json:"eventName" parquet:"name=event_name, type=BYTE_ARRAY, convertedtype=UTF8, encoding=RLE_DICTIONARY"` - ReceivedAt int64 `json:"receivedAt" parquet:"name=received_at, type=INT64, convertedtype=TIMESTAMP_MICROS, encoding=DELTA_BINARY_PACKED"` // In Microseconds - FailedAt int64 `json:"failedAt" parquet:"name=failed_at, type=INT64, convertedtype=TIMESTAMP_MICROS, encoding=DELTA_BINARY_PACKED"` // In Microseconds + ReceivedAt int64 `json:"receivedAt" parquet:"name=received_at, type=INT64, encoding=DELTA_BINARY_PACKED"` // In Microseconds + FailedAt int64 `json:"failedAt" parquet:"name=failed_at, type=INT64, encoding=DELTA_BINARY_PACKED"` // In Microseconds } func (p *payload) SetReceivedAt(t time.Time) { diff --git a/enterprise/reporting/error_index/worker_test.go b/enterprise/reporting/error_index/worker_test.go index dade052022..6fef94424e 100644 --- a/enterprise/reporting/error_index/worker_test.go +++ b/enterprise/reporting/error_index/worker_test.go @@ -134,18 +134,18 @@ func TestWorkerWriter(t *testing.T) { t.Run("count all", func(t *testing.T) { var count int64 - err := duckDB(t).QueryRowContext(ctx, fmt.Sprintf("SELECT count(*) FROM read_parquet('%s');", filePath)).Scan(&count) + err := duckDB(t).QueryRowContext(ctx, "SELECT count(*) FROM read_parquet($1);", filePath).Scan(&count) require.NoError(t, err) require.EqualValues(t, len(payloads), count) }) t.Run("count for sourceId, destinationId", func(t *testing.T) { var count int64 - err := duckDB(t).QueryRowContext(ctx, fmt.Sprintf("SELECT count(*) FROM read_parquet('%s') WHERE source_id = $1 AND destination_id = $2;", filePath), "sourceId3", "destinationId3").Scan(&count) + err := duckDB(t).QueryRowContext(ctx, "SELECT count(*) FROM read_parquet($1) WHERE source_id = $2 AND destination_id = $3;", filePath, "sourceId3", "destinationId3").Scan(&count) require.NoError(t, err) require.EqualValues(t, 10, count) }) t.Run("select all", func(t *testing.T) { - failedMessages := failedMessagesUsingDuckDB(t, ctx, nil, fmt.Sprintf("SELECT * FROM read_parquet('%s') ORDER BY failed_at DESC;", filePath)) + failedMessages := failedMessagesUsingDuckDB(t, ctx, nil, "SELECT * FROM read_parquet($1) ORDER BY failed_at DESC;", []interface{}{filePath}) for i, failedMessage := range failedMessages { require.Equal(t, payloads[i].MessageID, failedMessage.MessageID) @@ -156,8 +156,8 @@ func TestWorkerWriter(t *testing.T) { require.Equal(t, payloads[i].FailedStage, failedMessage.FailedStage) require.Equal(t, payloads[i].EventType, failedMessage.EventType) require.Equal(t, payloads[i].EventName, failedMessage.EventName) - require.EqualValues(t, payloads[i].ReceivedAt, failedMessage.ReceivedAt) - require.EqualValues(t, payloads[i].FailedAt, failedMessage.FailedAt) + require.Equal(t, payloads[i].ReceivedAt, failedMessage.ReceivedAt) + require.Equal(t, payloads[i].FailedAt, failedMessage.FailedAt) } }) }) @@ -264,8 +264,7 @@ func TestWorkerWriter(t *testing.T) { lastFailedAt.Unix(), instanceID, ) - query := fmt.Sprintf("SELECT * FROM read_parquet('%s') ORDER BY failed_at ASC;", filePath) - failedMessages := failedMessagesUsingDuckDB(t, ctx, minioResource, query) + failedMessages := failedMessagesUsingDuckDB(t, ctx, minioResource, "SELECT * FROM read_parquet($1) WHERE failed_at >= $2 AND failed_at <= $3 ORDER BY failed_at ASC;", []interface{}{filePath, failedAt.UTC().UnixMicro(), lastFailedAt.UTC().UnixMicro()}) require.Len(t, failedMessages, len(jobs)) require.EqualValues(t, payloads, failedMessages) @@ -277,7 +276,7 @@ func TestWorkerWriter(t *testing.T) { lastFailedAt.Unix(), instanceID, ) - s3SelectQuery := fmt.Sprint("SELECT message_id, source_id, destination_id, transformation_id, tracking_plan_id, failed_stage, event_type, event_name, received_at, failed_at FROM S3Object") + s3SelectQuery := fmt.Sprintf("SELECT message_id, source_id, destination_id, transformation_id, tracking_plan_id, failed_stage, event_type, event_name, received_at, failed_at FROM S3Object WHERE failed_at >= %d AND failed_at <= %d", failedAt.UTC().UnixMicro(), lastFailedAt.UTC().UnixMicro()) failedMessagesUsing3Select := failedMessagesUsingMinioS3Select(t, ctx, minioResource, s3SelectPath, s3SelectQuery) slices.SortFunc(failedMessagesUsing3Select, func(a, b payload) int { return a.FailedAtTime().Compare(b.FailedAtTime()) @@ -382,7 +381,7 @@ func TestWorkerWriter(t *testing.T) { for i := 0; i < count; i++ { failedAt := failedAt.Add(time.Duration(i) * time.Hour) - query := fmt.Sprintf("SELECT * FROM read_parquet('%s') ORDER BY failed_at ASC;", fmt.Sprintf("s3://%s/%s/%s/%s/%d_%d_%s.parquet", + filePath := fmt.Sprintf("s3://%s/%s/%s/%s/%d_%d_%s.parquet", minioResource.BucketName, w.sourceID, failedAt.Format("2006-01-02"), @@ -390,9 +389,8 @@ func TestWorkerWriter(t *testing.T) { failedAt.Unix(), failedAt.Unix(), instanceID, - )) - - failedMessages := failedMessagesUsingDuckDB(t, ctx, minioResource, query) + ) + failedMessages := failedMessagesUsingDuckDB(t, ctx, minioResource, "SELECT * FROM read_parquet($1) WHERE failed_at >= $2 AND failed_at <= $3 ORDER BY failed_at ASC;", []interface{}{filePath, failedAt.UTC().UnixMicro(), failedAt.UTC().UnixMicro()}) require.EqualValues(t, []payload{payloads[i]}, failedMessages) } @@ -578,7 +576,7 @@ func failedMessagesUsingMinioS3Select(t testing.TB, ctx context.Context, mr *res return payloads } -func failedMessagesUsingDuckDB(t testing.TB, ctx context.Context, mr *resource.MinioResource, query string) []payload { +func failedMessagesUsingDuckDB(t testing.TB, ctx context.Context, mr *resource.MinioResource, query string, queryArgs []interface{}) []payload { t.Helper() db := duckDB(t) @@ -593,23 +591,19 @@ func failedMessagesUsingDuckDB(t testing.TB, ctx context.Context, mr *resource.M require.NoError(t, err) } - rows, err := db.QueryContext(ctx, query) + rows, err := db.QueryContext(ctx, query, queryArgs...) require.NoError(t, err) defer func() { _ = rows.Close() }() var expectedPayloads []payload for rows.Next() { var p payload - var receivedAt time.Time - var failedAt time.Time require.NoError(t, rows.Scan( &p.MessageID, &p.SourceID, &p.DestinationID, &p.TransformationID, &p.TrackingPlanID, &p.FailedStage, - &p.EventType, &p.EventName, &receivedAt, - &failedAt, + &p.EventType, &p.EventName, &p.ReceivedAt, + &p.FailedAt, )) - p.SetReceivedAt(receivedAt) - p.SetFailedAt(failedAt) expectedPayloads = append(expectedPayloads, p) } require.NoError(t, rows.Err())