Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
38f7cee
dev
jangorecki Jun 4, 2020
febf07c
devdev
jangorecki Jun 4, 2020
3480d32
unsorted works as well
jangorecki Jun 5, 2020
99589c5
testing and fixing
jangorecki Jun 5, 2020
94a0faa
devdevdev
jangorecki Jun 6, 2020
6df1415
cleanup
jangorecki Jun 6, 2020
c13113a
prepare for batching
jangorecki Jun 6, 2020
ffb521a
finally parallel
jangorecki Jun 6, 2020
518b36c
parallel
jangorecki Jun 6, 2020
fd03e04
cleanup and speedup
jangorecki Jun 6, 2020
71c348b
bucketing now uses binary search
jangorecki Jun 7, 2020
86a4bcc
rename vars
jangorecki Jun 7, 2020
36b6762
getting closer
jangorecki Jun 7, 2020
b1f3463
switch magic option
jangorecki Jun 7, 2020
cba274a
more robust, batching into new function and struct
jangorecki Jun 8, 2020
9b0d195
static funs and comments
jangorecki Jun 8, 2020
4c5d1ea
better verbose
jangorecki Jun 8, 2020
19d1df6
simpler batching, more strict type defs
jangorecki Jun 9, 2020
7efaa6e
last batch not balanced anymore
jangorecki Jun 9, 2020
6423c0b
batching balanced again
jangorecki Jun 9, 2020
045e9e8
remove extra checks in bmerge to smerge opt
jangorecki Jun 9, 2020
739a35a
fix test function
jangorecki Jun 10, 2020
47cfd8a
improve verbose msg
jangorecki Jun 10, 2020
2806017
cleanup dev mostly
jangorecki Jun 10, 2020
4e5f56d
comment about thread utilization
jangorecki Jun 11, 2020
405f55d
mult support
jangorecki Jun 12, 2020
9ee0a7a
avoid lens allocation and unsort for mult=first|last
jangorecki Jun 12, 2020
5816549
move R allocs
jangorecki Jun 12, 2020
d7c2b4b
avoid one more unsort
jangorecki Jun 12, 2020
49ea9da
mult already supported
jangorecki Jun 13, 2020
9f1ea21
mult already supported2
jangorecki Jun 13, 2020
103b9c6
better algo description
jangorecki Jun 13, 2020
2acaf23
multiple columns support
jangorecki Jun 16, 2020
e1643c2
Revert "multiple columns support"
jangorecki Jun 22, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions R/bmerge.R
Original file line number Diff line number Diff line change
@@ -1,6 +1,23 @@

bmerge = function(i, x, icols, xcols, roll, rollends, nomatch, mult, ops, verbose)
{
if (length(icols)==1L && length(xcols)==1L && is.integer(i[[icols]]) && is.integer(x[[xcols]]) ## single column integer
&& isTRUE(getOption("datatable.smerge")) ## enable option
&& identical(nomatch, NA_integer_) ## for now only outer join
&& identical(ops, 1L) ## equi join
&& identical(roll, 0) && identical(rollends, c(FALSE, TRUE)) ## non-rolling join
) {
getIdxGrp = function(x, cols) { ## get index only if retGrp=T
if (!isTRUE(getOption("datatable.use.index"))) return()
if (is.numeric(cols)) cols = names(x)[cols]
idx = attr(attr(x, "index", exact=TRUE), paste0("__", cols, collapse=""), exact=TRUE)
if (!is.null(attr(idx, "starts", exact=TRUE))) idx
}
if (verbose) {last.started.at=proc.time();cat("Starting smerge ...\n");flush.console()}
ans = smerge(x=i[[icols]], y=x[[xcols]], x.idx=getIdxGrp(i, icols), y.idx=getIdxGrp(x, xcols), mult=mult, out.bmerge=TRUE)
if (verbose) {cat("smerge done in",timetaken(last.started.at),"\n"); flush.console()}
return(ans)
}
callersi = i
i = shallow(i)
# Just before the call to bmerge() in [.data.table there is a shallow() copy of i to prevent coercions here
Expand Down Expand Up @@ -130,7 +147,7 @@ bmerge = function(i, x, icols, xcols, roll, rollends, nomatch, mult, ops, verbos
} else {
xo = NULL
if (isTRUE(getOption("datatable.use.index"))) {
xo = getindex(x, names(x)[xcols])
xo = c(getindex(x, names(x)[xcols])) ## c takes care of future #4386
if (verbose && !is.null(xo)) cat("on= matches existing index, using index\n")
}
if (is.null(xo)) {
Expand Down Expand Up @@ -180,9 +197,7 @@ bmerge = function(i, x, icols, xcols, roll, rollends, nomatch, mult, ops, verbos
if (verbose) {last.started.at=proc.time();cat("Starting bmerge ...\n");flush.console()}
ans = .Call(Cbmerge, i, x, as.integer(icols), as.integer(xcols), io, xo, roll, rollends, nomatch, mult, ops, nqgrp, nqmaxgrp)
if (verbose) {cat("bmerge done in",timetaken(last.started.at),"\n"); flush.console()}
# TO DO: xo could be moved inside Cbmerge

ans$xo = xo # for further use by [.data.table
return(ans)
}

1 change: 1 addition & 0 deletions R/data.table.R
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ replace_dot_alias = function(e) {
}
# TODO: when nomatch=NA, len__ need not be allocated / set at all for mult="first"/"last"?
# TODO: how about when nomatch=0L, can we avoid allocating then as well?
# if we take nomatch out from [b|s]merge then it should be easier to avoid allocation
}
if (length(xo) && length(irows)) {
irows = xo[irows] # TO DO: fsort here?
Expand Down
2 changes: 2 additions & 0 deletions R/wrappers.R
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ colnamesInt = function(x, cols, check_dups=FALSE) .Call(CcolnamesInt, x, cols, c
coerceFill = function(x) .Call(CcoerceFillR, x)

testMsg = function(status=0L, nx=2L, nk=2L) .Call(CtestMsgR, as.integer(status)[1L], as.integer(nx)[1L], as.integer(nk)[1L])

smerge = function(x, y, x.idx=NULL, y.idx=NULL, mult=c("all","first","last","error"), out.bmerge=FALSE) .Call(CsmergeR, x, y, x.idx, y.idx, match.arg(mult), out.bmerge)
237 changes: 237 additions & 0 deletions inst/tests/smerge.Rraw
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
require(methods)

if (exists("test.data.table", .GlobalEnv, inherits=FALSE)) {
if ((tt<-compiler::enableJIT(-1))>0)
cat("This is dev mode and JIT is enabled (level ", tt, ") so there will be a brief pause around the first test.\n", sep="")
} else {
require(data.table)
test = data.table:::test
smerge = data.table:::smerge
bmerge = data.table:::bmerge
forderv = data.table:::forderv
vecseq = data.table:::vecseq
}

bm = function(x, y, mult="all") {
stopifnot(is.integer(x), is.integer(y))
ans = bmerge(data.table(x=x), data.table(y=y), 1L, 1L, roll=0, rollends=c(FALSE, TRUE), nomatch=NA_integer_, mult=mult, ops=1L, verbose=FALSE)
## if undefining SMERGE_STATS then we have to ignore allLen1 as well
ans$nMatch = as.numeric(sum(!is.na(vecseq(ans$starts, ans$lens, NULL))))
ans
}
sm = function(x, y, mult="all") {
stopifnot(is.integer(x), is.integer(y))
ans = smerge(x, y, mult=mult, out.bmerge=TRUE)
## if undefining SMERGE_STATS then we have to ignore allLen1 as well
ans$nMatch = smerge(x, y, mult=mult, out.bmerge=FALSE)$nMatch
ans
}

#setDTthreads(2)
#options(datatable.verbose=TRUE)

# unique and sort
## x y sorted
x = c(1L,2L,3L,4L) # unq
y = c(2L,3L,5L) # unq
test(1.01, sm(x, y), bm(x, y))
x = c(1L,2L,3L,3L,4L)
y = c(2L,3L,5L) # unq
test(1.02, sm(x, y), bm(x, y))
x = c(1L,2L,3L,4L) # unq
y = c(2L,3L,3L,5L)
test(1.03, sm(x, y), bm(x, y))
x = c(1L,2L,3L,3L,4L)
y = c(2L,3L,3L,5L)
test(1.04, sm(x, y), bm(x, y))
## y unsorted
x = c(1L,2L,3L,4L) # unq
y = c(2L,5L,3L) # unq
test(2.01, sm(x, y), bm(x, y))
x = c(1L,2L,3L,3L,4L)
y = c(3L,2L,5L) # unq
test(2.02, sm(x, y), bm(x, y))
x = c(1L,2L,3L,4L) # unq
y = c(5L,3L,2L,3L)
test(2.03, sm(x, y), bm(x, y))
x = c(1L,2L,3L,3L,4L)
y = c(5L,3L,3L,2L)
test(2.04, sm(x, y), bm(x, y))
## x unsorted
x = c(2L,3L,1L,4L) # unq
y = c(2L,3L,5L) # unq
test(3.01, sm(x, y), bm(x, y))
x = c(1L,3L,2L,4L,3L)
y = c(2L,3L,5L) # unq
test(3.02, sm(x, y), bm(x, y))
x = c(4L,2L,3L,1L)
y = c(2L,3L,3L,5L) # unq
test(3.03, sm(x, y), bm(x, y))
x = c(1L,2L,4L,3L,3L)
y = c(2L,3L,3L,5L)
test(3.04, sm(x, y), bm(x, y))
## xy unsorted
x = c(4L,1L,3L,2L) # unq
y = c(2L,5L,3L)
test(4.01, sm(x, y), bm(x, y))
x = c(1L,3L,2L,4L,3L)
y = c(5L,3L,2L) # unq
test(4.02, sm(x, y), bm(x, y))
x = c(4L,2L,3L,1L)
y = c(3L,3L,2L,5L) # unq
test(4.03, sm(x, y), bm(x, y))
x = c(1L,2L,4L,3L,3L)
y = c(5L,2L,3L,3L)
test(4.04, sm(x, y), bm(x, y))

# ties
x = c(1L,2L,3L,4L,5L)
y = c(2L,4L) # within
test(5.01, sm(x, y), bm(x, y))
x = c(1L,2L,3L,4L,5L)
y = c(-1L,2L,4L) # left tie
test(5.02, sm(x, y), bm(x, y))
x = c(1L,2L,3L,4L,5L)
y = c(2L,4L,7L) # right tie
test(5.03, sm(x, y), bm(x, y))
x = c(1L,2L,3L,4L,5L)
y = c(-1L,2L,4L,6L) # both ties
test(5.04, sm(x, y), bm(x, y))

# nomatch
x = c(1L,3L,5L)
y = c(2L,4L) # within nomatch
test(6.01, sm(x, y), bm(x, y))
x = c(1L,2L,3L,4L,5L)
y = c(-1L,6L) # ties nomatch
test(6.02, sm(x, y), bm(x, y))
x = c(1L,2L,2L,2L,5L)
y = c(2L,4L) # x duplicates single match
test(6.03, sm(x, y), bm(x, y))
x = c(1L,2L,2L,2L,3L,3L,4L,5L)
y = c(2L,4L) # x duplicates multi, single match
test(6.04, sm(x, y), bm(x, y))
x = c(1L,2L,2L,2L,3L,4L,4L,5L)
y = c(2L,4L) # x duplicates multi, multi match
test(6.05, sm(x, y), bm(x, y))
x = c(1L,2L,2L,2L,5L)
y = c(-1L,6L) # x duplicates nomatch
test(6.06, sm(x, y), bm(x, y))

# skew
N = 2e3L
x = seq_len(N)
y = c(head(x), tail(x))
test(7.01, sm(x, y), bm(x, y))
y = c(1:6, 750L, 1250L, 1995:2000)
test(7.02, sm(x, y), bm(x, y))

# custom cases
x=c(39L, 41L, 41L, 37L, 86L, 93L, 20L, 34L, 38L, 21L, 79L, 84L,
2L, 80L, 51L, 58L, 66L, 33L, 32L, 22L, 24L, 4L, 67L, 59L, 89L,
1L, 44L, 62L, 34L, 18L, 93L, 67L, 22L, 42L, 8L, 72L, 45L, 87L,
41L, 85L, 30L, 61L, 5L, 45L, 48L, 41L, 57L, 63L, 68L, 96L, 72L,
62L, 14L, 84L, 57L, 43L, 6L, 49L, 33L, 68L, 2L, 18L, 69L, 41L,
2L, 52L, 69L, 94L, 56L, 72L, 13L, 50L, 86L, 81L, 8L, 28L, 96L,
28L, 87L, 28L, 1L, 27L, 60L, 61L, 99L, 19L, 39L, 99L, 67L, 70L,
53L, 86L, 64L, 49L, 99L, 91L, 36L, 7L, 57L, 63L)
y=c(44L, 50L, 47L, 26L, 44L, 11L, 18L, 60L, 9L, 96L, 25L, 59L,
53L, 82L, 4L, 41L, 65L, 30L, 29L, 34L, 29L, 23L, 12L, 40L, 76L,
40L, 30L, 29L, 98L, 2L, 57L, 13L, 44L, 68L, 72L, 82L, 19L, 88L,
19L, 95L, 22L, 46L, 43L, 36L, 67L, 96L, 34L, 6L, 16L, 20L, 86L,
65L, 89L, 78L, 36L, 95L, 19L, 67L, 65L, 99L, 59L, 77L, 16L, 50L,
99L, 98L, 72L, 26L, 35L, 46L, 52L, 55L, 56L, 1L, 91L, 21L, 52L,
69L, 7L, 87L, 97L, 97L, 71L, 48L, 6L, 35L, 62L, 26L, 44L, 36L,
50L, 75L, 100L, 63L, 39L, 3L, 94L, 85L, 99L, 61L)
test(8.01, sm(x, y), bm(x, y))

# scale up
ssa = function(unq_n, size, sort=FALSE) {
if (unq_n > size) return(sample.int(unq_n, size))
unq_sub = seq_len(unq_n)
ans = sample(c(unq_sub, sample(unq_sub, size=max(size-unq_n, 0), replace=TRUE)))
if (sort) sort(ans) else ans
}
set.seed(108)
N = 1e4L
## xy sorted
x = ssa(N, N, sort=TRUE) # unq
y = ssa(N, N, sort=TRUE) # unq
test(11.01, sm(x, y), bm(x, y))
x = ssa(N, N*1.1, sort=TRUE)
y = ssa(N, N, sort=TRUE) # unq
test(11.02, sm(x, y), bm(x, y))
x = ssa(N, N, sort=TRUE) # unq
y = ssa(N, N*1.1, sort=TRUE)
test(11.03, sm(x, y), bm(x, y))
x = ssa(N, N*1.1, sort=TRUE)
y = ssa(N, N*1.1, sort=TRUE)
test(11.04, sm(x, y), bm(x, y))
## y unsorted
x = ssa(N, N, sort=TRUE) # unq
y = ssa(N, N) # unq
test(12.01, sm(x, y), bm(x, y))
x = ssa(N, N*1.1, sort=TRUE)
y = ssa(N, N) # unq
test(12.02, sm(x, y), bm(x, y))
x = ssa(N, N, sort=TRUE) # unq
y = ssa(N, N*1.1)
test(12.03, sm(x, y), bm(x, y))
x = ssa(N, N*1.1, sort=TRUE)
y = ssa(N, N*1.1)
test(12.04, sm(x, y), bm(x, y))
## x unsorted
x = ssa(N, N) # unq
y = ssa(N, N, sort=TRUE) # unq
test(13.01, sm(x, y), bm(x, y))
x = ssa(N, N*1.1)
y = ssa(N, N, sort=TRUE) # unq
test(13.02, sm(x, y), bm(x, y))
x = ssa(N, N) # unq
y = ssa(N, N*1.1, sort=TRUE)
test(13.03, sm(x, y), bm(x, y))
x = ssa(N, N*1.1)
y = ssa(N, N*1.1, sort=TRUE)
test(13.04, sm(x, y), bm(x, y))
## xy unsorted
x = ssa(N, N) # unq
y = ssa(N, N) # unq
test(14.01, sm(x, y), bm(x, y))
x = ssa(N, N*1.1)
y = ssa(N, N) # unq
test(14.02, sm(x, y), bm(x, y))
x = ssa(N, N) # unq
y = ssa(N, N*1.1)
test(14.03, sm(x, y), bm(x, y))
x = ssa(N, N*1.1)
y = ssa(N, N*1.1)
test(14.04, sm(x, y), bm(x, y))

# sparse
x = sample.int(2e2L, 1e2L)
y = sample.int(2e2L, 1e2L)
test(21.01, sm(x, y), bm(x, y))
x = sample.int(2e2L, 1e2L, TRUE)
y = sample.int(2e2L, 1e2L, TRUE)
test(21.02, sm(x, y), bm(x, y))

# [.data.table join
d1 = data.table(x=sample.int(2e2L, 1e2L, TRUE), v1=seq_along(x))
d2 = data.table(y=sample.int(2e2L, 1e2L, TRUE), v2=seq_along(y))
options(datatable.smerge=FALSE, datatable.verbose=TRUE) ## verbose=2L after #4491
test(101.01, expected <- d1[d2, on="x==y"], output="bmerge", notOutput="smerge")
options(datatable.smerge=TRUE)
test(101.02, d1[d2, on="x==y"], expected, output="smerge") ## for now extra computation of bmerge is still done so no: #, notOutput="bmerge")
setindexv2 = function(x, cols) { ## pretend we are after #4386
stopifnot(is.data.table(x), is.character(cols))
if (is.null(attr(x, "index", TRUE))) setattr(x, "index", integer())
setattr(attr(x, "index", TRUE), paste0("__", cols, collapse="__"), forderv(x, cols, retGrp=TRUE))
invisible(x)
}
options(datatable.verbose=FALSE)
setindexv2(d1, "x"); setindexv2(d2, "y")
options(datatable.use.index=TRUE, datatable.verbose=TRUE)
test(101.03, d1[d2, on="x==y"], expected, output="smerge.*already indexed")
options(datatable.use.index=FALSE)
test(101.04, d1[d2, on="x==y"], expected, output="smerge", notOutput="already indexed")
options(datatable.use.index=TRUE)
6 changes: 4 additions & 2 deletions src/bmerge.c
Original file line number Diff line number Diff line change
Expand Up @@ -175,18 +175,20 @@ SEXP bmerge(SEXP iArg, SEXP xArg, SEXP icolsArg, SEXP xcolsArg, SEXP isorted, SE
memcpy(INTEGER(retLengthArg), retLength, sizeof(int)*ctr);
memcpy(INTEGER(retIndexArg), retIndex, sizeof(int)*ctr);
}
SEXP ans = PROTECT(allocVector(VECSXP, 5)); protecti++;
SEXP ansnames = PROTECT(allocVector(STRSXP, 5)); protecti++;
SEXP ans = PROTECT(allocVector(VECSXP, 6)); protecti++;
SEXP ansnames = PROTECT(allocVector(STRSXP, 6)); protecti++;
SET_VECTOR_ELT(ans, 0, retFirstArg);
SET_VECTOR_ELT(ans, 1, retLengthArg);
SET_VECTOR_ELT(ans, 2, retIndexArg);
SET_VECTOR_ELT(ans, 3, allLen1Arg);
SET_VECTOR_ELT(ans, 4, allGrp1Arg);
SET_VECTOR_ELT(ans, 5, xoArg);
SET_STRING_ELT(ansnames, 0, char_starts); // changed from mkChar to char_ to pass the grep in CRAN_Release.cmd
SET_STRING_ELT(ansnames, 1, char_lens);
SET_STRING_ELT(ansnames, 2, char_indices);
SET_STRING_ELT(ansnames, 3, char_allLen1);
SET_STRING_ELT(ansnames, 4, char_allGrp1);
SET_STRING_ELT(ansnames, 5, char_xo);
setAttrib(ans, R_NamesSymbol, ansnames);
if (nqmaxgrp > 1 && mult == ALL) {
Free(retFirst);
Expand Down
8 changes: 8 additions & 0 deletions src/data.table.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ extern SEXP char_lens;
extern SEXP char_indices;
extern SEXP char_allLen1;
extern SEXP char_allGrp1;
extern SEXP char_xo;
extern SEXP char_io;
extern SEXP char_lhsLen1;
extern SEXP char_xyLen1;
extern SEXP char_nMatch;
extern SEXP char_factor;
extern SEXP char_ordered;
extern SEXP char_datatable;
Expand Down Expand Up @@ -243,3 +248,6 @@ SEXP testMsgR(SEXP status, SEXP x, SEXP k);
//fifelse.c
SEXP fifelseR(SEXP l, SEXP a, SEXP b, SEXP na);
SEXP fcaseR(SEXP na, SEXP rho, SEXP args);

// smjoin.c
SEXP smergeR(SEXP x, SEXP y, SEXP x_idx, SEXP y_idx, SEXP multArg, SEXP out_bmerge);
12 changes: 12 additions & 0 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ SEXP char_lens;
SEXP char_indices;
SEXP char_allLen1;
SEXP char_allGrp1;
SEXP char_xo;
SEXP char_io;
SEXP char_lhsLen1;
SEXP char_xyLen1;
SEXP char_nMatch;
SEXP char_factor;
SEXP char_ordered;
SEXP char_datatable;
Expand Down Expand Up @@ -119,6 +124,7 @@ SEXP lock();
SEXP unlock();
SEXP islockedR();
SEXP allNAR();
SEXP smergeR();

// .Externals
SEXP fastmean();
Expand Down Expand Up @@ -211,6 +217,7 @@ R_CallMethodDef callMethods[] = {
{"CfrollapplyR", (DL_FUNC) &frollapplyR, -1},
{"CtestMsgR", (DL_FUNC) &testMsgR, -1},
{"C_allNAR", (DL_FUNC) &allNAR, -1},
{"CsmergeR", (DL_FUNC) &smergeR, -1},
{NULL, NULL, 0}
};

Expand Down Expand Up @@ -317,6 +324,11 @@ void attribute_visible R_init_datatable(DllInfo *info)
char_indices = PRINTNAME(install("indices"));
char_allLen1 = PRINTNAME(install("allLen1"));
char_allGrp1 = PRINTNAME(install("allGrp1"));
char_xo = PRINTNAME(install("xo"));
char_io = PRINTNAME(install("io"));
char_lhsLen1 = PRINTNAME(install("lhsLen1"));
char_xyLen1 = PRINTNAME(install("xyLen1"));
char_nMatch = PRINTNAME(install("nMatch"));
char_factor = PRINTNAME(install("factor"));
char_ordered = PRINTNAME(install("ordered"));
char_datatable = PRINTNAME(install("data.table"));
Expand Down
Loading