diff --git a/testhelper/clone.go b/testhelper/clone.go new file mode 100644 index 00000000000..8c37b67529a --- /dev/null +++ b/testhelper/clone.go @@ -0,0 +1,20 @@ +package testhelper + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/require" +) + +func Clone[T any](t testing.TB, v T) T { + t.Helper() + + buf, err := json.Marshal(v) + require.NoError(t, err) + + var clone T + require.NoError(t, json.Unmarshal(buf, &clone)) + + return clone +} diff --git a/warehouse/integrations/bigquery/bigquery_test.go b/warehouse/integrations/bigquery/bigquery_test.go index b3277ea13c9..64341c0092d 100644 --- a/warehouse/integrations/bigquery/bigquery_test.go +++ b/warehouse/integrations/bigquery/bigquery_test.go @@ -25,13 +25,14 @@ import ( kithelper "github.com/rudderlabs/rudder-go-kit/testhelper" backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/runner" + th "github.com/rudderlabs/rudder-server/testhelper" "github.com/rudderlabs/rudder-server/testhelper/health" "github.com/rudderlabs/rudder-server/testhelper/workspaceConfig" "github.com/rudderlabs/rudder-server/utils/misc" "github.com/rudderlabs/rudder-server/warehouse/client" whbigquery "github.com/rudderlabs/rudder-server/warehouse/integrations/bigquery" bqHelper "github.com/rudderlabs/rudder-server/warehouse/integrations/bigquery/testhelper" - "github.com/rudderlabs/rudder-server/warehouse/integrations/testhelper" + whth "github.com/rudderlabs/rudder-server/warehouse/integrations/testhelper" mockuploader "github.com/rudderlabs/rudder-server/warehouse/internal/mocks/utils" "github.com/rudderlabs/rudder-server/warehouse/internal/model" warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" @@ -66,8 +67,8 @@ func TestIntegration(t *testing.T) { sourcesDestinationID := warehouseutils.RandHex() sourcesWriteKey := warehouseutils.RandHex() destType := warehouseutils.BQ - namespace := testhelper.RandSchema(destType) - sourcesNamespace := testhelper.RandSchema(destType) + namespace := whth.RandSchema(destType) + sourcesNamespace := whth.RandSchema(destType) bqTestCredentials, err := bqHelper.GetBQTestCredentials() require.NoError(t, err) @@ -96,7 +97,7 @@ func TestIntegration(t *testing.T) { } workspaceConfigPath := workspaceConfig.CreateTempFile(t, "testdata/template.json", templateConfigurations) - testhelper.EnhanceWithDefaultEnvs(t) + whth.EnhanceWithDefaultEnvs(t) t.Setenv("JOBS_DB_PORT", strconv.Itoa(jobsDBPort)) t.Setenv("WAREHOUSE_JOBS_DB_PORT", strconv.Itoa(jobsDBPort)) t.Setenv("RSERVER_WAREHOUSE_BIGQUERY_MAX_PARALLEL_LOADS", "8") @@ -133,7 +134,7 @@ func TestIntegration(t *testing.T) { } t.Run("Event flow", func(t *testing.T) { - jobsDB := testhelper.JobsDB(t, jobsDBPort) + jobsDB := whth.JobsDB(t, jobsDBPort) testcase := []struct { name string @@ -142,11 +143,11 @@ func TestIntegration(t *testing.T) { sourceID string destinationID string tables []string - stagingFilesEventsMap testhelper.EventsCountMap - stagingFilesModifiedEventsMap testhelper.EventsCountMap - loadFilesEventsMap testhelper.EventsCountMap - tableUploadsEventsMap testhelper.EventsCountMap - warehouseEventsMap testhelper.EventsCountMap + stagingFilesEventsMap whth.EventsCountMap + stagingFilesModifiedEventsMap whth.EventsCountMap + loadFilesEventsMap whth.EventsCountMap + tableUploadsEventsMap whth.EventsCountMap + warehouseEventsMap whth.EventsCountMap asyncJob bool skipModifiedEvents bool prerequisite func(context.Context, testing.TB, *bigquery.Client) @@ -182,15 +183,15 @@ func TestIntegration(t *testing.T) { destinationID: sourcesDestinationID, schema: sourcesNamespace, tables: []string{"tracks", "google_sheet"}, - stagingFilesEventsMap: testhelper.EventsCountMap{ + stagingFilesEventsMap: whth.EventsCountMap{ "wh_staging_files": 9, // 8 + 1 (merge events because of ID resolution) }, - stagingFilesModifiedEventsMap: testhelper.EventsCountMap{ + stagingFilesModifiedEventsMap: whth.EventsCountMap{ "wh_staging_files": 8, // 8 (de-duped by encounteredMergeRuleMap) }, - loadFilesEventsMap: testhelper.SourcesLoadFilesEventsMap(), - tableUploadsEventsMap: testhelper.SourcesTableUploadsEventsMap(), - warehouseEventsMap: testhelper.SourcesWarehouseEventsMap(), + loadFilesEventsMap: whth.SourcesLoadFilesEventsMap(), + tableUploadsEventsMap: whth.SourcesTableUploadsEventsMap(), + warehouseEventsMap: whth.SourcesWarehouseEventsMap(), asyncJob: true, enableMerge: false, prerequisite: func(ctx context.Context, t testing.TB, db *bigquery.Client) { @@ -308,7 +309,7 @@ func TestIntegration(t *testing.T) { } t.Log("verifying test case 1") - ts1 := testhelper.TestConfig{ + ts1 := whth.TestConfig{ WriteKey: tc.writeKey, Schema: tc.schema, Tables: tc.tables, @@ -327,7 +328,7 @@ func TestIntegration(t *testing.T) { JobRunID: misc.FastUUID().String(), TaskRunID: misc.FastUUID().String(), StagingFilePath: tc.stagingFilePrefix + ".staging-1.json", - UserID: testhelper.GetUserId(destType), + UserID: whth.GetUserId(destType), } ts1.VerifyEvents(t) @@ -336,7 +337,7 @@ func TestIntegration(t *testing.T) { } t.Log("verifying test case 2") - ts2 := testhelper.TestConfig{ + ts2 := whth.TestConfig{ WriteKey: tc.writeKey, Schema: tc.schema, Tables: tc.tables, @@ -356,7 +357,7 @@ func TestIntegration(t *testing.T) { JobRunID: misc.FastUUID().String(), TaskRunID: misc.FastUUID().String(), StagingFilePath: tc.stagingFilePrefix + ".staging-2.json", - UserID: testhelper.GetUserId(destType), + UserID: whth.GetUserId(destType), } if tc.asyncJob { ts2.UserID = ts1.UserID @@ -408,7 +409,7 @@ func TestIntegration(t *testing.T) { Enabled: true, RevisionID: destinationID, } - testhelper.VerifyConfigurationTest(t, dest) + whth.VerifyConfigurationTest(t, dest) }) t.Run("Load Table", func(t *testing.T) { @@ -418,7 +419,7 @@ func TestIntegration(t *testing.T) { workspaceID = "test_workspace_id" ) - namespace := testhelper.RandSchema(destType) + namespace := whth.RandSchema(destType) ctx := context.Background() db, err := bigquery.NewClient(ctx, @@ -502,7 +503,7 @@ func TestIntegration(t *testing.T) { t.Run("schema does not exist", func(t *testing.T) { tableName := "schema_not_exists_test_table" - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/load.json.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.json.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse) @@ -518,7 +519,7 @@ func TestIntegration(t *testing.T) { t.Run("table does not exist", func(t *testing.T) { tableName := "table_not_exists_test_table" - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/load.json.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.json.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse) @@ -536,12 +537,12 @@ func TestIntegration(t *testing.T) { }) t.Run("merge with dedup", func(t *testing.T) { tableName := "merge_with_dedup_test_table" - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/load.json.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.json.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse) - dedupWarehouse := cloneWarehouse(t, warehouse) + dedupWarehouse := th.Clone(t, warehouse) dedupWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true c := config.New() @@ -580,11 +581,11 @@ func TestIntegration(t *testing.T) { fmt.Sprintf("`%s`.`%s`", namespace, tableName), ), ) - require.Equal(t, records, testhelper.SampleTestRecords()) + require.Equal(t, records, whth.SampleTestRecords()) }) t.Run("merge without dedup", func(t *testing.T) { tableName := "merge_without_dedup_test_table" - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/dedup.json.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/dedup.json.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse) @@ -619,7 +620,7 @@ func TestIntegration(t *testing.T) { fmt.Sprintf("`%s`.`%s`", namespace, tableName), ) records := bqHelper.RetrieveRecordsFromWarehouse(t, db, retrieveRecordsSQL) - require.Equal(t, records, testhelper.DedupTestRecords()) + require.Equal(t, records, whth.DedupTestRecords()) loadTableStat, err = bq.LoadTable(ctx, tableName) require.NoError(t, err) @@ -632,7 +633,7 @@ func TestIntegration(t *testing.T) { t.Run("append", func(t *testing.T) { tableName := "append_test_table" - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/load.json.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.json.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse) @@ -676,7 +677,7 @@ func TestIntegration(t *testing.T) { time.Now().Add(+24*time.Hour).Format("2006-01-02"), ), ) - require.Equal(t, records, testhelper.AppendTestRecords()) + require.Equal(t, records, whth.AppendTestRecords()) }) t.Run("load file does not exists", func(t *testing.T) { tableName := "load_file_not_exists_test_table" @@ -703,7 +704,7 @@ func TestIntegration(t *testing.T) { t.Run("mismatch in number of columns", func(t *testing.T) { tableName := "mismatch_columns_test_table" - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/mismatch-columns.json.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/mismatch-columns.json.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse) @@ -725,7 +726,7 @@ func TestIntegration(t *testing.T) { t.Run("mismatch in schema", func(t *testing.T) { tableName := "mismatch_schema_test_table" - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/mismatch-schema.json.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/mismatch-schema.json.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse) @@ -747,7 +748,7 @@ func TestIntegration(t *testing.T) { t.Run("discards", func(t *testing.T) { tableName := warehouseutils.DiscardsTable - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/discards.json.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/discards.json.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, warehouseutils.DiscardsSchema, warehouseutils.DiscardsSchema) @@ -781,12 +782,12 @@ func TestIntegration(t *testing.T) { fmt.Sprintf("`%s`.`%s`", namespace, tableName), ), ) - require.Equal(t, records, testhelper.DiscardTestRecords()) + require.Equal(t, records, whth.DiscardTestRecords()) }) t.Run("custom partition", func(t *testing.T) { tableName := "partition_test_table" - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/load.json.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.json.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader( @@ -832,7 +833,7 @@ func TestIntegration(t *testing.T) { time.Now().Add(+24*time.Hour).Format("2006-01-02"), ), ) - require.Equal(t, records, testhelper.SampleTestRecords()) + require.Equal(t, records, whth.SampleTestRecords()) }) }) @@ -845,7 +846,7 @@ func TestIntegration(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { _ = db.Close() }) - namespace := testhelper.RandSchema(warehouseutils.BQ) + namespace := whth.RandSchema(warehouseutils.BQ) t.Cleanup(func() { require.Eventually(t, func() bool { @@ -985,8 +986,8 @@ func newMockUploader( return mockUploader } -func loadFilesEventsMap() testhelper.EventsCountMap { - return testhelper.EventsCountMap{ +func loadFilesEventsMap() whth.EventsCountMap { + return whth.EventsCountMap{ "identifies": 4, "users": 4, "tracks": 4, @@ -999,8 +1000,8 @@ func loadFilesEventsMap() testhelper.EventsCountMap { } } -func tableUploadsEventsMap() testhelper.EventsCountMap { - return testhelper.EventsCountMap{ +func tableUploadsEventsMap() whth.EventsCountMap { + return whth.EventsCountMap{ "identifies": 4, "users": 4, "tracks": 4, @@ -1013,14 +1014,14 @@ func tableUploadsEventsMap() testhelper.EventsCountMap { } } -func stagingFilesEventsMap() testhelper.EventsCountMap { - return testhelper.EventsCountMap{ +func stagingFilesEventsMap() whth.EventsCountMap { + return whth.EventsCountMap{ "wh_staging_files": 34, // Since extra 2 merge events because of ID resolution } } -func mergeEventsMap() testhelper.EventsCountMap { - return testhelper.EventsCountMap{ +func mergeEventsMap() whth.EventsCountMap { + return whth.EventsCountMap{ "identifies": 1, "users": 1, "tracks": 1, @@ -1033,8 +1034,8 @@ func mergeEventsMap() testhelper.EventsCountMap { } } -func appendEventsMap() testhelper.EventsCountMap { - return testhelper.EventsCountMap{ +func appendEventsMap() whth.EventsCountMap { + return whth.EventsCountMap{ "identifies": 4, "users": 1, "tracks": 4, @@ -1057,14 +1058,3 @@ func TestUnsupportedCredentials(t *testing.T) { assert.NotNil(t, err) assert.Contains(t, err.Error(), "client_credentials.json file is not supported") } - -func cloneWarehouse(t *testing.T, wh model.Warehouse) model.Warehouse { - t.Helper() - buf, err := json.Marshal(wh) - require.NoError(t, err) - - var clone model.Warehouse - require.NoError(t, json.Unmarshal(buf, &clone)) - - return clone -} diff --git a/warehouse/integrations/deltalake/deltalake_test.go b/warehouse/integrations/deltalake/deltalake_test.go index eb63c225671..838c3522a70 100644 --- a/warehouse/integrations/deltalake/deltalake_test.go +++ b/warehouse/integrations/deltalake/deltalake_test.go @@ -26,12 +26,13 @@ import ( kithelper "github.com/rudderlabs/rudder-go-kit/testhelper" backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/runner" + th "github.com/rudderlabs/rudder-server/testhelper" "github.com/rudderlabs/rudder-server/testhelper/health" "github.com/rudderlabs/rudder-server/testhelper/workspaceConfig" "github.com/rudderlabs/rudder-server/utils/misc" warehouseclient "github.com/rudderlabs/rudder-server/warehouse/client" "github.com/rudderlabs/rudder-server/warehouse/integrations/deltalake" - "github.com/rudderlabs/rudder-server/warehouse/integrations/testhelper" + whth "github.com/rudderlabs/rudder-server/warehouse/integrations/testhelper" mockuploader "github.com/rudderlabs/rudder-server/warehouse/internal/mocks/utils" "github.com/rudderlabs/rudder-server/warehouse/internal/model" warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" @@ -94,7 +95,7 @@ func TestIntegration(t *testing.T) { destinationID := warehouseutils.RandHex() writeKey := warehouseutils.RandHex() destType := warehouseutils.DELTALAKE - namespace := testhelper.RandSchema(destType) + namespace := whth.RandSchema(destType) deltaLakeCredentials, err := deltaLakeTestCredentials() require.NoError(t, err) @@ -134,7 +135,7 @@ func TestIntegration(t *testing.T) { } workspaceConfigPath := workspaceConfig.CreateTempFile(t, "testdata/template.json", templateConfigurations) - testhelper.EnhanceWithDefaultEnvs(t) + whth.EnhanceWithDefaultEnvs(t) t.Setenv("JOBS_DB_PORT", strconv.Itoa(jobsDBPort)) t.Setenv("WAREHOUSE_JOBS_DB_PORT", strconv.Itoa(jobsDBPort)) t.Setenv("RSERVER_WAREHOUSE_DELTALAKE_MAX_PARALLEL_LOADS", "8") @@ -161,7 +162,7 @@ func TestIntegration(t *testing.T) { } t.Run("Event flow", func(t *testing.T) { - jobsDB := testhelper.JobsDB(t, jobsDBPort) + jobsDB := whth.JobsDB(t, jobsDBPort) t.Cleanup(func() { require.Eventually(t, @@ -184,7 +185,7 @@ func TestIntegration(t *testing.T) { sourceID string destinationID string messageID string - warehouseEventsMap testhelper.EventsCountMap + warehouseEventsMap whth.EventsCountMap enableMerge bool useParquetLoadFiles bool stagingFilePrefix string @@ -253,14 +254,14 @@ func TestIntegration(t *testing.T) { tables := []string{"identifies", "users", "tracks", "product_track", "pages", "screens", "aliases", "groups"} t.Log("verifying test case 1") - ts1 := testhelper.TestConfig{ + ts1 := whth.TestConfig{ WriteKey: writeKey, Schema: tc.schema, Tables: tables, SourceID: tc.sourceID, DestinationID: tc.destinationID, JobRunID: tc.jobRunID, - WarehouseEventsMap: testhelper.EventsCountMap{ + WarehouseEventsMap: whth.EventsCountMap{ "identifies": 1, "users": 1, "tracks": 1, @@ -277,12 +278,12 @@ func TestIntegration(t *testing.T) { HTTPPort: httpPort, Client: sqlClient, StagingFilePath: tc.stagingFilePrefix + ".staging-1.json", - UserID: testhelper.GetUserId(destType), + UserID: whth.GetUserId(destType), } ts1.VerifyEvents(t) t.Log("verifying test case 2") - ts2 := testhelper.TestConfig{ + ts2 := whth.TestConfig{ WriteKey: writeKey, Schema: tc.schema, Tables: tables, @@ -383,7 +384,7 @@ func TestIntegration(t *testing.T) { dest.Config[k] = v } - testhelper.VerifyConfigurationTest(t, dest) + whth.VerifyConfigurationTest(t, dest) }) } }) @@ -396,7 +397,7 @@ func TestIntegration(t *testing.T) { ) ctx := context.Background() - namespace := testhelper.RandSchema(destType) + namespace := whth.RandSchema(destType) cleanupSchema := func() { require.Eventually(t, func() bool { @@ -475,7 +476,7 @@ func TestIntegration(t *testing.T) { t.Run("schema does not exists", func(t *testing.T) { tableName := "schema_not_exists_test_table" - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv, false, false, "2022-12-15T06:53:49.640Z") @@ -491,7 +492,7 @@ func TestIntegration(t *testing.T) { t.Run("table does not exists", func(t *testing.T) { tableName := "table_not_exists_test_table" - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv, false, false, "2022-12-15T06:53:49.640Z") @@ -512,7 +513,7 @@ func TestIntegration(t *testing.T) { tableName := "merge_test_table" t.Run("without dedup", func(t *testing.T) { - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv, false, false, "2022-12-15T06:53:49.640Z") @@ -538,7 +539,7 @@ func TestIntegration(t *testing.T) { require.Equal(t, loadTableStat.RowsInserted, int64(0)) require.Equal(t, loadTableStat.RowsUpdated, int64(14)) - records := testhelper.RetrieveRecordsFromWarehouse(t, d.DB.DB, + records := whth.RetrieveRecordsFromWarehouse(t, d.DB.DB, fmt.Sprintf(` SELECT id, @@ -554,15 +555,15 @@ func TestIntegration(t *testing.T) { tableName, ), ) - require.Equal(t, records, testhelper.SampleTestRecords()) + require.Equal(t, records, whth.SampleTestRecords()) }) t.Run("with dedup use new record", func(t *testing.T) { - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/dedup.csv.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/dedup.csv.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv, true, true, "2022-12-15T06:53:49.640Z") - mergeWarehouse := cloneWarehouse(t, warehouse) + mergeWarehouse := th.Clone(t, warehouse) mergeWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true d := deltalake.New(config.New(), logger.NOP, stats.Default) @@ -595,24 +596,24 @@ func TestIntegration(t *testing.T) { namespace, tableName, ) - records := testhelper.RetrieveRecordsFromWarehouse(t, d.DB.DB, retrieveRecordsSQL) - require.Equal(t, records, testhelper.DedupTestRecords()) + records := whth.RetrieveRecordsFromWarehouse(t, d.DB.DB, retrieveRecordsSQL) + require.Equal(t, records, whth.DedupTestRecords()) loadTableStat, err = d.LoadTable(ctx, tableName) require.NoError(t, err) require.Equal(t, loadTableStat.RowsInserted, int64(0)) require.Equal(t, loadTableStat.RowsUpdated, int64(14)) - records = testhelper.RetrieveRecordsFromWarehouse(t, d.DB.DB, retrieveRecordsSQL) - require.Equal(t, records, testhelper.DedupTestRecords()) + records = whth.RetrieveRecordsFromWarehouse(t, d.DB.DB, retrieveRecordsSQL) + require.Equal(t, records, whth.DedupTestRecords()) }) t.Run("with no overlapping partition", func(t *testing.T) { - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/dedup.csv.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/dedup.csv.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv, false, false, "2022-11-15T06:53:49.640Z") - mergeWarehouse := cloneWarehouse(t, warehouse) + mergeWarehouse := th.Clone(t, warehouse) mergeWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true d := deltalake.New(config.New(), logger.NOP, stats.Default) @@ -636,7 +637,7 @@ func TestIntegration(t *testing.T) { require.Equal(t, loadTableStat.RowsInserted, int64(14)) require.Equal(t, loadTableStat.RowsUpdated, int64(0)) - records := testhelper.RetrieveRecordsFromWarehouse(t, d.DB.DB, + records := whth.RetrieveRecordsFromWarehouse(t, d.DB.DB, fmt.Sprintf( `SELECT id, @@ -652,13 +653,13 @@ func TestIntegration(t *testing.T) { tableName, ), ) - require.Equal(t, records, testhelper.DedupTwiceTestRecords()) + require.Equal(t, records, whth.DedupTwiceTestRecords()) }) }) t.Run("append", func(t *testing.T) { tableName := "append_test_table" - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv, true, false, "2022-12-15T06:53:49.640Z") @@ -684,7 +685,7 @@ func TestIntegration(t *testing.T) { require.Equal(t, loadTableStat.RowsInserted, int64(14)) require.Equal(t, loadTableStat.RowsUpdated, int64(0)) - records := testhelper.RetrieveRecordsFromWarehouse(t, d.DB.DB, + records := whth.RetrieveRecordsFromWarehouse(t, d.DB.DB, fmt.Sprintf( `SELECT id, @@ -700,7 +701,7 @@ func TestIntegration(t *testing.T) { tableName, ), ) - require.Equal(t, records, testhelper.AppendTestRecords()) + require.Equal(t, records, whth.AppendTestRecords()) }) t.Run("load file does not exists", func(t *testing.T) { tableName := "load_file_not_exists_test_table" @@ -728,7 +729,7 @@ func TestIntegration(t *testing.T) { t.Run("mismatch in number of columns", func(t *testing.T) { tableName := "mismatch_columns_test_table" - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/mismatch-columns.csv.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/mismatch-columns.csv.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv, false, false, "2022-12-15T06:53:49.640Z") @@ -749,7 +750,7 @@ func TestIntegration(t *testing.T) { require.Equal(t, loadTableStat.RowsInserted, int64(14)) require.Equal(t, loadTableStat.RowsUpdated, int64(0)) - records := testhelper.RetrieveRecordsFromWarehouse(t, d.DB.DB, + records := whth.RetrieveRecordsFromWarehouse(t, d.DB.DB, fmt.Sprintf( `SELECT id, @@ -765,12 +766,12 @@ func TestIntegration(t *testing.T) { tableName, ), ) - require.Equal(t, records, testhelper.SampleTestRecords()) + require.Equal(t, records, whth.SampleTestRecords()) }) t.Run("mismatch in schema", func(t *testing.T) { tableName := "mismatch_schema_test_table" - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/mismatch-schema.csv.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/mismatch-schema.csv.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv, false, false, "2022-12-15T06:53:49.640Z") @@ -791,7 +792,7 @@ func TestIntegration(t *testing.T) { require.Equal(t, loadTableStat.RowsInserted, int64(14)) require.Equal(t, loadTableStat.RowsUpdated, int64(0)) - records := testhelper.RetrieveRecordsFromWarehouse(t, d.DB.DB, + records := whth.RetrieveRecordsFromWarehouse(t, d.DB.DB, fmt.Sprintf( `SELECT id, @@ -807,12 +808,12 @@ func TestIntegration(t *testing.T) { tableName, ), ) - require.Equal(t, records, testhelper.MismatchSchemaTestRecords()) + require.Equal(t, records, whth.MismatchSchemaTestRecords()) }) t.Run("discards", func(t *testing.T) { tableName := warehouseutils.DiscardsTable - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/discards.csv.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/discards.csv.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, warehouseutils.DiscardsSchema, warehouseutils.DiscardsSchema, warehouseutils.LoadFileTypeCsv, false, false, "2022-12-15T06:53:49.640Z") @@ -833,7 +834,7 @@ func TestIntegration(t *testing.T) { require.Equal(t, loadTableStat.RowsInserted, int64(6)) require.Equal(t, loadTableStat.RowsUpdated, int64(0)) - records := testhelper.RetrieveRecordsFromWarehouse(t, d.DB.DB, + records := whth.RetrieveRecordsFromWarehouse(t, d.DB.DB, fmt.Sprintf( `SELECT column_name, @@ -848,12 +849,12 @@ func TestIntegration(t *testing.T) { tableName, ), ) - require.Equal(t, records, testhelper.DiscardTestRecords()) + require.Equal(t, records, whth.DiscardTestRecords()) }) t.Run("parquet", func(t *testing.T) { tableName := "parquet_test_table" - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/load.parquet", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.parquet", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeParquet, false, false, "2022-12-15T06:53:49.640Z") @@ -874,7 +875,7 @@ func TestIntegration(t *testing.T) { require.Equal(t, loadTableStat.RowsInserted, int64(14)) require.Equal(t, loadTableStat.RowsUpdated, int64(0)) - records := testhelper.RetrieveRecordsFromWarehouse(t, d.DB.DB, + records := whth.RetrieveRecordsFromWarehouse(t, d.DB.DB, fmt.Sprintf( `SELECT id, @@ -890,13 +891,13 @@ func TestIntegration(t *testing.T) { tableName, ), ) - require.Equal(t, records, testhelper.SampleTestRecords()) + require.Equal(t, records, whth.SampleTestRecords()) }) t.Run("partition pruning", func(t *testing.T) { t.Run("not partitioned", func(t *testing.T) { tableName := "not_partitioned_test_table" - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv, false, false, "2022-12-15T06:53:49.640Z") @@ -934,7 +935,7 @@ func TestIntegration(t *testing.T) { require.Equal(t, loadTableStat.RowsInserted, int64(14)) require.Equal(t, loadTableStat.RowsUpdated, int64(0)) - records := testhelper.RetrieveRecordsFromWarehouse(t, d.DB.DB, + records := whth.RetrieveRecordsFromWarehouse(t, d.DB.DB, fmt.Sprintf( `SELECT id, @@ -950,12 +951,12 @@ func TestIntegration(t *testing.T) { tableName, ), ) - require.Equal(t, records, testhelper.SampleTestRecords()) + require.Equal(t, records, whth.SampleTestRecords()) }) t.Run("event_date is not in partition", func(t *testing.T) { tableName := "not_event_date_partition_test_table" - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv, false, false, "2022-12-15T06:53:49.640Z") @@ -993,7 +994,7 @@ func TestIntegration(t *testing.T) { require.Equal(t, loadTableStat.RowsInserted, int64(14)) require.Equal(t, loadTableStat.RowsUpdated, int64(0)) - records := testhelper.RetrieveRecordsFromWarehouse(t, d.DB.DB, + records := whth.RetrieveRecordsFromWarehouse(t, d.DB.DB, fmt.Sprintf( `SELECT id, @@ -1009,7 +1010,7 @@ func TestIntegration(t *testing.T) { tableName, ), ) - require.Equal(t, records, testhelper.SampleTestRecords()) + require.Equal(t, records, whth.SampleTestRecords()) }) }) }) @@ -1147,8 +1148,8 @@ func newMockUploader( return mockUploader } -func mergeEventsMap() testhelper.EventsCountMap { - return testhelper.EventsCountMap{ +func mergeEventsMap() whth.EventsCountMap { + return whth.EventsCountMap{ "identifies": 1, "users": 1, "tracks": 1, @@ -1160,8 +1161,8 @@ func mergeEventsMap() testhelper.EventsCountMap { } } -func appendEventsMap() testhelper.EventsCountMap { - return testhelper.EventsCountMap{ +func appendEventsMap() whth.EventsCountMap { + return whth.EventsCountMap{ "identifies": 2, "users": 2, "tracks": 2, @@ -1172,14 +1173,3 @@ func appendEventsMap() testhelper.EventsCountMap { "groups": 2, } } - -func cloneWarehouse(t *testing.T, wh model.Warehouse) model.Warehouse { - t.Helper() - buf, err := json.Marshal(wh) - require.NoError(t, err) - - var clone model.Warehouse - require.NoError(t, json.Unmarshal(buf, &clone)) - - return clone -} diff --git a/warehouse/integrations/postgres/postgres_test.go b/warehouse/integrations/postgres/postgres_test.go index aa23652d527..a6dda55831a 100644 --- a/warehouse/integrations/postgres/postgres_test.go +++ b/warehouse/integrations/postgres/postgres_test.go @@ -3,7 +3,6 @@ package postgres_test import ( "context" "database/sql" - "encoding/json" "fmt" "os" "strconv" @@ -23,12 +22,13 @@ import ( kithelper "github.com/rudderlabs/rudder-go-kit/testhelper" backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/runner" + th "github.com/rudderlabs/rudder-server/testhelper" "github.com/rudderlabs/rudder-server/testhelper/health" "github.com/rudderlabs/rudder-server/testhelper/workspaceConfig" "github.com/rudderlabs/rudder-server/utils/misc" "github.com/rudderlabs/rudder-server/warehouse/client" "github.com/rudderlabs/rudder-server/warehouse/integrations/postgres" - "github.com/rudderlabs/rudder-server/warehouse/integrations/testhelper" + whth "github.com/rudderlabs/rudder-server/warehouse/integrations/testhelper" mockuploader "github.com/rudderlabs/rudder-server/warehouse/internal/mocks/utils" "github.com/rudderlabs/rudder-server/warehouse/internal/model" "github.com/rudderlabs/rudder-server/warehouse/tunnelling" @@ -74,9 +74,9 @@ func TestIntegration(t *testing.T) { destType := warehouseutils.POSTGRES - namespace := testhelper.RandSchema(destType) - sourcesNamespace := testhelper.RandSchema(destType) - tunnelledNamespace := testhelper.RandSchema(destType) + namespace := whth.RandSchema(destType) + sourcesNamespace := whth.RandSchema(destType) + tunnelledNamespace := whth.RandSchema(destType) host := "localhost" database := "rudderdb" @@ -135,7 +135,7 @@ func TestIntegration(t *testing.T) { } workspaceConfigPath := workspaceConfig.CreateTempFile(t, "testdata/template.json", templateConfigurations) - testhelper.EnhanceWithDefaultEnvs(t) + whth.EnhanceWithDefaultEnvs(t) t.Setenv("JOBS_DB_PORT", strconv.Itoa(jobsDBPort)) t.Setenv("WAREHOUSE_JOBS_DB_PORT", strconv.Itoa(jobsDBPort)) t.Setenv("MINIO_ACCESS_KEY_ID", accessKeyID) @@ -176,7 +176,7 @@ func TestIntegration(t *testing.T) { require.NoError(t, err) require.NoError(t, db.Ping()) - jobsDB := testhelper.JobsDB(t, jobsDBPort) + jobsDB := whth.JobsDB(t, jobsDBPort) testCases := []struct { name string @@ -185,10 +185,10 @@ func TestIntegration(t *testing.T) { sourceID string destinationID string tables []string - stagingFilesEventsMap testhelper.EventsCountMap - loadFilesEventsMap testhelper.EventsCountMap - tableUploadsEventsMap testhelper.EventsCountMap - warehouseEventsMap testhelper.EventsCountMap + stagingFilesEventsMap whth.EventsCountMap + loadFilesEventsMap whth.EventsCountMap + tableUploadsEventsMap whth.EventsCountMap + warehouseEventsMap whth.EventsCountMap asyncJob bool stagingFilePrefix string }{ @@ -210,10 +210,10 @@ func TestIntegration(t *testing.T) { tables: []string{"tracks", "google_sheet"}, sourceID: sourcesSourceID, destinationID: sourcesDestinationID, - stagingFilesEventsMap: testhelper.SourcesStagingFilesEventsMap(), - loadFilesEventsMap: testhelper.SourcesLoadFilesEventsMap(), - tableUploadsEventsMap: testhelper.SourcesTableUploadsEventsMap(), - warehouseEventsMap: testhelper.SourcesWarehouseEventsMap(), + stagingFilesEventsMap: whth.SourcesStagingFilesEventsMap(), + loadFilesEventsMap: whth.SourcesLoadFilesEventsMap(), + tableUploadsEventsMap: whth.SourcesTableUploadsEventsMap(), + warehouseEventsMap: whth.SourcesWarehouseEventsMap(), asyncJob: true, stagingFilePrefix: "testdata/sources-job", }, @@ -241,7 +241,7 @@ func TestIntegration(t *testing.T) { } t.Log("verifying test case 1") - ts1 := testhelper.TestConfig{ + ts1 := whth.TestConfig{ WriteKey: tc.writeKey, Schema: tc.schema, Tables: tc.tables, @@ -260,12 +260,12 @@ func TestIntegration(t *testing.T) { JobRunID: misc.FastUUID().String(), TaskRunID: misc.FastUUID().String(), StagingFilePath: tc.stagingFilePrefix + ".staging-1.json", - UserID: testhelper.GetUserId(destType), + UserID: whth.GetUserId(destType), } ts1.VerifyEvents(t) t.Log("verifying test case 2") - ts2 := testhelper.TestConfig{ + ts2 := whth.TestConfig{ WriteKey: tc.writeKey, Schema: tc.schema, Tables: tc.tables, @@ -285,7 +285,7 @@ func TestIntegration(t *testing.T) { JobRunID: misc.FastUUID().String(), TaskRunID: misc.FastUUID().String(), StagingFilePath: tc.stagingFilePrefix + ".staging-2.json", - UserID: testhelper.GetUserId(destType), + UserID: whth.GetUserId(destType), } if tc.asyncJob { ts2.UserID = ts1.UserID @@ -322,7 +322,7 @@ func TestIntegration(t *testing.T) { require.NoError(t, err) require.NoError(t, db.Ping()) - jobsDB := testhelper.JobsDB(t, jobsDBPort) + jobsDB := whth.JobsDB(t, jobsDBPort) testcases := []struct { name string @@ -331,10 +331,10 @@ func TestIntegration(t *testing.T) { sourceID string destinationID string tables []string - stagingFilesEventsMap testhelper.EventsCountMap - loadFilesEventsMap testhelper.EventsCountMap - tableUploadsEventsMap testhelper.EventsCountMap - warehouseEventsMap testhelper.EventsCountMap + stagingFilesEventsMap whth.EventsCountMap + loadFilesEventsMap whth.EventsCountMap + tableUploadsEventsMap whth.EventsCountMap + warehouseEventsMap whth.EventsCountMap stagingFilePrefix string }{ { @@ -372,7 +372,7 @@ func TestIntegration(t *testing.T) { } t.Log("verifying test case 1") - ts1 := testhelper.TestConfig{ + ts1 := whth.TestConfig{ WriteKey: tc.writeKey, Schema: tc.schema, SourceID: tc.sourceID, @@ -391,12 +391,12 @@ func TestIntegration(t *testing.T) { JobRunID: misc.FastUUID().String(), TaskRunID: misc.FastUUID().String(), StagingFilePath: tc.stagingFilePrefix + ".staging-1.json", - UserID: testhelper.GetUserId(destType), + UserID: whth.GetUserId(destType), } ts1.VerifyEvents(t) t.Log("verifying test case 2") - ts2 := testhelper.TestConfig{ + ts2 := whth.TestConfig{ WriteKey: tc.writeKey, Schema: tc.schema, SourceID: tc.sourceID, @@ -415,7 +415,7 @@ func TestIntegration(t *testing.T) { JobRunID: misc.FastUUID().String(), TaskRunID: misc.FastUUID().String(), StagingFilePath: tc.stagingFilePrefix + ".staging-2.json", - UserID: testhelper.GetUserId(destType), + UserID: whth.GetUserId(destType), } ts2.VerifyEvents(t) }) @@ -451,7 +451,7 @@ func TestIntegration(t *testing.T) { Enabled: true, RevisionID: "29eeuu9kywWsRAybaXcxcnTVEl8", } - testhelper.VerifyConfigurationTest(t, dest) + whth.VerifyConfigurationTest(t, dest) }) t.Run("Load Table", func(t *testing.T) { @@ -461,7 +461,7 @@ func TestIntegration(t *testing.T) { workspaceID = "test_workspace_id" ) - namespace := testhelper.RandSchema(destType) + namespace := whth.RandSchema(destType) schemaInUpload := model.TableSchema{ "test_bool": "boolean", @@ -538,7 +538,7 @@ func TestIntegration(t *testing.T) { t.Run("schema does not exists", func(t *testing.T) { tableName := "schema_not_exists_test_table" - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := mockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse) @@ -554,7 +554,7 @@ func TestIntegration(t *testing.T) { t.Run("table does not exists", func(t *testing.T) { tableName := "table_not_exists_test_table" - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := mockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse) @@ -574,7 +574,7 @@ func TestIntegration(t *testing.T) { tableName := "merge_test_table" t.Run("without dedup", func(t *testing.T) { - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := mockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse) @@ -582,7 +582,7 @@ func TestIntegration(t *testing.T) { c := config.New() c.Set("Warehouse.postgres.EnableSQLStatementExecutionPlanWorkspaceIDs", workspaceID) - mergeWarehouse := cloneWarehouse(t, warehouse) + mergeWarehouse := th.Clone(t, warehouse) mergeWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true pg := postgres.New(c, logger.NOP, stats.Default) @@ -605,7 +605,7 @@ func TestIntegration(t *testing.T) { require.Equal(t, loadTableStat.RowsInserted, int64(0)) require.Equal(t, loadTableStat.RowsUpdated, int64(14)) - records := testhelper.RetrieveRecordsFromWarehouse(t, pg.DB.DB, + records := whth.RetrieveRecordsFromWarehouse(t, pg.DB.DB, fmt.Sprintf(` SELECT id, @@ -624,10 +624,10 @@ func TestIntegration(t *testing.T) { tableName, ), ) - require.Equal(t, records, testhelper.SampleTestRecords()) + require.Equal(t, records, whth.SampleTestRecords()) }) t.Run("with dedup", func(t *testing.T) { - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/dedup.csv.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/dedup.csv.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := mockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse) @@ -635,7 +635,7 @@ func TestIntegration(t *testing.T) { c := config.New() c.Set("Warehouse.postgres.EnableSQLStatementExecutionPlanWorkspaceIDs", workspaceID) - mergeWarehouse := cloneWarehouse(t, warehouse) + mergeWarehouse := th.Clone(t, warehouse) mergeWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true pg := postgres.New(config.Default, logger.NOP, stats.Default) @@ -653,7 +653,7 @@ func TestIntegration(t *testing.T) { require.Equal(t, loadTableStat.RowsInserted, int64(0)) require.Equal(t, loadTableStat.RowsUpdated, int64(14)) - records := testhelper.RetrieveRecordsFromWarehouse(t, pg.DB.DB, + records := whth.RetrieveRecordsFromWarehouse(t, pg.DB.DB, fmt.Sprintf(` SELECT id, @@ -672,13 +672,13 @@ func TestIntegration(t *testing.T) { tableName, ), ) - require.Equal(t, records, testhelper.DedupTestRecords()) + require.Equal(t, records, whth.DedupTestRecords()) }) }) t.Run("append", func(t *testing.T) { tableName := "append_test_table" - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := mockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse) @@ -706,7 +706,7 @@ func TestIntegration(t *testing.T) { require.Equal(t, loadTableStat.RowsInserted, int64(14)) require.Equal(t, loadTableStat.RowsUpdated, int64(0)) - records := testhelper.RetrieveRecordsFromWarehouse(t, pg.DB.DB, + records := whth.RetrieveRecordsFromWarehouse(t, pg.DB.DB, fmt.Sprintf(` SELECT id, @@ -725,7 +725,7 @@ func TestIntegration(t *testing.T) { tableName, ), ) - require.Equal(t, records, testhelper.AppendTestRecords()) + require.Equal(t, records, whth.AppendTestRecords()) }) t.Run("load file does not exists", func(t *testing.T) { tableName := "load_file_not_exists_test_table" @@ -752,7 +752,7 @@ func TestIntegration(t *testing.T) { t.Run("mismatch in number of columns", func(t *testing.T) { tableName := "mismatch_columns_test_table" - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/mismatch-columns.csv.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/mismatch-columns.csv.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := mockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse) @@ -774,7 +774,7 @@ func TestIntegration(t *testing.T) { t.Run("mismatch in schema", func(t *testing.T) { tableName := "mismatch_schema_test_table" - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/mismatch-schema.csv.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/mismatch-schema.csv.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := mockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse) @@ -796,7 +796,7 @@ func TestIntegration(t *testing.T) { t.Run("discards", func(t *testing.T) { tableName := warehouseutils.DiscardsTable - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/discards.csv.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/discards.csv.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := mockUploader(t, loadFiles, tableName, warehouseutils.DiscardsSchema, warehouseutils.DiscardsSchema) @@ -816,7 +816,7 @@ func TestIntegration(t *testing.T) { require.Equal(t, loadTableStat.RowsInserted, int64(6)) require.Equal(t, loadTableStat.RowsUpdated, int64(0)) - records := testhelper.RetrieveRecordsFromWarehouse(t, pg.DB.DB, + records := whth.RetrieveRecordsFromWarehouse(t, pg.DB.DB, fmt.Sprintf(` SELECT column_name, @@ -833,7 +833,7 @@ func TestIntegration(t *testing.T) { tableName, ), ) - require.Equal(t, records, testhelper.DiscardTestRecords()) + require.Equal(t, records, whth.DiscardTestRecords()) }) }) } @@ -857,14 +857,3 @@ func mockUploader( return mockUploader } - -func cloneWarehouse(t *testing.T, wh model.Warehouse) model.Warehouse { - t.Helper() - buf, err := json.Marshal(wh) - require.NoError(t, err) - - var clone model.Warehouse - require.NoError(t, json.Unmarshal(buf, &clone)) - - return clone -} diff --git a/warehouse/integrations/redshift/redshift_test.go b/warehouse/integrations/redshift/redshift_test.go index aff7793bece..423b733001f 100644 --- a/warehouse/integrations/redshift/redshift_test.go +++ b/warehouse/integrations/redshift/redshift_test.go @@ -12,47 +12,34 @@ import ( "testing" "time" - "golang.org/x/exp/slices" - "github.com/golang/mock/gomock" - - "github.com/rudderlabs/rudder-go-kit/filemanager" - mockuploader "github.com/rudderlabs/rudder-server/warehouse/internal/mocks/utils" - "github.com/rudderlabs/rudder-server/warehouse/internal/model" - - "github.com/rudderlabs/rudder-go-kit/logger" - "github.com/rudderlabs/rudder-go-kit/stats" - - "github.com/rudderlabs/compose-test/compose" - - "github.com/rudderlabs/rudder-server/warehouse/integrations/redshift" - - "github.com/rudderlabs/rudder-server/testhelper/workspaceConfig" - - sqlmiddleware "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper" - "github.com/lib/pq" "github.com/ory/dockertest/v3" + "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" + "github.com/rudderlabs/compose-test/compose" "github.com/rudderlabs/compose-test/testcompose" + "github.com/rudderlabs/rudder-go-kit/config" + "github.com/rudderlabs/rudder-go-kit/filemanager" + "github.com/rudderlabs/rudder-go-kit/logger" + "github.com/rudderlabs/rudder-go-kit/stats" kithelper "github.com/rudderlabs/rudder-go-kit/testhelper" + "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource" + backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/runner" + th "github.com/rudderlabs/rudder-server/testhelper" "github.com/rudderlabs/rudder-server/testhelper/health" - - "github.com/rudderlabs/rudder-go-kit/config" - "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource" - - "github.com/rudderlabs/rudder-server/warehouse/integrations/testhelper" - + "github.com/rudderlabs/rudder-server/testhelper/workspaceConfig" "github.com/rudderlabs/rudder-server/utils/misc" - "github.com/rudderlabs/rudder-server/warehouse/validations" - - backendconfig "github.com/rudderlabs/rudder-server/backend-config" - - "github.com/stretchr/testify/require" - "github.com/rudderlabs/rudder-server/warehouse/client" + sqlmiddleware "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper" + "github.com/rudderlabs/rudder-server/warehouse/integrations/redshift" + whth "github.com/rudderlabs/rudder-server/warehouse/integrations/testhelper" + mockuploader "github.com/rudderlabs/rudder-server/warehouse/internal/mocks/utils" + "github.com/rudderlabs/rudder-server/warehouse/internal/model" warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" + "github.com/rudderlabs/rudder-server/warehouse/validations" ) type testCredentials struct { @@ -117,8 +104,8 @@ func TestIntegration(t *testing.T) { destType := warehouseutils.RS - namespace := testhelper.RandSchema(destType) - sourcesNamespace := testhelper.RandSchema(destType) + namespace := whth.RandSchema(destType) + sourcesNamespace := whth.RandSchema(destType) rsTestCredentials, err := rsTestCredentials() require.NoError(t, err) @@ -144,7 +131,7 @@ func TestIntegration(t *testing.T) { } workspaceConfigPath := workspaceConfig.CreateTempFile(t, "testdata/template.json", templateConfigurations) - testhelper.EnhanceWithDefaultEnvs(t) + whth.EnhanceWithDefaultEnvs(t) t.Setenv("JOBS_DB_PORT", strconv.Itoa(jobsDBPort)) t.Setenv("WAREHOUSE_JOBS_DB_PORT", strconv.Itoa(jobsDBPort)) t.Setenv("RSERVER_WAREHOUSE_WEB_PORT", strconv.Itoa(httpPort)) @@ -185,7 +172,7 @@ func TestIntegration(t *testing.T) { t.Setenv("RSERVER_WAREHOUSE_REDSHIFT_DEDUP_WINDOW", "true") t.Setenv("RSERVER_WAREHOUSE_REDSHIFT_DEDUP_WINDOW_IN_HOURS", "5") - jobsDB := testhelper.JobsDB(t, jobsDBPort) + jobsDB := whth.JobsDB(t, jobsDBPort) testcase := []struct { name string @@ -194,10 +181,10 @@ func TestIntegration(t *testing.T) { sourceID string destinationID string tables []string - stagingFilesEventsMap testhelper.EventsCountMap - loadFilesEventsMap testhelper.EventsCountMap - tableUploadsEventsMap testhelper.EventsCountMap - warehouseEventsMap testhelper.EventsCountMap + stagingFilesEventsMap whth.EventsCountMap + loadFilesEventsMap whth.EventsCountMap + tableUploadsEventsMap whth.EventsCountMap + warehouseEventsMap whth.EventsCountMap asyncJob bool stagingFilePrefix string }{ @@ -217,10 +204,10 @@ func TestIntegration(t *testing.T) { tables: []string{"tracks", "google_sheet"}, sourceID: sourcesSourceID, destinationID: sourcesDestinationID, - stagingFilesEventsMap: testhelper.SourcesStagingFilesEventsMap(), - loadFilesEventsMap: testhelper.SourcesLoadFilesEventsMap(), - tableUploadsEventsMap: testhelper.SourcesTableUploadsEventsMap(), - warehouseEventsMap: testhelper.SourcesWarehouseEventsMap(), + stagingFilesEventsMap: whth.SourcesStagingFilesEventsMap(), + loadFilesEventsMap: whth.SourcesLoadFilesEventsMap(), + tableUploadsEventsMap: whth.SourcesTableUploadsEventsMap(), + warehouseEventsMap: whth.SourcesWarehouseEventsMap(), asyncJob: true, stagingFilePrefix: "testdata/sources-job", }, @@ -259,7 +246,7 @@ func TestIntegration(t *testing.T) { } t.Log("verifying test case 1") - ts1 := testhelper.TestConfig{ + ts1 := whth.TestConfig{ WriteKey: tc.writeKey, Schema: tc.schema, Tables: tc.tables, @@ -278,12 +265,12 @@ func TestIntegration(t *testing.T) { JobRunID: misc.FastUUID().String(), TaskRunID: misc.FastUUID().String(), StagingFilePath: tc.stagingFilePrefix + ".staging-1.json", - UserID: testhelper.GetUserId(destType), + UserID: whth.GetUserId(destType), } ts1.VerifyEvents(t) t.Log("verifying test case 2") - ts2 := testhelper.TestConfig{ + ts2 := whth.TestConfig{ WriteKey: tc.writeKey, Schema: tc.schema, Tables: tc.tables, @@ -303,7 +290,7 @@ func TestIntegration(t *testing.T) { JobRunID: misc.FastUUID().String(), TaskRunID: misc.FastUUID().String(), StagingFilePath: tc.stagingFilePrefix + ".staging-1.json", - UserID: testhelper.GetUserId(destType), + UserID: whth.GetUserId(destType), } if tc.asyncJob { ts2.UserID = ts1.UserID @@ -352,7 +339,7 @@ func TestIntegration(t *testing.T) { Enabled: true, RevisionID: "29HgOWobrn0RYZLpaSwPIbN2987", } - testhelper.VerifyConfigurationTest(t, dest) + whth.VerifyConfigurationTest(t, dest) }) t.Run("Load Table", func(t *testing.T) { @@ -362,7 +349,7 @@ func TestIntegration(t *testing.T) { workspaceID = "test_workspace_id" ) - namespace := testhelper.RandSchema(destType) + namespace := whth.RandSchema(destType) t.Cleanup(func() { require.Eventually(t, func() bool { @@ -443,7 +430,7 @@ func TestIntegration(t *testing.T) { t.Run("schema does not exists", func(t *testing.T) { tableName := "schema_not_exists_test_table" - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv) @@ -459,7 +446,7 @@ func TestIntegration(t *testing.T) { t.Run("table does not exists", func(t *testing.T) { tableName := "table_not_exists_test_table" - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv) @@ -478,7 +465,7 @@ func TestIntegration(t *testing.T) { t.Run("merge", func(t *testing.T) { t.Run("without dedup", func(t *testing.T) { tableName := "merge_without_dedup_test_table" - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/dedup.csv.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/dedup.csv.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv) @@ -503,7 +490,7 @@ func TestIntegration(t *testing.T) { require.Equal(t, loadTableStat.RowsInserted, int64(14)) require.Equal(t, loadTableStat.RowsUpdated, int64(0)) - records := testhelper.RetrieveRecordsFromWarehouse(t, d.DB.DB, + records := whth.RetrieveRecordsFromWarehouse(t, d.DB.DB, fmt.Sprintf( `SELECT id, @@ -519,16 +506,16 @@ func TestIntegration(t *testing.T) { tableName, ), ) - require.Equal(t, testhelper.DedupTwiceTestRecords(), records) + require.Equal(t, whth.DedupTwiceTestRecords(), records) }) t.Run("with dedup", func(t *testing.T) { tableName := "merge_with_dedup_test_table" - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/dedup.csv.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/dedup.csv.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv) - mergeWarehouse := cloneWarehouse(t, warehouse) + mergeWarehouse := th.Clone(t, warehouse) mergeWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true d := redshift.New(config.New(), logger.NOP, stats.Default) @@ -551,7 +538,7 @@ func TestIntegration(t *testing.T) { require.Equal(t, loadTableStat.RowsInserted, int64(0)) require.Equal(t, loadTableStat.RowsUpdated, int64(14)) - records := testhelper.RetrieveRecordsFromWarehouse(t, d.DB.DB, + records := whth.RetrieveRecordsFromWarehouse(t, d.DB.DB, fmt.Sprintf( `SELECT id, @@ -567,16 +554,16 @@ func TestIntegration(t *testing.T) { tableName, ), ) - require.Equal(t, testhelper.DedupTestRecords(), records) + require.Equal(t, whth.DedupTestRecords(), records) }) t.Run("with dedup window", func(t *testing.T) { tableName := "merge_with_dedup_window_test_table" - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/dedup.csv.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/dedup.csv.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv) - mergeWarehouse := cloneWarehouse(t, warehouse) + mergeWarehouse := th.Clone(t, warehouse) mergeWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true c := config.New() @@ -603,7 +590,7 @@ func TestIntegration(t *testing.T) { require.Equal(t, loadTableStat.RowsInserted, int64(0)) require.Equal(t, loadTableStat.RowsUpdated, int64(14)) - records := testhelper.RetrieveRecordsFromWarehouse(t, d.DB.DB, + records := whth.RetrieveRecordsFromWarehouse(t, d.DB.DB, fmt.Sprintf( `SELECT id, @@ -619,16 +606,16 @@ func TestIntegration(t *testing.T) { tableName, ), ) - require.Equal(t, testhelper.DedupTestRecords(), records) + require.Equal(t, whth.DedupTestRecords(), records) }) t.Run("with short dedup window", func(t *testing.T) { tableName := "merge_with_short_dedup_window_test_table" - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/dedup.csv.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/dedup.csv.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv) - mergeWarehouse := cloneWarehouse(t, warehouse) + mergeWarehouse := th.Clone(t, warehouse) mergeWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true c := config.New() @@ -655,7 +642,7 @@ func TestIntegration(t *testing.T) { require.Equal(t, loadTableStat.RowsInserted, int64(14)) require.Equal(t, loadTableStat.RowsUpdated, int64(0)) - records := testhelper.RetrieveRecordsFromWarehouse(t, d.DB.DB, + records := whth.RetrieveRecordsFromWarehouse(t, d.DB.DB, fmt.Sprintf( `SELECT id, @@ -671,13 +658,13 @@ func TestIntegration(t *testing.T) { tableName, ), ) - require.Equal(t, testhelper.DedupTwiceTestRecords(), records) + require.Equal(t, whth.DedupTwiceTestRecords(), records) }) }) t.Run("append", func(t *testing.T) { tableName := "append_test_table" - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv) @@ -705,7 +692,7 @@ func TestIntegration(t *testing.T) { require.Equal(t, loadTableStat.RowsInserted, int64(14)) require.Equal(t, loadTableStat.RowsUpdated, int64(0)) - records := testhelper.RetrieveRecordsFromWarehouse(t, rs.DB.DB, + records := whth.RetrieveRecordsFromWarehouse(t, rs.DB.DB, fmt.Sprintf(` SELECT id, @@ -724,7 +711,7 @@ func TestIntegration(t *testing.T) { tableName, ), ) - require.Equal(t, records, testhelper.AppendTestRecords()) + require.Equal(t, records, whth.AppendTestRecords()) }) t.Run("load file does not exists", func(t *testing.T) { tableName := "load_file_not_exists_test_table" @@ -751,7 +738,7 @@ func TestIntegration(t *testing.T) { t.Run("mismatch in number of columns", func(t *testing.T) { tableName := "mismatch_columns_test_table" - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/mismatch-columns.csv.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/mismatch-columns.csv.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv) @@ -773,7 +760,7 @@ func TestIntegration(t *testing.T) { t.Run("mismatch in schema", func(t *testing.T) { tableName := "mismatch_schema_test_table" - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/mismatch-schema.csv.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/mismatch-schema.csv.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv) @@ -795,7 +782,7 @@ func TestIntegration(t *testing.T) { t.Run("discards", func(t *testing.T) { tableName := warehouseutils.DiscardsTable - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/discards.csv.gz", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/discards.csv.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, warehouseutils.DiscardsSchema, warehouseutils.DiscardsSchema, warehouseutils.LoadFileTypeCsv) @@ -815,7 +802,7 @@ func TestIntegration(t *testing.T) { require.Equal(t, loadTableStat.RowsInserted, int64(6)) require.Equal(t, loadTableStat.RowsUpdated, int64(0)) - records := testhelper.RetrieveRecordsFromWarehouse(t, rs.DB.DB, + records := whth.RetrieveRecordsFromWarehouse(t, rs.DB.DB, fmt.Sprintf(` SELECT column_name, @@ -832,12 +819,12 @@ func TestIntegration(t *testing.T) { tableName, ), ) - require.Equal(t, records, testhelper.DiscardTestRecords()) + require.Equal(t, records, whth.DiscardTestRecords()) }) t.Run("parquet", func(t *testing.T) { tableName := "parquet_test_table" - uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/load.parquet", tableName) + uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.parquet", tableName) fileStat, err := os.Stat("../testdata/load.parquet") require.NoError(t, err) @@ -863,7 +850,7 @@ func TestIntegration(t *testing.T) { require.Equal(t, loadTableStat.RowsInserted, int64(14)) require.Equal(t, loadTableStat.RowsUpdated, int64(0)) - records := testhelper.RetrieveRecordsFromWarehouse(t, rs.DB.DB, + records := whth.RetrieveRecordsFromWarehouse(t, rs.DB.DB, fmt.Sprintf(` SELECT id, @@ -882,7 +869,7 @@ func TestIntegration(t *testing.T) { tableName, ), ) - require.Equal(t, records, testhelper.SampleTestRecords()) + require.Equal(t, records, whth.SampleTestRecords()) }) }) } @@ -1055,14 +1042,3 @@ func newMockUploader( return mockUploader } - -func cloneWarehouse(t *testing.T, wh model.Warehouse) model.Warehouse { - t.Helper() - buf, err := json.Marshal(wh) - require.NoError(t, err) - - var clone model.Warehouse - require.NoError(t, json.Unmarshal(buf, &clone)) - - return clone -}