diff --git a/CHANGELOG.md b/CHANGELOG.md index d78485c679..c7f3e65a41 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,84 @@ # Changelog +## [1.16.2](https://github.com/rudderlabs/rudder-server/compare/v1.16.1...v1.16.2) (2023-11-08) + + +### Bug Fixes + +* preferAppend defaults to false if not defined ([#4098](https://github.com/rudderlabs/rudder-server/issues/4098)) ([941caba](https://github.com/rudderlabs/rudder-server/commit/941cabad94b95da64f1045f119a64fc014263698)) + +## [1.16.1](https://github.com/rudderlabs/rudder-server/compare/v1.16.0...v1.16.1) (2023-11-06) + + +### Bug Fixes + +* sorting key for error indexing during parquet ([#4078](https://github.com/rudderlabs/rudder-server/issues/4078)) ([a029a81](https://github.com/rudderlabs/rudder-server/commit/a029a81b76e910990a8af89b559be8ede2c1ed9c)) + +## [1.16.0](https://github.com/rudderlabs/rudder-server/compare/v1.15.1...v1.16.0) (2023-10-31) + + +### Features + +* error index reporting implementation ([#3948](https://github.com/rudderlabs/rudder-server/issues/3948)) ([51220da](https://github.com/rudderlabs/rudder-server/commit/51220da8fb3bb461e92584ec78304393bf26c563)) +* grpc middleware for stats ([#4030](https://github.com/rudderlabs/rudder-server/issues/4030)) ([a524cbc](https://github.com/rudderlabs/rudder-server/commit/a524cbcc2d3bb7b7eb5322b2c15fc9975df20844)) +* introduce pagination in failed-keys endpoint ([#3967](https://github.com/rudderlabs/rudder-server/issues/3967)) ([aa8044a](https://github.com/rudderlabs/rudder-server/commit/aa8044ac4159a3af2dc4dc280f71a6138ff57b83)) +* introducing chi middleware for warehouse ([#4010](https://github.com/rudderlabs/rudder-server/issues/4010)) ([4d9f9b3](https://github.com/rudderlabs/rudder-server/commit/4d9f9b3cec78a858490def23e8f18b30ad255e95)) +* oauth access denied handling ([#3960](https://github.com/rudderlabs/rudder-server/issues/3960)) ([a53a127](https://github.com/rudderlabs/rudder-server/commit/a53a127cb63498575fa9f1c06375444c4a01e71f)) +* **processor:** added ability for geolocation enrichment during pipeline processing ([#3866](https://github.com/rudderlabs/rudder-server/issues/3866)) ([28497cf](https://github.com/rudderlabs/rudder-server/commit/28497cf690e285406987d6abb7d9cb0f168c0408)) +* push error index metadata ([#4002](https://github.com/rudderlabs/rudder-server/issues/4002)) ([72423dd](https://github.com/rudderlabs/rudder-server/commit/72423dd45277ff44f593b7673c12ffa48c9e68fa)) +* report failed messages in processor, router and batchrouter ([#3914](https://github.com/rudderlabs/rudder-server/issues/3914)) ([51220da](https://github.com/rudderlabs/rudder-server/commit/51220da8fb3bb461e92584ec78304393bf26c563)) +* use append vs merge option from backend config ([#3965](https://github.com/rudderlabs/rudder-server/issues/3965)) ([6d2db45](https://github.com/rudderlabs/rudder-server/commit/6d2db454218ac53cd87b43c21fcfbedf37758e50)) + + +### Bug Fixes + +* add error log for reporting metrics ([#3978](https://github.com/rudderlabs/rudder-server/issues/3978)) ([0963193](https://github.com/rudderlabs/rudder-server/commit/0963193913b0426acca2d53637094352a880eb8c)) +* config for stash payload limit ([#4001](https://github.com/rudderlabs/rudder-server/issues/4001)) ([f4c93ce](https://github.com/rudderlabs/rudder-server/commit/f4c93ce9547b09a8db1932379fc6ae75d08ea56b)) +* corrupted rsources stats captured by processor for dropped jobs ([#3999](https://github.com/rudderlabs/rudder-server/issues/3999)) ([e74cd7d](https://github.com/rudderlabs/rudder-server/commit/e74cd7d65fedb619e716051435cc9315570ec0c3)) +* error index filtering for timestamp fields use int64 ([#4062](https://github.com/rudderlabs/rudder-server/issues/4062)) ([17590a6](https://github.com/rudderlabs/rudder-server/commit/17590a60210e2fbeeed4673d64602daa8ff7aee7)) +* flaky validations tests ([#4012](https://github.com/rudderlabs/rudder-server/issues/4012)) ([3b157e3](https://github.com/rudderlabs/rudder-server/commit/3b157e32f6244d78df7e9ec6767e164d08da07bb)) +* invalid memory address or nil pointer dereference in googlecloudfunction ([#4003](https://github.com/rudderlabs/rudder-server/issues/4003)) ([37690ed](https://github.com/rudderlabs/rudder-server/commit/37690ede22bc4bfabdcb942375bbf653cbd9a405)) +* merge error blocks in gcf ([#4004](https://github.com/rudderlabs/rudder-server/issues/4004)) ([03a4c26](https://github.com/rudderlabs/rudder-server/commit/03a4c269bf083e5937fdd0ee49e6d329621529ba)) +* update error parsing of eloqua ([#3996](https://github.com/rudderlabs/rudder-server/issues/3996)) ([e74cd7d](https://github.com/rudderlabs/rudder-server/commit/e74cd7d65fedb619e716051435cc9315570ec0c3)) +* validations tests ([3b157e3](https://github.com/rudderlabs/rudder-server/commit/3b157e32f6244d78df7e9ec6767e164d08da07bb)) + + +### Miscellaneous + +* add workspaceID to router discarded stats ([#3977](https://github.com/rudderlabs/rudder-server/issues/3977)) ([51220da](https://github.com/rudderlabs/rudder-server/commit/51220da8fb3bb461e92584ec78304393bf26c563)) +* advertise gzip support to transformer through X-Feature-Gzip-Support header ([#3990](https://github.com/rudderlabs/rudder-server/issues/3990)) ([10c0ffe](https://github.com/rudderlabs/rudder-server/commit/10c0ffe63c7d472112e3316bd05157b85a2dccc5)) +* avoid using global conf during tests ([#4046](https://github.com/rudderlabs/rudder-server/issues/4046)) ([4e3d477](https://github.com/rudderlabs/rudder-server/commit/4e3d477029ec0f93c1ce0965e0c74d3e52b33325)) +* bump rudder-go-kit to 1.16.2 ([#4026](https://github.com/rudderlabs/rudder-server/issues/4026)) ([be29d5b](https://github.com/rudderlabs/rudder-server/commit/be29d5b8e39c0193e133e1a15fd7621e490dadb9)) +* collect only drained failed keys at router ([#3930](https://github.com/rudderlabs/rudder-server/issues/3930)) ([51220da](https://github.com/rudderlabs/rudder-server/commit/51220da8fb3bb461e92584ec78304393bf26c563)) +* **deps:** bump github.com/confluentinc/confluent-kafka-go/v2 from 2.2.0 to 2.3.0 ([#4024](https://github.com/rudderlabs/rudder-server/issues/4024)) ([5192a09](https://github.com/rudderlabs/rudder-server/commit/5192a093429620101e6eb4e3c9cc25eb729c7d22)) +* **deps:** bump github.com/google/uuid from 1.3.1 to 1.4.0 ([#4022](https://github.com/rudderlabs/rudder-server/issues/4022)) ([a4dd910](https://github.com/rudderlabs/rudder-server/commit/a4dd910e39c5982de4f748fe671474783aca73e1)) +* **deps:** bump github.com/hashicorp/golang-lru/v2 from 2.0.6 to 2.0.7 ([#4023](https://github.com/rudderlabs/rudder-server/issues/4023)) ([8e0796a](https://github.com/rudderlabs/rudder-server/commit/8e0796a7b32dd9bf0be3508453e7cf141417a6d7)) +* **deps:** bump github.com/rs/cors from 1.10.0 to 1.10.1 ([#4017](https://github.com/rudderlabs/rudder-server/issues/4017)) ([4010776](https://github.com/rudderlabs/rudder-server/commit/4010776763f8712265e758c538ed5f6322e65209)) +* **deps:** bump github.com/snowflakedb/gosnowflake from 1.6.24 to 1.6.25 ([#4025](https://github.com/rudderlabs/rudder-server/issues/4025)) ([43add27](https://github.com/rudderlabs/rudder-server/commit/43add27fd1d059a64adca25b47801e4de0042020)) +* **deps:** bump github.com/trinodb/trino-go-client from 0.312.0 to 0.313.0 ([#4016](https://github.com/rudderlabs/rudder-server/issues/4016)) ([1ee9f56](https://github.com/rudderlabs/rudder-server/commit/1ee9f56badacdda32bbe840cafe098cc79026aa7)) +* **deps:** bump go.uber.org/goleak from 1.2.1 to 1.3.0 ([#4019](https://github.com/rudderlabs/rudder-server/issues/4019)) ([f9c9615](https://github.com/rudderlabs/rudder-server/commit/f9c961594c8130d40ff9e17c6af0eb565a1364e7)) +* **deps:** bump google.golang.org/grpc from 1.58.2 to 1.58.3 ([#4011](https://github.com/rudderlabs/rudder-server/issues/4011)) ([3840612](https://github.com/rudderlabs/rudder-server/commit/3840612009152612c9f34d1a5640829e846d4f6a)) +* enable errcheck and unparam linters for warehouse ([#3970](https://github.com/rudderlabs/rudder-server/issues/3970)) ([51220da](https://github.com/rudderlabs/rudder-server/commit/51220da8fb3bb461e92584ec78304393bf26c563)) +* error index reporter improvements ([#3994](https://github.com/rudderlabs/rudder-server/issues/3994)) ([55f0737](https://github.com/rudderlabs/rudder-server/commit/55f0737b25f0084141b879f5d038851b9c295c01)) +* fix remove warehouse jobs panic ([#3982](https://github.com/rudderlabs/rudder-server/issues/3982)) ([6e0729c](https://github.com/rudderlabs/rudder-server/commit/6e0729c571c05f0d0ecdc5eff405e640756094e3)) +* flaky error index report test ([#3988](https://github.com/rudderlabs/rudder-server/issues/3988)) ([9b7157f](https://github.com/rudderlabs/rudder-server/commit/9b7157f2abfe0f5c6f121dc6ce1b0750bd598ade)) +* flaky tests ([#3989](https://github.com/rudderlabs/rudder-server/issues/3989)) ([4db35e9](https://github.com/rudderlabs/rudder-server/commit/4db35e986223a27f532f0990b515305d8293d129)) +* flaky validations test ([#4027](https://github.com/rudderlabs/rudder-server/issues/4027)) ([3ee4c7b](https://github.com/rudderlabs/rudder-server/commit/3ee4c7b471943dbe4b551db2cd7c26ae6fccd7ea)) +* go kit v1.16.0 ([#4014](https://github.com/rudderlabs/rudder-server/issues/4014)) ([f200683](https://github.com/rudderlabs/rudder-server/commit/f200683a81ed195916aea94b78709b2ff8120d8c)) +* increase archiver postgres shm size ([#4040](https://github.com/rudderlabs/rudder-server/issues/4040)) ([bd855f6](https://github.com/rudderlabs/rudder-server/commit/bd855f67c0d366c940faab8e4372816e2d44fa7d)) +* migrate to minio resource from rudder-go kit ([#4028](https://github.com/rudderlabs/rudder-server/issues/4028)) ([3ba0260](https://github.com/rudderlabs/rudder-server/commit/3ba026085338ebfe60f1444c87844fa9d932b4e1)) +* minor scheduler cleanup ([#4032](https://github.com/rudderlabs/rudder-server/issues/4032)) ([2cc9470](https://github.com/rudderlabs/rudder-server/commit/2cc9470b46e5110b1fcceaabe33f8f761d2cd815)) +* minor tunneling cleanup ([#4034](https://github.com/rudderlabs/rudder-server/issues/4034)) ([877eb70](https://github.com/rudderlabs/rudder-server/commit/877eb70b0f60501e4c995937bba60629dd22282e)) +* replace golang.org/x/exp/slices to slices ([#4031](https://github.com/rudderlabs/rudder-server/issues/4031)) ([f014c01](https://github.com/rudderlabs/rudder-server/commit/f014c01acc772dab8724365fd78caea10e5eb424)) +* replace varcheck and deadcode with unused linter ([#3968](https://github.com/rudderlabs/rudder-server/issues/3968)) ([51220da](https://github.com/rudderlabs/rudder-server/commit/51220da8fb3bb461e92584ec78304393bf26c563)) +* scheduler cleanup: avoid globals ([2cc9470](https://github.com/rudderlabs/rudder-server/commit/2cc9470b46e5110b1fcceaabe33f8f761d2cd815)) +* track long-running transformations in processor ([#3976](https://github.com/rudderlabs/rudder-server/issues/3976)) ([51220da](https://github.com/rudderlabs/rudder-server/commit/51220da8fb3bb461e92584ec78304393bf26c563)) +* tunneling cleanup ([877eb70](https://github.com/rudderlabs/rudder-server/commit/877eb70b0f60501e4c995937bba60629dd22282e)) +* upgrade go version to 1.21.3 ([#3973](https://github.com/rudderlabs/rudder-server/issues/3973)) ([11c3f28](https://github.com/rudderlabs/rudder-server/commit/11c3f28127467c7de67881ae5c257b1ee6e53524)) +* upgrade golangci-lint ([#4029](https://github.com/rudderlabs/rudder-server/issues/4029)) ([554a04a](https://github.com/rudderlabs/rudder-server/commit/554a04abc342f500c0284c4a1b6fbb0535d7708a)) +* upgrade urfave/cli v2 for rudder-cli ([#3980](https://github.com/rudderlabs/rudder-server/issues/3980)) ([d0d99bc](https://github.com/rudderlabs/rudder-server/commit/d0d99bcba31286a780dc734dbe924d6d86945bba)) +* use a normalised data model for storing failed keys ([#3961](https://github.com/rudderlabs/rudder-server/issues/3961)) ([aa8044a](https://github.com/rudderlabs/rudder-server/commit/aa8044ac4159a3af2dc4dc280f71a6138ff57b83)) + ## [1.15.4](https://github.com/rudderlabs/rudder-server/compare/v1.15.3...v1.15.4) (2023-10-23) diff --git a/enterprise/reporting/error_index/worker.go b/enterprise/reporting/error_index/worker.go index a2797be901..4dabdbe315 100644 --- a/enterprise/reporting/error_index/worker.go +++ b/enterprise/reporting/error_index/worker.go @@ -263,7 +263,7 @@ func (w *worker) encodeToParquet(wr io.Writer, payloads []payload) error { pw.CompressionType = parquet.CompressionCodec_SNAPPY sort.Slice(payloads, func(i, j int) bool { - return payloads[i].FailedAt > payloads[j].FailedAt + return payloads[i].SortingKey() < payloads[j].SortingKey() }) for _, payload := range payloads { diff --git a/enterprise/reporting/error_index/worker_test.go b/enterprise/reporting/error_index/worker_test.go index 6fef94424e..9a5b799961 100644 --- a/enterprise/reporting/error_index/worker_test.go +++ b/enterprise/reporting/error_index/worker_test.go @@ -80,6 +80,8 @@ func TestWorkerWriter(t *testing.T) { payloads = append(payloads, p) } + toEncode := make([]payload, len(payloads)) + copy(toEncode, payloads) t.Run("writes", func(t *testing.T) { buf := bytes.NewBuffer(make([]byte, 0, 1024)) @@ -89,7 +91,7 @@ func TestWorkerWriter(t *testing.T) { w.config.parquetPageSize = misc.SingleValueLoader(8 * bytesize.KB) w.config.parquetParallelWriters = misc.SingleValueLoader(int64(8)) - require.NoError(t, w.encodeToParquet(buf, payloads)) + require.NoError(t, w.encodeToParquet(buf, toEncode)) pr, err := reader.NewParquetReader(buffer.NewBufferFileFromBytes(buf.Bytes()), new(payload), 8) require.NoError(t, err) @@ -130,7 +132,7 @@ func TestWorkerWriter(t *testing.T) { w.config.parquetPageSize = misc.SingleValueLoader(8 * bytesize.KB) w.config.parquetParallelWriters = misc.SingleValueLoader(int64(8)) - require.NoError(t, w.encodeToParquet(fw, payloads)) + require.NoError(t, w.encodeToParquet(fw, toEncode)) t.Run("count all", func(t *testing.T) { var count int64 @@ -145,7 +147,7 @@ func TestWorkerWriter(t *testing.T) { require.EqualValues(t, 10, count) }) t.Run("select all", func(t *testing.T) { - failedMessages := failedMessagesUsingDuckDB(t, ctx, nil, "SELECT * FROM read_parquet($1) ORDER BY failed_at DESC;", []interface{}{filePath}) + failedMessages := failedMessagesUsingDuckDB(t, ctx, nil, "SELECT * FROM read_parquet($1) ORDER BY failed_at ASC;", []interface{}{filePath}) for i, failedMessage := range failedMessages { require.Equal(t, payloads[i].MessageID, failedMessage.MessageID) diff --git a/warehouse/integrations/bigquery/bigquery.go b/warehouse/integrations/bigquery/bigquery.go index d252f17bec..5650acd307 100644 --- a/warehouse/integrations/bigquery/bigquery.go +++ b/warehouse/integrations/bigquery/bigquery.go @@ -867,7 +867,7 @@ func (bq *BigQuery) connect(ctx context.Context, cred BQCredentials) (*bigquery. // * the server config says we allow merging // * the user opted in to merging func (bq *BigQuery) shouldMerge() bool { - return bq.config.allowMerge && bq.warehouse.GetBoolDestinationConfig(model.EnableMergeSetting) + return bq.config.allowMerge && !bq.warehouse.GetPreferAppendSetting() } func (bq *BigQuery) CrashRecover(ctx context.Context) { diff --git a/warehouse/integrations/bigquery/bigquery_test.go b/warehouse/integrations/bigquery/bigquery_test.go index 43595b63e8..3fcd214e09 100644 --- a/warehouse/integrations/bigquery/bigquery_test.go +++ b/warehouse/integrations/bigquery/bigquery_test.go @@ -78,7 +78,7 @@ func TestIntegration(t *testing.T) { escapedCredentialsTrimmedStr := strings.Trim(string(escapedCredentials), `"`) - bootstrapSvc := func(t *testing.T, enableMerge bool) *bigquery.Client { + bootstrapSvc := func(t *testing.T, preferAppend bool) *bigquery.Client { templateConfigurations := map[string]any{ "workspaceID": workspaceID, "sourceID": sourceID, @@ -93,7 +93,7 @@ func TestIntegration(t *testing.T) { "bucketName": bqTestCredentials.BucketName, "credentials": escapedCredentialsTrimmedStr, "sourcesNamespace": sourcesNamespace, - "enableMerge": enableMerge, + "preferAppend": preferAppend, } workspaceConfigPath := workspaceConfig.CreateTempFile(t, "testdata/template.json", templateConfigurations) @@ -151,7 +151,7 @@ func TestIntegration(t *testing.T) { sourceJob bool skipModifiedEvents bool prerequisite func(context.Context, testing.TB, *bigquery.Client) - enableMerge bool + preferAppend bool customPartitionsEnabledWorkspaceIDs string stagingFilePrefix string }{ @@ -169,7 +169,7 @@ func TestIntegration(t *testing.T) { loadFilesEventsMap: loadFilesEventsMap(), tableUploadsEventsMap: tableUploadsEventsMap(), warehouseEventsMap: mergeEventsMap(), - enableMerge: true, + preferAppend: false, prerequisite: func(ctx context.Context, t testing.TB, db *bigquery.Client) { t.Helper() _ = db.Dataset(namespace).DeleteWithContents(ctx) @@ -193,7 +193,7 @@ func TestIntegration(t *testing.T) { tableUploadsEventsMap: whth.SourcesTableUploadsEventsMap(), warehouseEventsMap: whth.SourcesWarehouseEventsMap(), sourceJob: true, - enableMerge: false, + preferAppend: true, prerequisite: func(ctx context.Context, t testing.TB, db *bigquery.Client) { t.Helper() _ = db.Dataset(namespace).DeleteWithContents(ctx) @@ -215,7 +215,7 @@ func TestIntegration(t *testing.T) { tableUploadsEventsMap: tableUploadsEventsMap(), warehouseEventsMap: appendEventsMap(), skipModifiedEvents: true, - enableMerge: false, + preferAppend: true, prerequisite: func(ctx context.Context, t testing.TB, db *bigquery.Client) { t.Helper() _ = db.Dataset(namespace).DeleteWithContents(ctx) @@ -237,7 +237,7 @@ func TestIntegration(t *testing.T) { tableUploadsEventsMap: tableUploadsEventsMap(), warehouseEventsMap: appendEventsMap(), skipModifiedEvents: true, - enableMerge: false, + preferAppend: true, customPartitionsEnabledWorkspaceIDs: workspaceID, prerequisite: func(ctx context.Context, t testing.TB, db *bigquery.Client) { t.Helper() @@ -274,7 +274,7 @@ func TestIntegration(t *testing.T) { "RSERVER_WAREHOUSE_BIGQUERY_CUSTOM_PARTITIONS_ENABLED_WORKSPACE_IDS", tc.customPartitionsEnabledWorkspaceIDs, ) - db := bootstrapSvc(t, tc.enableMerge) + db := bootstrapSvc(t, tc.preferAppend) t.Cleanup(func() { for _, dataset := range []string{tc.schema} { @@ -542,12 +542,9 @@ func TestIntegration(t *testing.T) { loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse) - dedupWarehouse := th.Clone(t, warehouse) - dedupWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true - c := config.New() bq := whbigquery.New(c, logger.NOP) - err := bq.Setup(ctx, dedupWarehouse, mockUploader) + err := bq.Setup(ctx, warehouse, mockUploader) require.NoError(t, err) err = bq.CreateSchema(ctx) @@ -590,9 +587,12 @@ func TestIntegration(t *testing.T) { loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse) + appendWarehouse := th.Clone(t, warehouse) + appendWarehouse.Destination.Config[string(model.PreferAppendSetting)] = true + c := config.New() bq := whbigquery.New(c, logger.NOP) - err := bq.Setup(ctx, warehouse, mockUploader) + err := bq.Setup(ctx, appendWarehouse, mockUploader) require.NoError(t, err) err = bq.CreateSchema(ctx) @@ -638,8 +638,11 @@ func TestIntegration(t *testing.T) { loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse) + appendWarehouse := th.Clone(t, warehouse) + appendWarehouse.Destination.Config[string(model.PreferAppendSetting)] = true + bq := whbigquery.New(config.New(), logger.NOP) - err := bq.Setup(ctx, warehouse, mockUploader) + err := bq.Setup(ctx, appendWarehouse, mockUploader) require.NoError(t, err) err = bq.CreateSchema(ctx) diff --git a/warehouse/integrations/bigquery/testdata/template.json b/warehouse/integrations/bigquery/testdata/template.json index c6eac2a526..fda8d690c1 100644 --- a/warehouse/integrations/bigquery/testdata/template.json +++ b/warehouse/integrations/bigquery/testdata/template.json @@ -32,7 +32,7 @@ "prefix": "", "namespace": "{{.namespace}}", "syncFrequency": "30", - "enableMerge": {{.enableMerge}} + "preferAppend": {{.preferAppend}} }, "liveEventsConfig": {}, "secretConfig": {}, @@ -161,7 +161,7 @@ "prefix": "", "namespace": "{{.sourcesNamespace}}", "syncFrequency": "30", - "enableMerge": {{.enableMerge}} + "preferAppend": {{.preferAppend}} }, "liveEventsConfig": {}, "secretConfig": {}, diff --git a/warehouse/integrations/deltalake/deltalake.go b/warehouse/integrations/deltalake/deltalake.go index eb6b02e122..8b3342fd85 100644 --- a/warehouse/integrations/deltalake/deltalake.go +++ b/warehouse/integrations/deltalake/deltalake.go @@ -1387,5 +1387,5 @@ func (*Deltalake) DeleteBy(context.Context, []string, warehouseutils.DeleteByPar // * the user opted in to merging and we allow merging func (d *Deltalake) ShouldMerge() bool { return !d.Uploader.CanAppend() || - (d.config.allowMerge && d.Warehouse.GetBoolDestinationConfig(model.EnableMergeSetting)) + (d.config.allowMerge && !d.Warehouse.GetPreferAppendSetting()) } diff --git a/warehouse/integrations/deltalake/deltalake_test.go b/warehouse/integrations/deltalake/deltalake_test.go index 2fc4ea20ab..c36b9ba239 100644 --- a/warehouse/integrations/deltalake/deltalake_test.go +++ b/warehouse/integrations/deltalake/deltalake_test.go @@ -117,7 +117,7 @@ func TestIntegration(t *testing.T) { db := sql.OpenDB(connector) require.NoError(t, db.Ping()) - bootstrapSvc := func(t *testing.T, enableMerge bool) { + bootstrapSvc := func(t *testing.T, preferAppend bool) { templateConfigurations := map[string]any{ "workspaceID": workspaceID, "sourceID": sourceID, @@ -131,7 +131,7 @@ func TestIntegration(t *testing.T) { "containerName": deltaLakeCredentials.ContainerName, "accountName": deltaLakeCredentials.AccountName, "accountKey": deltaLakeCredentials.AccountKey, - "enableMerge": enableMerge, + "preferAppend": preferAppend, } workspaceConfigPath := workspaceConfig.CreateTempFile(t, "testdata/template.json", templateConfigurations) @@ -186,7 +186,7 @@ func TestIntegration(t *testing.T) { destinationID string messageID string warehouseEventsMap whth.EventsCountMap - enableMerge bool + preferAppend bool useParquetLoadFiles bool stagingFilePrefix string jobRunID string @@ -198,7 +198,7 @@ func TestIntegration(t *testing.T) { sourceID: sourceID, destinationID: destinationID, warehouseEventsMap: mergeEventsMap(), - enableMerge: true, + preferAppend: false, useParquetLoadFiles: false, stagingFilePrefix: "testdata/upload-job-merge-mode", jobRunID: misc.FastUUID().String(), @@ -210,7 +210,7 @@ func TestIntegration(t *testing.T) { sourceID: sourceID, destinationID: destinationID, warehouseEventsMap: appendEventsMap(), - enableMerge: false, + preferAppend: true, useParquetLoadFiles: false, stagingFilePrefix: "testdata/upload-job-append-mode", // an empty jobRunID means that the source is not an ETL one @@ -224,7 +224,7 @@ func TestIntegration(t *testing.T) { sourceID: sourceID, destinationID: destinationID, warehouseEventsMap: mergeEventsMap(), - enableMerge: true, + preferAppend: false, useParquetLoadFiles: true, stagingFilePrefix: "testdata/upload-job-parquet", jobRunID: misc.FastUUID().String(), @@ -234,7 +234,7 @@ func TestIntegration(t *testing.T) { for _, tc := range testCases { tc := tc t.Run(tc.name, func(t *testing.T) { - bootstrapSvc(t, tc.enableMerge) + bootstrapSvc(t, tc.preferAppend) t.Setenv("RSERVER_WAREHOUSE_DELTALAKE_USE_PARQUET_LOAD_FILES", strconv.FormatBool(tc.useParquetLoadFiles)) sqlClient := &warehouseclient.Client{ @@ -563,11 +563,8 @@ func TestIntegration(t *testing.T) { loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv, true, true, "2022-12-15T06:53:49.640Z") - mergeWarehouse := th.Clone(t, warehouse) - mergeWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true - d := deltalake.New(config.New(), logger.NOP, memstats.New()) - err := d.Setup(ctx, mergeWarehouse, mockUploader) + err := d.Setup(ctx, warehouse, mockUploader) require.NoError(t, err) err = d.CreateSchema(ctx) @@ -613,11 +610,11 @@ func TestIntegration(t *testing.T) { loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv, false, false, "2022-11-15T06:53:49.640Z") - mergeWarehouse := th.Clone(t, warehouse) - mergeWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true + appendWarehouse := th.Clone(t, warehouse) + appendWarehouse.Destination.Config[string(model.PreferAppendSetting)] = true d := deltalake.New(config.New(), logger.NOP, memstats.New()) - err := d.Setup(ctx, mergeWarehouse, mockUploader) + err := d.Setup(ctx, appendWarehouse, mockUploader) require.NoError(t, err) err = d.CreateSchema(ctx) @@ -664,8 +661,11 @@ func TestIntegration(t *testing.T) { loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv, true, false, "2022-12-15T06:53:49.640Z") + appendWarehouse := th.Clone(t, warehouse) + appendWarehouse.Destination.Config[string(model.PreferAppendSetting)] = true + d := deltalake.New(config.New(), logger.NOP, memstats.New()) - err := d.Setup(ctx, warehouse, mockUploader) + err := d.Setup(ctx, appendWarehouse, mockUploader) require.NoError(t, err) err = d.CreateSchema(ctx) @@ -1057,35 +1057,35 @@ func TestDeltalake_TrimErrorMessage(t *testing.T) { func TestDeltalake_ShouldMerge(t *testing.T) { testCases := []struct { name string - enableMerge bool + preferAppend bool uploaderCanAppend bool uploaderExpectedCalls int expected bool }{ { - name: "uploader says we can append and merge is not enabled", - enableMerge: false, + name: "uploader says we can append and user prefers to append", + preferAppend: true, uploaderCanAppend: true, uploaderExpectedCalls: 1, expected: false, }, { - name: "uploader says we can append and merge is enabled", - enableMerge: true, + name: "uploader says we can append and users prefers not to append", + preferAppend: false, uploaderCanAppend: true, uploaderExpectedCalls: 1, expected: true, }, { - name: "uploader says we cannot append so enableMerge false is ignored", - enableMerge: false, + name: "uploader says we cannot append and user prefers to append", + preferAppend: true, uploaderCanAppend: false, uploaderExpectedCalls: 1, expected: true, }, { - name: "uploader says we cannot append so enableMerge true is ignored", - enableMerge: true, + name: "uploader says we cannot append and users prefers not to append", + preferAppend: false, uploaderCanAppend: false, uploaderExpectedCalls: 1, expected: true, @@ -1098,7 +1098,7 @@ func TestDeltalake_ShouldMerge(t *testing.T) { d.Warehouse = model.Warehouse{ Destination: backendconfig.DestinationT{ Config: map[string]any{ - string(model.EnableMergeSetting): tc.enableMerge, + string(model.PreferAppendSetting): tc.preferAppend, }, }, } diff --git a/warehouse/integrations/deltalake/testdata/template.json b/warehouse/integrations/deltalake/testdata/template.json index 2b796aa448..8531dd273a 100644 --- a/warehouse/integrations/deltalake/testdata/template.json +++ b/warehouse/integrations/deltalake/testdata/template.json @@ -39,7 +39,7 @@ "syncFrequency": "30", "eventDelivery": false, "eventDeliveryTS": 1648195480174, - "enableMerge": {{.enableMerge}} + "preferAppend": {{.preferAppend}} }, "liveEventsConfig": { "eventDelivery": false, diff --git a/warehouse/integrations/postgres/load.go b/warehouse/integrations/postgres/load.go index da76cfedff..3b02ba0f44 100644 --- a/warehouse/integrations/postgres/load.go +++ b/warehouse/integrations/postgres/load.go @@ -556,6 +556,6 @@ func (pg *Postgres) loadUsersTable( func (pg *Postgres) shouldMerge() bool { return !pg.Uploader.CanAppend() || (pg.config.allowMerge && - pg.Warehouse.GetBoolDestinationConfig(model.EnableMergeSetting) && + !pg.Warehouse.GetPreferAppendSetting() && !slices.Contains(pg.config.skipDedupDestinationIDs, pg.Warehouse.Destination.ID)) } diff --git a/warehouse/integrations/postgres/postgres_test.go b/warehouse/integrations/postgres/postgres_test.go index c225bba767..6895a546ae 100644 --- a/warehouse/integrations/postgres/postgres_test.go +++ b/warehouse/integrations/postgres/postgres_test.go @@ -571,9 +571,8 @@ func TestIntegration(t *testing.T) { require.Nil(t, loadTableStat) }) t.Run("merge", func(t *testing.T) { - tableName := "merge_test_table" - t.Run("without dedup", func(t *testing.T) { + tableName := "merge_without_dedup_test_table" uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} @@ -582,11 +581,11 @@ func TestIntegration(t *testing.T) { c := config.New() c.Set("Warehouse.postgres.EnableSQLStatementExecutionPlanWorkspaceIDs", workspaceID) - mergeWarehouse := th.Clone(t, warehouse) - mergeWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true + appendWarehouse := th.Clone(t, warehouse) + appendWarehouse.Destination.Config[string(model.PreferAppendSetting)] = true pg := postgres.New(c, logger.NOP, memstats.New()) - err := pg.Setup(ctx, mergeWarehouse, mockUploader) + err := pg.Setup(ctx, appendWarehouse, mockUploader) require.NoError(t, err) err = pg.CreateSchema(ctx) @@ -602,8 +601,8 @@ func TestIntegration(t *testing.T) { loadTableStat, err = pg.LoadTable(ctx, tableName) require.NoError(t, err) - require.Equal(t, loadTableStat.RowsInserted, int64(0)) - require.Equal(t, loadTableStat.RowsUpdated, int64(14)) + require.Equal(t, loadTableStat.RowsInserted, int64(14)) + require.Equal(t, loadTableStat.RowsUpdated, int64(0)) records := whth.RetrieveRecordsFromWarehouse(t, pg.DB.DB, fmt.Sprintf(` @@ -624,9 +623,10 @@ func TestIntegration(t *testing.T) { tableName, ), ) - require.Equal(t, records, whth.SampleTestRecords()) + require.Equal(t, records, whth.AppendTestRecords()) }) t.Run("with dedup", func(t *testing.T) { + tableName := "merge_with_dedup_test_table" uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/dedup.csv.gz", tableName) loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} @@ -635,11 +635,8 @@ func TestIntegration(t *testing.T) { c := config.New() c.Set("Warehouse.postgres.EnableSQLStatementExecutionPlanWorkspaceIDs", workspaceID) - mergeWarehouse := th.Clone(t, warehouse) - mergeWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true - pg := postgres.New(config.New(), logger.NOP, memstats.New()) - err := pg.Setup(ctx, mergeWarehouse, mockUploader) + err := pg.Setup(ctx, warehouse, mockUploader) require.NoError(t, err) err = pg.CreateSchema(ctx) @@ -650,6 +647,11 @@ func TestIntegration(t *testing.T) { loadTableStat, err := pg.LoadTable(ctx, tableName) require.NoError(t, err) + require.Equal(t, loadTableStat.RowsInserted, int64(14)) + require.Equal(t, loadTableStat.RowsUpdated, int64(0)) + + loadTableStat, err = pg.LoadTable(ctx, tableName) + require.NoError(t, err) require.Equal(t, loadTableStat.RowsInserted, int64(0)) require.Equal(t, loadTableStat.RowsUpdated, int64(14)) @@ -686,8 +688,11 @@ func TestIntegration(t *testing.T) { c := config.New() c.Set("Warehouse.postgres.skipDedupDestinationIDs", destinationID) + appendWarehouse := th.Clone(t, warehouse) + appendWarehouse.Destination.Config[string(model.PreferAppendSetting)] = true + pg := postgres.New(c, logger.NOP, memstats.New()) - err := pg.Setup(ctx, warehouse, mockUploader) + err := pg.Setup(ctx, appendWarehouse, mockUploader) require.NoError(t, err) err = pg.CreateSchema(ctx) diff --git a/warehouse/integrations/redshift/redshift.go b/warehouse/integrations/redshift/redshift.go index fedf6a1529..263dab06ff 100644 --- a/warehouse/integrations/redshift/redshift.go +++ b/warehouse/integrations/redshift/redshift.go @@ -1329,7 +1329,7 @@ func (rs *Redshift) SetConnectionTimeout(timeout time.Duration) { func (rs *Redshift) shouldMerge() bool { return !rs.Uploader.CanAppend() || (rs.config.allowMerge && - rs.Warehouse.GetBoolDestinationConfig(model.EnableMergeSetting) && + !rs.Warehouse.GetPreferAppendSetting() && !slices.Contains(rs.config.skipDedupDestinationIDs, rs.Warehouse.Destination.ID)) } diff --git a/warehouse/integrations/redshift/redshift_test.go b/warehouse/integrations/redshift/redshift_test.go index 3de39467ba..c5130ccf1d 100644 --- a/warehouse/integrations/redshift/redshift_test.go +++ b/warehouse/integrations/redshift/redshift_test.go @@ -13,6 +13,8 @@ import ( "testing" "time" + th "github.com/rudderlabs/rudder-server/testhelper" + "github.com/golang/mock/gomock" "github.com/lib/pq" "github.com/ory/dockertest/v3" @@ -28,7 +30,6 @@ import ( "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource" backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/runner" - th "github.com/rudderlabs/rudder-server/testhelper" "github.com/rudderlabs/rudder-server/testhelper/health" "github.com/rudderlabs/rudder-server/testhelper/workspaceConfig" "github.com/rudderlabs/rudder-server/utils/misc" @@ -470,8 +471,11 @@ func TestIntegration(t *testing.T) { loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv) + appendWarehouse := th.Clone(t, warehouse) + appendWarehouse.Destination.Config[string(model.PreferAppendSetting)] = true + d := redshift.New(config.New(), logger.NOP, memstats.New()) - err := d.Setup(ctx, warehouse, mockUploader) + err := d.Setup(ctx, appendWarehouse, mockUploader) require.NoError(t, err) err = d.CreateSchema(ctx) @@ -515,11 +519,8 @@ func TestIntegration(t *testing.T) { loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv) - mergeWarehouse := th.Clone(t, warehouse) - mergeWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true - d := redshift.New(config.New(), logger.NOP, memstats.New()) - err := d.Setup(ctx, mergeWarehouse, mockUploader) + err := d.Setup(ctx, warehouse, mockUploader) require.NoError(t, err) err = d.CreateSchema(ctx) @@ -563,15 +564,12 @@ func TestIntegration(t *testing.T) { loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv) - mergeWarehouse := th.Clone(t, warehouse) - mergeWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true - c := config.New() c.Set("Warehouse.redshift.dedupWindow", true) c.Set("Warehouse.redshift.dedupWindowInHours", 999999) d := redshift.New(c, logger.NOP, memstats.New()) - err := d.Setup(ctx, mergeWarehouse, mockUploader) + err := d.Setup(ctx, warehouse, mockUploader) require.NoError(t, err) err = d.CreateSchema(ctx) @@ -615,15 +613,12 @@ func TestIntegration(t *testing.T) { loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv) - mergeWarehouse := th.Clone(t, warehouse) - mergeWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true - c := config.New() c.Set("Warehouse.redshift.dedupWindow", true) c.Set("Warehouse.redshift.dedupWindowInHours", 0) d := redshift.New(c, logger.NOP, memstats.New()) - err := d.Setup(ctx, mergeWarehouse, mockUploader) + err := d.Setup(ctx, warehouse, mockUploader) require.NoError(t, err) err = d.CreateSchema(ctx) @@ -672,8 +667,11 @@ func TestIntegration(t *testing.T) { c := config.New() c.Set("Warehouse.redshift.skipDedupDestinationIDs", []string{destinationID}) + appendWarehouse := th.Clone(t, warehouse) + appendWarehouse.Destination.Config[string(model.PreferAppendSetting)] = true + rs := redshift.New(c, logger.NOP, memstats.New()) - err := rs.Setup(ctx, warehouse, mockUploader) + err := rs.Setup(ctx, appendWarehouse, mockUploader) require.NoError(t, err) err = rs.CreateSchema(ctx) diff --git a/warehouse/integrations/snowflake/snowflake.go b/warehouse/integrations/snowflake/snowflake.go index d1c707a002..fddb7945f6 100644 --- a/warehouse/integrations/snowflake/snowflake.go +++ b/warehouse/integrations/snowflake/snowflake.go @@ -829,7 +829,7 @@ func (sf *Snowflake) LoadIdentityMappingsTable(ctx context.Context) error { // * the user opted-in func (sf *Snowflake) ShouldMerge() bool { return !sf.Uploader.CanAppend() || - (sf.config.allowMerge && sf.Warehouse.GetBoolDestinationConfig(model.EnableMergeSetting)) + (sf.config.allowMerge && !sf.Warehouse.GetPreferAppendSetting()) } func (sf *Snowflake) LoadUserTables(ctx context.Context) map[string]error { diff --git a/warehouse/integrations/snowflake/snowflake_test.go b/warehouse/integrations/snowflake/snowflake_test.go index d053c9359e..628f49022f 100644 --- a/warehouse/integrations/snowflake/snowflake_test.go +++ b/warehouse/integrations/snowflake/snowflake_test.go @@ -27,6 +27,7 @@ import ( kithelper "github.com/rudderlabs/rudder-go-kit/testhelper" backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/runner" + th "github.com/rudderlabs/rudder-server/testhelper" "github.com/rudderlabs/rudder-server/testhelper/health" "github.com/rudderlabs/rudder-server/testhelper/workspaceConfig" "github.com/rudderlabs/rudder-server/utils/misc" @@ -130,7 +131,7 @@ func TestIntegration(t *testing.T) { rbacCredentials, err := getSnowflakeTestCredentials(testRBACKey) require.NoError(t, err) - bootstrapSvc := func(t testing.TB, enableMerge bool) { + bootstrapSvc := func(t testing.TB, preferAppend bool) { templateConfigurations := map[string]any{ "workspaceID": workspaceID, "sourceID": sourceID, @@ -168,7 +169,7 @@ func TestIntegration(t *testing.T) { "rbacBucketName": rbacCredentials.BucketName, "rbacAccessKeyID": rbacCredentials.AccessKeyID, "rbacAccessKey": rbacCredentials.AccessKey, - "enableMerge": enableMerge, + "preferAppend": preferAppend, } workspaceConfigPath := workspaceConfig.CreateTempFile(t, "testdata/template.json", templateConfigurations) @@ -227,7 +228,7 @@ func TestIntegration(t *testing.T) { sourceJob bool stagingFilePrefix string emptyJobRunID bool - enableMerge bool + preferAppend bool customUserID string }{ { @@ -248,7 +249,7 @@ func TestIntegration(t *testing.T) { "wh_staging_files": 34, // 32 + 2 (merge events because of ID resolution) }, stagingFilePrefix: "testdata/upload-job", - enableMerge: true, + preferAppend: false, }, { name: "Upload Job with Role", @@ -268,7 +269,7 @@ func TestIntegration(t *testing.T) { "wh_staging_files": 34, // 32 + 2 (merge events because of ID resolution) }, stagingFilePrefix: "testdata/upload-job-with-role", - enableMerge: true, + preferAppend: false, }, { name: "Upload Job with Case Sensitive Database", @@ -288,7 +289,7 @@ func TestIntegration(t *testing.T) { "wh_staging_files": 34, // 32 + 2 (merge events because of ID resolution) }, stagingFilePrefix: "testdata/upload-job-case-sensitive", - enableMerge: true, + preferAppend: false, }, { name: "Source Job with Sources", @@ -310,7 +311,7 @@ func TestIntegration(t *testing.T) { warehouseEventsMap: testhelper.SourcesWarehouseEventsMap(), sourceJob: true, stagingFilePrefix: "testdata/sources-job", - enableMerge: true, + preferAppend: false, }, { name: "Upload Job in append mode", @@ -331,7 +332,7 @@ func TestIntegration(t *testing.T) { // an empty jobRunID means that the source is not an ETL one // see Uploader.CanAppend() emptyJobRunID: true, - enableMerge: false, + preferAppend: true, customUserID: testhelper.GetUserId("append_test"), }, } @@ -339,7 +340,7 @@ func TestIntegration(t *testing.T) { for _, tc := range testcase { tc := tc t.Run(tc.name, func(t *testing.T) { - bootstrapSvc(t, tc.enableMerge) + bootstrapSvc(t, tc.preferAppend) urlConfig := sfdb.Config{ Account: tc.cred.Account, @@ -501,7 +502,7 @@ func TestIntegration(t *testing.T) { "syncFrequency": "30", "enableSSE": false, "useRudderStorage": false, - "enableMerge": true, + "preferAppend": false, }, DestinationDefinition: backendconfig.DestinationDefinitionT{ ID: "1XjvXnzw34UMAz1YOuKqL1kwzh6", @@ -662,9 +663,12 @@ func TestIntegration(t *testing.T) { loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, true, false) + appendWarehouse := th.Clone(t, warehouse) + appendWarehouse.Destination.Config[string(model.PreferAppendSetting)] = true + sf, err := snowflake.New(config.New(), logger.NOP, memstats.New()) require.NoError(t, err) - err = sf.Setup(ctx, warehouse, mockUploader) + err = sf.Setup(ctx, appendWarehouse, mockUploader) require.NoError(t, err) err = sf.CreateSchema(ctx) @@ -756,9 +760,12 @@ func TestIntegration(t *testing.T) { loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}} mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, true, false) + appendWarehouse := th.Clone(t, warehouse) + appendWarehouse.Destination.Config[string(model.PreferAppendSetting)] = true + sf, err := snowflake.New(config.New(), logger.NOP, memstats.New()) require.NoError(t, err) - err = sf.Setup(ctx, warehouse, mockUploader) + err = sf.Setup(ctx, appendWarehouse, mockUploader) require.NoError(t, err) err = sf.CreateSchema(ctx) @@ -949,35 +956,35 @@ func TestIntegration(t *testing.T) { func TestSnowflake_ShouldMerge(t *testing.T) { testCases := []struct { name string - enableMerge bool + preferAppend bool uploaderCanAppend bool uploaderExpectedCalls int expected bool }{ { - name: "uploader says we can append and merge is not enabled", - enableMerge: false, + name: "uploader says we can append and user prefers append", + preferAppend: true, uploaderCanAppend: true, uploaderExpectedCalls: 1, expected: false, }, { - name: "uploader says we cannot append and merge is not enabled", - enableMerge: false, + name: "uploader says we cannot append and user prefers append", + preferAppend: true, uploaderCanAppend: false, uploaderExpectedCalls: 1, expected: true, }, { - name: "uploader says we can append and merge is enabled", - enableMerge: true, + name: "uploader says we can append and user prefers not to append", + preferAppend: false, uploaderCanAppend: true, uploaderExpectedCalls: 1, expected: true, }, { - name: "uploader says we cannot append and we are in merge mode", - enableMerge: true, + name: "uploader says we cannot append and user prefers not to append", + preferAppend: false, uploaderCanAppend: false, uploaderExpectedCalls: 1, expected: true, @@ -992,7 +999,7 @@ func TestSnowflake_ShouldMerge(t *testing.T) { sf.Warehouse = model.Warehouse{ Destination: backendconfig.DestinationT{ Config: map[string]any{ - string(model.EnableMergeSetting): tc.enableMerge, + string(model.PreferAppendSetting): tc.preferAppend, }, }, } diff --git a/warehouse/integrations/snowflake/testdata/template.json b/warehouse/integrations/snowflake/testdata/template.json index 73d113e764..861c612ead 100644 --- a/warehouse/integrations/snowflake/testdata/template.json +++ b/warehouse/integrations/snowflake/testdata/template.json @@ -40,7 +40,7 @@ "syncFrequency": "30", "enableSSE": false, "useRudderStorage": false, - "enableMerge": {{.enableMerge}} + "preferAppend": {{.preferAppend}} }, "liveEventsConfig": {}, "secretConfig": {}, @@ -165,7 +165,7 @@ "syncFrequency": "30", "enableSSE": false, "useRudderStorage": false, - "enableMerge": {{.enableMerge}} + "preferAppend": {{.preferAppend}} }, "liveEventsConfig": {}, "secretConfig": {}, @@ -291,7 +291,7 @@ "syncFrequency": "30", "enableSSE": false, "useRudderStorage": false, - "enableMerge": {{.enableMerge}} + "preferAppend": {{.preferAppend}} }, "liveEventsConfig": {}, "secretConfig": {}, @@ -442,7 +442,7 @@ "syncFrequency": "30", "enableSSE": false, "useRudderStorage": false, - "enableMerge": {{.enableMerge}} + "preferAppend": {{.preferAppend}} }, "liveEventsConfig": {}, "secretConfig": {}, diff --git a/warehouse/internal/model/warehouse.go b/warehouse/internal/model/warehouse.go index f398960e5e..ab8bcc53c2 100644 --- a/warehouse/internal/model/warehouse.go +++ b/warehouse/internal/model/warehouse.go @@ -9,7 +9,7 @@ type destConfSetting string func (s destConfSetting) string() string { return string(s) } const ( - EnableMergeSetting destConfSetting = "enableMerge" + PreferAppendSetting destConfSetting = "preferAppend" UseRudderStorageSetting destConfSetting = "useRudderStorage" ) @@ -31,3 +31,10 @@ func (w *Warehouse) GetBoolDestinationConfig(key DestinationConfigSetting) bool } return false } + +func (w *Warehouse) GetPreferAppendSetting() bool { + destConfig := w.Destination.Config + // defaulting to false if not defined for backwards compatibility with previous behaviour + value, _ := destConfig[PreferAppendSetting.string()].(bool) + return value +}