Skip to content

Commit

Permalink
mean na.rm=TRUE uses GForce (#4851)
Browse files Browse the repository at this point in the history
  • Loading branch information
jangorecki authored Mar 15, 2021
1 parent 788c585 commit ec1259a
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 97 deletions.
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

1. `nafill()` now applies `fill=` to the front/back of the vector when `type="locf|nocb"`, [#3594](https://github.com/Rdatatable/data.table/issues/3594). Thanks to @ben519 for the feature request. It also now returns a named object based on the input names. Note that if you are considering joining and then using `nafill(...,type='locf|nocb')` afterwards, please review `roll=`/`rollends=` which should achieve the same result in one step more efficiently. `nafill()` is for when filling-while-joining (i.e. `roll=`/`rollends=`/`nomatch=`) cannot be applied.

2. `mean(na.rm=TRUE)` by group is now GForce optimized, [#4849](https://github.com/Rdatatable/data.table/issues/4849). Thanks to the [h2oai/db-benchmark](https://github.com/h2oai/db-benchmark) project for spotting this issue. The 1 billion row example in the issue shows 48s reduced to 14s. The optimization also applies to type `integer64` resulting in a difference to the `bit64::mean.integer64` method: `data.table` returns a `double` result whereas `bit64` rounds the mean to the nearest integer.

## BUG FIXES

## NOTES
Expand Down
2 changes: 1 addition & 1 deletion R/data.table.R
Original file line number Diff line number Diff line change
Expand Up @@ -2856,7 +2856,7 @@ ghead = function(x, n) .Call(Cghead, x, as.integer(n)) # n is not used at the mo
gtail = function(x, n) .Call(Cgtail, x, as.integer(n)) # n is not used at the moment
gfirst = function(x) .Call(Cgfirst, x)
glast = function(x) .Call(Cglast, x)
gsum = function(x, na.rm=FALSE) .Call(Cgsum, x, na.rm, TRUE) # warnOverflow=TRUE, #986
gsum = function(x, na.rm=FALSE) .Call(Cgsum, x, na.rm)
gmean = function(x, na.rm=FALSE) .Call(Cgmean, x, na.rm)
gprod = function(x, na.rm=FALSE) .Call(Cgprod, x, na.rm)
gmedian = function(x, na.rm=FALSE) .Call(Cgmedian, x, na.rm)
Expand Down
10 changes: 10 additions & 0 deletions inst/tests/tests.Rraw
Original file line number Diff line number Diff line change
Expand Up @@ -17263,3 +17263,13 @@ if (identical(x, enc2native(x))) {

# fintersect now preserves order of first argument like intersect, #4716
test(2163, fintersect(data.table(x=c("b", "c", "a")), data.table(x=c("a","c")))$x, c("c", "a"))

# mean na.rm=TRUE GForce, #4849
d = data.table(a=1, b=list(1,2))
test(2164.1, d[, mean(b), by=a], error="not supported by GForce mean")
if (test_bit64) {
d = data.table(a=INT(1,1,2,2), b=as.integer64(c(2,3,4,NA)))
test(2164.2, d[, mean(b), by=a], data.table(a=INT(1,2), V1=c(2.5, NA)))
test(2164.3, d[, mean(b, na.rm=TRUE), by=a], data.table(a=INT(1,2), V1=c(2.5, 4)))
}

225 changes: 129 additions & 96 deletions src/gsumm.c
Original file line number Diff line number Diff line change
Expand Up @@ -337,11 +337,10 @@ void *gather(SEXP x, bool *anyNA)
return gx;
}

SEXP gsum(SEXP x, SEXP narmArg, SEXP warnOverflowArg)
SEXP gsum(SEXP x, SEXP narmArg)
{
if (!isLogical(narmArg) || LENGTH(narmArg)!=1 || LOGICAL(narmArg)[0]==NA_LOGICAL) error(_("na.rm must be TRUE or FALSE"));
const bool narm = LOGICAL(narmArg)[0];
const bool warnOverflow = LOGICAL(warnOverflowArg)[0];
if (inherits(x, "factor")) error(_("sum is not meaningful for factors."));
const int n = (irowslen == -1) ? length(x) : irowslen;
double started = wallclock();
Expand Down Expand Up @@ -401,7 +400,7 @@ SEXP gsum(SEXP x, SEXP narmArg, SEXP warnOverflowArg)
//Rprintf(_("gsum int took %.3f\n"), wallclock()-started);
if (overflow) {
UNPROTECT(1); // discard the result with overflow
if (warnOverflow) warning(_("The sum of an integer column for a group was more than type 'integer' can hold so the result has been coerced to 'numeric' automatically for convenience."));
warning(_("The sum of an integer column for a group was more than type 'integer' can hold so the result has been coerced to 'numeric' automatically for convenience."));
ans = PROTECT(allocVector(REALSXP, ngrp));
double *restrict ansp = REAL(ans);
memset(ansp, 0, ngrp*sizeof(double));
Expand Down Expand Up @@ -570,113 +569,147 @@ SEXP gsum(SEXP x, SEXP narmArg, SEXP warnOverflowArg)
return(ans);
}

SEXP gmean(SEXP x, SEXP narm)
SEXP gmean(SEXP x, SEXP narmArg)
{
SEXP ans=R_NilValue;
//clock_t start = clock();
if (!isLogical(narm) || LENGTH(narm)!=1 || LOGICAL(narm)[0]==NA_LOGICAL) error(_("na.rm must be TRUE or FALSE"));
if (!isVectorAtomic(x)) error(_("GForce mean can only be applied to columns, not .SD or similar. Likely you're looking for 'DT[,lapply(.SD,mean),by=,.SDcols=]'. See ?data.table."));
if (inherits(x, "factor")) error(_("mean is not meaningful for factors."));
if (!LOGICAL(narm)[0]) {
int protecti=0;
ans = PROTECT(gsum(x, narm, /*#986, warnOverflow=*/ScalarLogical(FALSE))); protecti++;
switch(TYPEOF(ans)) {
case LGLSXP: case INTSXP:
ans = PROTECT(coerceVector(ans, REALSXP)); protecti++;
case REALSXP: {
double *xd = REAL(ans);
for (int i=0; i<ngrp; i++) *xd++ /= grpsize[i]; // let NA propogate
} break;
case CPLXSXP: {
Rcomplex *xd = COMPLEX(ans);
for (int i=0; i<ngrp; i++) {
xd->i /= grpsize[i];
xd->r /= grpsize[i];
xd++;
}
} break;
default :
error(_("Internal error: gsum returned type '%s'. typeof(x) is '%s'"), type2char(TYPEOF(ans)), type2char(TYPEOF(x))); // # nocov
}
UNPROTECT(protecti);
return(ans);
}
// na.rm=TRUE. Similar to gsum, but we need to count the non-NA as well for the divisor
if (!isLogical(narmArg) || LENGTH(narmArg)!=1 || LOGICAL(narmArg)[0]==NA_LOGICAL) error(_("na.rm must be TRUE or FALSE"));
const bool narm = LOGICAL(narmArg)[0];
const int n = (irowslen == -1) ? length(x) : irowslen;
if (nrow != n) error(_("nrow [%d] != length(x) [%d] in %s"), nrow, n, "gsum");

long double *s = calloc(ngrp, sizeof(long double)), *si=NULL; // s = sum; si = sum imaginary just for complex
if (!s) error(_("Unable to allocate %d * %d bytes for sum in gmean na.rm=TRUE"), ngrp, sizeof(long double));

int *c = calloc(ngrp, sizeof(int));
if (!c) error(_("Unable to allocate %d * %d bytes for counts in gmean na.rm=TRUE"), ngrp, sizeof(int));

double started = wallclock();
const bool verbose=GetVerbose();
if (verbose) Rprintf(_("This gmean took (narm=%s) ... "), narm?"TRUE":"FALSE"); // narm=TRUE only at this point
if (nrow != n) error(_("nrow [%d] != length(x) [%d] in %s"), nrow, n, "gmean");
bool anyNA=false;
SEXP ans=R_NilValue;
int protecti=0;
switch(TYPEOF(x)) {
case LGLSXP: case INTSXP: {
const int *xd = INTEGER(x);
for (int i=0; i<n; i++) {
int thisgrp = grp[i];
int ix = (irowslen == -1) ? i : irows[i]-1;
if (xd[ix] == NA_INTEGER) continue;
s[thisgrp] += xd[ix]; // no under/overflow here, s is long double
c[thisgrp]++;
}
} break;
case LGLSXP: case INTSXP:
x = PROTECT(coerceVector(x, REALSXP)); protecti++;
case REALSXP: {
const double *xd = REAL(x);
for (int i=0; i<n; i++) {
int thisgrp = grp[i];
int ix = (irowslen == -1) ? i : irows[i]-1;
if (ISNAN(xd[ix])) continue;
s[thisgrp] += xd[ix];
c[thisgrp]++;
}
} break;
case CPLXSXP: {
const Rcomplex *xd = COMPLEX(x);
si = calloc(ngrp, sizeof(long double));
if (!si) error(_("Unable to allocate %d * %d bytes for si in gmean na.rm=TRUE"), ngrp, sizeof(long double));
for (int i=0; i<n; i++) {
int thisgrp = grp[i];
int ix = (irowslen == -1) ? i : irows[i]-1;
if (ISNAN(xd[ix].r) || ISNAN(xd[ix].i)) continue; // || otherwise we'll need two counts in two c's too?
s[thisgrp] += xd[ix].r;
si[thisgrp] += xd[ix].i;
c[thisgrp]++;
if (INHERITS(x, char_integer64)) {
x = PROTECT(coerceAs(x, /*as=*/ScalarReal(1), /*copyArg=*/ScalarLogical(TRUE))); protecti++;
}
} break;
default:
free(s); free(c); // # nocov because it already stops at gsum, remove nocov if gmean will support a type that gsum wont
error(_("Type '%s' not supported by GForce mean (gmean) na.rm=TRUE. Either add the prefix base::mean(.) or turn off GForce optimization using options(datatable.optimize=1)"), type2char(TYPEOF(x))); // # nocov
}
switch(TYPEOF(x)) {
case LGLSXP: case INTSXP: case REALSXP: {
ans = PROTECT(allocVector(REALSXP, ngrp));
double *ansd = REAL(ans);
for (int i=0; i<ngrp; i++) {
if (c[i]==0) { ansd[i] = R_NaN; continue; } // NaN to follow base::mean
s[i] /= c[i];
ansd[i] = s[i]>DBL_MAX ? R_PosInf : (s[i] < -DBL_MAX ? R_NegInf : (double)s[i]);
const double *restrict gx = gather(x, &anyNA);
ans = PROTECT(allocVector(REALSXP, ngrp)); protecti++;
double *restrict ansp = REAL(ans);
memset(ansp, 0, ngrp*sizeof(double));
if (!narm || !anyNA) {
#pragma omp parallel for num_threads(getDTthreads(highSize, false))
for (int h=0; h<highSize; h++) {
double *restrict _ans = ansp + (h<<shift);
for (int b=0; b<nBatch; b++) {
const int pos = counts[ b*highSize + h ];
const int howMany = ((h==highSize-1) ? (b==nBatch-1?lastBatchSize:batchSize) : counts[ b*highSize + h + 1 ]) - pos;
const double *my_gx = gx + b*batchSize + pos;
const uint16_t *my_low = low + b*batchSize + pos;
for (int i=0; i<howMany; i++) {
_ans[my_low[i]] += my_gx[i]; // let NA propagate when !narm
}
}
}
#pragma omp parallel for num_threads(getDTthreads(ngrp, true))
for (int i=0; i<ngrp; i++) ansp[i] /= grpsize[i];
} else {
// narm==true and anyNA==true
int *restrict nna_counts = calloc(ngrp, sizeof(int));
if (!nna_counts) error(_("Unable to allocate %d * %d bytes for non-NA counts in gmean na.rm=TRUE"), ngrp, sizeof(int));
#pragma omp parallel for num_threads(getDTthreads(highSize, false))
for (int h=0; h<highSize; h++) {
double *restrict _ans = ansp + (h<<shift);
int *restrict _nna = nna_counts + (h<<shift);
for (int b=0; b<nBatch; b++) {
const int pos = counts[ b*highSize + h ];
const int howMany = ((h==highSize-1) ? (b==nBatch-1?lastBatchSize:batchSize) : counts[ b*highSize + h + 1 ]) - pos;
const double *my_gx = gx + b*batchSize + pos;
const uint16_t *my_low = low + b*batchSize + pos;
for (int i=0; i<howMany; i++) {
const double elem = my_gx[i];
if (!ISNAN(elem)) {
_ans[my_low[i]] += elem;
_nna[my_low[i]]++;
}
}
}
}
#pragma omp parallel for num_threads(getDTthreads(ngrp, true))
for (int i=0; i<ngrp; i++) ansp[i] /= nna_counts[i];
free(nna_counts);
}
} break;
case CPLXSXP: {
ans = PROTECT(allocVector(CPLXSXP, ngrp));
Rcomplex *ansd = COMPLEX(ans);
for (int i=0; i<ngrp; i++) {
if (c[i]==0) { ansd[i].r = R_NaN; ansd[i].i = R_NaN; continue; }
s[i] /= c[i];
si[i] /= c[i];
ansd[i].r = s[i] >DBL_MAX ? R_PosInf : (s[i] < -DBL_MAX ? R_NegInf : (double)s[i]);
ansd[i].i = si[i]>DBL_MAX ? R_PosInf : (si[i]< -DBL_MAX ? R_NegInf : (double)si[i]);
const Rcomplex *restrict gx = gather(x, &anyNA);
ans = PROTECT(allocVector(CPLXSXP, ngrp)); protecti++;
Rcomplex *restrict ansp = COMPLEX(ans);
memset(ansp, 0, ngrp*sizeof(Rcomplex));
if (!narm || !anyNA) {
#pragma omp parallel for num_threads(getDTthreads(highSize, false))
for (int h=0; h<highSize; h++) {
Rcomplex *restrict _ans = ansp + (h<<shift);
for (int b=0; b<nBatch; b++) {
const int pos = counts[ b*highSize + h ];
const int howMany = ((h==highSize-1) ? (b==nBatch-1?lastBatchSize:batchSize) : counts[ b*highSize + h + 1 ]) - pos;
const Rcomplex *my_gx = gx + b*batchSize + pos;
const uint16_t *my_low = low + b*batchSize + pos;
for (int i=0; i<howMany; i++) {
_ans[my_low[i]].r += my_gx[i].r; // let NA propagate when !narm
_ans[my_low[i]].i += my_gx[i].i;
}
}
}
#pragma omp parallel for num_threads(getDTthreads(ngrp, true))
for (int i=0; i<ngrp; i++) {
ansp[i].i /= grpsize[i];
ansp[i].r /= grpsize[i];
}
} else {
// narm==true and anyNA==true
int *restrict nna_counts_r = calloc(ngrp, sizeof(int));
int *restrict nna_counts_i = calloc(ngrp, sizeof(int));
if (!nna_counts_r || !nna_counts_i) {
// # nocov start
free(nna_counts_r); // free(NULL) is allowed and does nothing. Avoids repeating the error() call here.
free(nna_counts_i);
error(_("Unable to allocate %d * %d bytes for non-NA counts in gmean na.rm=TRUE"), ngrp, sizeof(int));
// # nocov end
}
#pragma omp parallel for num_threads(getDTthreads(highSize, false))
for (int h=0; h<highSize; h++) {
Rcomplex *restrict _ans = ansp + (h<<shift);
int *restrict _nna_r = nna_counts_r + (h<<shift);
int *restrict _nna_i = nna_counts_i + (h<<shift);
for (int b=0; b<nBatch; b++) {
const int pos = counts[ b*highSize + h ];
const int howMany = ((h==highSize-1) ? (b==nBatch-1?lastBatchSize:batchSize) : counts[ b*highSize + h + 1 ]) - pos;
const Rcomplex *my_gx = gx + b*batchSize + pos;
const uint16_t *my_low = low + b*batchSize + pos;
for (int i=0; i<howMany; i++) {
const Rcomplex elem = my_gx[i];
if (!ISNAN(elem.r)) {
_ans[my_low[i]].r += elem.r;
_nna_r[my_low[i]]++;
}
if (!ISNAN(elem.i)) {
_ans[my_low[i]].i += elem.i;
_nna_i[my_low[i]]++;
}
}
}
}
#pragma omp parallel for num_threads(getDTthreads(ngrp, true))
for (int i=0; i<ngrp; i++) {
ansp[i].r /= nna_counts_r[i];
ansp[i].i /= nna_counts_i[i];
}
free(nna_counts_r);
free(nna_counts_i);
}
} break;
default:
error(_("Internal error: unsupported type at the end of gmean")); // # nocov
error(_("Type '%s' not supported by GForce mean (gmean). Either add the prefix base::mean(.) or turn off GForce optimization using options(datatable.optimize=1)"), type2char(TYPEOF(x)));
}
free(s); free(si); free(c);
copyMostAttrib(x, ans);
// Rprintf(_("this gmean na.rm=TRUE took %8.3f\n"), 1.0*(clock()-start)/CLOCKS_PER_SEC);
UNPROTECT(1);
if (verbose) { Rprintf(_("%.3fs\n"), wallclock()-started); }
UNPROTECT(protecti);
return(ans);
}

Expand Down

0 comments on commit ec1259a

Please sign in to comment.