Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rows_upsert.tbl_dbi() #616

Merged
merged 13 commits into from
Sep 26, 2021
7 changes: 7 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ S3method(rows_patch,tbl_dbi)
S3method(rows_truncate,data.frame)
S3method(rows_truncate,tbl_sql)
S3method(rows_update,tbl_dbi)
S3method(rows_upsert,tbl_dbi)
S3method(select,dm)
S3method(select,zoomed_dm)
S3method(semi_join,dm)
Expand Down Expand Up @@ -108,6 +109,12 @@ S3method(sql_rows_update,tbl_MariaDBConnection)
S3method(sql_rows_update,tbl_PqConnection)
S3method(sql_rows_update,tbl_SQLiteConnection)
S3method(sql_rows_update,tbl_sql)
S3method(sql_rows_upsert,"tbl_Microsoft SQL Server")
S3method(sql_rows_upsert,tbl_MariaDBConnection)
S3method(sql_rows_upsert,tbl_PqConnection)
S3method(sql_rows_upsert,tbl_SQLiteConnection)
S3method(sql_rows_upsert,tbl_duckdb_connection)
S3method(sql_rows_upsert,tbl_sql)
S3method(sql_schema_create,"Microsoft SQL Server")
S3method(sql_schema_create,PqConnection)
S3method(sql_schema_create,SQLiteConnection)
Expand Down
261 changes: 261 additions & 0 deletions R/rows-db.R
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,58 @@ rows_patch.tbl_dbi <- function(x, y, by = NULL, ...,
}
}

#' @export
#' @rdname rows-db
rows_upsert.tbl_dbi <- function(x, y, by = NULL, ...,
in_place = NULL, copy = FALSE, check = NULL,
returning = NULL) {
returning_cols <- eval_select_both(enquo(returning), colnames(x))$names
check_returning_cols_possible(returning_cols, in_place)

y <- auto_copy(x, y, copy = copy)
y_key <- db_key(y, by)
by <- names(y_key)
x_key <- db_key(x, by)

new_columns <- setdiff(colnames(y), by)

name <- target_table_name(x, in_place)

if (!is_null(name)) {
# Checking optional, can rely on primary key constraint
if (is_true(check)) {
check_db_superset(x, y, by)
}

if (is_empty(new_columns)) {
return(invisible(x))
}

con <- dbplyr::remote_con(x)
sql <- sql_rows_upsert(x, y, by, returning_cols = returning_cols)

rows_get_or_execute(x, con, sql, returning_cols)
} else {
# Checking optional, can rely on primary key constraint
# FIXME: contrary to doc currently also checks if `in_place = FALSE`
if (is_null(check) || is_true(check)) {
check_db_superset(x, y, by)
}

existing_columns <- setdiff(colnames(x), new_columns)

unchanged <- anti_join(x, y, by = by)
inserted <- anti_join(y, x, by = by)
updated <-
x %>%
select(!!!existing_columns) %>%
inner_join(y, by = by)
upserted <- union_all(updated, inserted)

union_all(unchanged, upserted)
}
}

#' @export
#' @rdname rows-db
rows_delete.tbl_dbi <- function(x, y, by = NULL, ...,
Expand Down Expand Up @@ -453,6 +505,215 @@ sql_rows_update.tbl_PqConnection <- function(x, y, by, ..., returning_cols = NUL
glue::as_glue(sql)
}

sql_rows_update_prep <- function(x, y, by) {
con <- dbplyr::remote_con(x)
name <- dbplyr::remote_name(x)

# https://stackoverflow.com/a/47753166/946850
y_name <- DBI::dbQuoteIdentifier(con, "...y")
y_columns_qq <- paste(
DBI::dbQuoteIdentifier(con, colnames(y)),
collapse = ", "
)

new_columns_q <- DBI::dbQuoteIdentifier(con, setdiff(colnames(y), by))
new_columns_qq <- paste(new_columns_q, collapse = ", ")
new_columns_qq_list <- list(new_columns_q)
new_columns_qual_qq <- paste0(
y_name, ".", new_columns_q,
collapse = ", "
)
new_columns_qual_qq_list <- list(paste0(y_name, ".", new_columns_q))

key_columns_q <- DBI::dbQuoteIdentifier(con, by)
compare_qual_qq <- paste0(
y_name, ".", key_columns_q,
" = ",
name, ".", key_columns_q,
collapse = " AND "
)

tibble(
name, y_name,
y_columns_qq,
new_columns_qq, new_columns_qq_list,
new_columns_qual_qq, new_columns_qual_qq_list,
compare_qual_qq
)
}

sql_rows_upsert <- function(x, y, by, ..., returning_cols = NULL) {
ellipsis::check_dots_used()
# FIXME: check here same src for x and y? if not -> error.
UseMethod("sql_rows_upsert")
}

# nocov start
# exclude from coverage because no database in the workflow supports MERGE
#' @export
sql_rows_upsert.tbl_sql <- function(x, y, by, ..., returning_cols = NULL) {
con <- dbplyr::remote_con(x)

p <- sql_rows_upsert_prep(x, y, by)

update_clause <- paste0(
unlist(p$new_columns_qq_list), " = ", "excluded.", unlist(p$new_columns_qq_list),
collapse = ",\n"
)

sql <- paste0(
"MERGE INTO ", p$name, "\n",
"USING (", dbplyr::sql_render(y), ") AS ", p$y_name, "\n",
"ON (", p$compare_qual_qq, ")\n",
"WHEN MATCHED THEN\n",
" UPDATE SET", update_clause, "\n",
"WHEN NOT MATCHED THEN\n",
" INSERT (", p$y_columns_qq, ")\n",
" VALUES (", p$y_columns_qual_qq, ")\n",
sql_returning_cols(x, returning_cols)
)

glue::as_glue(sql)
}
# nocov end

# nocov start
# exclude from coverage because MERGE somehow doesn't work with MS SQL 2017
#' @export
`sql_rows_upsert.tbl_Microsoft SQL Server` <- function(x, y, by, ..., returning_cols = NULL) {
con <- dbplyr::remote_con(x)

p <- sql_rows_upsert_prep(x, y, by)

update_clause <- paste0(
unlist(p$new_columns_qq_list), " = ", "excluded.", unlist(p$new_columns_qq_list),
collapse = ",\n"
)

sql <- paste0(
"MERGE INTO ", p$name, "\n",
"USING (", dbplyr::sql_render(y), ") AS ", p$y_name, "\n",
"ON (", p$compare_qual_qq, ")\n",
"WHEN MATCHED THEN\n",
" UPDATE SET", update_clause, "\n",
"WHEN NOT MATCHED THEN\n",
" INSERT (", p$y_columns_qq, ")\n",
" VALUES (", p$y_columns_qual_qq, ")\n",
sql_output_cols(x, returning_cols),
";"
)

glue::as_glue(sql)
}
# nocov end

#' @export
sql_rows_upsert.tbl_duckdb_connection <- function(x, y, by, ..., returning_cols = NULL) {
abort("upsert is not supported for DuckDB")
}

#' @export
sql_rows_upsert.tbl_PqConnection <- function(x, y, by, ..., returning_cols = NULL) {
con <- dbplyr::remote_con(x)

p <- sql_rows_upsert_prep(x, y, by)

update_clause <- paste0(
unlist(p$new_columns_qq_list), " = ", "excluded.", unlist(p$new_columns_qq_list),
collapse = ",\n"
)

sql <- paste0(
"WITH ", p$y_name, "(", p$y_columns_qq, ") AS (\n",
dbplyr::sql_render(y),
"\n)\n",
"INSERT INTO ", p$name, " (", p$y_columns_qq, ")\n",
"SELECT * FROM ", p$y_name, "\n",
"WHERE true\n",
"ON CONFLICT (", p$by_columns_qq, ")\n",
"DO UPDATE\n",
"SET ", update_clause, "\n",
sql_returning_cols(x, returning_cols)
)

glue::as_glue(sql)
}

#' @export
sql_rows_upsert.tbl_SQLiteConnection <- sql_rows_upsert.tbl_PqConnection

#' @export
sql_rows_upsert.tbl_MariaDBConnection <- function(x, y, by, ..., returning_cols = NULL) {
con <- dbplyr::remote_con(x)

p <- sql_rows_upsert_prep(x, y, by)

update_clause <- paste0(
p$new_columns_qq_list, " = ", p$new_columns_qual_qq_list,
collapse = ",\n"
)

# MariaDB has the order: first INSERT then CTE
# https://www.itjungle.com/2016/08/16/fhg081616-story02/
sql <- paste0(
"INSERT INTO ", p$name, "\n",
"WITH ", p$y_name, "(", p$y_columns_qq, ") AS (\n",
dbplyr::sql_render(y),
"\n)\n",
"SELECT * FROM ", p$y_name, "\n",
"WHERE true\n",
"ON DUPLICATE KEY UPDATE\n",
update_clause, "\n",
sql_returning_cols(x, returning_cols)
)

glue::as_glue(sql)
}

sql_rows_upsert_prep <- function(x, y, by) {
krlmlr marked this conversation as resolved.
Show resolved Hide resolved
con <- dbplyr::remote_con(x)
name <- dbplyr::remote_name(x)

# https://stackoverflow.com/a/47753166/946850
y_name <- DBI::dbQuoteIdentifier(con, "...y")
y_columns_qq <- paste(
DBI::dbQuoteIdentifier(con, colnames(y)),
collapse = ", "
)
y_columns_qual_qq <- paste(
y_name, ".", DBI::dbQuoteIdentifier(con, colnames(y)),
collapse = ", "
)

by_columns_qq <- DBI::dbQuoteIdentifier(con, by)
new_columns_q <- DBI::dbQuoteIdentifier(con, setdiff(colnames(y), by))
new_columns_qq <- paste(new_columns_q, collapse = ", ")
new_columns_qq_list <- list(new_columns_q)
new_columns_qual_qq <- paste0(
y_name, ".", new_columns_q,
collapse = ", "
)
new_columns_qual_qq_list <- list(paste0(y_name, ".", new_columns_q))

key_columns_q <- DBI::dbQuoteIdentifier(con, by)
compare_qual_qq <- paste0(
y_name, ".", key_columns_q,
" = ",
name, ".", key_columns_q,
collapse = " AND "
)

tibble(
name, y_name,
y_columns_qq,
y_columns_qual_qq,
by_columns_qq,
new_columns_qq, new_columns_qq_list,
new_columns_qual_qq, new_columns_qual_qq_list,
compare_qual_qq
)
}

#' @export
#' @rdname rows-db
sql_rows_patch <- function(x, y, by, ..., returning_cols = NULL) {
Expand Down
12 changes: 12 additions & 0 deletions man/rows-db.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading