Skip to content

Commit

Permalink
fwrite gains progress meter, #1664
Browse files Browse the repository at this point in the history
  • Loading branch information
mattdowle committed Nov 3, 2016
1 parent 9c89881 commit eb9f7ef
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 12 deletions.
14 changes: 10 additions & 4 deletions R/fwrite.R
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
fwrite <- function(x, file="", append=FALSE, quote="auto",
sep=",", eol=if (.Platform$OS.type=="windows") "\r\n" else "\n",
na="", dec=".", row.names=FALSE, col.names=TRUE,
qmethod=c("double","escape"), verbose=FALSE, ..turbo=TRUE) {
qmethod=c("double","escape"),
showProgress = getOption("datatable.showProgress"),
verbose=getOption("datatable.verbose"),
..turbo=TRUE) {
isLOGICAL = function(x) isTRUE(x) || identical(FALSE, x) # it seems there is no isFALSE in R?
na = as.character(na[1L]) # fix for #1725
if (missing(qmethod)) qmethod = qmethod[1L]
Expand All @@ -12,7 +15,8 @@ fwrite <- function(x, file="", append=FALSE, quote="auto",
length(sep) == 1L && class(sep) == "character" && nchar(sep) == 1L,
length(eol) == 1L && class(eol) == "character",
length(qmethod) == 1L && qmethod %in% c("double", "escape"),
isLOGICAL(col.names), isLOGICAL(append), isLOGICAL(verbose), isLOGICAL(row.names),
isLOGICAL(col.names), isLOGICAL(append), isLOGICAL(row.names),
isLOGICAL(verbose), isLOGICAL(showProgress),
length(na) == 1L, #1725, handles NULL or character(0) input
isLOGICAL(..turbo),
is.character(file) && length(file)==1 && !is.na(file),
Expand All @@ -23,8 +27,10 @@ fwrite <- function(x, file="", append=FALSE, quote="auto",
col.names = FALSE # test 1658.16 checks this
if (!..turbo) warning("The ..turbo=FALSE option will be removed in future. Please report any problems with ..turbo=TRUE.")
if (identical(quote,"auto")) quote=NA # logical NA
if (verbose || file=="") old=setDTthreads(1) # console output isn't thread safe
.Call(Cwritefile, x, file, sep, eol, na, dec, quote, qmethod=="escape", append, row.names, col.names, verbose, ..turbo)
if (verbose || file=="") old=setDTthreads(1) # console output isn't thread safe
if (file=="") showProgress=FALSE
.Call(Cwritefile, x, file, sep, eol, na, dec, quote, qmethod=="escape", append,
row.names, col.names, showProgress, verbose, ..turbo)
if (verbose) setDTthreads(old)
invisible()
}
Expand Down
2 changes: 1 addition & 1 deletion R/onLoad.R
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
"datatable.warnredundantby"="TRUE", # not a function argument
"datatable.alloccol"="1024L", # argument 'n' of alloc.col. Over-allocate 1024 spare column slots
"datatable.integer64"="'integer64'", # datatable.<argument name> integer64|double|character
"datatable.showProgress"="1L", # in fread
"datatable.showProgress"="TRUE", # in fread and fwrite
"datatable.auto.index"="TRUE", # DT[col=="val"] to auto add index so 2nd time faster
"datatable.use.index"="TRUE", # global switch to address #1422
"datatable.fread.datatable"="TRUE",
Expand Down
21 changes: 21 additions & 0 deletions inst/tests/tests.Rraw
Original file line number Diff line number Diff line change
Expand Up @@ -9534,6 +9534,27 @@ test(1734.6, capture.output(fwrite(DF,row.names=TRUE,quote=FALSE)),
capture.output(write.csv(DF,quote=FALSE)))
test(1734.7, capture.output(fwrite(DF,row.names=TRUE,quote=TRUE)),
capture.output(write.csv(DF)))

# fwrite showProgress test. Turned off as too long for CRAN.
if (FALSE) {
N = 6e8 # apx 6GB
DT = data.table(C1=sample(100000,N,replace=TRUE), C2=sample(paste0(LETTERS,LETTERS,LETTERS), N, replace=TRUE))
gc()
setDTthreads(1) # to slow it down
system.time(fwrite(DT, "/tmp/test.txt"))
system("ls -lh /tmp/test.txt")

# ensure progress meter itself isn't taking time; e.g. too many calls to time() or clock()
system.time(fwrite(DT, "/tmp/test.txt", showProgress=FALSE))

setDTthreads(2) # make sure it works with several threads
system.time(fwrite(DT, "/tmp/test.txt"))
setDTthreads(4)
system.time(fwrite(DT, "/tmp/test.txt"))
setDTthreads(0)
system.time(fwrite(DT, "/tmp/test.txt"))
}


##########################

Expand Down
2 changes: 1 addition & 1 deletion man/fread.Rd
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ data.table=getOption("datatable.fread.datatable") # default: TRUE
\item{fill}{logical (default is \code{FALSE}). If \code{TRUE} then in case the rows have unequal length, blank fields are implicitly filled.}
\item{blank.lines.skip}{\code{logical}, default is \code{FALSE}. If \code{TRUE} blank lines in the input are ignored.}
\item{key}{Character vector of one or more column names which is passed to \code{\link{setkey}}. It may be a single comma separated string such as \code{key="x,y,z"}, or a vector of names such as \code{key=c("x","y","z")}. Only valid when argument \code{data.table=TRUE}.}
\item{showProgress}{ TRUE displays progress on the console using \code{\\r}. It is produced in fread's C code where the very nice (but R level) txtProgressBar and tkProgressBar are not easily available. }
\item{showProgress}{ \code{TRUE} displays progress on the console using \code{\\r}. It is produced in fread's C code where the very nice (but R level) txtProgressBar and tkProgressBar are not easily available. }
\item{data.table}{ TRUE returns a \code{data.table}. FALSE returns a \code{data.frame}. }
}
\details{
Expand Down
5 changes: 4 additions & 1 deletion man/fwrite.Rd
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ As \code{write.csv} and but \emph{much} faster (e.g. 1 minute versus 2 seconds)
fwrite(x, file = "", append = FALSE, quote = "auto", sep = ",",
eol = if (.Platform$OS.type=="windows") "\r\n" else "\n",
na = "", dec = ".", row.names = FALSE, col.names = TRUE, qmethod = c("double","escape"),
verbose=FALSE, ..turbo=TRUE)
showProgress = getOption("datatable.showProgress"),
verbose = getOption("datatable.verbose"),
..turbo=TRUE)
}
\arguments{
\item{x}{A \code{data.table} or \code{data.frame} to write.}
Expand All @@ -26,6 +28,7 @@ fwrite(x, file = "", append = FALSE, quote = "auto", sep = ",",
\item{"escape" - the quote character (as well as the backslash character) is escaped in C style by a backslash, or}
\item{"double" (default, same as \code{write.csv}), in which case the double quote is doubled with another one.}
}}
\item{showProgress}{ Display a progress meter on the console when \code{file!=""}. }
\item{verbose}{Be chatty and report timings?}
\item{..turbo}{Use specialized custom C code to format numeric, integer and integer64 columns. This reduces call overhead to the C library and avoids any use of memory buffers (copies) at all. Try with and without to see the difference it makes on your machine and please report any significant differences in output. If you do find cases where \code{..turbo=FALSE} is needed, please report them as bugs, since this option WILL BE REMOVED in future. Hence why it has the \code{__} prefix.}
}
Expand Down
38 changes: 33 additions & 5 deletions src/fwrite.c
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ SEXP writefile(SEXP list_of_columns,
SEXP append, // TRUE|FALSE
SEXP row_names, // TRUE|FALSE
SEXP col_names, // TRUE|FALSE
SEXP showProgressArg,
SEXP verboseArg,
SEXP turboArg)
{
Expand All @@ -307,7 +308,12 @@ SEXP writefile(SEXP list_of_columns,
#ifndef _OPENMP
Rprintf("Your platform/environment has not detected OpenMP support. fwrite() will still work, but slower in single threaded mode.\n");
// Rprintf rather than warning() because warning() would cause test.data.table() to error about the unexpected warnings
#endif
#endif

const Rboolean showProgress = LOGICAL(showProgressArg)[0];
time_t start = time(NULL);
time_t nexttime = start+2; // start printing progress meter in 2 sec if not completed by then

verbose = LOGICAL(verboseArg)[0];
const Rboolean turbo = LOGICAL(turboArg)[0];

Expand Down Expand Up @@ -444,12 +450,12 @@ SEXP writefile(SEXP list_of_columns,
if (lineLenMax > bufSize) bufSize = lineLenMax;
const int rowsPerBatch = bufSize/lineLenMax;
const int numBatches = (nrows-1)/rowsPerBatch + 1;
if (verbose) Rprintf("Writing data rows in %d batches of %d rows (each buffer size %.3fMB, turbo=%d) ... ",
numBatches, rowsPerBatch, 1.0*bufSize/(1024*1024), turbo);
if (verbose) Rprintf("Writing data rows in %d batches of %d rows (each buffer size %.3fMB, turbo=%d, showProgress=%d) ... ",
numBatches, rowsPerBatch, 1.0*bufSize/(1024*1024), turbo, showProgress);
t0 = clock();

int nth;
Rboolean failed=FALSE;
Rboolean failed=FALSE, hasPrinted=FALSE;
int failed_reason=0; // -1 for malloc fail, else write's errno (>=1)
#pragma omp parallel num_threads(getDTthreads())
{
Expand All @@ -469,6 +475,8 @@ SEXP writefile(SEXP list_of_columns,
{
nth = omp_get_num_threads();
}
int me = omp_get_thread_num();

#pragma omp for ordered schedule(dynamic)
for(RLEN start_row = 0; start_row < nrows; start_row += rowsPerBatch) {
if (failed) continue; // Not break. See comments above about #omp cancel
Expand Down Expand Up @@ -595,7 +603,7 @@ SEXP writefile(SEXP list_of_columns,
// Although this ordered section is one-at-a-time it seems that calling Rprintf() here, even with a
// R_FlushConsole() too, causes corruptions on Windows but not on Linux. At least, as observed so
// far using capture.output(). Perhaps Rprintf() updates some state or allocation that cannot be done
// by child threads, even when one-at-a-time. Anyway, made this single-threaded when output to console
// by slave threads, even when one-at-a-time. Anyway, made this single-threaded when output to console
// to be safe (setDTthreads(1) in fwrite.R) since output to console doesn't need to be fast.
} else {
if (!failed && WRITE(f, buffer, (int)(ch-buffer)) == -1) {
Expand All @@ -604,6 +612,21 @@ SEXP writefile(SEXP list_of_columns,
// The !failed is so the other threads that were waiting at this '#omp ordered' don't try
// and write after the first fail.

time_t now;
if (me==0 && showProgress && (now=time(NULL))>=nexttime) {
// See comments above inside the f==-1 clause.
// Not only is this ordered section one-at-a-time but we'll also Rprintf() here only from the
// master thread (me==0) and hopefully this will work on Windows. If not, user should set
// showProgress=FALSE until this can be fixed or removed.
int eta = (int)((nrows-upp)*(((double)(now-start))/upp));
if (hasPrinted || eta >= 2) {
Rprintf("\rWritten %.1f%% of %d rows in %d secs using %d thread%s. ETA %d secs.",
(100.0*upp)/nrows, nrows, (int)(now-start), nth, nth==1?"":"s", eta);
R_FlushConsole(); // for Windows
nexttime = now+1;
hasPrinted = TRUE;
}
}
// TODO: Adding a progress bar here with Rprintf() or similar should be possible in theory, but see
// the comment above about corruptions observed on windows. Could try and see. If the progress bar
// corruputed then there's not too much harm. It could be delayed and flushed, or, best just have
Expand All @@ -617,6 +640,11 @@ SEXP writefile(SEXP list_of_columns,
// If malloc() failed for me, free(NULL) is ok and does nothing.
}
// Finished parallel region and can call R API safely now.
if (hasPrinted) {
// clear the progress meter
Rprintf("\r \r");
R_FlushConsole(); // for Windows
}
if (failed && failed_reason==-1)
error("One or more threads failed to alloc or realloc their private buffer. Out of memory.\n");
if (f!=-1 && CLOSE(f) && !failed) error("Error closing file '%s': %s", filename, strerror(errno));
Expand Down

0 comments on commit eb9f7ef

Please sign in to comment.