Skip to content

Commit

Permalink
Merge pull request #616 from mgirlich/rows_upsert
Browse files Browse the repository at this point in the history
Add `rows_upsert()` for DBI sources (#616, @mgirlich).
  • Loading branch information
krlmlr authored Sep 26, 2021
2 parents d528fa2 + 0496b24 commit 6fd3206
Show file tree
Hide file tree
Showing 6 changed files with 481 additions and 8 deletions.
7 changes: 7 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ S3method(rows_patch,tbl_dbi)
S3method(rows_truncate,data.frame)
S3method(rows_truncate,tbl_sql)
S3method(rows_update,tbl_dbi)
S3method(rows_upsert,tbl_dbi)
S3method(select,dm)
S3method(select,zoomed_dm)
S3method(semi_join,dm)
Expand Down Expand Up @@ -108,6 +109,12 @@ S3method(sql_rows_update,tbl_MariaDBConnection)
S3method(sql_rows_update,tbl_PqConnection)
S3method(sql_rows_update,tbl_SQLiteConnection)
S3method(sql_rows_update,tbl_sql)
S3method(sql_rows_upsert,"tbl_Microsoft SQL Server")
S3method(sql_rows_upsert,tbl_MariaDBConnection)
S3method(sql_rows_upsert,tbl_PqConnection)
S3method(sql_rows_upsert,tbl_SQLiteConnection)
S3method(sql_rows_upsert,tbl_duckdb_connection)
S3method(sql_rows_upsert,tbl_sql)
S3method(sql_schema_create,"Microsoft SQL Server")
S3method(sql_schema_create,PqConnection)
S3method(sql_schema_create,SQLiteConnection)
Expand Down
261 changes: 261 additions & 0 deletions R/rows-db.R
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,58 @@ rows_patch.tbl_dbi <- function(x, y, by = NULL, ...,
}
}

#' @export
#' @rdname rows-db
rows_upsert.tbl_dbi <- function(x, y, by = NULL, ...,
in_place = NULL, copy = FALSE, check = NULL,
returning = NULL) {
returning_cols <- eval_select_both(enquo(returning), colnames(x))$names
check_returning_cols_possible(returning_cols, in_place)

y <- auto_copy(x, y, copy = copy)
y_key <- db_key(y, by)
by <- names(y_key)
x_key <- db_key(x, by)

new_columns <- setdiff(colnames(y), by)

name <- target_table_name(x, in_place)

if (!is_null(name)) {
# Checking optional, can rely on primary key constraint
if (is_true(check)) {
check_db_superset(x, y, by)
}

if (is_empty(new_columns)) {
return(invisible(x))
}

con <- dbplyr::remote_con(x)
sql <- sql_rows_upsert(x, y, by, returning_cols = returning_cols)

rows_get_or_execute(x, con, sql, returning_cols)
} else {
# Checking optional, can rely on primary key constraint
# FIXME: contrary to doc currently also checks if `in_place = FALSE`
if (is_null(check) || is_true(check)) {
check_db_superset(x, y, by)
}

existing_columns <- setdiff(colnames(x), new_columns)

unchanged <- anti_join(x, y, by = by)
inserted <- anti_join(y, x, by = by)
updated <-
x %>%
select(!!!existing_columns) %>%
inner_join(y, by = by)
upserted <- union_all(updated, inserted)

union_all(unchanged, upserted)
}
}

#' @export
#' @rdname rows-db
rows_delete.tbl_dbi <- function(x, y, by = NULL, ...,
Expand Down Expand Up @@ -453,6 +505,215 @@ sql_rows_update.tbl_PqConnection <- function(x, y, by, ..., returning_cols = NUL
glue::as_glue(sql)
}

sql_rows_update_prep <- function(x, y, by) {
con <- dbplyr::remote_con(x)
name <- dbplyr::remote_name(x)

# https://stackoverflow.com/a/47753166/946850
y_name <- DBI::dbQuoteIdentifier(con, "...y")
y_columns_qq <- paste(
DBI::dbQuoteIdentifier(con, colnames(y)),
collapse = ", "
)

new_columns_q <- DBI::dbQuoteIdentifier(con, setdiff(colnames(y), by))
new_columns_qq <- paste(new_columns_q, collapse = ", ")
new_columns_qq_list <- list(new_columns_q)
new_columns_qual_qq <- paste0(
y_name, ".", new_columns_q,
collapse = ", "
)
new_columns_qual_qq_list <- list(paste0(y_name, ".", new_columns_q))

key_columns_q <- DBI::dbQuoteIdentifier(con, by)
compare_qual_qq <- paste0(
y_name, ".", key_columns_q,
" = ",
name, ".", key_columns_q,
collapse = " AND "
)

tibble(
name, y_name,
y_columns_qq,
new_columns_qq, new_columns_qq_list,
new_columns_qual_qq, new_columns_qual_qq_list,
compare_qual_qq
)
}

sql_rows_upsert <- function(x, y, by, ..., returning_cols = NULL) {
ellipsis::check_dots_used()
# FIXME: check here same src for x and y? if not -> error.
UseMethod("sql_rows_upsert")
}

# nocov start
# exclude from coverage because no database in the workflow supports MERGE
#' @export
sql_rows_upsert.tbl_sql <- function(x, y, by, ..., returning_cols = NULL) {
con <- dbplyr::remote_con(x)

p <- sql_rows_upsert_prep(x, y, by)

update_clause <- paste0(
unlist(p$new_columns_qq_list), " = ", "excluded.", unlist(p$new_columns_qq_list),
collapse = ",\n"
)

sql <- paste0(
"MERGE INTO ", p$name, "\n",
"USING (", dbplyr::sql_render(y), ") AS ", p$y_name, "\n",
"ON (", p$compare_qual_qq, ")\n",
"WHEN MATCHED THEN\n",
" UPDATE SET", update_clause, "\n",
"WHEN NOT MATCHED THEN\n",
" INSERT (", p$y_columns_qq, ")\n",
" VALUES (", p$y_columns_qual_qq, ")\n",
sql_returning_cols(x, returning_cols)
)

glue::as_glue(sql)
}
# nocov end

# nocov start
# exclude from coverage because MERGE somehow doesn't work with MS SQL 2017
#' @export
`sql_rows_upsert.tbl_Microsoft SQL Server` <- function(x, y, by, ..., returning_cols = NULL) {
con <- dbplyr::remote_con(x)

p <- sql_rows_upsert_prep(x, y, by)

update_clause <- paste0(
unlist(p$new_columns_qq_list), " = ", "excluded.", unlist(p$new_columns_qq_list),
collapse = ",\n"
)

sql <- paste0(
"MERGE INTO ", p$name, "\n",
"USING (", dbplyr::sql_render(y), ") AS ", p$y_name, "\n",
"ON (", p$compare_qual_qq, ")\n",
"WHEN MATCHED THEN\n",
" UPDATE SET", update_clause, "\n",
"WHEN NOT MATCHED THEN\n",
" INSERT (", p$y_columns_qq, ")\n",
" VALUES (", p$y_columns_qual_qq, ")\n",
sql_output_cols(x, returning_cols),
";"
)

glue::as_glue(sql)
}
# nocov end

#' @export
sql_rows_upsert.tbl_duckdb_connection <- function(x, y, by, ..., returning_cols = NULL) {
abort("upsert is not supported for DuckDB")
}

#' @export
sql_rows_upsert.tbl_PqConnection <- function(x, y, by, ..., returning_cols = NULL) {
con <- dbplyr::remote_con(x)

p <- sql_rows_upsert_prep(x, y, by)

update_clause <- paste0(
unlist(p$new_columns_qq_list), " = ", "excluded.", unlist(p$new_columns_qq_list),
collapse = ",\n"
)

sql <- paste0(
"WITH ", p$y_name, "(", p$y_columns_qq, ") AS (\n",
dbplyr::sql_render(y),
"\n)\n",
"INSERT INTO ", p$name, " (", p$y_columns_qq, ")\n",
"SELECT * FROM ", p$y_name, "\n",
"WHERE true\n",
"ON CONFLICT (", p$by_columns_qq, ")\n",
"DO UPDATE\n",
"SET ", update_clause, "\n",
sql_returning_cols(x, returning_cols)
)

glue::as_glue(sql)
}

#' @export
sql_rows_upsert.tbl_SQLiteConnection <- sql_rows_upsert.tbl_PqConnection

#' @export
sql_rows_upsert.tbl_MariaDBConnection <- function(x, y, by, ..., returning_cols = NULL) {
con <- dbplyr::remote_con(x)

p <- sql_rows_upsert_prep(x, y, by)

update_clause <- paste0(
p$new_columns_qq_list, " = ", p$new_columns_qual_qq_list,
collapse = ",\n"
)

# MariaDB has the order: first INSERT then CTE
# https://www.itjungle.com/2016/08/16/fhg081616-story02/
sql <- paste0(
"INSERT INTO ", p$name, "\n",
"WITH ", p$y_name, "(", p$y_columns_qq, ") AS (\n",
dbplyr::sql_render(y),
"\n)\n",
"SELECT * FROM ", p$y_name, "\n",
"WHERE true\n",
"ON DUPLICATE KEY UPDATE\n",
update_clause, "\n",
sql_returning_cols(x, returning_cols)
)

glue::as_glue(sql)
}

sql_rows_upsert_prep <- function(x, y, by) {
con <- dbplyr::remote_con(x)
name <- dbplyr::remote_name(x)

# https://stackoverflow.com/a/47753166/946850
y_name <- DBI::dbQuoteIdentifier(con, "...y")
y_columns_qq <- paste(
DBI::dbQuoteIdentifier(con, colnames(y)),
collapse = ", "
)
y_columns_qual_qq <- paste(
y_name, ".", DBI::dbQuoteIdentifier(con, colnames(y)),
collapse = ", "
)

by_columns_qq <- DBI::dbQuoteIdentifier(con, by)
new_columns_q <- DBI::dbQuoteIdentifier(con, setdiff(colnames(y), by))
new_columns_qq <- paste(new_columns_q, collapse = ", ")
new_columns_qq_list <- list(new_columns_q)
new_columns_qual_qq <- paste0(
y_name, ".", new_columns_q,
collapse = ", "
)
new_columns_qual_qq_list <- list(paste0(y_name, ".", new_columns_q))

key_columns_q <- DBI::dbQuoteIdentifier(con, by)
compare_qual_qq <- paste0(
y_name, ".", key_columns_q,
" = ",
name, ".", key_columns_q,
collapse = " AND "
)

tibble(
name, y_name,
y_columns_qq,
y_columns_qual_qq,
by_columns_qq,
new_columns_qq, new_columns_qq_list,
new_columns_qual_qq, new_columns_qual_qq_list,
compare_qual_qq
)
}

#' @export
#' @rdname rows-db
sql_rows_patch <- function(x, y, by, ..., returning_cols = NULL) {
Expand Down
12 changes: 12 additions & 0 deletions man/rows-db.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 6fd3206

Please sign in to comment.