Skip to content

Commit

Permalink
fwrite buffered write. Bit faster. #580
Browse files Browse the repository at this point in the history
  • Loading branch information
mattdowle committed Apr 13, 2016
1 parent 971bbc9 commit 33fda74
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 87 deletions.
33 changes: 10 additions & 23 deletions R/fwrite.R
Original file line number Diff line number Diff line change
@@ -1,37 +1,24 @@
fwrite <- function(x, file.path, append = FALSE, quote = TRUE,
sep = ",", eol = "\n", na = "", col.names = TRUE, qmethod = "double") {
sep = ",", eol = "\n", na = "", col.names = TRUE, qmethod = "double", verbose=FALSE) {

# validate arguments
stopifnot(is.data.frame(x))
stopifnot(ncol(x) > 0)

stopifnot(length(quote) == 1 && class(quote) == "logical")
isFALSE = function(x)identical(FALSE,x) # it seems there is no isFALSE in R?
stopifnot(isTRUE(quote) || isFALSE(quote))
stopifnot(length(sep) == 1 && class(sep) == "character" && nchar(sep) == 1)
stopifnot(length(eol) == 1 && class(eol) == "character")
if (!eol %in% c("\n","\r\n")) warning('eol is not linux standard \\n or windows standard \\r\\n')
stopifnot(length(qmethod) == 1 && qmethod %in% c("double", "escape"))
stopifnot(length(col.names) == 1 && class(col.names) == "logical")
stopifnot(length(append) == 1 && class(append) == "logical")
stopifnot(isTRUE(col.names) || isFALSE(col.names))
stopifnot(isTRUE(append) || isFALSE(append))
stopifnot(isTRUE(verbose) || isFALSE(verbose))
if (append && missing(col.names)) col.names = FALSE # Otto's test 1658.16 checks this

# handle paths like "~/foo/bar"
file.path <- path.expand(file.path)

quoted_cols <- rep(quote, ncol(x))

# write header row separately for correct quoting of row names
if (col.names && !append) {
.Call(Cwritefile, as.list(names(x)), file.path, sep, eol, na, quoted_cols, qmethod == "escape", append)
append <- TRUE
}

# handle empty x
if (nrow(x) == 0) return()

# determine from column types, which ones should be quoted
if (quote) {
column_types <- sapply(x, class)
quoted_cols <- column_types %chin% c('character', 'factor')
}

.Call(Cwritefile, x, file.path, sep, eol, na, quoted_cols, qmethod == "escape", append)
.Call(Cwritefile, x, file.path, sep, eol, na, quote, qmethod == "escape", append, col.names, verbose)
invisible()
}

10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,15 @@

30. x's columns can be referred to in `j` using the prefix `x.` at all times. This is particularly useful when it is necessary to x's column that is *also a join column*. This is a patch addressing [#1615](https://github.com/Rdatatable/data.table/issues/1615).

31. New function `fwrite`. Fixes [#580](https://github.com/Rdatatable/data.table/issues/580). Thanks @oseiskar.
31. New function `fwrite` implements [#580](https://github.com/Rdatatable/data.table/issues/580). Thanks to Otto Seiskari for C code, R wrapper, manual page and extensive tests.

31. `on=.()` syntax is now posible, e.g., `X[Y, on=.(x==a, y==b)]`, [#1257](https://github.com/Rdatatable/data.table/issues/1257). Thanks @dselivanov.
32. `on=.()` syntax is now posible, e.g., `X[Y, on=.(x==a, y==b)]`, [#1257](https://github.com/Rdatatable/data.table/issues/1257). Thanks @dselivanov.

32. Non-equi joins are now possible using the familiar `on=` syntax. With this, the set of binary operators extend from just `==` to `>=`, `>`, `<=`, `<` and `==`. For e.g., `X[Y, on=.(a, b>b)]` looks for `X.a == Y.a` first and within those matching rows for rows where`X.b > Y.b`. Arguments `mult` and `nomatch` work as expected. `by=.EACHI` is not yet implemented. Partly addreses [#1452](https://github.com/Rdatatable/data.table/issues/1452).
33. Non-equi joins are now possible using the familiar `on=` syntax. With this, the set of binary operators extend from just `==` to `>=`, `>`, `<=`, `<` and `==`. For e.g., `X[Y, on=.(a, b>b)]` looks for `X.a == Y.a` first and within those matching rows for rows where`X.b > Y.b`. Arguments `mult` and `nomatch` work as expected. `by=.EACHI` is not yet implemented. Partly addreses [#1452](https://github.com/Rdatatable/data.table/issues/1452).

33. `%between%` is vectorised which means we can now do: `DT[x %between% list(y,z)]` which is equivalent to `DT[x >= y & x <= z]`, [#534](https://github.com/Rdatatable/data.table/issues/534). Thanks @MicheleCarriero for filing the issue and the idea.
34. `%between%` is vectorised which means we can now do: `DT[x %between% list(y,z)]` which is equivalent to `DT[x >= y & x <= z]`, [#534](https://github.com/Rdatatable/data.table/issues/534). Thanks @MicheleCarriero for filing the issue and the idea.

34. New functions `anywhere()` and `%anywhere%` are exported. `between()` answers the question: *"Is x[i] present in between `lower[i]` and `upper[i]`?"*. `anywhere()` on the other hand answers the question: *"Is x[i] present in any of the intervals specified by `lower, upper`?"*. This makes use of the recently implemented `non-equi` join to provide a convenient function to perform a *range join* [#679](https://github.com/Rdatatable/data.table/issues/679).
35. New functions `anywhere()` and `%anywhere%` are exported. `between()` answers the question: *"Is x[i] present in between `lower[i]` and `upper[i]`?"*. `anywhere()` on the other hand answers the question: *"Is x[i] present in any of the intervals specified by `lower, upper`?"*. This makes use of the recently implemented `non-equi` join to provide a convenient function to perform a *range join* [#679](https://github.com/Rdatatable/data.table/issues/679).

#### BUG FIXES

Expand Down
2 changes: 1 addition & 1 deletion inst/tests/tests.Rraw
Original file line number Diff line number Diff line change
Expand Up @@ -8578,7 +8578,7 @@ local({
a=c(NA, 1.2e-100, 3.01),
"other \"column"=c('foo bar', NA, 'quote" and \\ bs')),
f, quote=T, qmethod="double")
}, '"a","other ""column"\n,"foo bar"\n1.2e-100,\n3.01,"quote"" and \\ bs"\n')
}, '"a","other ""column"\n,"foo bar"\n1.2E-100,\n3.01,"quote"" and \\ bs"\n')

# changing sep
fwrite_test(1658.4, function(f) { fwrite(data.table(a="foo", b="ba\"r"), f, sep=";") },
Expand Down
3 changes: 2 additions & 1 deletion man/fwrite.Rd
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
}
\usage{
fwrite(x, file.path, append = FALSE, quote = TRUE, sep = ",", eol = "\n", na = "",
col.names = TRUE, qmethod = "double")
col.names = TRUE, qmethod = "double", verbose=FALSE)
}
\arguments{
\item{x}{The \code{data.table} or \code{data.frame} to write}
Expand All @@ -18,6 +18,7 @@ fwrite(x, file.path, append = FALSE, quote = TRUE, sep = ",", eol = "\n", na = "
\item{na}{The string to use for missing values in the data}
\item{col.names}{A logical value indicating if the column names (header row) should be written}
\item{qmethod}{A character string specifying how to deal with embedded double quote characters when quoting strings. Must be one of "escape", in which case the quote character (as well as the backslash character) is escaped in C style by a backslash, or "double" (default), in which case it is doubled.}
\item{verbose}{Be chatty and report timings?}
}
\details{
The speed-up compared to \code{write.csv} depends on the parameters and column types.
Expand Down
193 changes: 136 additions & 57 deletions src/fwrite.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,28 @@
#include <errno.h>
#include <Rinternals.h>
#include <unistd.h> // for access()
#include <fcntl.h>
#include <time.h>

#define QUOTE_FIELD \
*ch++ = QUOTE; \
for (const char *ch2 = CHAR(str); *ch2 != '\0'; ch2++) { \
if (*ch2 == QUOTE) *ch++ = ESCAPE_QUOTE; \
if (qmethod_escape && *ch2 == ESCAPE) *ch++ = ESCAPE; \
*ch++ = *ch2; \
} \
*ch++ = QUOTE

SEXP writefile(SEXP list_of_columns,
SEXP filenameArg,
SEXP col_sep_exp,
SEXP row_sep_exp,
SEXP na_exp,
SEXP quote_cols,
SEXP qmethod_escape_exp,
SEXP append)
SEXP col_sep_Arg,
SEXP row_sep_Arg,
SEXP na_Arg,
SEXP quoteArg, // TRUE|FALSE
SEXP qmethod_escapeArg, // TRUE|FALSE
SEXP append, // TRUE|FALSE
SEXP col_names, // TRUE|FALSE
SEXP verboseArg)
{
if (!isNewList(list_of_columns)) error("fwrite must be passed an object of type list, data.table or data.frame");
RLEN ncols = length(list_of_columns);
Expand All @@ -21,25 +34,25 @@ SEXP writefile(SEXP list_of_columns,
if (nrows != length(VECTOR_ELT(list_of_columns, i)))
error("Column %d's length (%d) is not the same as column 1's length (%d)", i+1, length(VECTOR_ELT(list_of_columns, i)), nrows);
}

//int error_number = 0;
int qmethod_escape = *LOGICAL(qmethod_escape_exp);
const Rboolean verbose = LOGICAL(verboseArg)[0];
const Rboolean quote = LOGICAL(quoteArg)[0];

errno = 0; /* clear flag possibly set by previous errors */
const char col_sep = *CHAR(STRING_ELT(col_sep_Arg, 0)); // DO NOT DO: allow multichar separator (bad idea)

char col_sep = *CHAR(STRING_ELT(col_sep_exp, 0));
const char *row_sep = CHAR(STRING_ELT(row_sep_exp, 0));
const char *na_str = CHAR(STRING_ELT(na_exp, 0));
const char QUOTE_CHAR = '"';
const char ESCAPE_CHAR = '\\';
const char *row_sep = CHAR(STRING_ELT(row_sep_Arg, 0));
const int row_sep_len = strlen(row_sep); // someone somewhere might want a trailer on every line
const char *na_str = CHAR(STRING_ELT(na_Arg, 0));
const int na_len = strlen(na_str);
const char QUOTE = '"';
const char ESCAPE = '\\';
const Rboolean qmethod_escape = LOGICAL(qmethod_escapeArg)[0];
const char ESCAPE_QUOTE = qmethod_escape ? ESCAPE : QUOTE;
const char *filename = CHAR(STRING_ELT(filenameArg, 0));

/* open input file in correct mode */
const char *open_mode = "wb"; // wt currently fails Windows tests as f* converts \n to \r\n on Windows.
if (LOGICAL(append)[0]) open_mode = "ab";
// TO DO: setup eol=\r\n for Windows but keep writing in binary mode rather than let f* do it
FILE *f = fopen(filename, open_mode);
if (f == NULL) {
// TODO: ensure Windows opens in O_BINARY and set row_sep='\r\n' -OR- leave O_TEXT and write() will convert \n to \r\n for us.
errno = 0; // clear flag possibly set by previous errors
int f = open(filename, O_WRONLY | O_CREAT | (LOGICAL(append)[0] ? O_APPEND : O_TRUNC), 0644);
if (f == -1) {
if( access( filename, F_OK ) != -1 )
error("File exists and failed to open for writing. Do you have write permission to it? Is this Windows and does another process such as Excel have it open? File: %s", filename);
else
Expand All @@ -54,63 +67,129 @@ SEXP writefile(SEXP list_of_columns,
levels[col_i] = isFactor(column) ? getAttrib(column, R_LevelsSymbol) : NULL;
}

for (RLEN row_i = 0; row_i < nrows; ++row_i) {
for (int col_i = 0; col_i < ncols; ++col_i) {

if (col_i > 0) fputc(col_sep, f);

char *buffer = Calloc(4*1024*1024, char); // 4MB buffer. Large enough to fit many lines. Small enough to fit in cache.
if (buffer == NULL) error("Unable to allocate 4MB buffer");
int writeTrigger = (int)(3.5*1024*1024); // When to write. Crash problems expected (for now) with lines of length
// apx 0.5 million bytes (0.5*1024*1024)

clock_t t0=clock(),t1,tformat=0,twrite=0;
// clock_t tt0,tSTR=0,tNUM=0;
SEXP str;
char *ch = buffer;
int numWrite=0;

if (LOGICAL(col_names)[0]) {
SEXP names = getAttrib(list_of_columns, R_NamesSymbol);
if (names!=NULL) {
if (LENGTH(names) != ncols) error("Internal error: length of column names is not equal to the number of columns. Please report.");
for (int col_i=0; col_i<ncols; col_i++) {
str = STRING_ELT(names, col_i);
if (str==NA_STRING) {
if (na_len) { memcpy(ch, na_str, na_len); ch += na_len; }
break;
}
if (quote) {
QUOTE_FIELD;
} else {
memcpy(ch, CHAR(str), LENGTH(str)); // could have large fields. Doubt call overhead is much of an issue on small fields.
ch += LENGTH(str);
}
*ch++ = col_sep;
}
ch--; // backup onto the last col_sep after the last column
memcpy(ch, row_sep, row_sep_len); // replace it with the newline
ch += row_sep_len;
}
}

for (RLEN row_i = 0; row_i < nrows; row_i++) {
for (int col_i = 0; col_i < ncols; col_i++) {
SEXP column = VECTOR_ELT(list_of_columns, col_i);
SEXP str = NULL;
switch(TYPEOF(column)) {
case LGLSXP:
true_false = LOGICAL(column)[row_i];
fputs(true_false == NA_LOGICAL ? na_str : (true_false ? "TRUE" : "FALSE"), f);
if (true_false == NA_LOGICAL) {
if (na_len) { memcpy(ch, na_str, na_len); ch += na_len; }
} else if (true_false) {
memcpy(ch,"TRUE",4); // Other than strings, field widths are limited which we check elsewhere here to ensure
ch += 4;
} else {
memcpy(ch,"FALSE",5);
ch += 5;
}
break;
case REALSXP:
if (ISNA(REAL(column)[row_i])) fputs(na_str, f);
else fprintf(f, "%.15g", REAL(column)[row_i]);
if (ISNA(REAL(column)[row_i])) {
if (na_len) { memcpy(ch, na_str, na_len); ch += na_len; }
} else {
//tt0 = clock();
ch += sprintf(ch, "%.15G", REAL(column)[row_i]);
//tNUM += clock()-tt0;
}
break;
case INTSXP:
if (INTEGER(column)[row_i] == NA_INTEGER) {
fputs(na_str, f);
break;
}
if (levels[col_i] != NULL) { // isFactor(column) == TRUE
if (na_len) { memcpy(ch, na_str, na_len); ch += na_len; }
} else if (levels[col_i] != NULL) { // isFactor(column) == TRUE
str = STRING_ELT(levels[col_i], INTEGER(column)[row_i]-1);
// fall through to STRSXP case
if (quote) {
QUOTE_FIELD;
} else {
memcpy(ch, CHAR(str), LENGTH(str));
ch += LENGTH(str);
}
} else {
fprintf(f, "%d", INTEGER(column)[row_i]);
break;
ch += sprintf(ch, "%d", INTEGER(column)[row_i]);
}
break;
case STRSXP:
if (str==NULL) str = STRING_ELT(column, row_i);
if (str==NA_STRING) fputs(na_str, f);
else {
int quote = LOGICAL(quote_cols)[col_i];
if (quote) fputc(QUOTE_CHAR, f);
for (const char *ch = CHAR(str); *ch != '\0'; ++ch) {
if (quote) {
if (*ch == QUOTE_CHAR) {
if (qmethod_escape) fputc(ESCAPE_CHAR, f);
else fputc(QUOTE_CHAR, f); /* qmethod = "double" */
}
if (qmethod_escape && *ch == ESCAPE_CHAR) fputc(ESCAPE_CHAR, f);
}
fputc(*ch, f);
}
if (quote) fputc(QUOTE_CHAR, f);
str = STRING_ELT(column, row_i);
if (str==NA_STRING) {
if (na_len) { memcpy(ch, na_str, na_len); ch += na_len; }
} else if (quote) {
QUOTE_FIELD;
} else {
//tt0 = clock();
memcpy(ch, CHAR(str), LENGTH(str)); // could have large fields. Doubt call overhead is much of an issue on small fields.
ch += LENGTH(str);
//tSTR += clock()-tt0;
}
break;
default:
error("Column %d's type is '%s' - not yet implemented.", col_i+1,type2char(TYPEOF(column)) );
}
// TODO: Check that buffer has more than maximum CHARSXP length left. Saves checking every time above.
*ch++ = col_sep;
}
ch--; // backup onto the last col_sep after the last column
memcpy(ch, row_sep, row_sep_len); // replace it with the newline
ch += row_sep_len;
// Rprintf("Writing a line out length %d %10s\n", (int)(ch-buffer), buffer);
if ((ch-buffer)>writeTrigger) {
t1 = clock(); tformat += t1-t0; t0 = t1;
if (write(f, buffer, (int)(ch-buffer)) == -1) { close(f); error("Error writing to file: %s", filename); }
t1 = clock(); twrite += t1-t0; t0 = t1;
numWrite++;
ch = buffer;
}
fputs(row_sep, f);
}
if (f == NULL) error("File handle is NULL at the end.");
if (fflush(f)) error("Error flushing file before closing it. Is disk full?");
if (fclose(f)) error("Error closing file: %s", filename);
if (ch>buffer) {
// write last batch remaining in buffer
t1 = clock(); tformat += t1-t0; t0 = t1;
if (write(f, buffer, (int)(ch-buffer)) == -1) { close(f); error("Error writing to file: %s", filename); }
numWrite++;
t1 = clock(); twrite += t1-t0; t0 = t1;
}
if (close(f)) error("Error closing file: %s", filename);
Free(buffer);
if (verbose) {
Rprintf("%8.3fs (%3.0f%%) format\n", 1.0*tformat/CLOCKS_PER_SEC, 100.0*tformat/(tformat+twrite));
Rprintf("%8.3fs (%3.0f%%) write (%d calls)\n", 1.0*twrite/CLOCKS_PER_SEC, 100.0*twrite/(tformat+twrite), numWrite);
//Rprintf(" %8.3fs (%3.0f%%) STR\n", 1.0*tSTR/CLOCKS_PER_SEC, 100.0*tSTR/tformat);
//Rprintf(" %8.3fs (%3.0f%%) NUM\n", 1.0*tNUM/CLOCKS_PER_SEC, 100.0*tNUM/tformat);
}
return(R_NilValue); // must always return SEXP from C level otherwise hang on Windows
}



0 comments on commit 33fda74

Please sign in to comment.