Skip to content

Commit

Permalink
chore: sync release v1.35.2 to main branch (#5172)
Browse files Browse the repository at this point in the history
  • Loading branch information
devops-github-rudderstack authored Oct 4, 2024
1 parent 3d4ebf8 commit 2fa0fa0
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 33 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changelog

## [1.35.2](https://github.com/rudderlabs/rudder-server/compare/v1.35.1...v1.35.2) (2024-10-04)


### Bug Fixes

* bigquery validations for partition column and type ([#5168](https://github.com/rudderlabs/rudder-server/issues/5168)) ([a2af47a](https://github.com/rudderlabs/rudder-server/commit/a2af47afe735210dd8355f141a8d38d298bfb2b5))
* change retl check to source category ([#5167](https://github.com/rudderlabs/rudder-server/issues/5167)) ([3ed1302](https://github.com/rudderlabs/rudder-server/commit/3ed1302abaebc255ed194da51aa16ffed1a7c191))

## [1.35.1](https://github.com/rudderlabs/rudder-server/compare/v1.35.0...v1.35.1) (2024-10-03)


Expand Down
2 changes: 1 addition & 1 deletion router/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (w *worker) workLoop() {
DestinationID: parameters.DestinationID,
}]
w.rt.destinationsMapMu.RUnlock()
if !destOK || (parameters.SourceJobRunID != "" && !connOK) {
if !destOK || (parameters.SourceCategory == "warehouse" && !connOK) {
continue
}
destination := batchDestination.Destination
Expand Down
13 changes: 12 additions & 1 deletion warehouse/integrations/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -1195,7 +1195,18 @@ func (bq *BigQuery) LoadTestTable(ctx context.Context, location, tableName strin
gcsRef.MaxBadRecords = 0
gcsRef.IgnoreUnknownValues = false

outputTable := partitionedTable(tableName, bq.now().Format("2006-01-02"))
partitionDate, err := bq.partitionDate()
if err != nil {
return fmt.Errorf("partition date: %w", err)
}

var outputTable string
if bq.avoidPartitionDecorator() {
outputTable = tableName
} else {
outputTable = partitionedTable(tableName, partitionDate)
}

loader := bq.db.Dataset(bq.namespace).Table(outputTable).LoaderFrom(gcsRef)

job, err := loader.Run(ctx)
Expand Down
108 changes: 77 additions & 31 deletions warehouse/integrations/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,40 +641,86 @@ func TestIntegration(t *testing.T) {
})

t.Run("Validations", func(t *testing.T) {
ctx := context.Background()
namespace := whth.RandSchema(destType)

db, err := bigquery.NewClient(ctx,
credentials.ProjectID,
option.WithCredentialsJSON([]byte(credentials.Credentials)),
)
require.NoError(t, err)
t.Cleanup(func() { _ = db.Close() })
t.Cleanup(func() {
dropSchema(t, db, namespace)
})

dest := backendconfig.DestinationT{
ID: "test_destination_id",
Config: map[string]interface{}{
"project": credentials.ProjectID,
"location": credentials.Location,
"bucketName": credentials.BucketName,
"credentials": credentials.Credentials,
"prefix": "",
"namespace": namespace,
"syncFrequency": "30",
testCases := []struct {
name string
configOverride map[string]any
}{
{
name: "default partitionColumn and partitionType",
},
{
name: "partitionColumn: _PARTITIONTIME, partitionType: day",
configOverride: map[string]any{
"partitionColumn": "_PARTITIONTIME",
"partitionType": "day",
},
},
{
name: "partitionColumn: _PARTITIONTIME, partitionType: hour",
configOverride: map[string]any{
"partitionColumn": "_PARTITIONTIME",
"partitionType": "hour",
},
},
{
name: "partitionColumn: received_at, partitionType: hour",
configOverride: map[string]any{
"partitionColumn": "received_at",
"partitionType": "hour",
},
},
DestinationDefinition: backendconfig.DestinationDefinitionT{
ID: "1UmeD7xhVGHsPDEHoCiSPEGytS3",
Name: "BQ",
DisplayName: "BigQuery",
{
name: "partitionColumn: received_at, partitionType: day",
configOverride: map[string]any{
"partitionColumn": "loaded_at",
"partitionType": "day",
},
},
Name: "bigquery-integration",
Enabled: true,
RevisionID: "test_destination_id",
}
whth.VerifyConfigurationTest(t, dest)

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
namespace := whth.RandSchema(destType)

db, err := bigquery.NewClient(ctx,
credentials.ProjectID,
option.WithCredentialsJSON([]byte(credentials.Credentials)),
)
require.NoError(t, err)
t.Cleanup(func() { _ = db.Close() })
t.Cleanup(func() {
dropSchema(t, db, namespace)
})

conf := map[string]interface{}{
"project": credentials.ProjectID,
"location": credentials.Location,
"bucketName": credentials.BucketName,
"credentials": credentials.Credentials,
"prefix": "",
"namespace": namespace,
"syncFrequency": "30",
}
for k, v := range tc.configOverride {
conf[k] = v
}

dest := backendconfig.DestinationT{
ID: "test_destination_id",
Config: conf,
DestinationDefinition: backendconfig.DestinationDefinitionT{
ID: "1UmeD7xhVGHsPDEHoCiSPEGytS3",
Name: "BQ",
DisplayName: "BigQuery",
},
Name: "bigquery-integration",
Enabled: true,
RevisionID: "test_destination_id",
}
whth.VerifyConfigurationTest(t, dest)
})
}
})

t.Run("Load Table", func(t *testing.T) {
Expand Down

0 comments on commit 2fa0fa0

Please sign in to comment.