Skip to content

Commit

Permalink
chore: clone testhelper
Browse files Browse the repository at this point in the history
  • Loading branch information
fracasula committed Oct 25, 2023
1 parent fb482dd commit 552bc92
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 266 deletions.
20 changes: 20 additions & 0 deletions testhelper/clone.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package testhelper

import (
"encoding/json"
"testing"

"github.com/stretchr/testify/require"
)

func Clone[T any](t testing.TB, v T) T {
t.Helper()

buf, err := json.Marshal(v)
require.NoError(t, err)

var clone T
require.NoError(t, json.Unmarshal(buf, &clone))

return clone
}
106 changes: 48 additions & 58 deletions warehouse/integrations/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ import (
kithelper "github.com/rudderlabs/rudder-go-kit/testhelper"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/runner"
th "github.com/rudderlabs/rudder-server/testhelper"
"github.com/rudderlabs/rudder-server/testhelper/health"
"github.com/rudderlabs/rudder-server/testhelper/workspaceConfig"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/warehouse/client"
whbigquery "github.com/rudderlabs/rudder-server/warehouse/integrations/bigquery"
bqHelper "github.com/rudderlabs/rudder-server/warehouse/integrations/bigquery/testhelper"
"github.com/rudderlabs/rudder-server/warehouse/integrations/testhelper"
whth "github.com/rudderlabs/rudder-server/warehouse/integrations/testhelper"
mockuploader "github.com/rudderlabs/rudder-server/warehouse/internal/mocks/utils"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
Expand Down Expand Up @@ -66,8 +67,8 @@ func TestIntegration(t *testing.T) {
sourcesDestinationID := warehouseutils.RandHex()
sourcesWriteKey := warehouseutils.RandHex()
destType := warehouseutils.BQ
namespace := testhelper.RandSchema(destType)
sourcesNamespace := testhelper.RandSchema(destType)
namespace := whth.RandSchema(destType)
sourcesNamespace := whth.RandSchema(destType)

bqTestCredentials, err := bqHelper.GetBQTestCredentials()
require.NoError(t, err)
Expand Down Expand Up @@ -96,7 +97,7 @@ func TestIntegration(t *testing.T) {
}
workspaceConfigPath := workspaceConfig.CreateTempFile(t, "testdata/template.json", templateConfigurations)

testhelper.EnhanceWithDefaultEnvs(t)
whth.EnhanceWithDefaultEnvs(t)
t.Setenv("JOBS_DB_PORT", strconv.Itoa(jobsDBPort))
t.Setenv("WAREHOUSE_JOBS_DB_PORT", strconv.Itoa(jobsDBPort))
t.Setenv("RSERVER_WAREHOUSE_BIGQUERY_MAX_PARALLEL_LOADS", "8")
Expand Down Expand Up @@ -133,7 +134,7 @@ func TestIntegration(t *testing.T) {
}

t.Run("Event flow", func(t *testing.T) {
jobsDB := testhelper.JobsDB(t, jobsDBPort)
jobsDB := whth.JobsDB(t, jobsDBPort)

testcase := []struct {
name string
Expand All @@ -142,11 +143,11 @@ func TestIntegration(t *testing.T) {
sourceID string
destinationID string
tables []string
stagingFilesEventsMap testhelper.EventsCountMap
stagingFilesModifiedEventsMap testhelper.EventsCountMap
loadFilesEventsMap testhelper.EventsCountMap
tableUploadsEventsMap testhelper.EventsCountMap
warehouseEventsMap testhelper.EventsCountMap
stagingFilesEventsMap whth.EventsCountMap
stagingFilesModifiedEventsMap whth.EventsCountMap
loadFilesEventsMap whth.EventsCountMap
tableUploadsEventsMap whth.EventsCountMap
warehouseEventsMap whth.EventsCountMap
asyncJob bool
skipModifiedEvents bool
prerequisite func(context.Context, testing.TB, *bigquery.Client)
Expand Down Expand Up @@ -182,15 +183,15 @@ func TestIntegration(t *testing.T) {
destinationID: sourcesDestinationID,
schema: sourcesNamespace,
tables: []string{"tracks", "google_sheet"},
stagingFilesEventsMap: testhelper.EventsCountMap{
stagingFilesEventsMap: whth.EventsCountMap{
"wh_staging_files": 9, // 8 + 1 (merge events because of ID resolution)
},
stagingFilesModifiedEventsMap: testhelper.EventsCountMap{
stagingFilesModifiedEventsMap: whth.EventsCountMap{
"wh_staging_files": 8, // 8 (de-duped by encounteredMergeRuleMap)
},
loadFilesEventsMap: testhelper.SourcesLoadFilesEventsMap(),
tableUploadsEventsMap: testhelper.SourcesTableUploadsEventsMap(),
warehouseEventsMap: testhelper.SourcesWarehouseEventsMap(),
loadFilesEventsMap: whth.SourcesLoadFilesEventsMap(),
tableUploadsEventsMap: whth.SourcesTableUploadsEventsMap(),
warehouseEventsMap: whth.SourcesWarehouseEventsMap(),
asyncJob: true,
enableMerge: false,
prerequisite: func(ctx context.Context, t testing.TB, db *bigquery.Client) {
Expand Down Expand Up @@ -308,7 +309,7 @@ func TestIntegration(t *testing.T) {
}

t.Log("verifying test case 1")
ts1 := testhelper.TestConfig{
ts1 := whth.TestConfig{
WriteKey: tc.writeKey,
Schema: tc.schema,
Tables: tc.tables,
Expand All @@ -327,7 +328,7 @@ func TestIntegration(t *testing.T) {
JobRunID: misc.FastUUID().String(),
TaskRunID: misc.FastUUID().String(),
StagingFilePath: tc.stagingFilePrefix + ".staging-1.json",
UserID: testhelper.GetUserId(destType),
UserID: whth.GetUserId(destType),
}
ts1.VerifyEvents(t)

Expand All @@ -336,7 +337,7 @@ func TestIntegration(t *testing.T) {
}

t.Log("verifying test case 2")
ts2 := testhelper.TestConfig{
ts2 := whth.TestConfig{
WriteKey: tc.writeKey,
Schema: tc.schema,
Tables: tc.tables,
Expand All @@ -356,7 +357,7 @@ func TestIntegration(t *testing.T) {
JobRunID: misc.FastUUID().String(),
TaskRunID: misc.FastUUID().String(),
StagingFilePath: tc.stagingFilePrefix + ".staging-2.json",
UserID: testhelper.GetUserId(destType),
UserID: whth.GetUserId(destType),
}
if tc.asyncJob {
ts2.UserID = ts1.UserID
Expand Down Expand Up @@ -408,7 +409,7 @@ func TestIntegration(t *testing.T) {
Enabled: true,
RevisionID: destinationID,
}
testhelper.VerifyConfigurationTest(t, dest)
whth.VerifyConfigurationTest(t, dest)
})

t.Run("Load Table", func(t *testing.T) {
Expand All @@ -418,7 +419,7 @@ func TestIntegration(t *testing.T) {
workspaceID = "test_workspace_id"
)

namespace := testhelper.RandSchema(destType)
namespace := whth.RandSchema(destType)

ctx := context.Background()
db, err := bigquery.NewClient(ctx,
Expand Down Expand Up @@ -502,7 +503,7 @@ func TestIntegration(t *testing.T) {
t.Run("schema does not exist", func(t *testing.T) {
tableName := "schema_not_exists_test_table"

uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/load.json.gz", tableName)
uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.json.gz", tableName)

loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse)
Expand All @@ -518,7 +519,7 @@ func TestIntegration(t *testing.T) {
t.Run("table does not exist", func(t *testing.T) {
tableName := "table_not_exists_test_table"

uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/load.json.gz", tableName)
uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.json.gz", tableName)

loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse)
Expand All @@ -536,12 +537,12 @@ func TestIntegration(t *testing.T) {
})
t.Run("merge with dedup", func(t *testing.T) {
tableName := "merge_with_dedup_test_table"
uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/load.json.gz", tableName)
uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.json.gz", tableName)

loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse)

dedupWarehouse := cloneWarehouse(t, warehouse)
dedupWarehouse := th.Clone(t, warehouse)
dedupWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true

c := config.New()
Expand Down Expand Up @@ -580,11 +581,11 @@ func TestIntegration(t *testing.T) {
fmt.Sprintf("`%s`.`%s`", namespace, tableName),
),
)
require.Equal(t, records, testhelper.SampleTestRecords())
require.Equal(t, records, whth.SampleTestRecords())
})
t.Run("merge without dedup", func(t *testing.T) {
tableName := "merge_without_dedup_test_table"
uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/dedup.json.gz", tableName)
uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/dedup.json.gz", tableName)

loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse)
Expand Down Expand Up @@ -619,7 +620,7 @@ func TestIntegration(t *testing.T) {
fmt.Sprintf("`%s`.`%s`", namespace, tableName),
)
records := bqHelper.RetrieveRecordsFromWarehouse(t, db, retrieveRecordsSQL)
require.Equal(t, records, testhelper.DedupTestRecords())
require.Equal(t, records, whth.DedupTestRecords())

loadTableStat, err = bq.LoadTable(ctx, tableName)
require.NoError(t, err)
Expand All @@ -632,7 +633,7 @@ func TestIntegration(t *testing.T) {
t.Run("append", func(t *testing.T) {
tableName := "append_test_table"

uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/load.json.gz", tableName)
uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.json.gz", tableName)

loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse)
Expand Down Expand Up @@ -676,7 +677,7 @@ func TestIntegration(t *testing.T) {
time.Now().Add(+24*time.Hour).Format("2006-01-02"),
),
)
require.Equal(t, records, testhelper.AppendTestRecords())
require.Equal(t, records, whth.AppendTestRecords())
})
t.Run("load file does not exists", func(t *testing.T) {
tableName := "load_file_not_exists_test_table"
Expand All @@ -703,7 +704,7 @@ func TestIntegration(t *testing.T) {
t.Run("mismatch in number of columns", func(t *testing.T) {
tableName := "mismatch_columns_test_table"

uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/mismatch-columns.json.gz", tableName)
uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/mismatch-columns.json.gz", tableName)

loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse)
Expand All @@ -725,7 +726,7 @@ func TestIntegration(t *testing.T) {
t.Run("mismatch in schema", func(t *testing.T) {
tableName := "mismatch_schema_test_table"

uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/mismatch-schema.json.gz", tableName)
uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/mismatch-schema.json.gz", tableName)

loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse)
Expand All @@ -747,7 +748,7 @@ func TestIntegration(t *testing.T) {
t.Run("discards", func(t *testing.T) {
tableName := warehouseutils.DiscardsTable

uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/discards.json.gz", tableName)
uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/discards.json.gz", tableName)

loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, warehouseutils.DiscardsSchema, warehouseutils.DiscardsSchema)
Expand Down Expand Up @@ -781,12 +782,12 @@ func TestIntegration(t *testing.T) {
fmt.Sprintf("`%s`.`%s`", namespace, tableName),
),
)
require.Equal(t, records, testhelper.DiscardTestRecords())
require.Equal(t, records, whth.DiscardTestRecords())
})
t.Run("custom partition", func(t *testing.T) {
tableName := "partition_test_table"

uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/load.json.gz", tableName)
uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.json.gz", tableName)

loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(
Expand Down Expand Up @@ -832,7 +833,7 @@ func TestIntegration(t *testing.T) {
time.Now().Add(+24*time.Hour).Format("2006-01-02"),
),
)
require.Equal(t, records, testhelper.SampleTestRecords())
require.Equal(t, records, whth.SampleTestRecords())
})
})

Expand All @@ -845,7 +846,7 @@ func TestIntegration(t *testing.T) {
require.NoError(t, err)
t.Cleanup(func() { _ = db.Close() })

namespace := testhelper.RandSchema(warehouseutils.BQ)
namespace := whth.RandSchema(warehouseutils.BQ)
t.Cleanup(func() {
require.Eventually(t,
func() bool {
Expand Down Expand Up @@ -985,8 +986,8 @@ func newMockUploader(
return mockUploader
}

func loadFilesEventsMap() testhelper.EventsCountMap {
return testhelper.EventsCountMap{
func loadFilesEventsMap() whth.EventsCountMap {
return whth.EventsCountMap{
"identifies": 4,
"users": 4,
"tracks": 4,
Expand All @@ -999,8 +1000,8 @@ func loadFilesEventsMap() testhelper.EventsCountMap {
}
}

func tableUploadsEventsMap() testhelper.EventsCountMap {
return testhelper.EventsCountMap{
func tableUploadsEventsMap() whth.EventsCountMap {
return whth.EventsCountMap{
"identifies": 4,
"users": 4,
"tracks": 4,
Expand All @@ -1013,14 +1014,14 @@ func tableUploadsEventsMap() testhelper.EventsCountMap {
}
}

func stagingFilesEventsMap() testhelper.EventsCountMap {
return testhelper.EventsCountMap{
func stagingFilesEventsMap() whth.EventsCountMap {
return whth.EventsCountMap{
"wh_staging_files": 34, // Since extra 2 merge events because of ID resolution
}
}

func mergeEventsMap() testhelper.EventsCountMap {
return testhelper.EventsCountMap{
func mergeEventsMap() whth.EventsCountMap {
return whth.EventsCountMap{
"identifies": 1,
"users": 1,
"tracks": 1,
Expand All @@ -1033,8 +1034,8 @@ func mergeEventsMap() testhelper.EventsCountMap {
}
}

func appendEventsMap() testhelper.EventsCountMap {
return testhelper.EventsCountMap{
func appendEventsMap() whth.EventsCountMap {
return whth.EventsCountMap{
"identifies": 4,
"users": 1,
"tracks": 4,
Expand All @@ -1057,14 +1058,3 @@ func TestUnsupportedCredentials(t *testing.T) {
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "client_credentials.json file is not supported")
}

func cloneWarehouse(t *testing.T, wh model.Warehouse) model.Warehouse {
t.Helper()
buf, err := json.Marshal(wh)
require.NoError(t, err)

var clone model.Warehouse
require.NoError(t, json.Unmarshal(buf, &clone))

return clone
}
Loading

0 comments on commit 552bc92

Please sign in to comment.