Skip to content

Commit

Permalink
materialize-{snowflake,databricks}: make deletions idempotent
Browse files Browse the repository at this point in the history
The Snowflake and Databricks materializations rely on idempotent merge queries.
Previously a repeated merge query with a root document field having a `"delete"`
sentinel value would cause that row to be re-added to the table after having
first been deleted, in rare cases where the merge query is run a second time
before the staged files are delete & before the runtime acknowledgement is sent.

This fixes that scenario by not inserting rows if the root document field is
`"delete"` in merge queries.
  • Loading branch information
williamhbaker committed Oct 4, 2024
1 parent 190652f commit 49874ce
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 88 deletions.
28 changes: 1 addition & 27 deletions materialize-databricks/.snapshots/TestSQLGeneration
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ SELECT 0, `a-schema`.key_value.flow_document
DELETE
WHEN MATCHED THEN
UPDATE SET l.array = r.array, l.binary = r.binary, l.boolean = r.boolean, l.flow_published_at = r.flow_published_at, l.integer = r.integer, l.`integerGt64Bit` = r.`integerGt64Bit`, l.`integerWithUserDDL` = r.`integerWithUserDDL`, l.multiple = r.multiple, l.number = r.number, l.`numberCastToString` = r.`numberCastToString`, l.object = r.object, l.string = r.string, l.`stringInteger` = r.`stringInteger`, l.`stringInteger39Chars` = r.`stringInteger39Chars`, l.`stringInteger66Chars` = r.`stringInteger66Chars`, l.`stringNumber` = r.`stringNumber`, l.flow_document = r.flow_document
WHEN NOT MATCHED THEN
WHEN NOT MATCHED AND r.flow_document!='"delete"' THEN
INSERT (key1, key2, `key!binary`, array, binary, boolean, flow_published_at, integer, `integerGt64Bit`, `integerWithUserDDL`, multiple, number, `numberCastToString`, object, string, `stringInteger`, `stringInteger39Chars`, `stringInteger66Chars`, `stringNumber`, flow_document)
VALUES (r.key1, r.key2, r.`key!binary`, r.array, r.binary, r.boolean, r.flow_published_at, r.integer, r.`integerGt64Bit`, r.`integerWithUserDDL`, r.multiple, r.number, r.`numberCastToString`, r.object, r.string, r.`stringInteger`, r.`stringInteger39Chars`, r.`stringInteger66Chars`, r.`stringNumber`, r.flow_document);
--- End `a-schema`.key_value mergeInto ---
Expand All @@ -100,32 +100,6 @@ SELECT 0, `a-schema`.key_value.flow_document
;
--- End `a-schema`.key_value copyIntoDirect ---

--- Begin `a-schema`.delta_updates loadQuery ---
SELECT -1, ""
--- End `a-schema`.delta_updates loadQuery ---

--- Begin `a-schema`.delta_updates mergeInto ---
MERGE INTO `a-schema`.delta_updates AS l
USING (
(
SELECT
`theKey`::STRING, `aValue`::LONG, flow_published_at::TIMESTAMP
FROM json.`file1`
)
UNION ALL (
SELECT
`theKey`::STRING, `aValue`::LONG, flow_published_at::TIMESTAMP
FROM json.`file2`
)
) AS r
ON l.`theKey` = r.`theKey`
WHEN MATCHED THEN
UPDATE SET l.`aValue` = r.`aValue`, l.flow_published_at = r.flow_published_at
WHEN NOT MATCHED THEN
INSERT (`theKey`, `aValue`, flow_published_at)
VALUES (r.`theKey`, r.`aValue`, r.flow_published_at);
--- End `a-schema`.delta_updates mergeInto ---

--- Begin `a-schema`.delta_updates copyIntoDirect ---
COPY INTO `a-schema`.delta_updates FROM (
SELECT
Expand Down
2 changes: 1 addition & 1 deletion materialize-databricks/sqlgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ SELECT -1, ""
{{- if $.Table.Document -}}
{{ if $.Table.Values }}, {{ end }}l.{{ $.Table.Document.Identifier}} = r.{{ $.Table.Document.Identifier }}
{{- end }}
WHEN NOT MATCHED THEN
WHEN NOT MATCHED AND r.{{ $.Table.Document.Identifier }}!='"delete"' THEN
INSERT (
{{- range $ind, $key := $.Table.Columns }}
{{- if $ind }}, {{ end -}}
Expand Down
42 changes: 29 additions & 13 deletions materialize-databricks/sqlgen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,35 @@ func TestSQLGeneration(t *testing.T) {
},
)

for _, tbl := range tables {
for _, tpl := range []*template.Template{
tplLoadQuery,
tplMergeInto,
tplCopyIntoDirect,
} {
var testcase = tbl.Identifier + " " + tpl.Name()

var tplData = tableWithFiles{Table: &tbl, StagingPath: "test-staging-path", Files: []string{"file1", "file2"}}
snap.WriteString("--- Begin " + testcase + " ---")
require.NoError(t, tpl.Execute(snap, &tplData))
snap.WriteString("--- End " + testcase + " ---\n\n")
}
for _, tpl := range []*template.Template{
tplLoadQuery,
tplMergeInto,
tplCopyIntoDirect,
} {
tbl := tables[0]
require.False(t, tbl.DeltaUpdates)

var testcase = tbl.Identifier + " " + tpl.Name()

var tplData = tableWithFiles{Table: &tbl, StagingPath: "test-staging-path", Files: []string{"file1", "file2"}}
snap.WriteString("--- Begin " + testcase + " ---")
require.NoError(t, tpl.Execute(snap, &tplData))
snap.WriteString("--- End " + testcase + " ---\n\n")
}

for _, tpl := range []*template.Template{
tplCopyIntoDirect,
} {
tbl := tables[1]
require.True(t, tbl.DeltaUpdates)
require.Nil(t, tbl.Document)

var testcase = tbl.Identifier + " " + tpl.Name()

var tplData = tableWithFiles{Table: &tbl, StagingPath: "test-staging-path", Files: []string{"file1", "file2"}}
snap.WriteString("--- Begin " + testcase + " ---")
require.NoError(t, tpl.Execute(snap, &tplData))
snap.WriteString("--- End " + testcase + " ---\n\n")
}

cupaloy.SnapshotT(t, snap.String())
Expand Down
20 changes: 1 addition & 19 deletions materialize-snowflake/.snapshots/TestSQLGeneration
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,11 @@ WHEN MATCHED AND r.flow_document='delete' THEN
DELETE
WHEN MATCHED THEN
UPDATE SET l.array = r.array, l.binary = r.binary, l.boolean = r.boolean, l.flow_published_at = r.flow_published_at, l.integer = r.integer, l.integerGt64Bit = r.integerGt64Bit, l.integerWithUserDDL = r.integerWithUserDDL, l.multiple = r.multiple, l.number = r.number, l.numberCastToString = r.numberCastToString, l.object = r.object, l.string = r.string, l.stringInteger = r.stringInteger, l.stringInteger39Chars = r.stringInteger39Chars, l.stringInteger66Chars = r.stringInteger66Chars, l.stringNumber = r.stringNumber, l.flow_document = r.flow_document
WHEN NOT MATCHED THEN
WHEN NOT MATCHED and r.flow_document!='delete' THEN
INSERT (key1, key2, "key!binary", array, binary, boolean, flow_published_at, integer, integerGt64Bit, integerWithUserDDL, multiple, number, numberCastToString, object, string, stringInteger, stringInteger39Chars, stringInteger66Chars, stringNumber, flow_document)
VALUES (r.key1, r.key2, r."key!binary", r.array, r.binary, r.boolean, r.flow_published_at, r.integer, r.integerGt64Bit, r.integerWithUserDDL, r.multiple, r.number, r.numberCastToString, r.object, r.string, r.stringInteger, r.stringInteger39Chars, r.stringInteger66Chars, r.stringNumber, r.flow_document);
--- End "a-schema".key_value mergeInto ---

--- Begin "a-schema".delta_updates loadQuery ---
SELECT * FROM (SELECT -1, CAST(NULL AS VARIANT) LIMIT 0) as nodoc
--- End "a-schema".delta_updates loadQuery ---

--- Begin "a-schema".delta_updates copyInto ---
COPY INTO "a-schema".delta_updates (
theKey, aValue, flow_published_at
Expand All @@ -140,18 +136,4 @@ COPY INTO "a-schema".delta_updates (
);
--- End "a-schema".delta_updates copyInto ---

--- Begin "a-schema".delta_updates mergeInto ---
MERGE INTO "a-schema".delta_updates AS l
USING (
SELECT $1[0] AS theKey, $1[1] AS aValue, $1[2] AS flow_published_at
FROM test-file
) AS r
ON l.theKey = r.theKey
WHEN MATCHED THEN
UPDATE SET l.aValue = r.aValue, l.flow_published_at = r.flow_published_at
WHEN NOT MATCHED THEN
INSERT (theKey, aValue, flow_published_at)
VALUES (r.theKey, r.aValue, r.flow_published_at);
--- End "a-schema".delta_updates mergeInto ---


2 changes: 1 addition & 1 deletion materialize-snowflake/sqlgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ WHEN MATCHED THEN
{{- if $.Table.Document -}}
{{ if $.Table.Values }}, {{ end }}l.{{ $.Table.Document.Identifier}} = r.{{ $.Table.Document.Identifier }}
{{- end }}
WHEN NOT MATCHED THEN
WHEN NOT MATCHED and r.{{ $.Table.Document.Identifier }}!='delete' THEN
INSERT (
{{- range $ind, $key := $.Table.Columns }}
{{- if $ind }}, {{ end -}}
Expand Down
48 changes: 33 additions & 15 deletions materialize-snowflake/sqlgen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,41 @@ func TestSQLGeneration(t *testing.T) {
},
)

for _, tbl := range tables {
for _, tpl := range []*template.Template{
templates.loadQuery,
templates.copyInto,
templates.mergeInto,
} {
var testcase = tbl.Identifier + " " + tpl.Name()

var tf = tableAndFile{
Table: tbl,
File: "test-file",
}
for _, tpl := range []*template.Template{
templates.loadQuery,
templates.copyInto,
templates.mergeInto,
} {
tbl := tables[0]
require.False(t, tbl.DeltaUpdates)
var testcase = tbl.Identifier + " " + tpl.Name()

snap.WriteString("--- Begin " + testcase + " ---")
require.NoError(t, tpl.Execute(snap, &tf))
snap.WriteString("--- End " + testcase + " ---\n\n")
var tf = tableAndFile{
Table: tbl,
File: "test-file",
}

snap.WriteString("--- Begin " + testcase + " ---")
require.NoError(t, tpl.Execute(snap, &tf))
snap.WriteString("--- End " + testcase + " ---\n\n")
}

for _, tpl := range []*template.Template{
templates.copyInto,
} {
tbl := tables[1]
require.True(t, tbl.DeltaUpdates)
require.Nil(t, tbl.Document)
var testcase = tbl.Identifier + " " + tpl.Name()

var tf = tableAndFile{
Table: tbl,
File: "test-file",
}

snap.WriteString("--- Begin " + testcase + " ---")
require.NoError(t, tpl.Execute(snap, &tf))
snap.WriteString("--- End " + testcase + " ---\n\n")
}

cupaloy.SnapshotT(t, snap.String())
Expand Down
12 changes: 6 additions & 6 deletions tests/materialize/materialize-databricks/snapshot.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
"updated": {
"some-schema%2Fdeletions": {
"Queries": [
"\n\tMERGE INTO `some-schema`.deletions AS l\n\tUSING (\n\t\t(\n\t\t\tSELECT\n\t\t\tid::LONG, `_meta/op`::STRING, flow_published_at::TIMESTAMP, flow_document::STRING\n\t\t\tFROM json.`/Volumes/main/some-schema/flow_staging/flow_temp_tables/<uuid>`\n\t\t)\n\t) AS r\n\tON l.id = r.id\n\tWHEN MATCHED AND r.flow_document='\"delete\"' THEN\n\t\tDELETE\n\tWHEN MATCHED THEN\n\t\tUPDATE SET l.`_meta/op` = r.`_meta/op`, l.flow_published_at = r.flow_published_at, l.flow_document = r.flow_document\n\tWHEN NOT MATCHED THEN\n\t\tINSERT (id, `_meta/op`, flow_published_at, flow_document)\n\t\tVALUES (r.id, r.`_meta/op`, r.flow_published_at, r.flow_document);\n"
"\n\tMERGE INTO `some-schema`.deletions AS l\n\tUSING (\n\t\t(\n\t\t\tSELECT\n\t\t\tid::LONG, `_meta/op`::STRING, flow_published_at::TIMESTAMP, flow_document::STRING\n\t\t\tFROM json.`/Volumes/main/some-schema/flow_staging/flow_temp_tables/<uuid>`\n\t\t)\n\t) AS r\n\tON l.id = r.id\n\tWHEN MATCHED AND r.flow_document='\"delete\"' THEN\n\t\tDELETE\n\tWHEN MATCHED THEN\n\t\tUPDATE SET l.`_meta/op` = r.`_meta/op`, l.flow_published_at = r.flow_published_at, l.flow_document = r.flow_document\n\tWHEN NOT MATCHED AND r.flow_document!='\"delete\"' THEN\n\t\tINSERT (id, `_meta/op`, flow_published_at, flow_document)\n\t\tVALUES (r.id, r.`_meta/op`, r.flow_published_at, r.flow_document);\n"
],
"ToDelete": [
"/Volumes/main/some-schema/flow_staging/flow_temp_tables/<uuid>"
Expand All @@ -103,7 +103,7 @@
},
"some-schema%2Fduplicate_keys_standard": {
"Queries": [
"\n\tMERGE INTO `some-schema`.duplicate_keys_standard AS l\n\tUSING (\n\t\t(\n\t\t\tSELECT\n\t\t\tid::LONG, flow_published_at::TIMESTAMP, int::LONG, str::STRING, flow_document::STRING\n\t\t\tFROM json.`/Volumes/main/some-schema/flow_staging/flow_temp_tables/<uuid>`\n\t\t)\n\t) AS r\n\tON l.id = r.id\n\tWHEN MATCHED AND r.flow_document='\"delete\"' THEN\n\t\tDELETE\n\tWHEN MATCHED THEN\n\t\tUPDATE SET l.flow_published_at = r.flow_published_at, l.int = r.int, l.str = r.str, l.flow_document = r.flow_document\n\tWHEN NOT MATCHED THEN\n\t\tINSERT (id, flow_published_at, int, str, flow_document)\n\t\tVALUES (r.id, r.flow_published_at, r.int, r.str, r.flow_document);\n"
"\n\tMERGE INTO `some-schema`.duplicate_keys_standard AS l\n\tUSING (\n\t\t(\n\t\t\tSELECT\n\t\t\tid::LONG, flow_published_at::TIMESTAMP, int::LONG, str::STRING, flow_document::STRING\n\t\t\tFROM json.`/Volumes/main/some-schema/flow_staging/flow_temp_tables/<uuid>`\n\t\t)\n\t) AS r\n\tON l.id = r.id\n\tWHEN MATCHED AND r.flow_document='\"delete\"' THEN\n\t\tDELETE\n\tWHEN MATCHED THEN\n\t\tUPDATE SET l.flow_published_at = r.flow_published_at, l.int = r.int, l.str = r.str, l.flow_document = r.flow_document\n\tWHEN NOT MATCHED AND r.flow_document!='\"delete\"' THEN\n\t\tINSERT (id, flow_published_at, int, str, flow_document)\n\t\tVALUES (r.id, r.flow_published_at, r.int, r.str, r.flow_document);\n"
],
"ToDelete": [
"/Volumes/main/some-schema/flow_staging/flow_temp_tables/<uuid>"
Expand All @@ -119,7 +119,7 @@
},
"some-schema%2Fmultiple_types": {
"Queries": [
"\n\tMERGE INTO `some-schema`.multiple_types AS l\n\tUSING (\n\t\t(\n\t\t\tSELECT\n\t\t\tid::LONG, array_int::STRING, unbase64(binary_field)::BINARY as binary_field, bool_field::BOOLEAN, float_field::DOUBLE, flow_published_at::TIMESTAMP, multiple::STRING, nested::STRING, nullable_int::LONG, str_field::STRING, flow_document::STRING\n\t\t\tFROM json.`/Volumes/main/some-schema/flow_staging/flow_temp_tables/<uuid>`\n\t\t)\n\t) AS r\n\tON l.id = r.id\n\tWHEN MATCHED AND r.flow_document='\"delete\"' THEN\n\t\tDELETE\n\tWHEN MATCHED THEN\n\t\tUPDATE SET l.array_int = r.array_int, l.binary_field = r.binary_field, l.bool_field = r.bool_field, l.float_field = r.float_field, l.flow_published_at = r.flow_published_at, l.multiple = r.multiple, l.nested = r.nested, l.nullable_int = r.nullable_int, l.str_field = r.str_field, l.flow_document = r.flow_document\n\tWHEN NOT MATCHED THEN\n\t\tINSERT (id, array_int, binary_field, bool_field, float_field, flow_published_at, multiple, nested, nullable_int, str_field, flow_document)\n\t\tVALUES (r.id, r.array_int, r.binary_field, r.bool_field, r.float_field, r.flow_published_at, r.multiple, r.nested, r.nullable_int, r.str_field, r.flow_document);\n"
"\n\tMERGE INTO `some-schema`.multiple_types AS l\n\tUSING (\n\t\t(\n\t\t\tSELECT\n\t\t\tid::LONG, array_int::STRING, unbase64(binary_field)::BINARY as binary_field, bool_field::BOOLEAN, float_field::DOUBLE, flow_published_at::TIMESTAMP, multiple::STRING, nested::STRING, nullable_int::LONG, str_field::STRING, flow_document::STRING\n\t\t\tFROM json.`/Volumes/main/some-schema/flow_staging/flow_temp_tables/<uuid>`\n\t\t)\n\t) AS r\n\tON l.id = r.id\n\tWHEN MATCHED AND r.flow_document='\"delete\"' THEN\n\t\tDELETE\n\tWHEN MATCHED THEN\n\t\tUPDATE SET l.array_int = r.array_int, l.binary_field = r.binary_field, l.bool_field = r.bool_field, l.float_field = r.float_field, l.flow_published_at = r.flow_published_at, l.multiple = r.multiple, l.nested = r.nested, l.nullable_int = r.nullable_int, l.str_field = r.str_field, l.flow_document = r.flow_document\n\tWHEN NOT MATCHED AND r.flow_document!='\"delete\"' THEN\n\t\tINSERT (id, array_int, binary_field, bool_field, float_field, flow_published_at, multiple, nested, nullable_int, str_field, flow_document)\n\t\tVALUES (r.id, r.array_int, r.binary_field, r.bool_field, r.float_field, r.flow_published_at, r.multiple, r.nested, r.nullable_int, r.str_field, r.flow_document);\n"
],
"ToDelete": [
"/Volumes/main/some-schema/flow_staging/flow_temp_tables/<uuid>"
Expand All @@ -142,7 +142,7 @@
"updated": {
"some-schema%2Fdeletions": {
"Queries": [
"\n\tMERGE INTO `some-schema`.deletions AS l\n\tUSING (\n\t\t(\n\t\t\tSELECT\n\t\t\tid::LONG, `_meta/op`::STRING, flow_published_at::TIMESTAMP, flow_document::STRING\n\t\t\tFROM json.`/Volumes/main/some-schema/flow_staging/flow_temp_tables/<uuid>`\n\t\t)\n\t) AS r\n\tON l.id = r.id\n\tWHEN MATCHED AND r.flow_document='\"delete\"' THEN\n\t\tDELETE\n\tWHEN MATCHED THEN\n\t\tUPDATE SET l.`_meta/op` = r.`_meta/op`, l.flow_published_at = r.flow_published_at, l.flow_document = r.flow_document\n\tWHEN NOT MATCHED THEN\n\t\tINSERT (id, `_meta/op`, flow_published_at, flow_document)\n\t\tVALUES (r.id, r.`_meta/op`, r.flow_published_at, r.flow_document);\n"
"\n\tMERGE INTO `some-schema`.deletions AS l\n\tUSING (\n\t\t(\n\t\t\tSELECT\n\t\t\tid::LONG, `_meta/op`::STRING, flow_published_at::TIMESTAMP, flow_document::STRING\n\t\t\tFROM json.`/Volumes/main/some-schema/flow_staging/flow_temp_tables/<uuid>`\n\t\t)\n\t) AS r\n\tON l.id = r.id\n\tWHEN MATCHED AND r.flow_document='\"delete\"' THEN\n\t\tDELETE\n\tWHEN MATCHED THEN\n\t\tUPDATE SET l.`_meta/op` = r.`_meta/op`, l.flow_published_at = r.flow_published_at, l.flow_document = r.flow_document\n\tWHEN NOT MATCHED AND r.flow_document!='\"delete\"' THEN\n\t\tINSERT (id, `_meta/op`, flow_published_at, flow_document)\n\t\tVALUES (r.id, r.`_meta/op`, r.flow_published_at, r.flow_document);\n"
],
"ToDelete": [
"/Volumes/main/some-schema/flow_staging/flow_temp_tables/<uuid>"
Expand All @@ -166,7 +166,7 @@
},
"some-schema%2Fduplicate_keys_standard": {
"Queries": [
"\n\tMERGE INTO `some-schema`.duplicate_keys_standard AS l\n\tUSING (\n\t\t(\n\t\t\tSELECT\n\t\t\tid::LONG, flow_published_at::TIMESTAMP, int::LONG, str::STRING, flow_document::STRING\n\t\t\tFROM json.`/Volumes/main/some-schema/flow_staging/flow_temp_tables/<uuid>`\n\t\t)\n\t) AS r\n\tON l.id = r.id\n\tWHEN MATCHED AND r.flow_document='\"delete\"' THEN\n\t\tDELETE\n\tWHEN MATCHED THEN\n\t\tUPDATE SET l.flow_published_at = r.flow_published_at, l.int = r.int, l.str = r.str, l.flow_document = r.flow_document\n\tWHEN NOT MATCHED THEN\n\t\tINSERT (id, flow_published_at, int, str, flow_document)\n\t\tVALUES (r.id, r.flow_published_at, r.int, r.str, r.flow_document);\n"
"\n\tMERGE INTO `some-schema`.duplicate_keys_standard AS l\n\tUSING (\n\t\t(\n\t\t\tSELECT\n\t\t\tid::LONG, flow_published_at::TIMESTAMP, int::LONG, str::STRING, flow_document::STRING\n\t\t\tFROM json.`/Volumes/main/some-schema/flow_staging/flow_temp_tables/<uuid>`\n\t\t)\n\t) AS r\n\tON l.id = r.id\n\tWHEN MATCHED AND r.flow_document='\"delete\"' THEN\n\t\tDELETE\n\tWHEN MATCHED THEN\n\t\tUPDATE SET l.flow_published_at = r.flow_published_at, l.int = r.int, l.str = r.str, l.flow_document = r.flow_document\n\tWHEN NOT MATCHED AND r.flow_document!='\"delete\"' THEN\n\t\tINSERT (id, flow_published_at, int, str, flow_document)\n\t\tVALUES (r.id, r.flow_published_at, r.int, r.str, r.flow_document);\n"
],
"ToDelete": [
"/Volumes/main/some-schema/flow_staging/flow_temp_tables/<uuid>"
Expand All @@ -182,7 +182,7 @@
},
"some-schema%2Fmultiple_types": {
"Queries": [
"\n\tMERGE INTO `some-schema`.multiple_types AS l\n\tUSING (\n\t\t(\n\t\t\tSELECT\n\t\t\tid::LONG, array_int::STRING, unbase64(binary_field)::BINARY as binary_field, bool_field::BOOLEAN, float_field::DOUBLE, flow_published_at::TIMESTAMP, multiple::STRING, nested::STRING, nullable_int::LONG, str_field::STRING, flow_document::STRING\n\t\t\tFROM json.`/Volumes/main/some-schema/flow_staging/flow_temp_tables/<uuid>`\n\t\t)\n\t) AS r\n\tON l.id = r.id\n\tWHEN MATCHED AND r.flow_document='\"delete\"' THEN\n\t\tDELETE\n\tWHEN MATCHED THEN\n\t\tUPDATE SET l.array_int = r.array_int, l.binary_field = r.binary_field, l.bool_field = r.bool_field, l.float_field = r.float_field, l.flow_published_at = r.flow_published_at, l.multiple = r.multiple, l.nested = r.nested, l.nullable_int = r.nullable_int, l.str_field = r.str_field, l.flow_document = r.flow_document\n\tWHEN NOT MATCHED THEN\n\t\tINSERT (id, array_int, binary_field, bool_field, float_field, flow_published_at, multiple, nested, nullable_int, str_field, flow_document)\n\t\tVALUES (r.id, r.array_int, r.binary_field, r.bool_field, r.float_field, r.flow_published_at, r.multiple, r.nested, r.nullable_int, r.str_field, r.flow_document);\n"
"\n\tMERGE INTO `some-schema`.multiple_types AS l\n\tUSING (\n\t\t(\n\t\t\tSELECT\n\t\t\tid::LONG, array_int::STRING, unbase64(binary_field)::BINARY as binary_field, bool_field::BOOLEAN, float_field::DOUBLE, flow_published_at::TIMESTAMP, multiple::STRING, nested::STRING, nullable_int::LONG, str_field::STRING, flow_document::STRING\n\t\t\tFROM json.`/Volumes/main/some-schema/flow_staging/flow_temp_tables/<uuid>`\n\t\t)\n\t) AS r\n\tON l.id = r.id\n\tWHEN MATCHED AND r.flow_document='\"delete\"' THEN\n\t\tDELETE\n\tWHEN MATCHED THEN\n\t\tUPDATE SET l.array_int = r.array_int, l.binary_field = r.binary_field, l.bool_field = r.bool_field, l.float_field = r.float_field, l.flow_published_at = r.flow_published_at, l.multiple = r.multiple, l.nested = r.nested, l.nullable_int = r.nullable_int, l.str_field = r.str_field, l.flow_document = r.flow_document\n\tWHEN NOT MATCHED AND r.flow_document!='\"delete\"' THEN\n\t\tINSERT (id, array_int, binary_field, bool_field, float_field, flow_published_at, multiple, nested, nullable_int, str_field, flow_document)\n\t\tVALUES (r.id, r.array_int, r.binary_field, r.bool_field, r.float_field, r.flow_published_at, r.multiple, r.nested, r.nullable_int, r.str_field, r.flow_document);\n"
],
"ToDelete": [
"/Volumes/main/some-schema/flow_staging/flow_temp_tables/<uuid>"
Expand Down
Loading

0 comments on commit 49874ce

Please sign in to comment.