Skip to content

Commit

Permalink
chore: revert "fix: preferAppend defaults to false if not defined (#4098
Browse files Browse the repository at this point in the history
)"
  • Loading branch information
fracasula committed Nov 13, 2023
1 parent f4686aa commit 1406702
Show file tree
Hide file tree
Showing 14 changed files with 108 additions and 128 deletions.
2 changes: 1 addition & 1 deletion warehouse/integrations/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,7 @@ func (bq *BigQuery) connect(ctx context.Context, cred BQCredentials) (*bigquery.
// * the server config says we allow merging
// * the user opted in to merging
func (bq *BigQuery) shouldMerge() bool {
return bq.config.allowMerge && !bq.warehouse.GetPreferAppendSetting()
return bq.config.allowMerge && bq.warehouse.GetBoolDestinationConfig(model.EnableMergeSetting)
}

func (bq *BigQuery) CrashRecover(ctx context.Context) {
Expand Down
41 changes: 19 additions & 22 deletions warehouse/integrations/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestIntegration(t *testing.T) {

escapedCredentialsTrimmedStr := strings.Trim(string(escapedCredentials), `"`)

bootstrapSvc := func(t *testing.T, preferAppend bool) *bigquery.Client {
bootstrapSvc := func(t *testing.T, enableMerge bool) *bigquery.Client {
templateConfigurations := map[string]any{
"workspaceID": workspaceID,
"sourceID": sourceID,
Expand All @@ -93,7 +93,7 @@ func TestIntegration(t *testing.T) {
"bucketName": bqTestCredentials.BucketName,
"credentials": escapedCredentialsTrimmedStr,
"sourcesNamespace": sourcesNamespace,
"preferAppend": preferAppend,
"enableMerge": enableMerge,
}
workspaceConfigPath := workspaceConfig.CreateTempFile(t, "testdata/template.json", templateConfigurations)

Expand Down Expand Up @@ -148,10 +148,10 @@ func TestIntegration(t *testing.T) {
loadFilesEventsMap whth.EventsCountMap
tableUploadsEventsMap whth.EventsCountMap
warehouseEventsMap whth.EventsCountMap
sourceJob bool
asyncJob bool
skipModifiedEvents bool
prerequisite func(context.Context, testing.TB, *bigquery.Client)
preferAppend bool
enableMerge bool
customPartitionsEnabledWorkspaceIDs string
stagingFilePrefix string
}{
Expand All @@ -169,15 +169,15 @@ func TestIntegration(t *testing.T) {
loadFilesEventsMap: loadFilesEventsMap(),
tableUploadsEventsMap: tableUploadsEventsMap(),
warehouseEventsMap: mergeEventsMap(),
preferAppend: false,
enableMerge: true,
prerequisite: func(ctx context.Context, t testing.TB, db *bigquery.Client) {
t.Helper()
_ = db.Dataset(namespace).DeleteWithContents(ctx)
},
stagingFilePrefix: "testdata/upload-job-merge-mode",
},
{
name: "Source Job",
name: "Async Job",
writeKey: sourcesWriteKey,
sourceID: sourcesSourceID,
destinationID: sourcesDestinationID,
Expand All @@ -192,8 +192,8 @@ func TestIntegration(t *testing.T) {
loadFilesEventsMap: whth.SourcesLoadFilesEventsMap(),
tableUploadsEventsMap: whth.SourcesTableUploadsEventsMap(),
warehouseEventsMap: whth.SourcesWarehouseEventsMap(),
sourceJob: true,
preferAppend: true,
asyncJob: true,
enableMerge: false,
prerequisite: func(ctx context.Context, t testing.TB, db *bigquery.Client) {
t.Helper()
_ = db.Dataset(namespace).DeleteWithContents(ctx)
Expand All @@ -215,7 +215,7 @@ func TestIntegration(t *testing.T) {
tableUploadsEventsMap: tableUploadsEventsMap(),
warehouseEventsMap: appendEventsMap(),
skipModifiedEvents: true,
preferAppend: true,
enableMerge: false,
prerequisite: func(ctx context.Context, t testing.TB, db *bigquery.Client) {
t.Helper()
_ = db.Dataset(namespace).DeleteWithContents(ctx)
Expand All @@ -237,7 +237,7 @@ func TestIntegration(t *testing.T) {
tableUploadsEventsMap: tableUploadsEventsMap(),
warehouseEventsMap: appendEventsMap(),
skipModifiedEvents: true,
preferAppend: true,
enableMerge: false,
customPartitionsEnabledWorkspaceIDs: workspaceID,
prerequisite: func(ctx context.Context, t testing.TB, db *bigquery.Client) {
t.Helper()
Expand Down Expand Up @@ -274,7 +274,7 @@ func TestIntegration(t *testing.T) {
"RSERVER_WAREHOUSE_BIGQUERY_CUSTOM_PARTITIONS_ENABLED_WORKSPACE_IDS",
tc.customPartitionsEnabledWorkspaceIDs,
)
db := bootstrapSvc(t, tc.preferAppend)
db := bootstrapSvc(t, tc.enableMerge)

t.Cleanup(func() {
for _, dataset := range []string{tc.schema} {
Expand Down Expand Up @@ -347,7 +347,7 @@ func TestIntegration(t *testing.T) {
LoadFilesEventsMap: tc.loadFilesEventsMap,
TableUploadsEventsMap: tc.tableUploadsEventsMap,
WarehouseEventsMap: tc.warehouseEventsMap,
SourceJob: tc.sourceJob,
AsyncJob: tc.asyncJob,
Config: conf,
WorkspaceID: workspaceID,
DestinationType: destType,
Expand All @@ -359,7 +359,7 @@ func TestIntegration(t *testing.T) {
StagingFilePath: tc.stagingFilePrefix + ".staging-2.json",
UserID: whth.GetUserId(destType),
}
if tc.sourceJob {
if tc.asyncJob {
ts2.UserID = ts1.UserID
}
ts2.VerifyEvents(t)
Expand Down Expand Up @@ -542,9 +542,12 @@ func TestIntegration(t *testing.T) {
loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse)

dedupWarehouse := th.Clone(t, warehouse)
dedupWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true

c := config.New()
bq := whbigquery.New(c, logger.NOP)
err := bq.Setup(ctx, warehouse, mockUploader)
err := bq.Setup(ctx, dedupWarehouse, mockUploader)
require.NoError(t, err)

err = bq.CreateSchema(ctx)
Expand Down Expand Up @@ -587,12 +590,9 @@ func TestIntegration(t *testing.T) {
loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse)

appendWarehouse := th.Clone(t, warehouse)
appendWarehouse.Destination.Config[string(model.PreferAppendSetting)] = true

c := config.New()
bq := whbigquery.New(c, logger.NOP)
err := bq.Setup(ctx, appendWarehouse, mockUploader)
err := bq.Setup(ctx, warehouse, mockUploader)
require.NoError(t, err)

err = bq.CreateSchema(ctx)
Expand Down Expand Up @@ -638,11 +638,8 @@ func TestIntegration(t *testing.T) {
loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse)

appendWarehouse := th.Clone(t, warehouse)
appendWarehouse.Destination.Config[string(model.PreferAppendSetting)] = true

bq := whbigquery.New(config.New(), logger.NOP)
err := bq.Setup(ctx, appendWarehouse, mockUploader)
err := bq.Setup(ctx, warehouse, mockUploader)
require.NoError(t, err)

err = bq.CreateSchema(ctx)
Expand Down
4 changes: 2 additions & 2 deletions warehouse/integrations/bigquery/testdata/template.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"prefix": "",
"namespace": "{{.namespace}}",
"syncFrequency": "30",
"preferAppend": {{.preferAppend}}
"enableMerge": {{.enableMerge}}
},
"liveEventsConfig": {},
"secretConfig": {},
Expand Down Expand Up @@ -161,7 +161,7 @@
"prefix": "",
"namespace": "{{.sourcesNamespace}}",
"syncFrequency": "30",
"preferAppend": {{.preferAppend}}
"enableMerge": {{.enableMerge}}
},
"liveEventsConfig": {},
"secretConfig": {},
Expand Down
2 changes: 1 addition & 1 deletion warehouse/integrations/deltalake/deltalake.go
Original file line number Diff line number Diff line change
Expand Up @@ -1387,5 +1387,5 @@ func (*Deltalake) DeleteBy(context.Context, []string, warehouseutils.DeleteByPar
// * the user opted in to merging and we allow merging
func (d *Deltalake) ShouldMerge() bool {
return !d.Uploader.CanAppend() ||
(d.config.allowMerge && !d.Warehouse.GetPreferAppendSetting())
(d.config.allowMerge && d.Warehouse.GetBoolDestinationConfig(model.EnableMergeSetting))
}
50 changes: 25 additions & 25 deletions warehouse/integrations/deltalake/deltalake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestIntegration(t *testing.T) {
db := sql.OpenDB(connector)
require.NoError(t, db.Ping())

bootstrapSvc := func(t *testing.T, preferAppend bool) {
bootstrapSvc := func(t *testing.T, enableMerge bool) {
templateConfigurations := map[string]any{
"workspaceID": workspaceID,
"sourceID": sourceID,
Expand All @@ -131,7 +131,7 @@ func TestIntegration(t *testing.T) {
"containerName": deltaLakeCredentials.ContainerName,
"accountName": deltaLakeCredentials.AccountName,
"accountKey": deltaLakeCredentials.AccountKey,
"preferAppend": preferAppend,
"enableMerge": enableMerge,
}
workspaceConfigPath := workspaceConfig.CreateTempFile(t, "testdata/template.json", templateConfigurations)

Expand Down Expand Up @@ -186,7 +186,7 @@ func TestIntegration(t *testing.T) {
destinationID string
messageID string
warehouseEventsMap whth.EventsCountMap
preferAppend bool
enableMerge bool
useParquetLoadFiles bool
stagingFilePrefix string
jobRunID string
Expand All @@ -198,7 +198,7 @@ func TestIntegration(t *testing.T) {
sourceID: sourceID,
destinationID: destinationID,
warehouseEventsMap: mergeEventsMap(),
preferAppend: false,
enableMerge: true,
useParquetLoadFiles: false,
stagingFilePrefix: "testdata/upload-job-merge-mode",
jobRunID: misc.FastUUID().String(),
Expand All @@ -210,7 +210,7 @@ func TestIntegration(t *testing.T) {
sourceID: sourceID,
destinationID: destinationID,
warehouseEventsMap: appendEventsMap(),
preferAppend: true,
enableMerge: false,
useParquetLoadFiles: false,
stagingFilePrefix: "testdata/upload-job-append-mode",
// an empty jobRunID means that the source is not an ETL one
Expand All @@ -224,7 +224,7 @@ func TestIntegration(t *testing.T) {
sourceID: sourceID,
destinationID: destinationID,
warehouseEventsMap: mergeEventsMap(),
preferAppend: false,
enableMerge: true,
useParquetLoadFiles: true,
stagingFilePrefix: "testdata/upload-job-parquet",
jobRunID: misc.FastUUID().String(),
Expand All @@ -234,7 +234,7 @@ func TestIntegration(t *testing.T) {
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
bootstrapSvc(t, tc.preferAppend)
bootstrapSvc(t, tc.enableMerge)
t.Setenv("RSERVER_WAREHOUSE_DELTALAKE_USE_PARQUET_LOAD_FILES", strconv.FormatBool(tc.useParquetLoadFiles))

sqlClient := &warehouseclient.Client{
Expand Down Expand Up @@ -563,8 +563,11 @@ func TestIntegration(t *testing.T) {
loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv, true, true, "2022-12-15T06:53:49.640Z")

mergeWarehouse := th.Clone(t, warehouse)
mergeWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true

d := deltalake.New(config.New(), logger.NOP, memstats.New())
err := d.Setup(ctx, warehouse, mockUploader)
err := d.Setup(ctx, mergeWarehouse, mockUploader)
require.NoError(t, err)

err = d.CreateSchema(ctx)
Expand Down Expand Up @@ -610,11 +613,11 @@ func TestIntegration(t *testing.T) {
loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv, false, false, "2022-11-15T06:53:49.640Z")

appendWarehouse := th.Clone(t, warehouse)
appendWarehouse.Destination.Config[string(model.PreferAppendSetting)] = true
mergeWarehouse := th.Clone(t, warehouse)
mergeWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true

d := deltalake.New(config.New(), logger.NOP, memstats.New())
err := d.Setup(ctx, appendWarehouse, mockUploader)
err := d.Setup(ctx, mergeWarehouse, mockUploader)
require.NoError(t, err)

err = d.CreateSchema(ctx)
Expand Down Expand Up @@ -661,11 +664,8 @@ func TestIntegration(t *testing.T) {
loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv, true, false, "2022-12-15T06:53:49.640Z")

appendWarehouse := th.Clone(t, warehouse)
appendWarehouse.Destination.Config[string(model.PreferAppendSetting)] = true

d := deltalake.New(config.New(), logger.NOP, memstats.New())
err := d.Setup(ctx, appendWarehouse, mockUploader)
err := d.Setup(ctx, warehouse, mockUploader)
require.NoError(t, err)

err = d.CreateSchema(ctx)
Expand Down Expand Up @@ -1057,35 +1057,35 @@ func TestDeltalake_TrimErrorMessage(t *testing.T) {
func TestDeltalake_ShouldMerge(t *testing.T) {
testCases := []struct {
name string
preferAppend bool
enableMerge bool
uploaderCanAppend bool
uploaderExpectedCalls int
expected bool
}{
{
name: "uploader says we can append and user prefers to append",
preferAppend: true,
name: "uploader says we can append and merge is not enabled",
enableMerge: false,
uploaderCanAppend: true,
uploaderExpectedCalls: 1,
expected: false,
},
{
name: "uploader says we can append and users prefers not to append",
preferAppend: false,
name: "uploader says we can append and merge is enabled",
enableMerge: true,
uploaderCanAppend: true,
uploaderExpectedCalls: 1,
expected: true,
},
{
name: "uploader says we cannot append and user prefers to append",
preferAppend: true,
name: "uploader says we cannot append so enableMerge false is ignored",
enableMerge: false,
uploaderCanAppend: false,
uploaderExpectedCalls: 1,
expected: true,
},
{
name: "uploader says we cannot append and users prefers not to append",
preferAppend: false,
name: "uploader says we cannot append so enableMerge true is ignored",
enableMerge: true,
uploaderCanAppend: false,
uploaderExpectedCalls: 1,
expected: true,
Expand All @@ -1098,7 +1098,7 @@ func TestDeltalake_ShouldMerge(t *testing.T) {
d.Warehouse = model.Warehouse{
Destination: backendconfig.DestinationT{
Config: map[string]any{
string(model.PreferAppendSetting): tc.preferAppend,
string(model.EnableMergeSetting): tc.enableMerge,
},
},
}
Expand Down
2 changes: 1 addition & 1 deletion warehouse/integrations/deltalake/testdata/template.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
"syncFrequency": "30",
"eventDelivery": false,
"eventDeliveryTS": 1648195480174,
"preferAppend": {{.preferAppend}}
"enableMerge": {{.enableMerge}}
},
"liveEventsConfig": {
"eventDelivery": false,
Expand Down
2 changes: 1 addition & 1 deletion warehouse/integrations/postgres/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,6 @@ func (pg *Postgres) loadUsersTable(
func (pg *Postgres) shouldMerge() bool {
return !pg.Uploader.CanAppend() ||
(pg.config.allowMerge &&
!pg.Warehouse.GetPreferAppendSetting() &&
pg.Warehouse.GetBoolDestinationConfig(model.EnableMergeSetting) &&
!slices.Contains(pg.config.skipDedupDestinationIDs, pg.Warehouse.Destination.ID))
}
Loading

0 comments on commit 1406702

Please sign in to comment.