Skip to content

Commit

Permalink
fix: preferAppend defaults to false if not defined (#4098)
Browse files Browse the repository at this point in the history
* fix: preferAppend defaults to false if not defined

* chore: adding comment and removing if

* fix: minio healthcheck
  • Loading branch information
fracasula authored Nov 8, 2023
1 parent 94717f5 commit 941caba
Show file tree
Hide file tree
Showing 15 changed files with 124 additions and 104 deletions.
2 changes: 1 addition & 1 deletion warehouse/integrations/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,7 @@ func (bq *BigQuery) connect(ctx context.Context, cred BQCredentials) (*bigquery.
// * the server config says we allow merging
// * the user opted in to merging
func (bq *BigQuery) shouldMerge() bool {
return bq.config.allowMerge && bq.warehouse.GetBoolDestinationConfig(model.EnableMergeSetting)
return bq.config.allowMerge && !bq.warehouse.GetPreferAppendSetting()
}

func (bq *BigQuery) CrashRecover(ctx context.Context) {
Expand Down
31 changes: 17 additions & 14 deletions warehouse/integrations/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestIntegration(t *testing.T) {

escapedCredentialsTrimmedStr := strings.Trim(string(escapedCredentials), `"`)

bootstrapSvc := func(t *testing.T, enableMerge bool) *bigquery.Client {
bootstrapSvc := func(t *testing.T, preferAppend bool) *bigquery.Client {
templateConfigurations := map[string]any{
"workspaceID": workspaceID,
"sourceID": sourceID,
Expand All @@ -93,7 +93,7 @@ func TestIntegration(t *testing.T) {
"bucketName": bqTestCredentials.BucketName,
"credentials": escapedCredentialsTrimmedStr,
"sourcesNamespace": sourcesNamespace,
"enableMerge": enableMerge,
"preferAppend": preferAppend,
}
workspaceConfigPath := workspaceConfig.CreateTempFile(t, "testdata/template.json", templateConfigurations)

Expand Down Expand Up @@ -151,7 +151,7 @@ func TestIntegration(t *testing.T) {
asyncJob bool
skipModifiedEvents bool
prerequisite func(context.Context, testing.TB, *bigquery.Client)
enableMerge bool
preferAppend bool
customPartitionsEnabledWorkspaceIDs string
stagingFilePrefix string
}{
Expand All @@ -169,7 +169,7 @@ func TestIntegration(t *testing.T) {
loadFilesEventsMap: loadFilesEventsMap(),
tableUploadsEventsMap: tableUploadsEventsMap(),
warehouseEventsMap: mergeEventsMap(),
enableMerge: true,
preferAppend: false,
prerequisite: func(ctx context.Context, t testing.TB, db *bigquery.Client) {
t.Helper()
_ = db.Dataset(namespace).DeleteWithContents(ctx)
Expand All @@ -193,7 +193,7 @@ func TestIntegration(t *testing.T) {
tableUploadsEventsMap: whth.SourcesTableUploadsEventsMap(),
warehouseEventsMap: whth.SourcesWarehouseEventsMap(),
asyncJob: true,
enableMerge: false,
preferAppend: true,
prerequisite: func(ctx context.Context, t testing.TB, db *bigquery.Client) {
t.Helper()
_ = db.Dataset(namespace).DeleteWithContents(ctx)
Expand All @@ -215,7 +215,7 @@ func TestIntegration(t *testing.T) {
tableUploadsEventsMap: tableUploadsEventsMap(),
warehouseEventsMap: appendEventsMap(),
skipModifiedEvents: true,
enableMerge: false,
preferAppend: true,
prerequisite: func(ctx context.Context, t testing.TB, db *bigquery.Client) {
t.Helper()
_ = db.Dataset(namespace).DeleteWithContents(ctx)
Expand All @@ -237,7 +237,7 @@ func TestIntegration(t *testing.T) {
tableUploadsEventsMap: tableUploadsEventsMap(),
warehouseEventsMap: appendEventsMap(),
skipModifiedEvents: true,
enableMerge: false,
preferAppend: true,
customPartitionsEnabledWorkspaceIDs: workspaceID,
prerequisite: func(ctx context.Context, t testing.TB, db *bigquery.Client) {
t.Helper()
Expand Down Expand Up @@ -274,7 +274,7 @@ func TestIntegration(t *testing.T) {
"RSERVER_WAREHOUSE_BIGQUERY_CUSTOM_PARTITIONS_ENABLED_WORKSPACE_IDS",
tc.customPartitionsEnabledWorkspaceIDs,
)
db := bootstrapSvc(t, tc.enableMerge)
db := bootstrapSvc(t, tc.preferAppend)

t.Cleanup(func() {
for _, dataset := range []string{tc.schema} {
Expand Down Expand Up @@ -542,12 +542,9 @@ func TestIntegration(t *testing.T) {
loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse)

dedupWarehouse := th.Clone(t, warehouse)
dedupWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true

c := config.New()
bq := whbigquery.New(c, logger.NOP)
err := bq.Setup(ctx, dedupWarehouse, mockUploader)
err := bq.Setup(ctx, warehouse, mockUploader)
require.NoError(t, err)

err = bq.CreateSchema(ctx)
Expand Down Expand Up @@ -590,9 +587,12 @@ func TestIntegration(t *testing.T) {
loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse)

appendWarehouse := th.Clone(t, warehouse)
appendWarehouse.Destination.Config[string(model.PreferAppendSetting)] = true

c := config.New()
bq := whbigquery.New(c, logger.NOP)
err := bq.Setup(ctx, warehouse, mockUploader)
err := bq.Setup(ctx, appendWarehouse, mockUploader)
require.NoError(t, err)

err = bq.CreateSchema(ctx)
Expand Down Expand Up @@ -638,8 +638,11 @@ func TestIntegration(t *testing.T) {
loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse)

appendWarehouse := th.Clone(t, warehouse)
appendWarehouse.Destination.Config[string(model.PreferAppendSetting)] = true

bq := whbigquery.New(config.New(), logger.NOP)
err := bq.Setup(ctx, warehouse, mockUploader)
err := bq.Setup(ctx, appendWarehouse, mockUploader)
require.NoError(t, err)

err = bq.CreateSchema(ctx)
Expand Down
4 changes: 2 additions & 2 deletions warehouse/integrations/bigquery/testdata/template.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"prefix": "",
"namespace": "{{.namespace}}",
"syncFrequency": "30",
"enableMerge": {{.enableMerge}}
"preferAppend": {{.preferAppend}}
},
"liveEventsConfig": {},
"secretConfig": {},
Expand Down Expand Up @@ -161,7 +161,7 @@
"prefix": "",
"namespace": "{{.sourcesNamespace}}",
"syncFrequency": "30",
"enableMerge": {{.enableMerge}}
"preferAppend": {{.preferAppend}}
},
"liveEventsConfig": {},
"secretConfig": {},
Expand Down
2 changes: 1 addition & 1 deletion warehouse/integrations/deltalake/deltalake.go
Original file line number Diff line number Diff line change
Expand Up @@ -1387,5 +1387,5 @@ func (*Deltalake) DeleteBy(context.Context, []string, warehouseutils.DeleteByPar
// * the user opted in to merging and we allow merging
func (d *Deltalake) ShouldMerge() bool {
return !d.Uploader.CanAppend() ||
(d.config.allowMerge && d.Warehouse.GetBoolDestinationConfig(model.EnableMergeSetting))
(d.config.allowMerge && !d.Warehouse.GetPreferAppendSetting())
}
50 changes: 25 additions & 25 deletions warehouse/integrations/deltalake/deltalake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestIntegration(t *testing.T) {
db := sql.OpenDB(connector)
require.NoError(t, db.Ping())

bootstrapSvc := func(t *testing.T, enableMerge bool) {
bootstrapSvc := func(t *testing.T, preferAppend bool) {
templateConfigurations := map[string]any{
"workspaceID": workspaceID,
"sourceID": sourceID,
Expand All @@ -131,7 +131,7 @@ func TestIntegration(t *testing.T) {
"containerName": deltaLakeCredentials.ContainerName,
"accountName": deltaLakeCredentials.AccountName,
"accountKey": deltaLakeCredentials.AccountKey,
"enableMerge": enableMerge,
"preferAppend": preferAppend,
}
workspaceConfigPath := workspaceConfig.CreateTempFile(t, "testdata/template.json", templateConfigurations)

Expand Down Expand Up @@ -186,7 +186,7 @@ func TestIntegration(t *testing.T) {
destinationID string
messageID string
warehouseEventsMap whth.EventsCountMap
enableMerge bool
preferAppend bool
useParquetLoadFiles bool
stagingFilePrefix string
jobRunID string
Expand All @@ -198,7 +198,7 @@ func TestIntegration(t *testing.T) {
sourceID: sourceID,
destinationID: destinationID,
warehouseEventsMap: mergeEventsMap(),
enableMerge: true,
preferAppend: false,
useParquetLoadFiles: false,
stagingFilePrefix: "testdata/upload-job-merge-mode",
jobRunID: misc.FastUUID().String(),
Expand All @@ -210,7 +210,7 @@ func TestIntegration(t *testing.T) {
sourceID: sourceID,
destinationID: destinationID,
warehouseEventsMap: appendEventsMap(),
enableMerge: false,
preferAppend: true,
useParquetLoadFiles: false,
stagingFilePrefix: "testdata/upload-job-append-mode",
// an empty jobRunID means that the source is not an ETL one
Expand All @@ -224,7 +224,7 @@ func TestIntegration(t *testing.T) {
sourceID: sourceID,
destinationID: destinationID,
warehouseEventsMap: mergeEventsMap(),
enableMerge: true,
preferAppend: false,
useParquetLoadFiles: true,
stagingFilePrefix: "testdata/upload-job-parquet",
jobRunID: misc.FastUUID().String(),
Expand All @@ -234,7 +234,7 @@ func TestIntegration(t *testing.T) {
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
bootstrapSvc(t, tc.enableMerge)
bootstrapSvc(t, tc.preferAppend)
t.Setenv("RSERVER_WAREHOUSE_DELTALAKE_USE_PARQUET_LOAD_FILES", strconv.FormatBool(tc.useParquetLoadFiles))

sqlClient := &warehouseclient.Client{
Expand Down Expand Up @@ -563,11 +563,8 @@ func TestIntegration(t *testing.T) {
loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv, true, true, "2022-12-15T06:53:49.640Z")

mergeWarehouse := th.Clone(t, warehouse)
mergeWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true

d := deltalake.New(config.New(), logger.NOP, memstats.New())
err := d.Setup(ctx, mergeWarehouse, mockUploader)
err := d.Setup(ctx, warehouse, mockUploader)
require.NoError(t, err)

err = d.CreateSchema(ctx)
Expand Down Expand Up @@ -613,11 +610,11 @@ func TestIntegration(t *testing.T) {
loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv, false, false, "2022-11-15T06:53:49.640Z")

mergeWarehouse := th.Clone(t, warehouse)
mergeWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true
appendWarehouse := th.Clone(t, warehouse)
appendWarehouse.Destination.Config[string(model.PreferAppendSetting)] = true

d := deltalake.New(config.New(), logger.NOP, memstats.New())
err := d.Setup(ctx, mergeWarehouse, mockUploader)
err := d.Setup(ctx, appendWarehouse, mockUploader)
require.NoError(t, err)

err = d.CreateSchema(ctx)
Expand Down Expand Up @@ -664,8 +661,11 @@ func TestIntegration(t *testing.T) {
loadFiles := []warehouseutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, warehouseutils.LoadFileTypeCsv, true, false, "2022-12-15T06:53:49.640Z")

appendWarehouse := th.Clone(t, warehouse)
appendWarehouse.Destination.Config[string(model.PreferAppendSetting)] = true

d := deltalake.New(config.New(), logger.NOP, memstats.New())
err := d.Setup(ctx, warehouse, mockUploader)
err := d.Setup(ctx, appendWarehouse, mockUploader)
require.NoError(t, err)

err = d.CreateSchema(ctx)
Expand Down Expand Up @@ -1057,35 +1057,35 @@ func TestDeltalake_TrimErrorMessage(t *testing.T) {
func TestDeltalake_ShouldMerge(t *testing.T) {
testCases := []struct {
name string
enableMerge bool
preferAppend bool
uploaderCanAppend bool
uploaderExpectedCalls int
expected bool
}{
{
name: "uploader says we can append and merge is not enabled",
enableMerge: false,
name: "uploader says we can append and user prefers to append",
preferAppend: true,
uploaderCanAppend: true,
uploaderExpectedCalls: 1,
expected: false,
},
{
name: "uploader says we can append and merge is enabled",
enableMerge: true,
name: "uploader says we can append and users prefers not to append",
preferAppend: false,
uploaderCanAppend: true,
uploaderExpectedCalls: 1,
expected: true,
},
{
name: "uploader says we cannot append so enableMerge false is ignored",
enableMerge: false,
name: "uploader says we cannot append and user prefers to append",
preferAppend: true,
uploaderCanAppend: false,
uploaderExpectedCalls: 1,
expected: true,
},
{
name: "uploader says we cannot append so enableMerge true is ignored",
enableMerge: true,
name: "uploader says we cannot append and users prefers not to append",
preferAppend: false,
uploaderCanAppend: false,
uploaderExpectedCalls: 1,
expected: true,
Expand All @@ -1098,7 +1098,7 @@ func TestDeltalake_ShouldMerge(t *testing.T) {
d.Warehouse = model.Warehouse{
Destination: backendconfig.DestinationT{
Config: map[string]any{
string(model.EnableMergeSetting): tc.enableMerge,
string(model.PreferAppendSetting): tc.preferAppend,
},
},
}
Expand Down
2 changes: 1 addition & 1 deletion warehouse/integrations/deltalake/testdata/template.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
"syncFrequency": "30",
"eventDelivery": false,
"eventDeliveryTS": 1648195480174,
"enableMerge": {{.enableMerge}}
"preferAppend": {{.preferAppend}}
},
"liveEventsConfig": {
"eventDelivery": false,
Expand Down
2 changes: 1 addition & 1 deletion warehouse/integrations/postgres/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,6 @@ func (pg *Postgres) loadUsersTable(
func (pg *Postgres) shouldMerge() bool {
return !pg.Uploader.CanAppend() ||
(pg.config.allowMerge &&
pg.Warehouse.GetBoolDestinationConfig(model.EnableMergeSetting) &&
!pg.Warehouse.GetPreferAppendSetting() &&
!slices.Contains(pg.config.skipDedupDestinationIDs, pg.Warehouse.Destination.ID))
}
Loading

0 comments on commit 941caba

Please sign in to comment.