From 783f463d18236cb9331882e56ab85d8bddd9f111 Mon Sep 17 00:00:00 2001 From: Artem Klevtsov Date: Sun, 30 Aug 2020 18:45:22 +0700 Subject: [PATCH 1/7] dbWriteTable for CSV --- R/RcppExports.R | 4 ++ R/tables.R | 79 ++++++++++++++++++++++++++++++ man/postgres-tables.Rd | 38 ++++++++++++++ src/DbConnection.cpp | 44 +++++++++++++++++ src/DbConnection.h | 2 + src/RcppExports.cpp | 13 +++++ src/connection.cpp | 6 +++ tests/testthat/test-dbWriteTable.R | 10 +++- 8 files changed, 195 insertions(+), 1 deletion(-) diff --git a/R/RcppExports.R b/R/RcppExports.R index 38e7fddd..2f61a56f 100644 --- a/R/RcppExports.R +++ b/R/RcppExports.R @@ -41,6 +41,10 @@ connection_copy_data <- function(con, sql, df) { invisible(.Call(`_RPostgres_connection_copy_data`, con, sql, df)) } +connection_copy_file <- function(con, sql, file) { + invisible(.Call(`_RPostgres_connection_copy_file`, con, sql, file)) +} + encode_vector <- function(x) { .Call(`_RPostgres_encode_vector`, x) } diff --git a/R/tables.R b/R/tables.R index 0897817e..2208df1d 100644 --- a/R/tables.R +++ b/R/tables.R @@ -131,6 +131,85 @@ setMethod("dbWriteTable", c("PqConnection", "character", "data.frame"), ) +#' @param header is a logical indicating whether the first data line (but see +#' `skip`) has a header or not. If missing, it value is determined +#' following [read.table()] convention, namely, it is set to TRUE if +#' and only if the first row has one fewer field that the number of columns. +#' @param sep The field separator, defaults to `','`. +#' @param eol The end-of-line delimiter, defaults to `'\n'`. +#' @param skip number of lines to skip before reading the data. Defaults to 0. +#' @param nrows Number of rows to read to determine types. +#' @param colClasses Character vector of R type names, used to override +#' defaults when imputing classes from on-disk file. +#' @param na.strings a character vector of strings which are to be interpreted as NA values. +#' @export +#' @rdname postgres-tables +setMethod("dbWriteTable", c("PqConnection", "character", "character"), + function(conn, name, value, ..., field.types = NULL, overwrite = FALSE, + append = FALSE, header = TRUE, colClasses = NA, row.names = FALSE, + nrows = 50, sep = ",", na.strings = "NA", eol = "\n", skip = 0, temporary = FALSE) { + + if (!is.logical(overwrite) || length(overwrite) != 1L || is.na(overwrite)) { + stopc("`overwrite` must be a logical scalar") + } + if (!is.logical(append) || length(append) != 1L || is.na(append)) { + stopc("`append` must be a logical scalar") + } + if (!is.logical(temporary) || length(temporary) != 1L) { + stopc("`temporary` must be a logical scalar") + } + if (overwrite && append) { + stopc("overwrite and append cannot both be TRUE") + } + if (!is.null(field.types) && !(is.character(field.types) && !is.null(names(field.types)) && !anyDuplicated(names(field.types)))) { + stopc("`field.types` must be a named character vector with unique names, or NULL") + } + if (append && !is.null(field.types)) { + stopc("Cannot specify `field.types` with `append = TRUE`") + } + + found <- dbExistsTable(conn, name) + if (found && !overwrite && !append) { + stop("Table ", name, " exists in database, and both overwrite and", + " append are FALSE", call. = FALSE) + } + if (found && overwrite) { + dbRemoveTable(conn, name) + } + + if (!found || overwrite) { + if (is.null(field.types)) { + tmp_value <- utils::read.table( + value, sep = sep, header = header, skip = skip, nrows = nrows, + na.strings = na.strings, comment.char = "", colClasses = colClasses, + stringsAsFactors = FALSE) + field.types <- lapply(tmp_value, dbDataType, dbObj = conn) + } + + dbCreateTable( + conn = conn, + name = name, + fields = field.types, + temporary = temporary + ) + } + + value <- path.expand(value) + fields <- dbQuoteIdentifier(conn, names(field.types)) + + skip <- skip + as.integer(header) + sql <- paste0( + "COPY ", dbQuoteIdentifier(conn, name), + " (", paste(fields, collapse = ","), ") ", + "FROM STDIN ", "(FORMAT CSV, DELIMITER '", sep, "', HEADER '", header, "')" + ) + + connection_copy_file(conn@ptr, sql, value) + + invisible(TRUE) + } +) + #' @export #' @inheritParams DBI::sqlRownamesToColumn #' @param ... Ignored. diff --git a/man/postgres-tables.Rd b/man/postgres-tables.Rd index 4bea47d5..74b3897e 100644 --- a/man/postgres-tables.Rd +++ b/man/postgres-tables.Rd @@ -3,6 +3,7 @@ \name{postgres-tables} \alias{postgres-tables} \alias{dbWriteTable,PqConnection,character,data.frame-method} +\alias{dbWriteTable,PqConnection,character,character-method} \alias{sqlData,PqConnection-method} \alias{dbAppendTable,PqConnection-method} \alias{dbReadTable,PqConnection,character-method} @@ -28,6 +29,25 @@ copy = TRUE ) +\S4method{dbWriteTable}{PqConnection,character,character}( + conn, + name, + value, + ..., + field.types = NULL, + overwrite = FALSE, + append = FALSE, + header = TRUE, + colClasses = NA, + row.names = FALSE, + nrows = 50, + sep = ",", + na.strings = "NA", + eol = "\\n", + skip = 0, + temporary = FALSE +) + \S4method{sqlData}{PqConnection}(con, value, row.names = FALSE, ...) \S4method{dbAppendTable}{PqConnection}(conn, name, value, ..., row.names = NULL) @@ -88,6 +108,24 @@ and uses \verb{COPY name FROM stdin}. This is fast, but not supported by all postgres servers (e.g. Amazon's redshift). If \code{FALSE}, generates a single SQL string. This is slower, but always supported.} +\item{header}{is a logical indicating whether the first data line (but see +\code{skip}) has a header or not. If missing, it value is determined +following \code{\link[=read.table]{read.table()}} convention, namely, it is set to TRUE if +and only if the first row has one fewer field that the number of columns.} + +\item{colClasses}{Character vector of R type names, used to override +defaults when imputing classes from on-disk file.} + +\item{nrows}{Number of rows to read to determine types.} + +\item{sep}{The field separator, defaults to \code{','}.} + +\item{na.strings}{a character vector of strings which are to be interpreted as NA values.} + +\item{eol}{The end-of-line delimiter, defaults to \code{'\\n'}.} + +\item{skip}{number of lines to skip before reading the data. Defaults to 0.} + \item{con}{A database connection.} \item{check.names}{If \code{TRUE}, the default, column names will be diff --git a/src/DbConnection.cpp b/src/DbConnection.cpp index dd753740..0bf8088f 100644 --- a/src/DbConnection.cpp +++ b/src/DbConnection.cpp @@ -1,3 +1,4 @@ +#include #include "pch.h" #include "DbConnection.h" #include "encode.h" @@ -159,6 +160,49 @@ void DbConnection::copy_data(std::string sql, List df) { PQclear(pComplete); } +void DbConnection::copy_csv(std::string sql, std::string file) { + LOG_DEBUG << sql; + + if (file.size() == 0) + return; + + PGresult* pInit = PQexec(pConn_, sql.c_str()); + if (PQresultStatus(pInit) != PGRES_COPY_IN) { + PQclear(pInit); + conn_stop("Failed to initialise COPY"); + } + PQclear(pInit); + + + const size_t buffer_size = 1024 * 64; + std::string buffer; + buffer.reserve(buffer_size); + + std::ifstream fs(file, std::ios::binary); + if (!fs.is_open()) { + stop("Can not open file '%s'.", file); + } + + while (!fs.eof()) { + buffer.clear(); + fs.read(&buffer[0], buffer_size); + if (PQputCopyData(pConn_, buffer.data(), static_cast(fs.gcount())) != 1) { + conn_stop("Failed to put data"); + } + } + + if (PQputCopyEnd(pConn_, NULL) != 1) { + conn_stop("Failed to finish COPY"); + } + + PGresult* pComplete = PQgetResult(pConn_); + if (PQresultStatus(pComplete) != PGRES_COMMAND_OK) { + PQclear(pComplete); + conn_stop("COPY returned error"); + } + PQclear(pComplete); +} + void DbConnection::check_connection() { if (!pConn_) { stop("Disconnected"); diff --git a/src/DbConnection.h b/src/DbConnection.h index c2c0627d..c4bbb34a 100644 --- a/src/DbConnection.h +++ b/src/DbConnection.h @@ -35,6 +35,8 @@ class DbConnection : boost::noncopyable { void copy_data(std::string sql, List df); + void copy_csv(std::string sql, std::string file); + void check_connection(); List info(); diff --git a/src/RcppExports.cpp b/src/RcppExports.cpp index 5ca11b7d..d934ca0d 100644 --- a/src/RcppExports.cpp +++ b/src/RcppExports.cpp @@ -119,6 +119,18 @@ BEGIN_RCPP return R_NilValue; END_RCPP } +// connection_copy_file +void connection_copy_file(DbConnection* con, std::string sql, std::string file); +RcppExport SEXP _RPostgres_connection_copy_file(SEXP conSEXP, SEXP sqlSEXP, SEXP fileSEXP) { +BEGIN_RCPP + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< DbConnection* >::type con(conSEXP); + Rcpp::traits::input_parameter< std::string >::type sql(sqlSEXP); + Rcpp::traits::input_parameter< std::string >::type file(fileSEXP); + connection_copy_file(con, sql, file); + return R_NilValue; +END_RCPP +} // encode_vector std::string encode_vector(RObject x); RcppExport SEXP _RPostgres_encode_vector(SEXP xSEXP) { @@ -275,6 +287,7 @@ static const R_CallMethodDef CallEntries[] = { {"_RPostgres_connection_is_transacting", (DL_FUNC) &_RPostgres_connection_is_transacting, 1}, {"_RPostgres_connection_set_transacting", (DL_FUNC) &_RPostgres_connection_set_transacting, 2}, {"_RPostgres_connection_copy_data", (DL_FUNC) &_RPostgres_connection_copy_data, 3}, + {"_RPostgres_connection_copy_file", (DL_FUNC) &_RPostgres_connection_copy_file, 3}, {"_RPostgres_encode_vector", (DL_FUNC) &_RPostgres_encode_vector, 1}, {"_RPostgres_encode_data_frame", (DL_FUNC) &_RPostgres_encode_data_frame, 1}, {"_RPostgres_encrypt_password", (DL_FUNC) &_RPostgres_encrypt_password, 2}, diff --git a/src/connection.cpp b/src/connection.cpp index e751522d..b5a583a9 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -99,6 +99,12 @@ void connection_copy_data(DbConnection* con, std::string sql, List df) { return con->copy_data(sql, df); } +// [[Rcpp::export]] +void connection_copy_file(DbConnection* con, std::string sql, std::string file) { + return con->copy_csv(sql, file); +} + + // as() override diff --git a/tests/testthat/test-dbWriteTable.R b/tests/testthat/test-dbWriteTable.R index 362d115d..d791bdb9 100644 --- a/tests/testthat/test-dbWriteTable.R +++ b/tests/testthat/test-dbWriteTable.R @@ -86,7 +86,7 @@ with_database_connection({ }) }) }) - + describe("Writing to the database with possible numeric precision issues", { # reference value value <- data.frame(x = -0.000064925595060641, y = -0.00006492559506064059) @@ -119,6 +119,14 @@ with_database_connection({ expect_equal(dbGetQuery(con, "SELECT * FROM xy"), value) }) }) + + test_that("Writing CSV to the database", { + with_table(con, "iris") + tmp <- tempfile() + write.csv(iris, tmp) + dbWriteTable(con, "iris", tmp, temporary = TRUE) + expect_equal(dbReadTable(con, "iris"), iris) + }) }) }) From 7068cb84ee614312caeb8bc9f04bd41a682e2c4f Mon Sep 17 00:00:00 2001 From: Artem Klevtsov Date: Sun, 30 Aug 2020 20:50:42 +0700 Subject: [PATCH 2/7] Fix with_table syntax --- tests/testthat/test-dbWriteTable.R | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/tests/testthat/test-dbWriteTable.R b/tests/testthat/test-dbWriteTable.R index d791bdb9..47b78065 100644 --- a/tests/testthat/test-dbWriteTable.R +++ b/tests/testthat/test-dbWriteTable.R @@ -121,11 +121,12 @@ with_database_connection({ }) test_that("Writing CSV to the database", { - with_table(con, "iris") - tmp <- tempfile() - write.csv(iris, tmp) - dbWriteTable(con, "iris", tmp, temporary = TRUE) - expect_equal(dbReadTable(con, "iris"), iris) + with_table(con, "iris", { + tmp <- tempfile() + write.csv(iris, tmp) + dbWriteTable(con, "iris", tmp, temporary = TRUE) + expect_equal(dbReadTable(con, "iris"), iris) + }) }) }) }) From 7746d887b50edd5d72f85293fce3220110f2e896 Mon Sep 17 00:00:00 2001 From: Artem Klevtsov Date: Sun, 30 Aug 2020 22:03:35 +0700 Subject: [PATCH 3/7] Fix CSV test (due factor) --- tests/testthat/test-dbWriteTable.R | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/testthat/test-dbWriteTable.R b/tests/testthat/test-dbWriteTable.R index 47b78065..990b94a5 100644 --- a/tests/testthat/test-dbWriteTable.R +++ b/tests/testthat/test-dbWriteTable.R @@ -123,9 +123,10 @@ with_database_connection({ test_that("Writing CSV to the database", { with_table(con, "iris", { tmp <- tempfile() - write.csv(iris, tmp) + iris2 <- transform(iris, Species = as.character(Species)) + write.csv(iris2, tmp) dbWriteTable(con, "iris", tmp, temporary = TRUE) - expect_equal(dbReadTable(con, "iris"), iris) + expect_equal(dbReadTable(con, "iris"), iris2) }) }) }) From 3572cf9957999f37aa237444cfaa1eaf48be8677 Mon Sep 17 00:00:00 2001 From: Artem Klevtsov Date: Sun, 30 Aug 2020 22:28:06 +0700 Subject: [PATCH 4/7] Remove row.names from the CSV write test --- tests/testthat/test-dbWriteTable.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/testthat/test-dbWriteTable.R b/tests/testthat/test-dbWriteTable.R index 990b94a5..11f6c478 100644 --- a/tests/testthat/test-dbWriteTable.R +++ b/tests/testthat/test-dbWriteTable.R @@ -124,7 +124,7 @@ with_database_connection({ with_table(con, "iris", { tmp <- tempfile() iris2 <- transform(iris, Species = as.character(Species)) - write.csv(iris2, tmp) + write.csv(iris2, tmp, row.names = FALSE) dbWriteTable(con, "iris", tmp, temporary = TRUE) expect_equal(dbReadTable(con, "iris"), iris2) }) From 45e9e1e142cefb8907b6c94c4c00910b14644c69 Mon Sep 17 00:00:00 2001 From: Artem Klevtsov Date: Sun, 30 Aug 2020 23:38:18 +0700 Subject: [PATCH 5/7] Fix ifstream matching --- src/DbConnection.cpp | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/DbConnection.cpp b/src/DbConnection.cpp index 0bf8088f..ce5de0df 100644 --- a/src/DbConnection.cpp +++ b/src/DbConnection.cpp @@ -163,9 +163,6 @@ void DbConnection::copy_data(std::string sql, List df) { void DbConnection::copy_csv(std::string sql, std::string file) { LOG_DEBUG << sql; - if (file.size() == 0) - return; - PGresult* pInit = PQexec(pConn_, sql.c_str()); if (PQresultStatus(pInit) != PGRES_COPY_IN) { PQclear(pInit); @@ -178,7 +175,7 @@ void DbConnection::copy_csv(std::string sql, std::string file) { std::string buffer; buffer.reserve(buffer_size); - std::ifstream fs(file, std::ios::binary); + std::ifstream fs(file); if (!fs.is_open()) { stop("Can not open file '%s'.", file); } From fcad52e6c1990e70c0a7d7dfad5aa0b1aa2346cc Mon Sep 17 00:00:00 2001 From: Artem Klevtsov Date: Sun, 30 Aug 2020 23:51:23 +0700 Subject: [PATCH 6/7] Fix ifstream for the old compilers --- src/DbConnection.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/DbConnection.cpp b/src/DbConnection.cpp index ce5de0df..54997df8 100644 --- a/src/DbConnection.cpp +++ b/src/DbConnection.cpp @@ -163,6 +163,9 @@ void DbConnection::copy_data(std::string sql, List df) { void DbConnection::copy_csv(std::string sql, std::string file) { LOG_DEBUG << sql; + if (file.size() == 0) + return; + PGresult* pInit = PQexec(pConn_, sql.c_str()); if (PQresultStatus(pInit) != PGRES_COPY_IN) { PQclear(pInit); @@ -175,7 +178,7 @@ void DbConnection::copy_csv(std::string sql, std::string file) { std::string buffer; buffer.reserve(buffer_size); - std::ifstream fs(file); + std::ifstream fs(file.c_str(), std::ios::in); if (!fs.is_open()) { stop("Can not open file '%s'.", file); } From a9b4f70ccd7dca90643720317f74cca5c40a2da0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kirill=20M=C3=BCller?= Date: Sun, 5 Dec 2021 08:40:15 +0100 Subject: [PATCH 7/7] Apply suggestions from code review --- src/DbConnection.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DbConnection.cpp b/src/DbConnection.cpp index 803aa26e..ac2f2631 100644 --- a/src/DbConnection.cpp +++ b/src/DbConnection.cpp @@ -1,5 +1,5 @@ -#include #include "pch.h" +#include #include "DbConnection.h" #include "encode.h" #include "DbResult.h"