From dc25273ecc6e1128a75f6fb6a648968e52787a5a Mon Sep 17 00:00:00 2001 From: Francesco Casula Date: Mon, 6 Nov 2023 15:29:52 +0100 Subject: [PATCH 01/11] chore: enableMerge default true --- warehouse/integrations/bigquery/bigquery.go | 2 +- warehouse/integrations/deltalake/deltalake.go | 2 +- warehouse/integrations/postgres/load.go | 2 +- warehouse/integrations/redshift/redshift.go | 2 +- warehouse/integrations/snowflake/snowflake.go | 2 +- warehouse/internal/model/warehouse.go | 9 +++++++++ 6 files changed, 14 insertions(+), 5 deletions(-) diff --git a/warehouse/integrations/bigquery/bigquery.go b/warehouse/integrations/bigquery/bigquery.go index d252f17bec..695ec633e1 100644 --- a/warehouse/integrations/bigquery/bigquery.go +++ b/warehouse/integrations/bigquery/bigquery.go @@ -867,7 +867,7 @@ func (bq *BigQuery) connect(ctx context.Context, cred BQCredentials) (*bigquery. // * the server config says we allow merging // * the user opted in to merging func (bq *BigQuery) shouldMerge() bool { - return bq.config.allowMerge && bq.warehouse.GetBoolDestinationConfig(model.EnableMergeSetting) + return bq.config.allowMerge && bq.warehouse.GetEnableMergeSetting() } func (bq *BigQuery) CrashRecover(ctx context.Context) { diff --git a/warehouse/integrations/deltalake/deltalake.go b/warehouse/integrations/deltalake/deltalake.go index eb6b02e122..18d0f65246 100644 --- a/warehouse/integrations/deltalake/deltalake.go +++ b/warehouse/integrations/deltalake/deltalake.go @@ -1387,5 +1387,5 @@ func (*Deltalake) DeleteBy(context.Context, []string, warehouseutils.DeleteByPar // * the user opted in to merging and we allow merging func (d *Deltalake) ShouldMerge() bool { return !d.Uploader.CanAppend() || - (d.config.allowMerge && d.Warehouse.GetBoolDestinationConfig(model.EnableMergeSetting)) + (d.config.allowMerge && d.Warehouse.GetEnableMergeSetting()) } diff --git a/warehouse/integrations/postgres/load.go b/warehouse/integrations/postgres/load.go index da76cfedff..af1797a080 100644 --- a/warehouse/integrations/postgres/load.go +++ b/warehouse/integrations/postgres/load.go @@ -556,6 +556,6 @@ func (pg *Postgres) loadUsersTable( func (pg *Postgres) shouldMerge() bool { return !pg.Uploader.CanAppend() || (pg.config.allowMerge && - pg.Warehouse.GetBoolDestinationConfig(model.EnableMergeSetting) && + pg.Warehouse.GetEnableMergeSetting() && !slices.Contains(pg.config.skipDedupDestinationIDs, pg.Warehouse.Destination.ID)) } diff --git a/warehouse/integrations/redshift/redshift.go b/warehouse/integrations/redshift/redshift.go index fedf6a1529..9bd48c3a68 100644 --- a/warehouse/integrations/redshift/redshift.go +++ b/warehouse/integrations/redshift/redshift.go @@ -1329,7 +1329,7 @@ func (rs *Redshift) SetConnectionTimeout(timeout time.Duration) { func (rs *Redshift) shouldMerge() bool { return !rs.Uploader.CanAppend() || (rs.config.allowMerge && - rs.Warehouse.GetBoolDestinationConfig(model.EnableMergeSetting) && + rs.Warehouse.GetEnableMergeSetting() && !slices.Contains(rs.config.skipDedupDestinationIDs, rs.Warehouse.Destination.ID)) } diff --git a/warehouse/integrations/snowflake/snowflake.go b/warehouse/integrations/snowflake/snowflake.go index d1c707a002..d755cfea85 100644 --- a/warehouse/integrations/snowflake/snowflake.go +++ b/warehouse/integrations/snowflake/snowflake.go @@ -829,7 +829,7 @@ func (sf *Snowflake) LoadIdentityMappingsTable(ctx context.Context) error { // * the user opted-in func (sf *Snowflake) ShouldMerge() bool { return !sf.Uploader.CanAppend() || - (sf.config.allowMerge && sf.Warehouse.GetBoolDestinationConfig(model.EnableMergeSetting)) + (sf.config.allowMerge && sf.Warehouse.GetEnableMergeSetting()) } func (sf *Snowflake) LoadUserTables(ctx context.Context) map[string]error { diff --git a/warehouse/internal/model/warehouse.go b/warehouse/internal/model/warehouse.go index f398960e5e..528dc4afe2 100644 --- a/warehouse/internal/model/warehouse.go +++ b/warehouse/internal/model/warehouse.go @@ -31,3 +31,12 @@ func (w *Warehouse) GetBoolDestinationConfig(key DestinationConfigSetting) bool } return false } + +func (w *Warehouse) GetEnableMergeSetting() bool { + destConfig := w.Destination.Config + value, ok := destConfig[EnableMergeSetting.string()].(bool) + if !ok { + return true // default value for backwards compatibility + } + return value +} From 7e44d0ba68bcd80cc109c563432505cbef363bcd Mon Sep 17 00:00:00 2001 From: Francesco Casula Date: Tue, 7 Nov 2023 15:36:45 +0100 Subject: [PATCH 02/11] chore: preferAppend snowflake --- warehouse/integrations/snowflake/snowflake.go | 2 +- .../integrations/snowflake/snowflake_test.go | 40 +++++++++---------- warehouse/internal/model/warehouse.go | 6 +-- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/warehouse/integrations/snowflake/snowflake.go b/warehouse/integrations/snowflake/snowflake.go index d755cfea85..fddb7945f6 100644 --- a/warehouse/integrations/snowflake/snowflake.go +++ b/warehouse/integrations/snowflake/snowflake.go @@ -829,7 +829,7 @@ func (sf *Snowflake) LoadIdentityMappingsTable(ctx context.Context) error { // * the user opted-in func (sf *Snowflake) ShouldMerge() bool { return !sf.Uploader.CanAppend() || - (sf.config.allowMerge && sf.Warehouse.GetEnableMergeSetting()) + (sf.config.allowMerge && !sf.Warehouse.GetPreferAppendSetting()) } func (sf *Snowflake) LoadUserTables(ctx context.Context) map[string]error { diff --git a/warehouse/integrations/snowflake/snowflake_test.go b/warehouse/integrations/snowflake/snowflake_test.go index d053c9359e..d7f41c1392 100644 --- a/warehouse/integrations/snowflake/snowflake_test.go +++ b/warehouse/integrations/snowflake/snowflake_test.go @@ -130,7 +130,7 @@ func TestIntegration(t *testing.T) { rbacCredentials, err := getSnowflakeTestCredentials(testRBACKey) require.NoError(t, err) - bootstrapSvc := func(t testing.TB, enableMerge bool) { + bootstrapSvc := func(t testing.TB, preferAppend bool) { templateConfigurations := map[string]any{ "workspaceID": workspaceID, "sourceID": sourceID, @@ -168,7 +168,7 @@ func TestIntegration(t *testing.T) { "rbacBucketName": rbacCredentials.BucketName, "rbacAccessKeyID": rbacCredentials.AccessKeyID, "rbacAccessKey": rbacCredentials.AccessKey, - "enableMerge": enableMerge, + "preferAppend": preferAppend, } workspaceConfigPath := workspaceConfig.CreateTempFile(t, "testdata/template.json", templateConfigurations) @@ -227,7 +227,7 @@ func TestIntegration(t *testing.T) { sourceJob bool stagingFilePrefix string emptyJobRunID bool - enableMerge bool + preferAppend bool customUserID string }{ { @@ -248,7 +248,7 @@ func TestIntegration(t *testing.T) { "wh_staging_files": 34, // 32 + 2 (merge events because of ID resolution) }, stagingFilePrefix: "testdata/upload-job", - enableMerge: true, + preferAppend: false, }, { name: "Upload Job with Role", @@ -268,7 +268,7 @@ func TestIntegration(t *testing.T) { "wh_staging_files": 34, // 32 + 2 (merge events because of ID resolution) }, stagingFilePrefix: "testdata/upload-job-with-role", - enableMerge: true, + preferAppend: false, }, { name: "Upload Job with Case Sensitive Database", @@ -288,7 +288,7 @@ func TestIntegration(t *testing.T) { "wh_staging_files": 34, // 32 + 2 (merge events because of ID resolution) }, stagingFilePrefix: "testdata/upload-job-case-sensitive", - enableMerge: true, + preferAppend: false, }, { name: "Source Job with Sources", @@ -310,7 +310,7 @@ func TestIntegration(t *testing.T) { warehouseEventsMap: testhelper.SourcesWarehouseEventsMap(), sourceJob: true, stagingFilePrefix: "testdata/sources-job", - enableMerge: true, + preferAppend: false, }, { name: "Upload Job in append mode", @@ -331,7 +331,7 @@ func TestIntegration(t *testing.T) { // an empty jobRunID means that the source is not an ETL one // see Uploader.CanAppend() emptyJobRunID: true, - enableMerge: false, + preferAppend: true, customUserID: testhelper.GetUserId("append_test"), }, } @@ -339,7 +339,7 @@ func TestIntegration(t *testing.T) { for _, tc := range testcase { tc := tc t.Run(tc.name, func(t *testing.T) { - bootstrapSvc(t, tc.enableMerge) + bootstrapSvc(t, tc.preferAppend) urlConfig := sfdb.Config{ Account: tc.cred.Account, @@ -501,7 +501,7 @@ func TestIntegration(t *testing.T) { "syncFrequency": "30", "enableSSE": false, "useRudderStorage": false, - "enableMerge": true, + "preferAppend": false, }, DestinationDefinition: backendconfig.DestinationDefinitionT{ ID: "1XjvXnzw34UMAz1YOuKqL1kwzh6", @@ -949,35 +949,35 @@ func TestIntegration(t *testing.T) { func TestSnowflake_ShouldMerge(t *testing.T) { testCases := []struct { name string - enableMerge bool + preferAppend bool uploaderCanAppend bool uploaderExpectedCalls int expected bool }{ { - name: "uploader says we can append and merge is not enabled", - enableMerge: false, + name: "uploader says we can append and user prefers append", + preferAppend: true, uploaderCanAppend: true, uploaderExpectedCalls: 1, expected: false, }, { - name: "uploader says we cannot append and merge is not enabled", - enableMerge: false, + name: "uploader says we cannot append and user prefers append", + preferAppend: true, uploaderCanAppend: false, uploaderExpectedCalls: 1, expected: true, }, { - name: "uploader says we can append and merge is enabled", - enableMerge: true, + name: "uploader says we can append and user prefers not to append", + preferAppend: false, uploaderCanAppend: true, uploaderExpectedCalls: 1, expected: true, }, { - name: "uploader says we cannot append and we are in merge mode", - enableMerge: true, + name: "uploader says we cannot append and user prefers not to append", + preferAppend: false, uploaderCanAppend: false, uploaderExpectedCalls: 1, expected: true, @@ -992,7 +992,7 @@ func TestSnowflake_ShouldMerge(t *testing.T) { sf.Warehouse = model.Warehouse{ Destination: backendconfig.DestinationT{ Config: map[string]any{ - string(model.EnableMergeSetting): tc.enableMerge, + string(model.PreferAppendSetting): tc.preferAppend, }, }, } diff --git a/warehouse/internal/model/warehouse.go b/warehouse/internal/model/warehouse.go index 528dc4afe2..b18a2072ec 100644 --- a/warehouse/internal/model/warehouse.go +++ b/warehouse/internal/model/warehouse.go @@ -9,7 +9,7 @@ type destConfSetting string func (s destConfSetting) string() string { return string(s) } const ( - EnableMergeSetting destConfSetting = "enableMerge" + PreferAppendSetting destConfSetting = "preferAppend" UseRudderStorageSetting destConfSetting = "useRudderStorage" ) @@ -32,9 +32,9 @@ func (w *Warehouse) GetBoolDestinationConfig(key DestinationConfigSetting) bool return false } -func (w *Warehouse) GetEnableMergeSetting() bool { +func (w *Warehouse) GetPreferAppendSetting() bool { destConfig := w.Destination.Config - value, ok := destConfig[EnableMergeSetting.string()].(bool) + value, ok := destConfig[PreferAppendSetting.string()].(bool) if !ok { return true // default value for backwards compatibility } From 5757c7abd8058ce8d0d3eb9537724151d7bce8a6 Mon Sep 17 00:00:00 2001 From: Francesco Casula Date: Tue, 7 Nov 2023 15:53:41 +0100 Subject: [PATCH 03/11] chore: updating the rest --- warehouse/integrations/bigquery/bigquery.go | 2 +- .../integrations/bigquery/bigquery_test.go | 29 +++++++------- warehouse/integrations/deltalake/deltalake.go | 2 +- .../integrations/deltalake/deltalake_test.go | 38 +++++++++---------- warehouse/integrations/postgres/load.go | 2 +- .../integrations/postgres/postgres_test.go | 11 ++---- warehouse/integrations/redshift/redshift.go | 2 +- .../integrations/redshift/redshift_test.go | 23 +++++------ warehouse/internal/model/warehouse.go | 2 +- 9 files changed, 52 insertions(+), 59 deletions(-) diff --git a/warehouse/integrations/bigquery/bigquery.go b/warehouse/integrations/bigquery/bigquery.go index 695ec633e1..5650acd307 100644 --- a/warehouse/integrations/bigquery/bigquery.go +++ b/warehouse/integrations/bigquery/bigquery.go @@ -867,7 +867,7 @@ func (bq *BigQuery) connect(ctx context.Context, cred BQCredentials) (*bigquery. // * the server config says we allow merging // * the user opted in to merging func (bq *BigQuery) shouldMerge() bool { - return bq.config.allowMerge && bq.warehouse.GetEnableMergeSetting() + return bq.config.allowMerge && !bq.warehouse.GetPreferAppendSetting() } func (bq *BigQuery) CrashRecover(ctx context.Context) { diff --git a/warehouse/integrations/bigquery/bigquery_test.go b/warehouse/integrations/bigquery/bigquery_test.go index 43595b63e8..d567a2a3bd 100644 --- a/warehouse/integrations/bigquery/bigquery_test.go +++ b/warehouse/integrations/bigquery/bigquery_test.go @@ -11,6 +11,8 @@ import ( "testing" "time" + th "github.com/rudderlabs/rudder-server/testhelper" + "cloud.google.com/go/bigquery" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" @@ -25,7 +27,6 @@ import ( kithelper "github.com/rudderlabs/rudder-go-kit/testhelper" backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/runner" - th "github.com/rudderlabs/rudder-server/testhelper" "github.com/rudderlabs/rudder-server/testhelper/health" "github.com/rudderlabs/rudder-server/testhelper/workspaceConfig" "github.com/rudderlabs/rudder-server/utils/misc" @@ -78,7 +79,7 @@ func TestIntegration(t *testing.T) { escapedCredentialsTrimmedStr := strings.Trim(string(escapedCredentials), `"`) - bootstrapSvc := func(t *testing.T, enableMerge bool) *bigquery.Client { + bootstrapSvc := func(t *testing.T, preferAppend bool) *bigquery.Client { templateConfigurations := map[string]any{ "workspaceID": workspaceID, "sourceID": sourceID, @@ -93,7 +94,7 @@ func TestIntegration(t *testing.T) { "bucketName": bqTestCredentials.BucketName, "credentials": escapedCredentialsTrimmedStr, "sourcesNamespace": sourcesNamespace, - "enableMerge": enableMerge, + "preferAppend": preferAppend, } workspaceConfigPath := workspaceConfig.CreateTempFile(t, "testdata/template.json", templateConfigurations) @@ -151,7 +152,7 @@ func TestIntegration(t *testing.T) { sourceJob bool skipModifiedEvents bool prerequisite func(context.Context, testing.TB, *bigquery.Client) - enableMerge bool + preferAppend bool customPartitionsEnabledWorkspaceIDs string stagingFilePrefix string }{ @@ -169,7 +170,7 @@ func TestIntegration(t *testing.T) { loadFilesEventsMap: loadFilesEventsMap(), tableUploadsEventsMap: tableUploadsEventsMap(), warehouseEventsMap: mergeEventsMap(), - enableMerge: true, + preferAppend: false, prerequisite: func(ctx context.Context, t testing.TB, db *bigquery.Client) { t.Helper() _ = db.Dataset(namespace).DeleteWithContents(ctx) @@ -193,7 +194,7 @@ func TestIntegration(t *testing.T) { tableUploadsEventsMap: whth.SourcesTableUploadsEventsMap(), warehouseEventsMap: whth.SourcesWarehouseEventsMap(), sourceJob: true, - enableMerge: false, + preferAppend: true, prerequisite: func(ctx context.Context, t testing.TB, db *bigquery.Client) { t.Helper() _ = db.Dataset(namespace).DeleteWithContents(ctx) @@ -215,7 +216,7 @@ func TestIntegration(t *testing.T) { tableUploadsEventsMap: tableUploadsEventsMap(), warehouseEventsMap: appendEventsMap(), skipModifiedEvents: true, - enableMerge: false, + preferAppend: true, prerequisite: func(ctx context.Context, t testing.TB, db *bigquery.Client) { t.Helper() _ = db.Dataset(namespace).DeleteWithContents(ctx) @@ -237,7 +238,7 @@ func TestIntegration(t *testing.T) { tableUploadsEventsMap: tableUploadsEventsMap(), warehouseEventsMap: appendEventsMap(), skipModifiedEvents: true, - enableMerge: false, + preferAppend: true, customPartitionsEnabledWorkspaceIDs: workspaceID, prerequisite: func(ctx context.Context, t testing.TB, db *bigquery.Client) { t.Helper() @@ -274,7 +275,7 @@ func TestIntegration(t *testing.T) { "RSERVER_WAREHOUSE_BIGQUERY_CUSTOM_PARTITIONS_ENABLED_WORKSPACE_IDS", tc.customPartitionsEnabledWorkspaceIDs, ) - db := bootstrapSvc(t, tc.enableMerge) + db := bootstrapSvc(t, tc.preferAppend) t.Cleanup(func() { for _, dataset := range []string{tc.schema} { @@ -542,12 +543,9 @@ func TestIntegration(t *testing.T) { loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse) - dedupWarehouse := th.Clone(t, warehouse) - dedupWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true - c := config.New() bq := whbigquery.New(c, logger.NOP) - err := bq.Setup(ctx, dedupWarehouse, mockUploader) + err := bq.Setup(ctx, warehouse, mockUploader) require.NoError(t, err) err = bq.CreateSchema(ctx) @@ -638,8 +636,11 @@ func TestIntegration(t *testing.T) { loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse) + appendWarehouse := th.Clone(t, warehouse) + appendWarehouse.Destination.Config[string(model.PreferAppendSetting)] = true + bq := whbigquery.New(config.New(), logger.NOP) - err := bq.Setup(ctx, warehouse, mockUploader) + err := bq.Setup(ctx, appendWarehouse, mockUploader) require.NoError(t, err) err = bq.CreateSchema(ctx) diff --git a/warehouse/integrations/deltalake/deltalake.go b/warehouse/integrations/deltalake/deltalake.go index 18d0f65246..8b3342fd85 100644 --- a/warehouse/integrations/deltalake/deltalake.go +++ b/warehouse/integrations/deltalake/deltalake.go @@ -1387,5 +1387,5 @@ func (*Deltalake) DeleteBy(context.Context, []string, warehouseutils.DeleteByPar // * the user opted in to merging and we allow merging func (d *Deltalake) ShouldMerge() bool { return !d.Uploader.CanAppend() || - (d.config.allowMerge && d.Warehouse.GetEnableMergeSetting()) + (d.config.allowMerge && !d.Warehouse.GetPreferAppendSetting()) } diff --git a/warehouse/integrations/deltalake/deltalake_test.go b/warehouse/integrations/deltalake/deltalake_test.go index 2fc4ea20ab..7577217f90 100644 --- a/warehouse/integrations/deltalake/deltalake_test.go +++ b/warehouse/integrations/deltalake/deltalake_test.go @@ -117,7 +117,7 @@ func TestIntegration(t *testing.T) { db := sql.OpenDB(connector) require.NoError(t, db.Ping()) - bootstrapSvc := func(t *testing.T, enableMerge bool) { + bootstrapSvc := func(t *testing.T, preferAppend bool) { templateConfigurations := map[string]any{ "workspaceID": workspaceID, "sourceID": sourceID, @@ -131,7 +131,7 @@ func TestIntegration(t *testing.T) { "containerName": deltaLakeCredentials.ContainerName, "accountName": deltaLakeCredentials.AccountName, "accountKey": deltaLakeCredentials.AccountKey, - "enableMerge": enableMerge, + "preferAppend": preferAppend, } workspaceConfigPath := workspaceConfig.CreateTempFile(t, "testdata/template.json", templateConfigurations) @@ -186,7 +186,7 @@ func TestIntegration(t *testing.T) { destinationID string messageID string warehouseEventsMap whth.EventsCountMap - enableMerge bool + preferAppend bool useParquetLoadFiles bool stagingFilePrefix string jobRunID string @@ -198,7 +198,7 @@ func TestIntegration(t *testing.T) { sourceID: sourceID, destinationID: destinationID, warehouseEventsMap: mergeEventsMap(), - enableMerge: true, + preferAppend: false, useParquetLoadFiles: false, stagingFilePrefix: "testdata/upload-job-merge-mode", jobRunID: misc.FastUUID().String(), @@ -210,7 +210,7 @@ func TestIntegration(t *testing.T) { sourceID: sourceID, destinationID: destinationID, warehouseEventsMap: appendEventsMap(), - enableMerge: false, + preferAppend: true, useParquetLoadFiles: false, stagingFilePrefix: "testdata/upload-job-append-mode", // an empty jobRunID means that the source is not an ETL one @@ -224,7 +224,7 @@ func TestIntegration(t *testing.T) { sourceID: sourceID, destinationID: destinationID, warehouseEventsMap: mergeEventsMap(), - enableMerge: true, + preferAppend: false, useParquetLoadFiles: true, stagingFilePrefix: "testdata/upload-job-parquet", jobRunID: misc.FastUUID().String(), @@ -234,7 +234,7 @@ func TestIntegration(t *testing.T) { for _, tc := range testCases { tc := tc t.Run(tc.name, func(t *testing.T) { - bootstrapSvc(t, tc.enableMerge) + bootstrapSvc(t, tc.preferAppend) t.Setenv("RSERVER_WAREHOUSE_DELTALAKE_USE_PARQUET_LOAD_FILES", strconv.FormatBool(tc.useParquetLoadFiles)) sqlClient := &warehouseclient.Client{ @@ -564,7 +564,7 @@ func TestIntegration(t *testing.T) { mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv, true, true, "2022-12-15T06:53:49.640Z") mergeWarehouse := th.Clone(t, warehouse) - mergeWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true + mergeWarehouse.Destination.Config[string(model.PreferAppendSetting)] = true d := deltalake.New(config.New(), logger.NOP, memstats.New()) err := d.Setup(ctx, mergeWarehouse, mockUploader) @@ -614,7 +614,7 @@ func TestIntegration(t *testing.T) { mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv, false, false, "2022-11-15T06:53:49.640Z") mergeWarehouse := th.Clone(t, warehouse) - mergeWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true + mergeWarehouse.Destination.Config[string(model.PreferAppendSetting)] = true d := deltalake.New(config.New(), logger.NOP, memstats.New()) err := d.Setup(ctx, mergeWarehouse, mockUploader) @@ -1057,35 +1057,35 @@ func TestDeltalake_TrimErrorMessage(t *testing.T) { func TestDeltalake_ShouldMerge(t *testing.T) { testCases := []struct { name string - enableMerge bool + preferAppend bool uploaderCanAppend bool uploaderExpectedCalls int expected bool }{ { - name: "uploader says we can append and merge is not enabled", - enableMerge: false, + name: "uploader says we can append and user prefers to append", + preferAppend: true, uploaderCanAppend: true, uploaderExpectedCalls: 1, expected: false, }, { - name: "uploader says we can append and merge is enabled", - enableMerge: true, + name: "uploader says we can append and users prefers not to append", + preferAppend: false, uploaderCanAppend: true, uploaderExpectedCalls: 1, expected: true, }, { - name: "uploader says we cannot append so enableMerge false is ignored", - enableMerge: false, + name: "uploader says we cannot append and user prefers to append", + preferAppend: true, uploaderCanAppend: false, uploaderExpectedCalls: 1, expected: true, }, { - name: "uploader says we cannot append so enableMerge true is ignored", - enableMerge: true, + name: "uploader says we cannot append and users prefers not to append", + preferAppend: false, uploaderCanAppend: false, uploaderExpectedCalls: 1, expected: true, @@ -1098,7 +1098,7 @@ func TestDeltalake_ShouldMerge(t *testing.T) { d.Warehouse = model.Warehouse{ Destination: backendconfig.DestinationT{ Config: map[string]any{ - string(model.EnableMergeSetting): tc.enableMerge, + string(model.PreferAppendSetting): tc.preferAppend, }, }, } diff --git a/warehouse/integrations/postgres/load.go b/warehouse/integrations/postgres/load.go index af1797a080..3b02ba0f44 100644 --- a/warehouse/integrations/postgres/load.go +++ b/warehouse/integrations/postgres/load.go @@ -556,6 +556,6 @@ func (pg *Postgres) loadUsersTable( func (pg *Postgres) shouldMerge() bool { return !pg.Uploader.CanAppend() || (pg.config.allowMerge && - pg.Warehouse.GetEnableMergeSetting() && + !pg.Warehouse.GetPreferAppendSetting() && !slices.Contains(pg.config.skipDedupDestinationIDs, pg.Warehouse.Destination.ID)) } diff --git a/warehouse/integrations/postgres/postgres_test.go b/warehouse/integrations/postgres/postgres_test.go index c225bba767..c06f3589a4 100644 --- a/warehouse/integrations/postgres/postgres_test.go +++ b/warehouse/integrations/postgres/postgres_test.go @@ -582,11 +582,11 @@ func TestIntegration(t *testing.T) { c := config.New() c.Set("Warehouse.postgres.EnableSQLStatementExecutionPlanWorkspaceIDs", workspaceID) - mergeWarehouse := th.Clone(t, warehouse) - mergeWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true + appendWarehouse := th.Clone(t, warehouse) + appendWarehouse.Destination.Config[string(model.PreferAppendSetting)] = true pg := postgres.New(c, logger.NOP, memstats.New()) - err := pg.Setup(ctx, mergeWarehouse, mockUploader) + err := pg.Setup(ctx, appendWarehouse, mockUploader) require.NoError(t, err) err = pg.CreateSchema(ctx) @@ -635,11 +635,8 @@ func TestIntegration(t *testing.T) { c := config.New() c.Set("Warehouse.postgres.EnableSQLStatementExecutionPlanWorkspaceIDs", workspaceID) - mergeWarehouse := th.Clone(t, warehouse) - mergeWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true - pg := postgres.New(config.New(), logger.NOP, memstats.New()) - err := pg.Setup(ctx, mergeWarehouse, mockUploader) + err := pg.Setup(ctx, warehouse, mockUploader) require.NoError(t, err) err = pg.CreateSchema(ctx) diff --git a/warehouse/integrations/redshift/redshift.go b/warehouse/integrations/redshift/redshift.go index 9bd48c3a68..263dab06ff 100644 --- a/warehouse/integrations/redshift/redshift.go +++ b/warehouse/integrations/redshift/redshift.go @@ -1329,7 +1329,7 @@ func (rs *Redshift) SetConnectionTimeout(timeout time.Duration) { func (rs *Redshift) shouldMerge() bool { return !rs.Uploader.CanAppend() || (rs.config.allowMerge && - rs.Warehouse.GetEnableMergeSetting() && + !rs.Warehouse.GetPreferAppendSetting() && !slices.Contains(rs.config.skipDedupDestinationIDs, rs.Warehouse.Destination.ID)) } diff --git a/warehouse/integrations/redshift/redshift_test.go b/warehouse/integrations/redshift/redshift_test.go index 3de39467ba..ea48623b78 100644 --- a/warehouse/integrations/redshift/redshift_test.go +++ b/warehouse/integrations/redshift/redshift_test.go @@ -13,6 +13,8 @@ import ( "testing" "time" + th "github.com/rudderlabs/rudder-server/testhelper" + "github.com/golang/mock/gomock" "github.com/lib/pq" "github.com/ory/dockertest/v3" @@ -28,7 +30,6 @@ import ( "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource" backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/runner" - th "github.com/rudderlabs/rudder-server/testhelper" "github.com/rudderlabs/rudder-server/testhelper/health" "github.com/rudderlabs/rudder-server/testhelper/workspaceConfig" "github.com/rudderlabs/rudder-server/utils/misc" @@ -515,11 +516,8 @@ func TestIntegration(t *testing.T) { loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv) - mergeWarehouse := th.Clone(t, warehouse) - mergeWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true - d := redshift.New(config.New(), logger.NOP, memstats.New()) - err := d.Setup(ctx, mergeWarehouse, mockUploader) + err := d.Setup(ctx, warehouse, mockUploader) require.NoError(t, err) err = d.CreateSchema(ctx) @@ -563,15 +561,12 @@ func TestIntegration(t *testing.T) { loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv) - mergeWarehouse := th.Clone(t, warehouse) - mergeWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true - c := config.New() c.Set("Warehouse.redshift.dedupWindow", true) c.Set("Warehouse.redshift.dedupWindowInHours", 999999) d := redshift.New(c, logger.NOP, memstats.New()) - err := d.Setup(ctx, mergeWarehouse, mockUploader) + err := d.Setup(ctx, warehouse, mockUploader) require.NoError(t, err) err = d.CreateSchema(ctx) @@ -615,15 +610,12 @@ func TestIntegration(t *testing.T) { loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv) - mergeWarehouse := th.Clone(t, warehouse) - mergeWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true - c := config.New() c.Set("Warehouse.redshift.dedupWindow", true) c.Set("Warehouse.redshift.dedupWindowInHours", 0) d := redshift.New(c, logger.NOP, memstats.New()) - err := d.Setup(ctx, mergeWarehouse, mockUploader) + err := d.Setup(ctx, warehouse, mockUploader) require.NoError(t, err) err = d.CreateSchema(ctx) @@ -672,8 +664,11 @@ func TestIntegration(t *testing.T) { c := config.New() c.Set("Warehouse.redshift.skipDedupDestinationIDs", []string{destinationID}) + appendWarehouse := th.Clone(t, warehouse) + appendWarehouse.Destination.Config[string(model.PreferAppendSetting)] = true + rs := redshift.New(c, logger.NOP, memstats.New()) - err := rs.Setup(ctx, warehouse, mockUploader) + err := rs.Setup(ctx, appendWarehouse, mockUploader) require.NoError(t, err) err = rs.CreateSchema(ctx) diff --git a/warehouse/internal/model/warehouse.go b/warehouse/internal/model/warehouse.go index b18a2072ec..4488923785 100644 --- a/warehouse/internal/model/warehouse.go +++ b/warehouse/internal/model/warehouse.go @@ -36,7 +36,7 @@ func (w *Warehouse) GetPreferAppendSetting() bool { destConfig := w.Destination.Config value, ok := destConfig[PreferAppendSetting.string()].(bool) if !ok { - return true // default value for backwards compatibility + return false // default value for backwards compatibility } return value } From 10ec77928ea4b6c1db5b76f25a480586c503861e Mon Sep 17 00:00:00 2001 From: Francesco Casula Date: Tue, 7 Nov 2023 16:10:18 +0100 Subject: [PATCH 04/11] chore: updating sf test --- warehouse/integrations/snowflake/snowflake_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/warehouse/integrations/snowflake/snowflake_test.go b/warehouse/integrations/snowflake/snowflake_test.go index d7f41c1392..45d81c6d41 100644 --- a/warehouse/integrations/snowflake/snowflake_test.go +++ b/warehouse/integrations/snowflake/snowflake_test.go @@ -27,6 +27,7 @@ import ( kithelper "github.com/rudderlabs/rudder-go-kit/testhelper" backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/runner" + th "github.com/rudderlabs/rudder-server/testhelper" "github.com/rudderlabs/rudder-server/testhelper/health" "github.com/rudderlabs/rudder-server/testhelper/workspaceConfig" "github.com/rudderlabs/rudder-server/utils/misc" @@ -662,9 +663,12 @@ func TestIntegration(t *testing.T) { loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, true, false) + appendWarehouse := th.Clone(t, warehouse) + appendWarehouse.Destination.Config[string(model.PreferAppendSetting)] = true + sf, err := snowflake.New(config.New(), logger.NOP, memstats.New()) require.NoError(t, err) - err = sf.Setup(ctx, warehouse, mockUploader) + err = sf.Setup(ctx, appendWarehouse, mockUploader) require.NoError(t, err) err = sf.CreateSchema(ctx) From b4d3255783aa7cfabc2af2fe00f2a92a1a27ab9c Mon Sep 17 00:00:00 2001 From: Francesco Casula Date: Tue, 7 Nov 2023 16:27:20 +0100 Subject: [PATCH 05/11] chore: updating pg test --- .../integrations/postgres/postgres_test.go | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/warehouse/integrations/postgres/postgres_test.go b/warehouse/integrations/postgres/postgres_test.go index c06f3589a4..6895a546ae 100644 --- a/warehouse/integrations/postgres/postgres_test.go +++ b/warehouse/integrations/postgres/postgres_test.go @@ -571,9 +571,8 @@ func TestIntegration(t *testing.T) { require.Nil(t, loadTableStat) }) t.Run("merge", func(t *testing.T) { - tableName := "merge_test_table" - t.Run("without dedup", func(t *testing.T) { + tableName := "merge_without_dedup_test_table" uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} @@ -602,8 +601,8 @@ func TestIntegration(t *testing.T) { loadTableStat, err = pg.LoadTable(ctx, tableName) require.NoError(t, err) - require.Equal(t, loadTableStat.RowsInserted, int64(0)) - require.Equal(t, loadTableStat.RowsUpdated, int64(14)) + require.Equal(t, loadTableStat.RowsInserted, int64(14)) + require.Equal(t, loadTableStat.RowsUpdated, int64(0)) records := whth.RetrieveRecordsFromWarehouse(t, pg.DB.DB, fmt.Sprintf(` @@ -624,9 +623,10 @@ func TestIntegration(t *testing.T) { tableName, ), ) - require.Equal(t, records, whth.SampleTestRecords()) + require.Equal(t, records, whth.AppendTestRecords()) }) t.Run("with dedup", func(t *testing.T) { + tableName := "merge_with_dedup_test_table" uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/dedup.csv.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} @@ -647,6 +647,11 @@ func TestIntegration(t *testing.T) { loadTableStat, err := pg.LoadTable(ctx, tableName) require.NoError(t, err) + require.Equal(t, loadTableStat.RowsInserted, int64(14)) + require.Equal(t, loadTableStat.RowsUpdated, int64(0)) + + loadTableStat, err = pg.LoadTable(ctx, tableName) + require.NoError(t, err) require.Equal(t, loadTableStat.RowsInserted, int64(0)) require.Equal(t, loadTableStat.RowsUpdated, int64(14)) @@ -683,8 +688,11 @@ func TestIntegration(t *testing.T) { c := config.New() c.Set("Warehouse.postgres.skipDedupDestinationIDs", destinationID) + appendWarehouse := th.Clone(t, warehouse) + appendWarehouse.Destination.Config[string(model.PreferAppendSetting)] = true + pg := postgres.New(c, logger.NOP, memstats.New()) - err := pg.Setup(ctx, warehouse, mockUploader) + err := pg.Setup(ctx, appendWarehouse, mockUploader) require.NoError(t, err) err = pg.CreateSchema(ctx) From 94c88efcba83ce26b737d1137a423ec1dbe5fa98 Mon Sep 17 00:00:00 2001 From: Francesco Casula Date: Tue, 7 Nov 2023 16:43:57 +0100 Subject: [PATCH 06/11] chore: updating redshift test --- warehouse/integrations/redshift/redshift_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/warehouse/integrations/redshift/redshift_test.go b/warehouse/integrations/redshift/redshift_test.go index ea48623b78..c5130ccf1d 100644 --- a/warehouse/integrations/redshift/redshift_test.go +++ b/warehouse/integrations/redshift/redshift_test.go @@ -471,8 +471,11 @@ func TestIntegration(t *testing.T) { loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv) + appendWarehouse := th.Clone(t, warehouse) + appendWarehouse.Destination.Config[string(model.PreferAppendSetting)] = true + d := redshift.New(config.New(), logger.NOP, memstats.New()) - err := d.Setup(ctx, warehouse, mockUploader) + err := d.Setup(ctx, appendWarehouse, mockUploader) require.NoError(t, err) err = d.CreateSchema(ctx) From 6f565fe0bc39591e2dc75c27ec810038cbabd346 Mon Sep 17 00:00:00 2001 From: Francesco Casula Date: Tue, 7 Nov 2023 16:47:52 +0100 Subject: [PATCH 07/11] chore: updating bigquery test --- warehouse/integrations/bigquery/bigquery_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/warehouse/integrations/bigquery/bigquery_test.go b/warehouse/integrations/bigquery/bigquery_test.go index d567a2a3bd..21b9128cfb 100644 --- a/warehouse/integrations/bigquery/bigquery_test.go +++ b/warehouse/integrations/bigquery/bigquery_test.go @@ -588,9 +588,12 @@ func TestIntegration(t *testing.T) { loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse) + appendWarehouse := th.Clone(t, warehouse) + appendWarehouse.Destination.Config[string(model.PreferAppendSetting)] = true + c := config.New() bq := whbigquery.New(c, logger.NOP) - err := bq.Setup(ctx, warehouse, mockUploader) + err := bq.Setup(ctx, appendWarehouse, mockUploader) require.NoError(t, err) err = bq.CreateSchema(ctx) From 823459d6b3c862cd264268b1caa74893e2c82f1a Mon Sep 17 00:00:00 2001 From: Francesco Casula Date: Tue, 7 Nov 2023 16:55:29 +0100 Subject: [PATCH 08/11] chore: updating deltalake test --- warehouse/integrations/deltalake/deltalake_test.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/warehouse/integrations/deltalake/deltalake_test.go b/warehouse/integrations/deltalake/deltalake_test.go index 7577217f90..ff893e2ea5 100644 --- a/warehouse/integrations/deltalake/deltalake_test.go +++ b/warehouse/integrations/deltalake/deltalake_test.go @@ -563,11 +563,8 @@ func TestIntegration(t *testing.T) { loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv, true, true, "2022-12-15T06:53:49.640Z") - mergeWarehouse := th.Clone(t, warehouse) - mergeWarehouse.Destination.Config[string(model.PreferAppendSetting)] = true - d := deltalake.New(config.New(), logger.NOP, memstats.New()) - err := d.Setup(ctx, mergeWarehouse, mockUploader) + err := d.Setup(ctx, warehouse, mockUploader) require.NoError(t, err) err = d.CreateSchema(ctx) From c30c15c5606dbca5aee97121212e53e00c4000ba Mon Sep 17 00:00:00 2001 From: Francesco Casula Date: Tue, 7 Nov 2023 17:19:38 +0100 Subject: [PATCH 09/11] chore: updating templates --- warehouse/integrations/bigquery/testdata/template.json | 4 ++-- warehouse/integrations/deltalake/testdata/template.json | 2 +- warehouse/integrations/snowflake/testdata/template.json | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/warehouse/integrations/bigquery/testdata/template.json b/warehouse/integrations/bigquery/testdata/template.json index c6eac2a526..fda8d690c1 100644 --- a/warehouse/integrations/bigquery/testdata/template.json +++ b/warehouse/integrations/bigquery/testdata/template.json @@ -32,7 +32,7 @@ "prefix": "", "namespace": "{{.namespace}}", "syncFrequency": "30", - "enableMerge": {{.enableMerge}} + "preferAppend": {{.preferAppend}} }, "liveEventsConfig": {}, "secretConfig": {}, @@ -161,7 +161,7 @@ "prefix": "", "namespace": "{{.sourcesNamespace}}", "syncFrequency": "30", - "enableMerge": {{.enableMerge}} + "preferAppend": {{.preferAppend}} }, "liveEventsConfig": {}, "secretConfig": {}, diff --git a/warehouse/integrations/deltalake/testdata/template.json b/warehouse/integrations/deltalake/testdata/template.json index 2b796aa448..8531dd273a 100644 --- a/warehouse/integrations/deltalake/testdata/template.json +++ b/warehouse/integrations/deltalake/testdata/template.json @@ -39,7 +39,7 @@ "syncFrequency": "30", "eventDelivery": false, "eventDeliveryTS": 1648195480174, - "enableMerge": {{.enableMerge}} + "preferAppend": {{.preferAppend}} }, "liveEventsConfig": { "eventDelivery": false, diff --git a/warehouse/integrations/snowflake/testdata/template.json b/warehouse/integrations/snowflake/testdata/template.json index 73d113e764..861c612ead 100644 --- a/warehouse/integrations/snowflake/testdata/template.json +++ b/warehouse/integrations/snowflake/testdata/template.json @@ -40,7 +40,7 @@ "syncFrequency": "30", "enableSSE": false, "useRudderStorage": false, - "enableMerge": {{.enableMerge}} + "preferAppend": {{.preferAppend}} }, "liveEventsConfig": {}, "secretConfig": {}, @@ -165,7 +165,7 @@ "syncFrequency": "30", "enableSSE": false, "useRudderStorage": false, - "enableMerge": {{.enableMerge}} + "preferAppend": {{.preferAppend}} }, "liveEventsConfig": {}, "secretConfig": {}, @@ -291,7 +291,7 @@ "syncFrequency": "30", "enableSSE": false, "useRudderStorage": false, - "enableMerge": {{.enableMerge}} + "preferAppend": {{.preferAppend}} }, "liveEventsConfig": {}, "secretConfig": {}, @@ -442,7 +442,7 @@ "syncFrequency": "30", "enableSSE": false, "useRudderStorage": false, - "enableMerge": {{.enableMerge}} + "preferAppend": {{.preferAppend}} }, "liveEventsConfig": {}, "secretConfig": {}, From 3b4ca9536a42536600119e7f730edf850bb144bd Mon Sep 17 00:00:00 2001 From: Francesco Casula Date: Tue, 7 Nov 2023 17:26:58 +0100 Subject: [PATCH 10/11] chore: updating sf tests --- warehouse/integrations/snowflake/snowflake_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/warehouse/integrations/snowflake/snowflake_test.go b/warehouse/integrations/snowflake/snowflake_test.go index 45d81c6d41..628f49022f 100644 --- a/warehouse/integrations/snowflake/snowflake_test.go +++ b/warehouse/integrations/snowflake/snowflake_test.go @@ -760,9 +760,12 @@ func TestIntegration(t *testing.T) { loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, true, false) + appendWarehouse := th.Clone(t, warehouse) + appendWarehouse.Destination.Config[string(model.PreferAppendSetting)] = true + sf, err := snowflake.New(config.New(), logger.NOP, memstats.New()) require.NoError(t, err) - err = sf.Setup(ctx, warehouse, mockUploader) + err = sf.Setup(ctx, appendWarehouse, mockUploader) require.NoError(t, err) err = sf.CreateSchema(ctx) From 14cadbe857d59b53305c3dc1326ee0475acb8620 Mon Sep 17 00:00:00 2001 From: Francesco Casula Date: Tue, 7 Nov 2023 17:30:08 +0100 Subject: [PATCH 11/11] chore: updating deltalake tests --- warehouse/integrations/deltalake/deltalake_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/warehouse/integrations/deltalake/deltalake_test.go b/warehouse/integrations/deltalake/deltalake_test.go index ff893e2ea5..98febf1b1c 100644 --- a/warehouse/integrations/deltalake/deltalake_test.go +++ b/warehouse/integrations/deltalake/deltalake_test.go @@ -661,8 +661,11 @@ func TestIntegration(t *testing.T) { loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv, true, false, "2022-12-15T06:53:49.640Z") + appendWarehouse := th.Clone(t, warehouse) + appendWarehouse.Destination.Config[string(model.PreferAppendSetting)] = true + d := deltalake.New(config.New(), logger.NOP, memstats.New()) - err := d.Setup(ctx, warehouse, mockUploader) + err := d.Setup(ctx, appendWarehouse, mockUploader) require.NoError(t, err) err = d.CreateSchema(ctx)