Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: preferAppend defaults to false if not defined #4088

Closed
wants to merge 12 commits into from
2 changes: 1 addition & 1 deletion warehouse/integrations/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,7 @@ func (bq *BigQuery) connect(ctx context.Context, cred BQCredentials) (*bigquery.
// * the server config says we allow merging
// * the user opted in to merging
func (bq *BigQuery) shouldMerge() bool {
return bq.config.allowMerge && bq.warehouse.GetBoolDestinationConfig(model.EnableMergeSetting)
return bq.config.allowMerge && !bq.warehouse.GetPreferAppendSetting()
}

func (bq *BigQuery) CrashRecover(ctx context.Context) {
Expand Down
34 changes: 19 additions & 15 deletions warehouse/integrations/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"testing"
"time"

th "github.com/rudderlabs/rudder-server/testhelper"

"cloud.google.com/go/bigquery"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
Expand All @@ -25,7 +27,6 @@ import (
kithelper "github.com/rudderlabs/rudder-go-kit/testhelper"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/runner"
th "github.com/rudderlabs/rudder-server/testhelper"
"github.com/rudderlabs/rudder-server/testhelper/health"
"github.com/rudderlabs/rudder-server/testhelper/workspaceConfig"
"github.com/rudderlabs/rudder-server/utils/misc"
Expand Down Expand Up @@ -78,7 +79,7 @@ func TestIntegration(t *testing.T) {

escapedCredentialsTrimmedStr := strings.Trim(string(escapedCredentials), `"`)

bootstrapSvc := func(t *testing.T, enableMerge bool) *bigquery.Client {
bootstrapSvc := func(t *testing.T, preferAppend bool) *bigquery.Client {
templateConfigurations := map[string]any{
"workspaceID": workspaceID,
"sourceID": sourceID,
Expand All @@ -93,7 +94,7 @@ func TestIntegration(t *testing.T) {
"bucketName": bqTestCredentials.BucketName,
"credentials": escapedCredentialsTrimmedStr,
"sourcesNamespace": sourcesNamespace,
"enableMerge": enableMerge,
"preferAppend": preferAppend,
}
workspaceConfigPath := workspaceConfig.CreateTempFile(t, "testdata/template.json", templateConfigurations)

Expand Down Expand Up @@ -151,7 +152,7 @@ func TestIntegration(t *testing.T) {
sourceJob bool
skipModifiedEvents bool
prerequisite func(context.Context, testing.TB, *bigquery.Client)
enableMerge bool
preferAppend bool
customPartitionsEnabledWorkspaceIDs string
stagingFilePrefix string
}{
Expand All @@ -169,7 +170,7 @@ func TestIntegration(t *testing.T) {
loadFilesEventsMap: loadFilesEventsMap(),
tableUploadsEventsMap: tableUploadsEventsMap(),
warehouseEventsMap: mergeEventsMap(),
enableMerge: true,
preferAppend: false,
prerequisite: func(ctx context.Context, t testing.TB, db *bigquery.Client) {
t.Helper()
_ = db.Dataset(namespace).DeleteWithContents(ctx)
Expand All @@ -193,7 +194,7 @@ func TestIntegration(t *testing.T) {
tableUploadsEventsMap: whth.SourcesTableUploadsEventsMap(),
warehouseEventsMap: whth.SourcesWarehouseEventsMap(),
sourceJob: true,
enableMerge: false,
preferAppend: true,
prerequisite: func(ctx context.Context, t testing.TB, db *bigquery.Client) {
t.Helper()
_ = db.Dataset(namespace).DeleteWithContents(ctx)
Expand All @@ -215,7 +216,7 @@ func TestIntegration(t *testing.T) {
tableUploadsEventsMap: tableUploadsEventsMap(),
warehouseEventsMap: appendEventsMap(),
skipModifiedEvents: true,
enableMerge: false,
preferAppend: true,
prerequisite: func(ctx context.Context, t testing.TB, db *bigquery.Client) {
t.Helper()
_ = db.Dataset(namespace).DeleteWithContents(ctx)
Expand All @@ -237,7 +238,7 @@ func TestIntegration(t *testing.T) {
tableUploadsEventsMap: tableUploadsEventsMap(),
warehouseEventsMap: appendEventsMap(),
skipModifiedEvents: true,
enableMerge: false,
preferAppend: true,
customPartitionsEnabledWorkspaceIDs: workspaceID,
prerequisite: func(ctx context.Context, t testing.TB, db *bigquery.Client) {
t.Helper()
Expand Down Expand Up @@ -274,7 +275,7 @@ func TestIntegration(t *testing.T) {
"RSERVER_WAREHOUSE_BIGQUERY_CUSTOM_PARTITIONS_ENABLED_WORKSPACE_IDS",
tc.customPartitionsEnabledWorkspaceIDs,
)
db := bootstrapSvc(t, tc.enableMerge)
db := bootstrapSvc(t, tc.preferAppend)

t.Cleanup(func() {
for _, dataset := range []string{tc.schema} {
Expand Down Expand Up @@ -542,12 +543,9 @@ func TestIntegration(t *testing.T) {
loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse)

dedupWarehouse := th.Clone(t, warehouse)
dedupWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true

c := config.New()
bq := whbigquery.New(c, logger.NOP)
err := bq.Setup(ctx, dedupWarehouse, mockUploader)
err := bq.Setup(ctx, warehouse, mockUploader)
require.NoError(t, err)

err = bq.CreateSchema(ctx)
Expand Down Expand Up @@ -590,9 +588,12 @@ func TestIntegration(t *testing.T) {
loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse)

appendWarehouse := th.Clone(t, warehouse)
appendWarehouse.Destination.Config[string(model.PreferAppendSetting)] = true

c := config.New()
bq := whbigquery.New(c, logger.NOP)
err := bq.Setup(ctx, warehouse, mockUploader)
err := bq.Setup(ctx, appendWarehouse, mockUploader)
require.NoError(t, err)

err = bq.CreateSchema(ctx)
Expand Down Expand Up @@ -638,8 +639,11 @@ func TestIntegration(t *testing.T) {
loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse)

appendWarehouse := th.Clone(t, warehouse)
appendWarehouse.Destination.Config[string(model.PreferAppendSetting)] = true

bq := whbigquery.New(config.New(), logger.NOP)
err := bq.Setup(ctx, warehouse, mockUploader)
err := bq.Setup(ctx, appendWarehouse, mockUploader)
require.NoError(t, err)

err = bq.CreateSchema(ctx)
Expand Down
4 changes: 2 additions & 2 deletions warehouse/integrations/bigquery/testdata/template.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"prefix": "",
"namespace": "{{.namespace}}",
"syncFrequency": "30",
"enableMerge": {{.enableMerge}}
"preferAppend": {{.preferAppend}}
},
"liveEventsConfig": {},
"secretConfig": {},
Expand Down Expand Up @@ -161,7 +161,7 @@
"prefix": "",
"namespace": "{{.sourcesNamespace}}",
"syncFrequency": "30",
"enableMerge": {{.enableMerge}}
"preferAppend": {{.preferAppend}}
},
"liveEventsConfig": {},
"secretConfig": {},
Expand Down
2 changes: 1 addition & 1 deletion warehouse/integrations/deltalake/deltalake.go
Original file line number Diff line number Diff line change
Expand Up @@ -1387,5 +1387,5 @@ func (*Deltalake) DeleteBy(context.Context, []string, warehouseutils.DeleteByPar
// * the user opted in to merging and we allow merging
func (d *Deltalake) ShouldMerge() bool {
return !d.Uploader.CanAppend() ||
(d.config.allowMerge && d.Warehouse.GetBoolDestinationConfig(model.EnableMergeSetting))
(d.config.allowMerge && !d.Warehouse.GetPreferAppendSetting())
}
46 changes: 23 additions & 23 deletions warehouse/integrations/deltalake/deltalake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestIntegration(t *testing.T) {
db := sql.OpenDB(connector)
require.NoError(t, db.Ping())

bootstrapSvc := func(t *testing.T, enableMerge bool) {
bootstrapSvc := func(t *testing.T, preferAppend bool) {
templateConfigurations := map[string]any{
"workspaceID": workspaceID,
"sourceID": sourceID,
Expand All @@ -131,7 +131,7 @@ func TestIntegration(t *testing.T) {
"containerName": deltaLakeCredentials.ContainerName,
"accountName": deltaLakeCredentials.AccountName,
"accountKey": deltaLakeCredentials.AccountKey,
"enableMerge": enableMerge,
"preferAppend": preferAppend,
}
workspaceConfigPath := workspaceConfig.CreateTempFile(t, "testdata/template.json", templateConfigurations)

Expand Down Expand Up @@ -186,7 +186,7 @@ func TestIntegration(t *testing.T) {
destinationID string
messageID string
warehouseEventsMap whth.EventsCountMap
enableMerge bool
preferAppend bool
useParquetLoadFiles bool
stagingFilePrefix string
jobRunID string
Expand All @@ -198,7 +198,7 @@ func TestIntegration(t *testing.T) {
sourceID: sourceID,
destinationID: destinationID,
warehouseEventsMap: mergeEventsMap(),
enableMerge: true,
preferAppend: false,
useParquetLoadFiles: false,
stagingFilePrefix: "testdata/upload-job-merge-mode",
jobRunID: misc.FastUUID().String(),
Expand All @@ -210,7 +210,7 @@ func TestIntegration(t *testing.T) {
sourceID: sourceID,
destinationID: destinationID,
warehouseEventsMap: appendEventsMap(),
enableMerge: false,
preferAppend: true,
useParquetLoadFiles: false,
stagingFilePrefix: "testdata/upload-job-append-mode",
// an empty jobRunID means that the source is not an ETL one
Expand All @@ -224,7 +224,7 @@ func TestIntegration(t *testing.T) {
sourceID: sourceID,
destinationID: destinationID,
warehouseEventsMap: mergeEventsMap(),
enableMerge: true,
preferAppend: false,
useParquetLoadFiles: true,
stagingFilePrefix: "testdata/upload-job-parquet",
jobRunID: misc.FastUUID().String(),
Expand All @@ -234,7 +234,7 @@ func TestIntegration(t *testing.T) {
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
bootstrapSvc(t, tc.enableMerge)
bootstrapSvc(t, tc.preferAppend)
t.Setenv("RSERVER_WAREHOUSE_DELTALAKE_USE_PARQUET_LOAD_FILES", strconv.FormatBool(tc.useParquetLoadFiles))

sqlClient := &warehouseclient.Client{
Expand Down Expand Up @@ -563,11 +563,8 @@ func TestIntegration(t *testing.T) {
loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv, true, true, "2022-12-15T06:53:49.640Z")

mergeWarehouse := th.Clone(t, warehouse)
mergeWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true

d := deltalake.New(config.New(), logger.NOP, memstats.New())
err := d.Setup(ctx, mergeWarehouse, mockUploader)
err := d.Setup(ctx, warehouse, mockUploader)
require.NoError(t, err)

err = d.CreateSchema(ctx)
Expand Down Expand Up @@ -614,7 +611,7 @@ func TestIntegration(t *testing.T) {
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv, false, false, "2022-11-15T06:53:49.640Z")

mergeWarehouse := th.Clone(t, warehouse)
mergeWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true
mergeWarehouse.Destination.Config[string(model.PreferAppendSetting)] = true

d := deltalake.New(config.New(), logger.NOP, memstats.New())
err := d.Setup(ctx, mergeWarehouse, mockUploader)
Expand Down Expand Up @@ -664,8 +661,11 @@ func TestIntegration(t *testing.T) {
loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv, true, false, "2022-12-15T06:53:49.640Z")

appendWarehouse := th.Clone(t, warehouse)
appendWarehouse.Destination.Config[string(model.PreferAppendSetting)] = true

d := deltalake.New(config.New(), logger.NOP, memstats.New())
err := d.Setup(ctx, warehouse, mockUploader)
err := d.Setup(ctx, appendWarehouse, mockUploader)
require.NoError(t, err)

err = d.CreateSchema(ctx)
Expand Down Expand Up @@ -1057,35 +1057,35 @@ func TestDeltalake_TrimErrorMessage(t *testing.T) {
func TestDeltalake_ShouldMerge(t *testing.T) {
testCases := []struct {
name string
enableMerge bool
preferAppend bool
uploaderCanAppend bool
uploaderExpectedCalls int
expected bool
}{
{
name: "uploader says we can append and merge is not enabled",
enableMerge: false,
name: "uploader says we can append and user prefers to append",
preferAppend: true,
uploaderCanAppend: true,
uploaderExpectedCalls: 1,
expected: false,
},
{
name: "uploader says we can append and merge is enabled",
enableMerge: true,
name: "uploader says we can append and users prefers not to append",
preferAppend: false,
uploaderCanAppend: true,
uploaderExpectedCalls: 1,
expected: true,
},
{
name: "uploader says we cannot append so enableMerge false is ignored",
enableMerge: false,
name: "uploader says we cannot append and user prefers to append",
preferAppend: true,
uploaderCanAppend: false,
uploaderExpectedCalls: 1,
expected: true,
},
{
name: "uploader says we cannot append so enableMerge true is ignored",
enableMerge: true,
name: "uploader says we cannot append and users prefers not to append",
preferAppend: false,
uploaderCanAppend: false,
uploaderExpectedCalls: 1,
expected: true,
Expand All @@ -1098,7 +1098,7 @@ func TestDeltalake_ShouldMerge(t *testing.T) {
d.Warehouse = model.Warehouse{
Destination: backendconfig.DestinationT{
Config: map[string]any{
string(model.EnableMergeSetting): tc.enableMerge,
string(model.PreferAppendSetting): tc.preferAppend,
},
},
}
Expand Down
2 changes: 1 addition & 1 deletion warehouse/integrations/deltalake/testdata/template.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
"syncFrequency": "30",
"eventDelivery": false,
"eventDeliveryTS": 1648195480174,
"enableMerge": {{.enableMerge}}
"preferAppend": {{.preferAppend}}
},
"liveEventsConfig": {
"eventDelivery": false,
Expand Down
2 changes: 1 addition & 1 deletion warehouse/integrations/postgres/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,6 @@ func (pg *Postgres) loadUsersTable(
func (pg *Postgres) shouldMerge() bool {
return !pg.Uploader.CanAppend() ||
(pg.config.allowMerge &&
pg.Warehouse.GetBoolDestinationConfig(model.EnableMergeSetting) &&
!pg.Warehouse.GetPreferAppendSetting() &&
!slices.Contains(pg.config.skipDedupDestinationIDs, pg.Warehouse.Destination.ID))
}
Loading
Loading