Skip to content

Commit

Permalink
fwrite robust to very long line lengths (e.g. 100k columns). Test add…
Browse files Browse the repository at this point in the history
…ed. #580
  • Loading branch information
mattdowle committed Apr 14, 2016
1 parent eb9bb60 commit 029c141
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 12 deletions.
11 changes: 11 additions & 0 deletions inst/tests/tests.Rraw
Original file line number Diff line number Diff line change
Expand Up @@ -8729,6 +8729,17 @@ if (requireNamespace("xts", quietly = TRUE)) {
test(1663, dt.xts[1L], xts::xts(data.table(nav=100), order.by=as.Date("2014-12-31")))
}

# fwrite crash on very large number of columns (say 100k)
set.seed(123)
m <- matrix(runif(3*100000), nrow = 3)
DT <- as.data.table(m)
f <- tempfile()
system.time(fwrite(DT, f, eol='\n')) # eol fixed so size test passes on Windows
system.time(fwrite(DT, f, eol='\n')) # run again to force seg fault
test(1664, file.info(f)$size, 6288931)
unlink(f)


##########################

# TODO: Tests involving GForce functions needs to be run with optimisation level 1 and 2, so that both functions are tested all the time.
Expand Down
90 changes: 78 additions & 12 deletions src/fwrite.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@
#define WRITE write
#define CLOSE close
#endif

static inline int maxStrLen(SEXP x, int na_len) {
// The max nchar of any string in a column or factor level
int max=na_len, nrow=length(x);
for (int i=0; i<nrow; i++) {
int l = LENGTH(STRING_ELT(x,i));
if (l>max) max=l;
}
return max;
}

#define QUOTE_FIELD \
*ch++ = QUOTE; \
Expand Down Expand Up @@ -75,48 +85,103 @@ SEXP writefile(SEXP list_of_columns,
}
int true_false;

// prefetch levels of factor columns (if any) to save getAttrib on every field on every row of any factor column
clock_t t0=clock();
// i) prefetch levels of factor columns (if any) to save getAttrib on every field on every row of any factor column
// ii) calculate certain upper bound of line length
SEXP levels[ncols]; // on-stack vla
int lineLenMax = 2; // initialize with eol max width of \r\n on windows
for (int col_i=0; col_i<ncols; col_i++) {
SEXP column = VECTOR_ELT(list_of_columns, col_i);
levels[col_i] = isFactor(column) ? getAttrib(column, R_LevelsSymbol) : NULL;
switch(TYPEOF(column)) {
case LGLSXP:
lineLenMax+=5; // width of FALSE
break;
case REALSXP:
lineLenMax+=20; // 15 (+ 5 for safety)
break;
case INTSXP:
if (isFactor(column)) {
levels[col_i] = getAttrib(column, R_LevelsSymbol);
lineLenMax += maxStrLen(levels[col_i], na_len)*(1+quote) + quote*2;
// ^^^^^^^^^^ in case every character in the field is a quote, each to be escaped (!)
} else {
levels[col_i] = NULL;
lineLenMax+=11; // 11 + sign
}
break;
case STRSXP:
lineLenMax += maxStrLen(column, na_len)*(1+quote) + quote*2;
break;
default:
error("Column %d's type is '%s' - not yet implemented.", col_i+1,type2char(TYPEOF(column)) );
}
lineLenMax++; // column separator
}

char *buffer = Calloc(4*1024*1024, char); // 4MB buffer. Large enough to fit many lines. Small enough to fit in cache.
if (buffer == NULL) error("Unable to allocate 4MB buffer");
int writeTrigger = (int)(3.5*1024*1024); // When to write. Crash problems expected (for now) with lines of length
// apx 0.5 million bytes (0.5*1024*1024)
int bufSize = 4*1024*1024; // 4MB buffer. Large enough to fit many lines. Small enough to fit in cache.
if (lineLenMax > bufSize) bufSize = lineLenMax;
int writeTrigger = bufSize-lineLenMax-1; // When to write.
// TODO: int linesPerBuf = bufSize/lineLenMax
if (verbose) Rprintf("lineLenMax: %d\nbufSize: %d\nwriteTrigger: %d\n", lineLenMax, bufSize, writeTrigger);

char *buffer = Calloc(bufSize, char);
if (buffer == NULL) error("Unable to allocate %dMB buffer", bufSize/(1024*1024));

clock_t t0=clock(),t1,tformat=0,twrite=0;
clock_t tlineLenMax=clock()-t0;
t0=clock();
clock_t t1,tformat=0,twrite=0;
// clock_t tt0,tSTR=0,tNUM=0;
SEXP str;
char *ch = buffer;
int numWrite=0;

// Write the column names
if (LOGICAL(col_names)[0]) {
SEXP names = getAttrib(list_of_columns, R_NamesSymbol);
if (names!=NULL) {
if (LENGTH(names) != ncols) error("Internal error: length of column names is not equal to the number of columns. Please report.");
for (int col_i=0; col_i<ncols; col_i++) {
str = STRING_ELT(names, col_i);

// The lineLenMax was just for data rows, not the column names.
// Be robust to enormously long column names e.g. 100,000 character single column name which might not fit in the buffer
int maxLen = 2 + (str==NA_STRING ? na_len : LENGTH(str)*(1+quote)+quote*2);
// ^ to cover sep or windows eol
if (maxLen >= bufSize) error("The buffer isn't big enough to hold single column name %d", col_i+1);
if (ch-buffer+maxLen >= bufSize) {
if (ch == buffer) error("Internal error: ch>buffer at this point");
// write out the column names we've written to the buffer already and rewind the buffer
if (WRITE(f, buffer, (int)(ch-buffer)) == -1) { close(f); error("Error writing to file: %s", filename); }
numWrite++;
ch = buffer;
}
// now proceed safely to write the column name to the buffer

if (str==NA_STRING) {
if (na_len) { memcpy(ch, na_str, na_len); ch += na_len; }
break;
}
if (quote) {
QUOTE_FIELD;
} else {
memcpy(ch, CHAR(str), LENGTH(str)); // could have large fields. Doubt call overhead is much of an issue on small fields.
memcpy(ch, CHAR(str), LENGTH(str));
ch += LENGTH(str);
}
*ch++ = col_sep;
}
ch--; // backup onto the last col_sep after the last column
memcpy(ch, row_sep, row_sep_len); // replace it with the newline
ch += row_sep_len;

// For gigantic data row lengths the buffer may only be able to hold one row at a time. So write the column
// names now and clear the buffer, just in case, ready for the first data row.
if (WRITE(f, buffer, (int)(ch-buffer)) == -1) { close(f); error("Error writing to file: %s", filename); }
numWrite++;
ch = buffer;
}
}

// Write the data rows
for (RLEN row_i = 0; row_i < nrows; row_i++) {
for (int col_i = 0; col_i < ncols; col_i++) {
SEXP column = VECTOR_ELT(list_of_columns, col_i);
Expand Down Expand Up @@ -173,11 +238,10 @@ SEXP writefile(SEXP list_of_columns,
default:
error("Column %d's type is '%s' - not yet implemented.", col_i+1,type2char(TYPEOF(column)) );
}
// TODO: Check that buffer has more than maximum CHARSXP length left. Saves checking every time above.
*ch++ = col_sep;
}
ch--; // backup onto the last col_sep after the last column
memcpy(ch, row_sep, row_sep_len); // replace it with the newline
memcpy(ch, row_sep, row_sep_len); // replace it with the newline. TODO: replace memcpy call with eol1 eol2 --eolLen
ch += row_sep_len;
// Rprintf("Writing a line out length %d %10s\n", (int)(ch-buffer), buffer);
if ((ch-buffer)>writeTrigger) {
Expand All @@ -197,9 +261,11 @@ SEXP writefile(SEXP list_of_columns,
}
if (CLOSE(f)) error("Error closing file: %s", filename);
Free(buffer);
clock_t total = tlineLenMax + tformat + twrite;
if (verbose) {
Rprintf("%8.3fs (%3.0f%%) format\n", 1.0*tformat/CLOCKS_PER_SEC, 100.0*tformat/(tformat+twrite));
Rprintf("%8.3fs (%3.0f%%) write (%d calls)\n", 1.0*twrite/CLOCKS_PER_SEC, 100.0*twrite/(tformat+twrite), numWrite);
Rprintf("%8.3fs (%3.0f%%) calc max line length\n", 1.0*tlineLenMax/CLOCKS_PER_SEC, 100.0*tlineLenMax/total);
Rprintf("%8.3fs (%3.0f%%) format\n", 1.0*tformat/CLOCKS_PER_SEC, 100.0*tformat/total);
Rprintf("%8.3fs (%3.0f%%) write (%d calls)\n", 1.0*twrite/CLOCKS_PER_SEC, 100.0*twrite/total, numWrite);
//Rprintf(" %8.3fs (%3.0f%%) STR\n", 1.0*tSTR/CLOCKS_PER_SEC, 100.0*tSTR/tformat);
//Rprintf(" %8.3fs (%3.0f%%) NUM\n", 1.0*tNUM/CLOCKS_PER_SEC, 100.0*tNUM/tformat);
}
Expand Down

0 comments on commit 029c141

Please sign in to comment.