Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parallel processing #611

Closed
jeffreyhanson opened this issue Jan 10, 2018 · 52 comments
Closed

parallel processing #611

jeffreyhanson opened this issue Jan 10, 2018 · 52 comments

Comments

@jeffreyhanson
Copy link
Contributor

I've been working with some large-ish data sets and I was thinking it would be great if the sf R package could use parallel processing. I noticed that there was an openmp branch, but I can't tell if much progress has been made on it, and I was wondering if anyone was working on this? If not, I have a little experience with Rcpp and OpenMP and I'm keen to take a shot at implementing this.

From what I can tell, for the functions that utilise geos, the trick seems to be creating a copy of the handler object (GEOSContextHandle_t) for each thread. For instance, implementing something like this:

// [[Rcpp::export]]
Rcpp::List CPL_geos_op(std::string op, Rcpp::List sfc, 
    Rcpp::NumericVector bufferDist, int nQuadSegs = 30,
    double dTolerance = 0.0, bool preserveTopology = false,
    int bOnlyEdges = 1, double dfMaxLength = 0.0, std::size_t threads = 1) {

  int dim = 2;
  std::vector<GEOSContextHandle_t> hGEOSCtxt(threads);
  for (std::size_t i = 0; i < threads; ++i)
    hGEOSCtxt[i] = CPL_geos_init();

  std::vector<GEOSGeom> g = geometries_from_sfc(hGEOSCtxt[0], sfc, &dim);
  std::vector<GEOSGeom> out(sfc.length());

  if (op == "buffer") {
    if (bufferDist.size() != (int) g.size())
      Rcpp::stop("invalid dist argument"); // #nocov
    #pragma omp parallel for num_threads(threads)
    for (size_t i = 0; i < g.size(); i++)
      out[i] = chkNULL(GEOSBuffer_r(hGEOSCtxt[omp_get_thread_num()], g[i], bufferDist[i], nQuadSegs));
  } else if (op == "boundary") {
    #pragma omp parallel for num_threads(threads)
    for (size_t i = 0; i < g.size(); i++)
      out[i] = chkNULL(GEOSBoundary_r(hGEOSCtxt[omp_get_thread_num()], g[i]));
  } else if (op == "convex_hull") {
    #pragma omp parallel for num_threads(threads)
    for (size_t i = 0; i < g.size(); i++)
      out[i] = chkNULL(GEOSConvexHull_r(hGEOSCtxt[omp_get_thread_num()], g[i]));
//  } else if (op == "unary_union") { // -> done by CPL_geos_union()
//    for (size_t i = 0; i < g.size(); i++)
//      out[i] = chkNULL(GEOSUnaryUnion_r(hGEOSCtxt[omp_get_thread_num()], g[i]));
  } else if (op == "simplify") {
    if (preserveTopology) {
      #pragma omp parallel for num_threads(threads)
      for (size_t i = 0; i < g.size(); i++)
        out[i] = GEOSTopologyPreserveSimplify_r(hGEOSCtxt[omp_get_thread_num()], g[i], dTolerance);
    } else {
      #pragma omp parallel for num_threads(threads)
      for (size_t i = 0; i < g.size(); i++)
        out[i] = chkNULL(GEOSSimplify_r(hGEOSCtxt[omp_get_thread_num()], g[i], dTolerance));
    }
  } else if (op == "linemerge") {
    #pragma omp parallel for num_threads(threads)
    for (size_t i = 0; i < g.size(); i++)
      out[i] = chkNULL(GEOSLineMerge_r(hGEOSCtxt[omp_get_thread_num()], g[i]));
  } else if (op == "polygonize") {
    #pragma omp parallel for num_threads(threads)
    for (size_t i = 0; i < g.size(); i++)
      out[i] = chkNULL(GEOSPolygonize_r(hGEOSCtxt[omp_get_thread_num()], &(g[i]), 1));
  } else if (op == "centroid") {
    #pragma omp parallel for num_threads(threads)
    for (size_t i = 0; i < g.size(); i++) {
      out[i] = chkNULL(GEOSGetCentroid_r(hGEOSCtxt[omp_get_thread_num()], g[i]));
    }
  } else if (op == "node") {
    #pragma omp parallel for num_threads(threads)
    for (size_t i = 0; i < g.size(); i++)
      out[i] = chkNULL(GEOSNode_r(hGEOSCtxt[omp_get_thread_num()], g[i]));
  } else if (op == "point_on_surface") {
    #pragma omp parallel for num_threads(threads)
    for (size_t i = 0; i < g.size(); i++)
      out[i] = chkNULL(GEOSPointOnSurface_r(hGEOSCtxt[omp_get_thread_num()], g[i]));
  } else
#if GEOS_VERSION_MAJOR >= 3 && GEOS_VERSION_MINOR >= 4
  if (op == "triangulate") {
    #pragma omp parallel for num_threads(threads)
    for (size_t i = 0; i < g.size(); i++)
      out[i] = chkNULL(GEOSDelaunayTriangulation_r(hGEOSCtxt[0], g[i], dTolerance, bOnlyEdges));
  } else
#endif
    Rcpp::stop("invalid operation"); // would leak g and out // #nocov

  for (size_t i = 0; i < g.size(); i++)
    GEOSGeom_destroy_r(hGEOSCtxt[0], g[i]); // discard    

  Rcpp::List ret(sfc_from_geometry(hGEOSCtxt[0], out, dim)); // destroys out

  for (std::size_t i = 0; i < threads; ++i)
    CPL_geos_finish(hGEOSCtxt[i]);

  ret.attr("precision") = sfc.attr("precision");
  ret.attr("crs") = sfc.attr("crs");
  return ret;
}

If you think this would be a useful contribution, would you prefer me to submit a series of small pull requests (e.g. one pull request per function?) or would you prefer me to submit one big pull request with everything?

@edzer
Copy link
Member

edzer commented Jan 10, 2018

Brilliant! One big pull request should be fine, after all it's happening pretty much in one single file.

@edzer
Copy link
Member

edzer commented Jan 10, 2018

One worry (as always) is support for this by all CRAN platforms; see here -- we might want to look at how other packages do this, so that a single code base compiles on all CRAN platforms.

@rsbivand
Copy link
Member

OpenMP is not a good solution. OSX does not seem to support this in the standard build train. Nor do Windows builds.

As I see it, GEOS unary predicates and operations may benefit directly (top-down split of geometry column and lapply the function) if a parallel cluster is available. There is a start-up cost for clusters, be they PSOCK (Windows) or FORK (unix). FORK will be embarassingly parallelisable. On M$ Open R, all of these may run foul of MKL (but since GEOS doesn't link to BLAS, this won't bite in most circumstances).

I'll see if I can write a use case - experience with spdep uses package-specific options in an environment in the namespace to hold the number of cores and a parallel cluster, and simply cplits the input data between cores and catenates the results.

@edzer
Copy link
Member

edzer commented Jan 11, 2018

Solution to what?

SO and WRE suggest that windows CRAN builds should now support OpenMP, and that osx should be possible but isn't done by CRAN right now. From the perspective of code & maintenance, I prefer the openMP way rather than doing things at the R level. We shouldn't forget that people with serious computational problems may as well prefer using serious operating systems.

Is the threads = n arg to all functions a good user interface, possibly with package-wide default?

@tim-salabim
Copy link
Member

OpenMP is way out of my league, but there is a current issue thread related to OpenMP in fst package that may be of interest fstpackage/fst#109

@jeffreyhanson
Copy link
Contributor Author

I'll defer to others with more experience in writing and maintaining R packages about whether OpenMP is a viable solution.

With regards to the user interface for setting thread usage, perhaps sf could supply a function like set_number_of_threads(5) to set the default number of threads for processing and the st_* functions could have their defaults set using a function that accesses this? For instance, something like st_is_simple(x, threads = get_number_of_threads())?

@rsbivand
Copy link
Member

The underlying problem is that, as WRE details (painfully), there are no general, portable solutions at the C/C++ level, and they are not coming at all, ever, because systems change over time. Feel free to follow OMP, but for embarrassingly paralleiizable problems, the parallel package works cross-platform, and will only need a wrapper around calls out to split the input list into nthreads parts and mclapply() on OSX and Linux. This was why the parallel package was written. I'll look later at this. If OMP works even in some cases, that's helpful for some, but can also be handled in a wrapper. Still need to be able to set nthreads, though.

@rsbivand
Copy link
Member

rsbivand commented Jan 11, 2018

The underlying problem is that, as WRE details (painfully), there are no general, portable solutions at the C/C++ level, and they are not coming at all, ever, because systems change over time. Feel free to follow OMP, but for embarrassingly paralleiizable problems, the parallel package works cross-platform, and will only need a wrapper around calls out to split the input list into nthreads parts and mclapply() on OSX and Linux. This was why the parallel package was written. I'll look later at this. If OMP works even in some cases, that's helpful for some, but can also be handled in a wrapper. Still need to be able to set nthreads, though. And see this R-devel thread.

@edzer
Copy link
Member

edzer commented Jan 11, 2018

A compromise might be to have OMP off by default, and on only when installed with --with-openmp; this would allow more people to collect experience.

@kendonB
Copy link
Contributor

kendonB commented Jan 11, 2018

Would RcppParallel work for this case?

@jeffreyhanson
Copy link
Contributor Author

Awesome idea @kendonB!

It looks like RcppParallel ships with its own copies of parallelization libraries (TinyThread and Intel TBB), so this could potentially solve the problems with OpenMP not being available to specific compilers and the fact that different compilers need different flags to enable OpenMP.

From what I can tell though, it looks like the convenience functions in RcppParallel (e.g. parallelFor) abstract away thread control. This could be an issue because we need to create and utilize separate GEOSContextHandle_t handlers for each thread when using GEOS (AFAIK).

RcppParallel also exposes functions from the Intel TBB library (see http://rcppcore.github.io/RcppParallel/tbb.html). So one strategy could be to add RcppParallel to the package's DESCRIPTION (http://rcppcore.github.io/RcppParallel/index.html#r_packages) to standardise the installation/availability of the Intel TBB library, and we could use the Intel TBB functions in sf's the c++ source files to handle the parallel processing. It's worth noting though that Solaris doesn't support Intel TBB so we would still need ensure that the package can run without Intel TBB functions.

I'll see if I can modify the OpenMP PR (GH-613) to use Intel TBB and report back.

@jeffreyhanson
Copy link
Contributor Author

Ok, I've had a shot an implementing Intel TBB (GH-615). It seems to be easy enough to implement and appears to work on Windows, Linux, Mac OSX, and Solaris x86, so it might address most of the limitations associated with OpenMP?

@MarcusKlik
Copy link

Just my 2 cents from working with OpenMP in the fst package:

  • OpenMP is great for adapting existing serial code in a step-wise fashion, because it only requires adding some compiler pragma's to your code (which are ignored on compilers that don't support them). Therefore, it's easier to adapt existing code with OpenMP than with alternatives.
  • CRAN has OpenMP enabled compilers, also for the OSX binary build. So the average user won't have any problems on Windows, Linux and Mac if you use OpenMP.
  • You can't use the latest OpenMP features, but the major pragma's are there (CRAN doesn't use the most recent OpenMP version).
  • You can learn a lot from the OpenMP issues in data.table, they use OpenMP in fwrite for example and also have solutions to problems like having to switch back to a single thread on forking...
  • Mac users that install from source (your dev version) will have to make sure they have a OpenMP enbled Clang compiler installed. The instructions for installing that are on the data.table repository
  • Enabling OpenMP for your package is relatively easy, you just add SHLIB_OPENMP_CXXFLAGS to your Makevars file.

hope that helps, best and good luck with your efforts!

@jeffreyhanson
Copy link
Contributor Author

I'm still keen to start implementing parallelized versions of some of the more computationally demanding functions, so I just thought I'd ask if the package development team have decided on using the parallel R package, OpenMP, or TBB (or another package/library) for parallel processing?

@edzer
Copy link
Member

edzer commented Jan 14, 2018

after my initial experiments under #615 I put it aside for a moment. I planned to compare to parallel::parLapply, because I think this experiment should have sped up. I'd like to sort that out first.

I have no strong opinions over OMP vs TBB; the first seems to be wider used altogether, the second more portable in R / CRAN. I'd be happy to see positive results, first, from something.

@jeffreyhanson
Copy link
Contributor Author

Ok, yeah, I'll try implementing st_union since I expect that to show a speed up, and I'll put together a quick benchmark.

@rsbivand
Copy link
Member

rsbivand commented Jan 15, 2018

Several practical points, given that I have commit rights to master:

I've created a local git branch, but do not want to push it now. So I need help with the Byzantine roxygenize() or devtools::document() hell to be able to build DESCRIPTION and NAMESPACE so that sf can be installed. Should I use roxygenize() - got the roxygen_devtest bug, or devtools::document(); which versions of these?

@rsbivand
Copy link
Member

rsbivand commented Jan 15, 2018

Probably a missing #' @include crs.R in R/bbox.R, feel free to add in master ...

@edzer
Copy link
Member

edzer commented Jan 15, 2018

I use Rcpp:::compileAttributes() and devtools::document() before install, build or check.

@rsbivand
Copy link
Member

OK, a missing include declaration in R/bbox.R then. The NA bbox can't find NA_crs. Don't need Rcpp, only interpreted code.

What might happen if i push a branch?

@edzer
Copy link
Member

edzer commented Jan 15, 2018

It will tell you that the branch doesn't have an upstream (= on GH), and tell how to create it. Then you follow that instruction, push again and then it's in a branch on GH.

@rsbivand
Copy link
Member

Should I do that?

@edzer
Copy link
Member

edzer commented Jan 15, 2018

How else do you want to share a branch?

@rsbivand
Copy link
Member

that is, what if I overwrite everything?

@edzer
Copy link
Member

edzer commented Jan 15, 2018

Then you press Ctrl-Z.

@edzer
Copy link
Member

edzer commented Jan 15, 2018

Unlike svn, for git, every local copy has the full history.

@rsbivand
Copy link
Member

rsbivand commented Jan 15, 2018

I'll wait until later, am getting errors on nodes now, in the implementation, have to find out how parallel plays with Rcpp, and whether I need to load anything on the nodes.

@rsbivand
Copy link
Member

rsbivand commented Jan 15, 2018

OK. Story so far OK, working with parallel_cl branch:

library(sf)
library(parallel)
library(maptools)
data(wrld_simpl)
sf_wrld <- st_as_sfc(wrld_simpl)
plot(st_geometry(sf_wrld))
system.time(xxx <- st_union(sf_wrld))
#   user  system elapsed 
#  5.862   0.011   5.895 
plot(st_geometry(xxx))
set_cores_option(detectCores(logical=FALSE))
set_quiet_option(FALSE)
system.time(xxx1 <- st_union(sf_wrld))
# parallel: 2 cores
# parallel: mclapply
#   user  system elapsed 
#  1.388   0.016   2.180 
plot(st_geometry(xxx1))
set_mc_option(FALSE)
cl <- makeCluster(get_cores_option())
set_cluster_option(cl)
system.time(xxx2 <- st_union(sf_wrld))
# parallel: 2 cores
# parallel: parLapply
#   user  system elapsed 
#  0.021   0.003   2.504 
plot(st_geometry(xxx2))
stopCluster(cl)

but as the plots (might) show, st_union() was a bad example, as there are country boundaries between countries in the split data set (two cores for me) which touch, but which don't get unioned. So the problems that are actually EP (embarrassingly parallel) are mostly trivial, problems that affect only one sfg at a time like simple, valid, etc. Is there a non-trivial GEOS predicate or operation which would actually benefit from parallelisation?

Problem pushing branch:

$ git push --set-upstream origin parallel-cl
Username for 'https://github.com': rsbivand
Password for 'https://rsbivand@github.com': 
remote: Permission to r-spatial/sf.git denied to rsbivand.
fatal: unable to access 'https://github.com/r-spatial/sf.git/': The requested URL returned error: 403

I can push to r-spatial/spdep.git - is there a permissions issue?

@edzer
Copy link
Member

edzer commented Jan 15, 2018

Ah, you're not an owner of the sf repo. I can make you an owner; an alternative is to fork sf, push to your own fork and then make a PR. PRs are, from git's (and my) perspective nothing else but branches. Less risky if you feel uncomfortable with branches on the main repo: you can only mess up your own work.

@edzer
Copy link
Member

edzer commented Jan 15, 2018

st_union(x, by_feature = TRUE) is EP. All binary ops (logical binary predicates, intersection/difference) are not EP, but could be sped up, I believe, by parallel. Need to be careful with the index: build it for all and copy, or build it on the subset of the thread? YMMV.

@rsbivand
Copy link
Member

I have a very old fork from sfr, but made the local branch in the r-spatial/sf master, not my forked master. I've no idea how I might copy that across - recursive copying would probably copy the .git files too, wouldn't it? My old git sf repo has edzer/sfr.gitas upstream.

@edzer
Copy link
Member

edzer commented Jan 15, 2018

Throw old repositories away. Fork on GH; git clone to local; go to your working dir, remove the upstream and set the new upstream to your fork.

@rsbivand
Copy link
Member

rsbivand commented Jan 15, 2018

How to throw away old repos? On github? GH says a have a fork already, and will not let go.

@edzer
Copy link
Member

edzer commented Jan 15, 2018

Everywhere.

@rsbivand
Copy link
Member

I can remove local copies, but not on GH.

@edzer
Copy link
Member

edzer commented Jan 15, 2018

repo -> settings -> danger zone.

@rsbivand
Copy link
Member

Done on GH, and forked again. But locally I have an sf/ repo with the parallel_cl branch, and need to 1. clone rsbivand/sf, 2. copy the branch from the existing clone from r-spatial/sf to the clone of my fork. Can I simply change the name of the directory containing the existing clone from r-spatial/sf? Sorry fot the baby-steps, but git is strongly counter-intuitive.

@edzer
Copy link
Member

edzer commented Jan 15, 2018

no, that won't work. maybe easiest to diff -r and copy over the changed files to rsbivand/sf.

@rsbivand
Copy link
Member

rsbivand commented Jan 15, 2018

PR #616 is from changing the origin from my sf from r-spatial/sf to rsbivand/sf and pushing the parallel-cl branch there. I think it's not going anywhere unless we can see how to actually use parallel - maybe binary predicates and operators are a better bet than unary, where all of the features are needed anyway. I may continue trying with binary st_union, because x doesn't interact with x, only y.

@rsbivand
Copy link
Member

rsbivand commented Jan 15, 2018

For the record, I redirected the remote fro r-spatial/sf.git to rsbivand/sf.git:

 1075  git remote remove origin
 1077  git remote add origin https://github.com/rsbivand/sf.git
 1080  git checkout master
 1082  git branch --set-upstream-to=origin/master master
 1083  git pull
 1085  git checkout parallel-cl
 1088  git push --set-upstream origin parallel-cl

@rsbivand rsbivand mentioned this issue Jan 16, 2018
@rsbivand
Copy link
Member

rsbivand commented Jan 16, 2018

Report on status at last push. Output on medium sized objects for binary union (probably idiocies present, as two moderate sized objects give enormous output also for standard binary union):

library(sf)
# Linking to GEOS 3.6.2, GDAL 2.2.2, proj.4 4.9.3
library(parallel)
library(maptools)
# Loading required package: sp
# Checking rgeos availability: TRUE
data(wrld_simpl)
sf_wrld <- st_as_sfc(wrld_simpl)
sp_hex <- HexPoints2SpatialPolygons(spsample(wrld_simpl, n=400, type="hexagonal"))
sf_hex <- st_as_sfc(sp_hex)
system.time(xxx <- st_union(sf_hex, sf_wrld))
# although coordinates are longitude/latitude, st_union assumes that they are planar
#   user  system elapsed 
# 12.736   0.131  12.892 
object.size(xxx)
# 518383144 bytes
length(xxx)
# [1] 80442
set_cores_option(detectCores(logical=FALSE))
get_cores_option()
# [1] 4
set_quiet_option(FALSE)
system.time(xxx1 <- st_union(sf_hex, sf_wrld))
# parallel: 4 cores
# although coordinates are longitude/latitude, st_union assumes that they are planar
# although coordinates are longitude/latitude, st_union assumes that they are planar
# although coordinates are longitude/latitude, st_union assumes that they are planar
# although coordinates are longitude/latitude, st_union assumes that they are planar
# parallel: mclapply
#   user  system elapsed 
# 14.954   1.038  14.934 
attr(xxx1, "timings")
#            user.self elapsed
# index_split     0.002   0.002
# run_par         1.430   4.733
# catenate        9.950  10.198
object.size(xxx1)
# 517419744 bytes
length(xxx1)
# [1] 80442
set_mc_option(FALSE)
cl <- makeCluster(get_cores_option())
set_cluster_option(cl)
system.time(xxx2 <- st_union(sf_hex, sf_wrld))
# parallel: 4 cores
# parallel: parLapply
#    user  system elapsed 
#  8.498   0.208  12.640 
attr(xxx2, "timings")
#             user.self elapsed
# index_split     0.004   0.004
# run_par         0.856   4.869
# catenate        7.636   7.765
object.size(xxx2)
# 517419744 bytes
length(xxx2)
# [1] 80442
stopCluster(cl)

So it will be hard to know which combinations of objects (here 4x100 hex, 100 on each core, and wrld_simpl (n=246 but lots of islands)) will benefit from parallelising the actual binary union, as the catenation of the output is much more costly if there are many parts. The catenation lines, for res as a list of length the number of cores used, are:

out <- st_sfc(do.call("c", res))

I guess that binary union isn't a great use case either.

I also saw swapping last night on an 8GB/2-core laptop (multiple 0.5GB objects), but haven't reproduced major issues on a 16GB/4-core (8 with hyperthreading not used here) today. Could others try the parallel-cl branch with other data sets - I think that the islands are causing trouble? I haven't added cluster startup or shutdown times - the overhead is usually observable.

@rsbivand
Copy link
Member

By the way - if this is worth doing, it could be put in geos_op2_geom for generality.

@edzer
Copy link
Member

edzer commented Jan 18, 2018

@jeffreyhanson some other considerations related to TBB/RcppParallel here.

@edzer
Copy link
Member

edzer commented Mar 3, 2018

Since this discussion has stalled here, I'm closing the issue. As I see it now: we expect speed improvements by parallel computation of geometry operators. @jeffreyhanson has shown how this can be implemented at the C++ level using OpenMP and TBB; @rsbivand at the R level with package parallel. Neither have shown clear speed-ups, and for now we put the experiment to rest. To be continued, I hope!

@edzer edzer closed this as completed Mar 3, 2018
@jeffreyhanson
Copy link
Contributor Author

Sorry I haven't had the time to experiment more with this, I've just been really busy with other stuff. I'll play around more with this when I can find the time.

@edzer
Copy link
Member

edzer commented Mar 4, 2018

Looking forward to!

@jeffreyhanson
Copy link
Contributor Author

Hi,

Apologies for my lack of activity on this thread. I'm posting now because I think I've found an OpenMP implementation with a demonstrable performance improvement. Although the benchmark I've included below is encouraging, it would be great if anyone has any ideas for further improving the implementation.

Specifically, I've implemented a parallelized version of sf:::aggregate.sf because, as @monsieurxu pointed out (#661), this function can take a long time to execute for large data sets and could therefore, potentially, provide a compelling use-case for parallel functionality. Since one of the main bottlenecks in sf:::aggregate.sf is where it calls sf::st_union (with by_feature = TRUE; line 67 in R/aggregate.R), this implementation largely affects sf::st_union and its corresponding C++ function (CPL_geos_union defined in src/geos.cpp).

I've included a quick benchmark below comparing the parallel implementation to a recent implementation of sf:::aggregate.sf (as of 5e7fa09). To enable fair comparisons between the two versions of sf:::aggregate.sf, I've created a new function in src/geos.cpp called CPL_geos_union2 that contains the parallel implementation of CPL_geos_union). This parallelized implementation is only called when the argument to threads is greater than one. The original CPL_geos_union has not been altered and is only called when the argument to threads is equal to one.

# Initialization
## install openmp5 branch if needed
# devtools::install_github("jeffreyhanson/sf@openmp5")

## load packages
library(sf)
## Linking to GEOS 3.5.1, GDAL 2.2.2, proj.4 4.9.2
library(microbenchmark)
library(rnaturalearth)
library(lwgeom)
## Linking to liblwgeom 2.5.0dev r16016, GEOS 3.5.1, proj.4 4.9.2
library(testthat)

## fetch example data and clean it
data <- ne_countries(scale = 50, returnclass ="sf")
data <- data[, "continent"]
data <- st_make_valid(data)

## plot raw data
plot(data)

unnamed-chunk-1-1

# Main processing
## verify that parallel implementation has comparable output to original
## implementation. For further tests, see tests/testthat/geom.R
a1 <- aggregate(data, list(data$continent), FUN = dplyr::first)
a2 <- aggregate(data, list(data$continent), FUN = dplyr::first, threads = 4)
test_that("same outputs", expect_true(all(a1 == a2)))

## run benchmark
benchmark_data <- microbenchmark(
  standard = aggregate(data, list(data$continent), FUN = dplyr::first),
  parallel = aggregate(data, list(data$continent), FUN = dplyr::first,
                       threads = 4),
  times = 20L, unit = "s")

# Exports
## plot aggregated data for visual comparison
plot(a1[, "continent"], main = "standard")

unnamed-chunk-1-2

plot(a2[, "continent"], main = "parallel")

unnamed-chunk-1-3

## print benchmark results
print(benchmark_data)
## Unit: seconds
##      expr      min       lq     mean   median       uq      max neval cld
##  standard 4.160109 4.165188 4.179984 4.175173 4.176906 4.291120    20   b
##  parallel 2.473088 2.657600 2.639482 2.664336 2.686799 2.743518    20  a
## plot benchmark results
boxplot(benchmark_data, unit = "s", log = FALSE, main = "Benchmark")

unnamed-chunk-1-4

For those interested, I've listed below the ways in which this specific implementation differs from my previous attempts. I've also included an explanation for why I think these design choices might contribute to greater performance compared to previous implementations:

  • In previous implementations, I created a std::vector of GEOS handles and then shared this vector amongst all the threads. But in this implementation, each thread creates and uses its own local GEOS handle (i.e. the hGEOSCtxt_tp variable). I suspect that this design choice helps with performance because (1) shared variables can reduce performance and (2) it eliminates the need to look up which GEOS handle should be used to process a given geometry in each iteration of the loop (i.e. the loop in line 568 src/geos.cpp).
  • In previous implementations, I used the default static schedule for parallel processing. This schedule can be efficient when each geometry (i.e. loop iteration) takes an equal amount of time to process. But if different geometries require different amounts of time to process (e.g. because some geometries contain more vertices than other geometries), then this is not very efficient at all. So, this implementation uses the dynamic schedule (which uses a similar strategy to the load balancing functions in the parallel R package).
  • The variable used to represent the total number of geometries (i.e. sfc.size() in CPL_geos_union at line 497 in src/geos.cpp) is pre-calculated and stored in a new variable (n in CPL_geos_union2at line 521 in src/geos.cpp). This new variable n is declared as a thread private variable (line 538 in src/geos.cpp) instead of being treated as a default shared variable which can reduce performance.

I'd love to hear what everyone thinks, especially if anyone has any suggestions for improving this parallel implementation of sf:::aggregate.sf.

@edzer
Copy link
Member

edzer commented May 2, 2018

To this particular problem, adding cores (I tried 32 on a 64 core machine) didn't change the results:

> benchmark_data <- microbenchmark(
+   standard = aggregate(data, list(data$continent), FUN = dplyr::first),
+   parallel = aggregate(data, list(data$continent), FUN = dplyr::first,
+                        threads = 32),
+   times = 10L, unit = "s")
benchmark_data
> benchmark_data
Unit: seconds
     expr       min        lq      mean    median        uq       max neval
 standard 10.191005 10.199942 10.216554 10.208882 10.240288 10.249179    10
 parallel  5.865312  5.879789  5.909609  5.894446  5.933589  5.980556    10

@jeffreyhanson
Copy link
Contributor Author

Hmm, that's disappointing and also an inherent limitation of this implementation. This implementation involves splitting up the geometries for each continent into a separate "task" (for lack of a better word), and then farming out these tasks to the pool of available cores. So, because there are only eight continents and thus eight "tasks", we wouldn't expect to see any increases in performance between 9 cores vs 100 cores. Additionally, if one of the continents takes a much longer period of time to process than the other continents, we might not see any increase in performance at all between 1 core vs. 100 cores.

Here's an example which should, hopefully, show a noticeable increase in performance between 4 vs. 32 cores. In this example, we are aggregating state-level data to country-level data for more than 100 countries.

# Initialization
## install openmp5 branch if needed
## devtools::install_github("jeffreyhanson/sf@openmp5")

## load packages
library(sf)
## Linking to GEOS 3.5.1, GDAL 2.2.2, proj.4 4.9.2
library(microbenchmark)
library(rnaturalearth)
library(lwgeom)
## Linking to liblwgeom 2.5.0dev r16016, GEOS 3.5.1, proj.4 4.9.2
library(testthat)

## fetch example data and clean it
data <- ne_states(returnclass ="sf")
data <- data[, "admin"]
data <- st_make_valid(data)

## plot raw data
plot(data)

unnamed-chunk-1-1

## print number of countries
print(length(unique(data$admin)))
## [1] 257
# Main processing
## verify that parallel implementation has comparable output to original
## implementation. For further tests, see tests/testthat/geom.R
a1 <- aggregate(data, list(data$admin), FUN = dplyr::first)
a2 <- aggregate(data, list(data$admin), FUN = dplyr::first, threads = 4)
test_that("same outputs", expect_true(all(a1 == a2)))

## run benchmark
benchmark_data <- microbenchmark(
  standard = aggregate(data, list(data$admin), FUN = dplyr::first),
  parallel = aggregate(data, list(data$admin), FUN = dplyr::first,
                       threads = 4),
  times = 20L, unit = "s")

# Exports
## plot aggregated data for visual comparison
plot(a1[, "admin"], main = "standard")

unnamed-chunk-1-2

plot(a2[, "admin"], main = "parallel")

unnamed-chunk-1-3

## print benchmark results
print(benchmark_data)
## Unit: seconds
##      expr      min       lq      mean    median        uq       max neval
##  standard 9.504502 9.809825 10.137676 10.050441 10.444493 11.581725    20
##  parallel 5.288976 5.507777  5.783361  5.772791  5.996941  6.451062    20
##  cld
##    b
##   a
## plot benchmark results
boxplot(benchmark_data, unit = "s", log = FALSE, main = "Benchmark")

unnamed-chunk-1-4

@jeffreyhanson
Copy link
Contributor Author

I just thought I'd ask if you were able to reproduce this benchmark on your system? I completely understand if you're really busy at the moment and don't have time for this.

@rsbivand
Copy link
Member

This thread gives insight into the openMP race condition affecting data.table, the R garbage collector and the R byte compiler. The internals are quite complicated, but in that setting OMP_NUM_THREADS=1 is a work-around. Upper-level parallelization remains less problematic than lower level.

@Robinlovelace
Copy link
Contributor

Not well-qualified to comment on this but has the future package been explored on the upper-level side of things?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants