Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

port CJ to C #3596

Merged
merged 17 commits into from
Jun 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,25 @@

16. `as.data.table` now unpacks columns in a `data.frame` which are themselves a `data.frame`. This need arises when parsing JSON, a corollary in [#3369](https://github.com/Rdatatable/data.table/issues/3369#issuecomment-462662752). `data.table` does not allow columns to be objects which themselves have columns (such as `matrix` and `data.frame`), unlike `data.frame` which does. Bug fix 19 in v1.12.2 (see below) added a helpful error (rather than segfault) to detect such invalid `data.table`, and promised that `as.data.table()` would unpack these columns in the next release (i.e. this release) so that the invalid `data.table` is not created in the first place.

17. `CJ` has been ported to C and parallelized, thanks to a PR by Michael Chirico, [#3596](https://github.com/Rdatatable/data.table/pull/3596). All types benefit, and as in many `data.table` operations, factors benefit more than character.

```R
# default 4 threads on a laptop with 16GB RAM and 8 logical CPU

ids = as.vector(outer(LETTERS, LETTERS, paste0))
system.time(DT1 <- CJ(ids, 1:500000)) # 3.9GB; 340m rows
# user system elapsed
# 3.000 0.817 3.798 # was
# 1.800 0.832 2.190 # now

ids = as.factor(ids)
system.time(DT2 <- CJ(ids, 1:500000)) # 2.6GB; 340m rows
# user system elapsed
# 1.779 0.534 2.293 # was
# 0.357 0.763 0.292 # now
```


#### BUG FIXES

1. `first`, `last`, `head` and `tail` by group no longer error in some cases, [#2030](https://github.com/Rdatatable/data.table/issues/2030) [#3462](https://github.com/Rdatatable/data.table/issues/3462). Thanks to @franknarf1 for reporting.
Expand Down
61 changes: 23 additions & 38 deletions R/setkey.R
Original file line number Diff line number Diff line change
Expand Up @@ -370,53 +370,38 @@ CJ = function(..., sorted = TRUE, unique = FALSE)
if (isFALSE(getOption("datatable.CJ.names", TRUE))) { # default TRUE from v1.12.0, FALSE before. TODO: remove option in v1.13.0 as stated in news
if (is.null(vnames <- names(l))) vnames = paste0("V", seq_len(length(l)))
else if (any(tt <- vnames=="")) vnames[tt] = paste0("V", which(tt))
names(l) = vnames
} else {
names(l) = name_dots(...)
vnames = name_dots(...)
}
emptyList = FALSE ## fix for #2511
if(any(sapply(l, length) == 0L)){
## at least one column is empty The whole thing will be empty in the end
emptyList = TRUE
l = lapply(l, "[", 0L)
}
if (unique && !emptyList) l = lapply(l, unique)

dups = FALSE # fix for #1513
if (length(l)==1L && !emptyList && sorted && length(o <- forderv(l[[1L]])))
l[[1L]] = l[[1L]][o]
else if (length(l) > 1L && !emptyList) {
# using rep.int instead of rep speeds things up considerably (but attributes are dropped).
attribs = lapply(l, attributes) # remember attributes for resetting after rep.int
n = vapply(l, length, 0L) #lengths(l) will work from R 3.2.0
nrow = prod(n)
if (nrow > .Machine$integer.max) {
stop("Cross product of elements provided to CJ() would result in ",nrow," rows which exceeds .Machine$integer.max == ",.Machine$integer.max)
}
x = c(rev( head(cumprod(rev(n)),-1) ), 1L)
for (i in seq_along(x)) {
y = l[[i]]
# fix for #1513
if (sorted) {
if (length(o <- forderv(y, retGrp=TRUE))) y = y[o]
if (!dups) dups = attr(o, 'maxgrpn') > 1L
}
if (i == 1L)
l[[i]] = rep.int(y, times = rep.int(x[i], n[i])) # i.e. rep(y, each=x[i])
else if (i == length(n))
l[[i]] = rep.int(y, times = nrow/(x[i]*n[i]))
else
l[[i]] = rep.int(rep.int(y, times = rep.int(x[i], n[i])), times = nrow/(x[i]*n[i]))
if (!is.null(attribs[[i]])){
attributes(l[[i]]) = attribs[[i]] # reset all attributes that were destroyed by rep.int
nrow = prod( vapply_1i(l, length) ) # lengths(l) will work from R 3.2.0
if (nrow > .Machine$integer.max) stop("Cross product of elements provided to CJ() would result in ",nrow," rows which exceeds .Machine$integer.max == ",.Machine$integer.max)
if (nrow>0L) for (i in seq_along(l)) {
y = l[[i]]
if (sorted) {
if (!is.atomic(y)) stop("'sorted' is TRUE but element ", i, " is non-atomic, which can't be sorted; try setting sorted = FALSE")
o = forderv(y, retGrp=TRUE)
thisdups = attr(o,'maxgrpn')>1L
if (thisdups) {
dups = TRUE
if (length(o)) l[[i]] = if (unique) y[o[attr(o,"starts")]] else y[o]
else if (unique) l[[i]] = y[attr(o,"starts")] # test 1525.5
} else {
if (length(o)) l[[i]] = y[o]
}
} else {
if (unique) l[[i]] = unique(y)
}
}
l = as.data.table.list(l)
l = alloc.col(l) # a tiny bit wasteful to over-allocate a fixed join table (column slots only), doing it anyway for consistency, and it's possible a user may wish to use SJ directly outside a join and would expect consistent over-allocation.
l = .Call(Ccj, l)
setDT(l)
l = alloc.col(l) # a tiny bit wasteful to over-allocate a fixed join table (column slots only), doing it anyway for consistency since
# it's possible a user may wish to use SJ directly outside a join and would expect consistent over-allocation
setnames(l, vnames)
if (sorted) {
if (!dups) setattr(l, 'sorted', names(l))
else setkey(l) # fix #1513
}
l
}

25 changes: 21 additions & 4 deletions inst/tests/tests.Rraw
Original file line number Diff line number Diff line change
Expand Up @@ -2805,11 +2805,26 @@ test(995, DT[CJ(c(5,3), c(5,1), sorted=FALSE)], OUT)

# CJ with ordered factor
xx <- factor(letters[1:2], ordered=TRUE)
yy <- sample(2)
options(datatable.CJ.names=FALSE)
test(996.1, CJ(xx, yy), setkey(data.table(rep(xx, each=2), rep(base::sort.int(yy), 2))))
yy <- sample(2L)
yy_sort = base::sort.int(yy)
old = options(datatable.CJ.names=FALSE)
test(996.1, CJ(xx, yy), setkey(data.table(rep(xx, each=2L), rep(yy_sort, 2L))))
test(996.2, CJ(a = xx, yy), setkey(data.table(a = rep(xx, each=2L), V2 = rep(yy_sort, 2L))))
options(datatable.CJ.names=TRUE)
test(996.2, CJ(xx, yy), setkey(data.table(xx = rep(xx, each=2), yy = rep(base::sort.int(yy), 2))))
test(996.3, CJ(xx, yy), setkey(data.table(xx = rep(xx, each=2L), yy = rep(yy_sort, 2L))))
options(old)

# #3597 -- CJ properly informs sorted can't apply to list input
test(996.4, CJ(list(1:2, 3L)), error = "non-atomic, which can't be sorted")
test(996.5, CJ(list(1:2, 3), 4:6, sorted = FALSE),
data.table(V1 = list(1:2, 1:2, 1:2, 3, 3, 3), V2 = rep(4:6, 2L)))
test(996.6, CJ(4:6, list(1:2, 3), sorted = FALSE),
data.table(V1 = rep(4:6, each = 2L), V2 = rep(list(1:2, 3), 3L)))
test(996.7, CJ(1:2, list(1:2, 3), 4:5, sorted = FALSE),
data.table(V1 = rep(1:2, each = 4L), V2 = rep(rep(list(1:2, 3), each = 2L), 2L), V3 = rep(4:5, 4L)))

test(996.8, CJ(expression(1)), error = "element 1 is non-atomic")
test(996.9, CJ(expression(2), 3, sorted = FALSE), error = "Type 'expression' not supported")

# That CJ orders NA consistently with setkey and historically, now it doesn't use setkey.
# NA must always come first in data.table throughout, since binary search relies on that internally.
Expand Down Expand Up @@ -7100,12 +7115,14 @@ test(1524, ans1, ans2)
# 'unique =' argument for CJ, #1148
x = c(1, 2, 1)
y = c(5, 8, 8, 4)
w = c(10, 12, 12, 13) # already sorted but has dups; more efficient case to cover
options(datatable.CJ.names=FALSE)
test(1525.1, CJ(x, y, unique=TRUE), CJ(V1=c(1,2), V2=c(4,5,8)))
test(1525.2, CJ(x, z=y, unique=TRUE), ans<-data.table(V1=rep(c(1,2), each=3), z=c(4,5,8), key="V1,z")) # naming of one but not both, too
options(datatable.CJ.names=TRUE)
test(1525.3, CJ(x, y, unique=TRUE), CJ( x=c(1,2), y=c(4,5,8)))
test(1525.4, CJ(x, z=y, unique=TRUE), setnames(ans,c("x","z")))
test(1525.5, CJ(x, w, unique=TRUE), data.table(x=(rep(c(1,2), each=3)), w=c(10,12,13), key="x,w"))

# `key` argument fix for `setDT` when input is already a `data.table`, #1169
DT <- data.table(A = 1:4, B = 5:8)
Expand Down
82 changes: 82 additions & 0 deletions src/cj.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#include "data.table.h"

SEXP cj(SEXP base_list) {
int ncol = LENGTH(base_list);
SEXP out = PROTECT(allocVector(VECSXP, ncol));
int nrow = 1;
// already confirmed to be less than .Machine$integer.max at R level
for (int j=0; j<ncol; ++j) nrow *= length(VECTOR_ELT(base_list, j));
int eachrep = 1;
for (int j=ncol-1; j>=0; --j) {
SEXP source = VECTOR_ELT(base_list, j), target;
SET_VECTOR_ELT(out, j, target=allocVector(TYPEOF(source), nrow));
copyMostAttrib(source, target); // includes levels of factors, integer64, custom classes, etc
if (nrow==0) continue; // one or more columns are empty so the result will be empty, #2511
int thislen = LENGTH(source);
int blocklen = thislen*eachrep;
int ncopy = nrow/blocklen;
switch(TYPEOF(source)) {
case LGLSXP:
case INTSXP: {
const int *restrict sourceP = INTEGER(source);
int *restrict targetP = INTEGER(target);
#pragma omp parallel for num_threads(getDTthreads())
// default static schedule so two threads won't write to same cache line in last column
// if they did write to same cache line (and will when last column's thislen is small) there's no correctness issue
for (int i=0; i<thislen; ++i) {
const int item = sourceP[i];
const int end = (i+1)*eachrep;
for (int j=i*eachrep; j<end; ++j) targetP[j] = item; // no div, mod or read ops inside loop; just rep a const contiguous write
}
#pragma omp parallel for num_threads(getDTthreads())
for (int i=1; i<ncopy; ++i) {
memcpy(targetP + i*blocklen, targetP, blocklen*sizeof(int));
}
} break;
case REALSXP: {
const double *restrict sourceP = REAL(source);
double *restrict targetP = REAL(target);
#pragma omp parallel for num_threads(getDTthreads())
for (int i=0; i<thislen; ++i) {
const double item = sourceP[i];
const int end=(i+1)*eachrep;
for (int j=i*eachrep; j<end; ++j) targetP[j] = item;
}
#pragma omp parallel for num_threads(getDTthreads())
for (int i=1; i<ncopy; ++i) {
memcpy(targetP + i*blocklen, targetP, blocklen*sizeof(double));
}
} break;
case STRSXP: {
const SEXP *sourceP = STRING_PTR(source);
int start = 0;
for (int i=0; i<ncopy; ++i) {
for (int j=0; j<thislen; ++j) {
const SEXP item = sourceP[j];
const int end = start+eachrep;
for (int k=start; k<end; ++k) SET_STRING_ELT(target, k, item); // no div, mod, or read-API call to STRING_ELT
start = end;
}
}
} break;
case VECSXP: {
const SEXP *sourceP = VECTOR_PTR(source);
int start = 0;
for (int i=0; i<ncopy; ++i) {
for (int j=0; j<thislen; ++j) {
const SEXP item = sourceP[j];
const int end = start+eachrep;
for (int k=start; k<end; ++k) SET_VECTOR_ELT(target, k, item);
start = end;
}
}
} break;
default:
error("Type '%s' not supported by CJ.", type2char(TYPEOF(source)));
}
eachrep *= thislen;
}
UNPROTECT(1);
return out;
}

3 changes: 3 additions & 0 deletions src/data.table.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ bool GetVerbose();
double NA_INT64_D;
long long NA_INT64_LL;

// cj.c
SEXP cj(SEXP base_list);

// dogroups.c
SEXP keepattr(SEXP to, SEXP from);
SEXP growVector(SEXP x, R_len_t newlen);
Expand Down
2 changes: 2 additions & 0 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ SEXP dllVersion();
SEXP nafillR();
SEXP colnamesInt();
SEXP initLastUpdated();
SEXP cj();

// .Externals
SEXP fastmean();
Expand Down Expand Up @@ -167,6 +168,7 @@ R_CallMethodDef callMethods[] = {
{"CcolnamesInt", (DL_FUNC) &colnamesInt, -1},
{"CcoerceFillR", (DL_FUNC) &coerceFillR, -1},
{"CinitLastUpdated", (DL_FUNC) &initLastUpdated, -1},
{"Ccj", (DL_FUNC) &cj, -1},
{NULL, NULL, 0}
};

Expand Down