Skip to content

Commit

Permalink
Merge branch 'master' into refactor.source-job
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr authored Oct 31, 2023
2 parents 691c655 + 17590a6 commit b810b26
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 22 deletions.
4 changes: 2 additions & 2 deletions enterprise/reporting/error_index/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ type payload struct {
FailedStage string `json:"failedStage" parquet:"name=failed_stage, type=BYTE_ARRAY, convertedtype=UTF8, encoding=RLE_DICTIONARY"`
EventType string `json:"eventType" parquet:"name=event_type, type=BYTE_ARRAY, convertedtype=UTF8, encoding=RLE_DICTIONARY"`
EventName string `json:"eventName" parquet:"name=event_name, type=BYTE_ARRAY, convertedtype=UTF8, encoding=RLE_DICTIONARY"`
ReceivedAt int64 `json:"receivedAt" parquet:"name=received_at, type=INT64, convertedtype=TIMESTAMP_MICROS, encoding=DELTA_BINARY_PACKED"` // In Microseconds
FailedAt int64 `json:"failedAt" parquet:"name=failed_at, type=INT64, convertedtype=TIMESTAMP_MICROS, encoding=DELTA_BINARY_PACKED"` // In Microseconds
ReceivedAt int64 `json:"receivedAt" parquet:"name=received_at, type=INT64, encoding=DELTA_BINARY_PACKED"` // In Microseconds
FailedAt int64 `json:"failedAt" parquet:"name=failed_at, type=INT64, encoding=DELTA_BINARY_PACKED"` // In Microseconds
}

func (p *payload) SetReceivedAt(t time.Time) {
Expand Down
34 changes: 14 additions & 20 deletions enterprise/reporting/error_index/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,18 +134,18 @@ func TestWorkerWriter(t *testing.T) {

t.Run("count all", func(t *testing.T) {
var count int64
err := duckDB(t).QueryRowContext(ctx, fmt.Sprintf("SELECT count(*) FROM read_parquet('%s');", filePath)).Scan(&count)
err := duckDB(t).QueryRowContext(ctx, "SELECT count(*) FROM read_parquet($1);", filePath).Scan(&count)
require.NoError(t, err)
require.EqualValues(t, len(payloads), count)
})
t.Run("count for sourceId, destinationId", func(t *testing.T) {
var count int64
err := duckDB(t).QueryRowContext(ctx, fmt.Sprintf("SELECT count(*) FROM read_parquet('%s') WHERE source_id = $1 AND destination_id = $2;", filePath), "sourceId3", "destinationId3").Scan(&count)
err := duckDB(t).QueryRowContext(ctx, "SELECT count(*) FROM read_parquet($1) WHERE source_id = $2 AND destination_id = $3;", filePath, "sourceId3", "destinationId3").Scan(&count)
require.NoError(t, err)
require.EqualValues(t, 10, count)
})
t.Run("select all", func(t *testing.T) {
failedMessages := failedMessagesUsingDuckDB(t, ctx, nil, fmt.Sprintf("SELECT * FROM read_parquet('%s') ORDER BY failed_at DESC;", filePath))
failedMessages := failedMessagesUsingDuckDB(t, ctx, nil, "SELECT * FROM read_parquet($1) ORDER BY failed_at DESC;", []interface{}{filePath})

for i, failedMessage := range failedMessages {
require.Equal(t, payloads[i].MessageID, failedMessage.MessageID)
Expand All @@ -156,8 +156,8 @@ func TestWorkerWriter(t *testing.T) {
require.Equal(t, payloads[i].FailedStage, failedMessage.FailedStage)
require.Equal(t, payloads[i].EventType, failedMessage.EventType)
require.Equal(t, payloads[i].EventName, failedMessage.EventName)
require.EqualValues(t, payloads[i].ReceivedAt, failedMessage.ReceivedAt)
require.EqualValues(t, payloads[i].FailedAt, failedMessage.FailedAt)
require.Equal(t, payloads[i].ReceivedAt, failedMessage.ReceivedAt)
require.Equal(t, payloads[i].FailedAt, failedMessage.FailedAt)
}
})
})
Expand Down Expand Up @@ -264,8 +264,7 @@ func TestWorkerWriter(t *testing.T) {
lastFailedAt.Unix(),
instanceID,
)
query := fmt.Sprintf("SELECT * FROM read_parquet('%s') ORDER BY failed_at ASC;", filePath)
failedMessages := failedMessagesUsingDuckDB(t, ctx, minioResource, query)
failedMessages := failedMessagesUsingDuckDB(t, ctx, minioResource, "SELECT * FROM read_parquet($1) WHERE failed_at >= $2 AND failed_at <= $3 ORDER BY failed_at ASC;", []interface{}{filePath, failedAt.UTC().UnixMicro(), lastFailedAt.UTC().UnixMicro()})
require.Len(t, failedMessages, len(jobs))
require.EqualValues(t, payloads, failedMessages)

Expand All @@ -277,7 +276,7 @@ func TestWorkerWriter(t *testing.T) {
lastFailedAt.Unix(),
instanceID,
)
s3SelectQuery := fmt.Sprint("SELECT message_id, source_id, destination_id, transformation_id, tracking_plan_id, failed_stage, event_type, event_name, received_at, failed_at FROM S3Object")
s3SelectQuery := fmt.Sprintf("SELECT message_id, source_id, destination_id, transformation_id, tracking_plan_id, failed_stage, event_type, event_name, received_at, failed_at FROM S3Object WHERE failed_at >= %d AND failed_at <= %d", failedAt.UTC().UnixMicro(), lastFailedAt.UTC().UnixMicro())
failedMessagesUsing3Select := failedMessagesUsingMinioS3Select(t, ctx, minioResource, s3SelectPath, s3SelectQuery)
slices.SortFunc(failedMessagesUsing3Select, func(a, b payload) int {
return a.FailedAtTime().Compare(b.FailedAtTime())
Expand Down Expand Up @@ -382,17 +381,16 @@ func TestWorkerWriter(t *testing.T) {

for i := 0; i < count; i++ {
failedAt := failedAt.Add(time.Duration(i) * time.Hour)
query := fmt.Sprintf("SELECT * FROM read_parquet('%s') ORDER BY failed_at ASC;", fmt.Sprintf("s3://%s/%s/%s/%s/%d_%d_%s.parquet",
filePath := fmt.Sprintf("s3://%s/%s/%s/%s/%d_%d_%s.parquet",
minioResource.BucketName,
w.sourceID,
failedAt.Format("2006-01-02"),
strconv.Itoa(failedAt.Hour()),
failedAt.Unix(),
failedAt.Unix(),
instanceID,
))

failedMessages := failedMessagesUsingDuckDB(t, ctx, minioResource, query)
)
failedMessages := failedMessagesUsingDuckDB(t, ctx, minioResource, "SELECT * FROM read_parquet($1) WHERE failed_at >= $2 AND failed_at <= $3 ORDER BY failed_at ASC;", []interface{}{filePath, failedAt.UTC().UnixMicro(), failedAt.UTC().UnixMicro()})
require.EqualValues(t, []payload{payloads[i]}, failedMessages)
}

Expand Down Expand Up @@ -578,7 +576,7 @@ func failedMessagesUsingMinioS3Select(t testing.TB, ctx context.Context, mr *res
return payloads
}

func failedMessagesUsingDuckDB(t testing.TB, ctx context.Context, mr *resource.MinioResource, query string) []payload {
func failedMessagesUsingDuckDB(t testing.TB, ctx context.Context, mr *resource.MinioResource, query string, queryArgs []interface{}) []payload {
t.Helper()

db := duckDB(t)
Expand All @@ -593,23 +591,19 @@ func failedMessagesUsingDuckDB(t testing.TB, ctx context.Context, mr *resource.M
require.NoError(t, err)
}

rows, err := db.QueryContext(ctx, query)
rows, err := db.QueryContext(ctx, query, queryArgs...)
require.NoError(t, err)
defer func() { _ = rows.Close() }()

var expectedPayloads []payload
for rows.Next() {
var p payload
var receivedAt time.Time
var failedAt time.Time
require.NoError(t, rows.Scan(
&p.MessageID, &p.SourceID, &p.DestinationID,
&p.TransformationID, &p.TrackingPlanID, &p.FailedStage,
&p.EventType, &p.EventName, &receivedAt,
&failedAt,
&p.EventType, &p.EventName, &p.ReceivedAt,
&p.FailedAt,
))
p.SetReceivedAt(receivedAt)
p.SetFailedAt(failedAt)
expectedPayloads = append(expectedPayloads, p)
}
require.NoError(t, rows.Err())
Expand Down

0 comments on commit b810b26

Please sign in to comment.