-
Notifications
You must be signed in to change notification settings - Fork 8
/
82-parallel.Rmd
617 lines (422 loc) · 30.9 KB
/
82-parallel.Rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
---
output: html_document
editor_options:
chunk_output_type: console
---
# Parallel Computing {#parallel}
You would think that because you have an expensive multicore computer your computations will speed up.
Well, unless you actively make sure of that, this will not happen.
By default, the operating system will allocate each R session to a single core.
You may wonder: why can't I just write code, and let R (or any other language) figure out what can be parallelised.
Sadly, that's not how things work.
It is very hard to design software that can parallelise any algorithm, while adapting to your hardware, operating system, and other the software running on your device.
A lot of parallelisation still has to be explicit, but stay tuned for technologies like [Ray](https://rise.cs.berkeley.edu/projects/ray/), [Apache Spark](https://spark.apache.org), [Apache Flink](https://flink.apache.org), [Chapel](https://chapel-lang.org), [PyTorch](https://pytorch.org), and others, which are making great advances in handling parallelism for you.
To parallelise computationsin with R, we will distinguish between two types of parallelism:
1. __Parallel R__: where the parallelism is managed with R. Discussed in Section \@ref(parallel-r).
1. __Parallel Extensions__: where R calls specialized libraries/routines/software that manage the parallelism themselves. Discussed in Section \@ref(parallel-extensions).
## When and How to Parallelise?
Your notice computations are too slow, and wonder "why is that?"
Should you store your data differently?
Should you use different software?
Should you buy more RAM?
Should you "go cloud"?
Unlike what some vendors will make you think, there is no one-size-fits-all solution to speed problems.
Solving a RAM bottleneck may consume more CPU.
Solving a CPU bottleneck may consume more RAM.
Parallelisation means using multiple CPUs simultaneously.
It will thus aid with CPU bottlenecks, but may consume more RAM.
Parallelising is thus ill advised when dealing with a RAM bottleneck.
Memory bottlenecks are released with efficient memory representations or out-of-memory algorithms (Chapters \@ref(sparse) and \@ref(memory)).
When deciding if, and how, to parallelise, it is crucial that you diagnose your bottleneck.
The good news is- that diagnostics is not too hard.
Here are a few pointers:
1. You never drive without looking at your dashboard; you should never program without looking at your system monitors. Windows users have their [Task Manager](https://en.wikipedia.org/wiki/Task_Manager_(Windows)); Linux users have [top](https://en.wikipedia.org/wiki/Top_(software)), or preferably, [htop](https://en.wikipedia.org/wiki/Htop); Mac users have the [Activity Monitor](https://www.howtogeek.com/227240/how-to-monitor-your-macs-health-with-activity-monitor/). The system monitor will inform you how your RAM and CPUs are being used.
1. If you forcefully terminate your computation, and R takes a long time to respond, you are probably dealing with a RAM bottleneck.
1. Profile your code to detect how much RAM and CPU are consumed by each line of code. See Hadley's [guide](http://adv-r.had.co.nz/Profiling.html).
In the best possible scenario, the number of operations you can perform scales with the number of processors: $$time * processors = operations$$.
This is called _perfect scaling_.
It is rarely observed in practice, since parallelising incurs some computational overhead: setting up environments, copying memory, ...
For this reason, the typical speedup is sub-linear.
Computer scientists call this [Amdahl's law](https://en.wikipedia.org/wiki/Amdahl%27s_law); remember it.
## Terminology
Here are some terms we will be needing.
### Hardware:
- __Cluster:__ A collection of interconnected computers.
- __Node/Machine:__ A single physical machine in the cluster. Components of a single node do not communicate via the cluster's network, but rather, via the node's circuitry.
- __Processor/Socket/CPU/Core:__ The physical device in a computer that make computations. A modern laptop will have about 4-8 cores. A modern server may have hundreds of cores.
- __RAM:__ Random Access Memory. One of many types of memory in a computer. Possibly the most relevant type of memory when computing with data.
- __GPU:__ Graphical Processing Unit. A computing unit, separate from the CPU. Originally dedicated to graphics and gaming, thus its name. Currently, GPUs are extremely popular for fitting and servicing Deep Neural Networks.
- __TPU:__ Tensor Processing Unit. A computing unit, dedicated to servicing and fitting Deep Neural Networks.
### Software:
- __Process:__ A sequence of instructions in memory, with accompanying data. Various processes typically see different locations of memory. Interpreted languages like R, and Python operate on processes.
- __Thread:__ A sub-sequence of instructions, within a process. Various threads in a process may see the same memory. Compiled languages like C, C++, may operate on threads.
## Parallel R {#parallel-r}
R provides many frameworks to parallelise execution.
The operating system allocates each R session to a single process.
Any parallelisation framework will include the means for starting R processes, and the means for communicating between these processes.
Except for developers, a typical user will probably use some high-level R package which will abstract away these stages.
### Starting a New R Processes
A R process may strat a new R process in various ways.
The new process may be called a _child process_, a _slave_ process, and many other names.
Here are some mechanisms to start new processes.
- __Fork__: Imagine the operating system making a copy of the currently running R process.
The _fork_ mechanism, unique to Unix and Linux, clones a process with its accompanying instructions and data. All forked processes see the same memory in read-only mode. Copies of the data are made when the process needs to change the data.
- __System calls__: Imagine R as a human user, that starts a new R session. This is not a _forked_ porcess. The new process, called _spawn process_ cannot access the data and instructions of the parent process.
### Inter-process Communication
Now that you have various R processes running, how do they communicate?
- __Socket__: imagine each R process as a standalone computer in the network. Data can be sent via a network interface. Unlike PVM, MPI and other standards, information sent does not need to be format in any particular way, provided that the reciever knows how it is formatted. This is not a problem when sending from R to R.
- __Parallel Virtual Machine__ (PVM): a communication protocol and software, developed the University of Tennessee, Oak Ridge National Laboratory and Emory University, and first released in 1989. Runs on Windows and Unix, thus allowing to compute on clusters running these two operating systems. Noways, it is mostly replaced by MPI. The same group responsible for PVM will later deliver _pbdR_ \@ref(pbdr).
- __Message Passing Interface__ (MPI): A communication protocol that has become the de-facto standard for communication in large distributed clusters. Particularly, for heterogeneous computing clusters with varying operating systems and hardware. The protocol has various software implementations such as [OpenMPI](https://en.wikipedia.org/wiki/Open_MPI) and [MPICH](https://en.wikipedia.org/wiki/MPICH), [Deino](http://mpi.deino.net/), [LAM/MPI](https://en.wikipedia.org/wiki/LAM/MPI). Interestingly, large computing clusters use MPI, while modern BigData analysis platforms such as Spark, and Ray do not. Why is this? See Jonathan Dursi's excellent [blog post](https://www.dursi.ca/post/hpc-is-dying-and-mpi-is-killing-it.html).
- __NetWorkSpaces__ (NWS): A master-slave communication protocol where the master is not an R-session, but rather, an _NWS server_.
For more on inter-process communication, see [Wiki](https://en.wikipedia.org/wiki/Inter-process_communication).
### The parallel Package
The __parallel__ package, maintained by the R-core team, was introduced in 2011 to unify two popular parallisation packages: __snow__ and __multicore__.
The __multicore__ package was designed to parallelise using the _fork_ mechanism, on Linux machines.
The __snow__ package was designed to parallelise Socket, PVM, MPI, and NWS mechanisms.
R processes started with __snow__ are not forked, so they will not see the parent's data.
Data will have to be copied to child processes.
The good news: __snow__ can start R processes on Windows machines, or remotely machines in the cluster.
TOOD: add example.
### The foreach Package
For reasons detailed in @kane2013scalable, we recommend the __foreach__ parallelisation package [@foreach].
It allows us to:
1. Decouple between the parallel algorithm and the parallelisation mechanism: we write parallelisable code once, and can later switch between parallelisation mechanisms.
Currently supported mechanisms include:
- _fork_: Called with the _doMC_ backend.
- _MPI_, _VPM_, _NWS_: Called with the _doSNOW_ or _doMPI_ backends.
- _futures_: Called with the _doFuture_ backend.
- _redis_: Called with the _doRedis_ backend. Similar to NWS, only that data made available to different processes using [Redis](https://en.wikipedia.org/wiki/Redis).
- Future mechanism may also be supported.
1. Combine with the `big.matrix` object from Chapter \@ref(memory) for _shared memory parallelisation_: all the machines may see the same data, so that we don't need to export objects from machine to machine.
```{remark}
I personally prefer the __multicore__ mechanism, with the __doMC__ adapter for __foreach__.
I will not use this combo, however, because __multicore__ will not work on Windows machines, and will not work over a network.
I will thus use the more general __snow__ and __doParallel__ combo.
If you do happen to run on Linux, or Unix, you will want to replace all __doParallel__ functionality with __doMC__.
```
Let's start with a simple example, taken from ["Getting Started with doParallel and foreach"](http://debian.mc.vanderbilt.edu/R/CRAN/web/packages/doParallel/vignettes/gettingstartedParallel.pdf).
```{r}
library(doParallel)
cl <- makeCluster(2, type = 'SOCK')
registerDoParallel(cl)
result <- foreach(i=1:3) %dopar% sqrt(i)
class(result)
result
```
Things to note:
- `makeCluster` creates an object with the information our cluster.
On a single machine it is very simple. On a cluster of machines, you will need to specify the [IP](https://en.wikipedia.org/wiki/IP_address) addresses, or other identifier, of the machines.
- `registerDoParallel` is used to inform the __foreach__ package of the presence of our cluster.
- The `foreach` function handles the looping. In particular note the `%dopar%` operator that ensures that looping is in parallel. `%dopar%` can be replaced by `%do%` if you want serial looping (like the `for` loop), for instance, for debugging.
- The output of the various machines is collected by `foreach` to a list object.
- In this simple example, no data is shared between machines so we are not putting the shared memory capabilities to the test.
- We can check how many workers were involved using the `getDoParWorkers()` function.
- We can check the parallelisation mechanism used with the `getDoParName()` function.
Here is a more involved example.
We now try to make Bootstrap inference on the coefficients of a logistic regression.
Bootstrapping means that in each iteration, we resample the data, and refit the model.
```{r bootstrap-dopar, eval=FALSE}
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
trials <- 1e4
r <- foreach(icount(trials), .combine=cbind) %dopar% {
ind <- sample(100, 100, replace=TRUE)
result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
coefficients(result1)
}
```
Things to note:
- As usual, we use the `foreach` function with the `%dopar%` operator to loop in parallel.
- The `iterators::icount` function generates a counter that iterates over its argument.
- The object `x` is magically avaiable at all child processes, even though we did not _fork_ R. This is thanks to `forach` which guesses what data to pass to children.
- The `.combine=cbind` argument tells the `foreach` function how to combine the output of different machines, so that the returned object is not the default list.
- To run a serial version, say for debugging, you only need to replace `%dopar%` with `%do%`.
```{r bootstrap-do, eval=FALSE}
r <- foreach(icount(trials), .combine=cbind) %do% {
ind <- sample(100, 100, replace=TRUE)
result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
coefficients(result1)
}
```
Let's see how we can combine the power of __bigmemory__ and __foreach__ by creating a file mapped `big.matrix` object, which is shared by all machines.
The following example is taken from @kane2013scalable, and uses the `big.matrix` object we created in Chapter \@ref(memory).
```{r}
library(bigmemory)
x <- attach.big.matrix("airline.desc")
library(foreach)
library(doSNOW)
cl <- makeSOCKcluster(names=rep("localhost", 4)) # make a cluster of 4 machines
registerDoSNOW(cl) # register machines for foreach()
xdesc <- describe(x)
G <- split(1:nrow(x), x[, "BENE_AGE_CAT_CD"]) # Split the data along `BENE_AGE_CAT_CD`.
GetDepQuantiles <- function(rows, data) {
quantile(data[rows, "CAR_LINE_ICD9_DGNS_CD"],
probs = c(0.5, 0.9, 0.99),
na.rm = TRUE)
} # Function to extract quantiles
qs <- foreach(g = G, .combine = rbind) %dopar% {
library("bigmemory")
x <- attach.big.matrix(xdesc)
GetDepQuantiles(rows = g, data = x)
} # get quantiles, in parallel
qs
```
Things to note:
- `bigmemory::attach.big.matrix` creates an R _big.matrix_ object from a matrix already existing on disk. See Section \@ref(bigmemory) for details.
- `snow::makeSOCKcluster` creates cluster of R processes communicating via sockets.
- `bigmemory::describe` recovres a pointer to the _big.matrix_ object, that will be used to call it from various child proceeses.
- Because R processes were not _forked_, each child need to load the __bigmemory__ package separately.
Can only __big.matrix__ objects be used to share data between child processes?
No.
There are many mechanism to share data.
We use __big.matrix__ merely for demonstration.
#### Fork or Socket?
On Linux and Unix machines you can use both the _fork_ mechanism of the __multicore__ package, and the _socket_ mechanism of the __snow__ package.
Which is preferable? _Fork_, if available.
Here is a quick comparison.
```{r}
library(nycflights13)
flights$ind <- sample(1:10, size = nrow(flights), replace = TRUE) #split data to 10.
timer <- function(i) max(flights[flights$ind==i,"distance"]) # an arbitrary function
library(doMC)
registerDoMC(cores = 10) # make a fork cluster
system.time(foreach (i=1:10, .combine = 'c') %dopar% timer(i)) # time the fork cluster
library(parallel)
library(doParallel)
cl <- makeCluster(10, type="SOCK") # make a socket cluster.
registerDoParallel(cl)
system.time(foreach (i=1:10, .combine = 'c') %dopar% timer(i)) # time the socket cluster
stopCluster(cl) # close the cluster
```
Things to note:
- `doMC::registerDoMC` was used to stard and register the forked cluster.
- `parallel::makeCluster` was used to stard the socket cluster. It was registered with `doParallel::registerDoParallel`.
- After registering the cluster, the __foreach__ code is exactly the same.
- The clear victor is _fork_: sessions start faster, and computations finish faster. Sadly, we recall that _forking_ is impossible on Windows machines, or in clusters that consist of several machines.
- We did not need to pass `flights` to the different workers. `foreach::foreach` took care of that for us.
For fun, let's try the same with `data.table`.
```{r}
library(data.table)
flights.DT <- as.data.table(flights)
system.time(flights.DT[,max(distance),ind])
```
No surprises there.
If you can store your data in RAM, `data.table` is still the fastest.
### Rdsm {#rdsm}
TODO
### pbdR {#pbdr}
TODO
## Parallel Extensions {#parallel-extensions}
As we have seen, R can be used to write explicit parallel algorithms.
Some algorithms, however, are so basic that others have already written and published their parallel versions.
We call these _parallel extensions_.
Linear algebra, and various machine learning algorithms are examples we now discuss.
### Parallel Linear Algebra
R ships with its own linear algebra algorithms, known as Basic Linear Algebra Subprograms: [BLAS](http://www.netlib.org/blas/).
To learn the history of linear algebra in R, read @maechler20062nd.
For more details, see our Bibliographic notes.
BLAS will use a single core, even if your machines has many more.
There are many linear algebra libraries [out there](https://en.wikipedia.org/wiki/Comparison_of_linear_algebra_libraries), and you don't need to be a programmer to replace R's BLAS.
Cutting edge linear algebra libraries such as [OpenBLAS](https://github.com/xianyi/OpenBLAS), [Plasma](https://bitbucket.org/icl/plasma/src/default/), and Intel's [MKL](https://software.intel.com/en-us/mkl), will do your linear algebra while exploiting the many cores of your machine.
This is very useful, since all machines today have multiple cores, and linear algebra is at the heart of all statistics and machine learning.
Installing these libraries requires some knowldge in system administration.
It is fairly simple under Ubuntu and Debian linux, and may be more comlicated on other operating systems.
Installing these is outside the scope of this text.
We will thus content ourselves with the following pointers:
- Users can easily replace the BLAS libraries shipped with R, with other libraries such as OpenBLAS, and MKL. These will parallelise linear algebra for you.
- Installation is easier for Ubuntu and Debian Linux, but possible in all systems.
- For specific tasks, such as machine learning, you may not need an all-pupose paralle linear algebra library. If you want machine learning in parallel, there are more specialized libraries. In the followig, we demonstrate Spark (\@ref(spark)), and H2O (\@ref(h2o)).
- Read our word of caution on nested parallelism (\@ref(nested-parallel)) if you use parallel linear algebra within child R processes.
### Parallel Data Munging with data.table
We have discussed `data.table` in Chapter \@ref(datatable).
We now recall it to emphasize that various operations in `data.table` are done in parallel, using [OpenMP](https://en.wikipedia.org/wiki/OpenMP).
For instance, file imports can done in paralle: each thread is responsible to impot a subset of the file.
First, we check how many threads `data.table` is setup to use?
```{r}
library(data.table)
getDTthreads(verbose=TRUE)
```
Things to note:
- `data.table::getDTthreads` to get some info on my machine, and curent `data.table` setup. Use the `verbose=TRUE` flag for extra details.
- _omp_get_max_threads_ informs me how many threads are available in my machine.
- My current `data.table` configuraton is in the last line of the output.
We then import with `data.table::fread` and inspect CPU usage with the _top_ linux command.
```{r}
air <- fread('data/2010_BSA_Carrier_PUF.csv')
```
![The CPU usage of fread() is 384.4\%. This is because `data.table` is setup to use 4 threads simultanously. ](art/Screenshot from 2019-10-08 16-00-34.png)
```{remark}
An amazing feature of `data.table` is that it will not parallelize when called from a forked process.
This behaviour will avoid the nested parallelism we cautioned from in \@ref(nested-parallel).
```
After doing parallel imports, let's try parallel aggregation.
```{r}
n <- 5e6
N <- n
k <- 1e4
setDTthreads(threads = 0) # use all available cores
getDTthreads() # print available threads
DT <- data.table(x = rep_len(runif(n), N),
y = rep_len(runif(n), N),
grp = rep_len(sample(1:k, n, TRUE), N))
system.time(DT[, .(a = 1L), by = "grp"])
setDTthreads(threads = 1) # use a single thread
system.time(DT[, .(a = 1L), by = "grp"])
```
Things to note:
- Parallel aggregation is indeed much faster.
- Cores scaled by 8 fold. Time scaled by less. The scaling is not perfect. Remember Amdahl's law.
- This example was cooked to emphasize the difference. You may not enjoy such speedups in all problems.
If the data does not fit in our RAM, we cannot enjoy `data.table`s.
If the data is so large that it does not fit into RAM^[Recall that you can buy servers wth 1TB of RAM and more. So we are talking about A LOT of data!], nor into your local disk, you will need to store, and compute with it, in a distributed cluster.
In the next section, we present a very popular system for storing, munging, and learning, with massive datasets.
### Spark
Spark is the brainchild of Matei Zaharia, in 2009, as part of his PhD studies at University of California, Berkeley 's AMPLab.
To understand _Spark_ we need some background.
The software that manages files on your disk is the [file system](https://en.wikipedia.org/wiki/File_system).
On personal computers, you may have seen names like FAT32, NTFS, EXT3, or others.
Those are file systems for disks.
If your data is too big to be stored on a single disk, you may distribute it on several machines.
When doing so, you will need a file systems that is designed for distributed clusters.
A good [cluster file system](https://en.wikipedia.org/wiki/Clustered_file_system), is crucial for the performance of your cluster.
Part of Google strength is in its powerful file system, the [Google File System](https://en.wikipedia.org/wiki/Google_File_System).
If you are not at Google, you will not have access to this file system.
Luckily, there are many other alternatives.
The Hadoop File System, [HDFS](https://en.wikipedia.org/wiki/Apache_Hadoop), that started at Yahoo, later donated to the Apache Foundation, is a popular alternative.
With the HDFS you can store files in a cluster.
For doing statistics, you need software that is compatible with the file system.
This is true for all file systems, and in particular, HDFS.
A popular software suit that was designed to work with HDFS is _Hadoop_.
Alas, Hadoop was not designed for machine learning.
Hadoop for reasons of fault tolerance, Hadoop stores its data disk.
Machine learning consists of a lot iterative algorithms that requires fast and repeated data reads.
This is very slow if done from the disk.
This is where _Spark_ comes in.
Spark is a data oriented computing environment over distributed file systems.
Let's parse that:
- "data oriented": designed for statistics and machine learning, which require a lot of data, that is mostly read and not written.
- "computing environment": it less general than a full blown programming language, but it allows you to extend it.
- "over distributed file systems": it ingests data that is stored in distributed clusters, managed by HDFS or other distributed file system.
Let's start a Spark server on our local machine to get a feeling of it.
We will not run from a cluster, so that you may experiment with it yourself.
```{r}
library(sparklyr)
spark_install(version = "2.4.0") # will download Spark on first run.
sc <- spark_connect(master = "local")
```
Things to note:
- `spark_install` will download and install _Spark_ on your first run. Make sure to update the version number, since my text may be outdated by the time you read it.
- I used the _sparklyr_ package from RStudio. There is an alternative package from Apache: _SparkR_.
- `spark_connect` opens a connection to the (local) Spark server. When working in a cluster, with many machines, the `master=` argumnt infrorms R which machine is the master, a.k.a. the "driver node". Consult your cluster's documentation for connection details.
- After running `spark_connect`, the connection to the Sprak server will appear in RStudio's [Connection pane](https://support.rstudio.com/hc/en-us/articles/115010915687-Using-RStudio-Connections).
Let's load and aggregate some data:
```{r}
library(nycflights13)
flights_tbl<- copy_to(dest=sc, df=flights, name='flights', overwrite = TRUE)
class(flights_tbl)
library(dplyr)
system.time(delay<-flights_tbl %>%
group_by(tailnum) %>%
summarise(
count=n(),
dist=mean(distance, na.rm=TRUE),
delay=mean(arr_delay, na.rm=TRUE)) %>%
filter(count>20, dist<2000, !is.na(delay)) %>%
collect())
delay
```
Things to note:
- `copy_to` exports from R to Sprak. Typically, my data will already be waiting in Sprak, since the whole motivation is that it does not fit on my disk.
- Notice the `collect` command at the end. As the name suggests, this will collect results from the various worker/slave machines.
- I have used the _dplyr_ syntax and not my favorite _data.table_ syntax. This is because _sparklyr_ currently supports the _splyr_ syntax, or plain SQL with the _DBI_ package.
To make the most of it, you will porbably be running Spark on some cluster.
You should thus consult your cluster's documentation in order to connect to it.
In our particular case, the data is not very big so it fits into RAM.
We can thus compare performance to `data.table`, only to re-discover, than if data fits in RAM, there is no beating `data.table`.
```{r}
library(data.table)
flight.DT <- data.table(flights)
system.time(flight.DT[,.(distance=mean(distance),delay=mean(arr_delay),count=.N),by=tailnum][count>20 & distance<2000 & !is.na(delay)])
```
Let's disconnect from the Spark server.
```{r}
spark_disconnect(sc)
```
Spark comes with a set of learning algorithms called _MLLib_.
Consult the [online documentation](http://spark.apache.org/docs/latest/ml-classification-regression.html) to see which are currently available.
If your data is happily stored in a distributed Spark cluster, and the algorithm you want to run is not available, you have too options:
(1) use extensions or (2) write your own.
Writing your own algorithm and dispatching it to Spark can be done a-la `apply` style with `sparklyr::spark_apply`. This, however, would typically be extremely inneficient. You are better off finding a Spark extension that does what you need.
See the _sparklyr_ [CRAN page](https://CRAN.R-project.org/package=sparklyr), and in particular the Reverse Depends section, to see which extensions are available.
One particular extension is __rsparkling__, which allows you to apply H2O's massive library of learning algorithms, on data stored in Spark.
We start by presenting H2O.
### H2O {#h2o}
H2O can be thought of as a library of efficient distributed learning algorithm, that run in-memory, where memory considerations and parallelisation have been taken care of for you.
Another way to think of it is as a "machine learning service".
For a (massive) list of learning algorithms implemented in H2O, see [their documentaion](http://docs.h2o.ai/h2o/latest-stable/h2o-docs/data-science.html#).
H2O can run as a standalone server, or on top of Spark, so that it may use the _Spark data frames_.
We start by working with H2O using H2O's own data structures, using `h2o` package.
We later discuss how to use H2O using Spark's data structures (\@ref(sparkling)).
```{r h2o, message=TRUE, warning=FALSE}
#install.packages("h2o")
library(h2o)
h2o.init(nthreads=2)
```
Things to note:
- We did not install the H2O server; `install.packages("h2o")` did it for us.
- `h2o.init` fires the H2O server. Use `nthreads` to manually control the number of threads, or use the defaults. "H2O cluster total cores" informs you of the number of potential cores. "H2O cluster allowed cores" was set by `nthreads`, and informs of the number of actual cores that will be used.
- Read `?h2o.init` for the (massive) list of configuration parameters available.
```{r}
h2o.no_progress() # to supress progress bars.
data("spam", package = 'ElemStatLearn')
spam.h2o <- as.h2o(spam, destination_frame = "spam.hex") # load to the H2O server
h2o.ls() # check avaialbe data in the server
h2o.describe(spam.h2o) %>% head # the H2O version of summary()
h2o.table(spam.h2o$spam)
# Split to train and test
splits <- h2o.splitFrame(data = spam.h2o, ratios = c(0.8))
train <- splits[[1]]
test <- splits[[2]]
# Fit a random forest
rf <- h2o.randomForest(
x = names(spam.h2o)[-58],
y = c("spam"),
training_frame = train,
model_id = "our.rf")
# Predict on test set
predictions <- h2o.predict(rf, test)
head(predictions)
```
Things to note:
- H2O objects behave a lot like data.frame/tables.
- To compute on H2O objects, you need dedicated function. They typically start with "h2o" such as `h2o.table`, and `h2o.randomForest`.
- `h2o.randomForest`, and other H2O functions, have their own syntax with many many options. Make sure to read `?h2o.randomForest`.
#### Sparkling-Water {#sparkling}
The _h2o_ package (\@ref(h2o)) works with `H2OFrame` class objects.
If your data is stored in Spark, it may be more natural to work with _Spark DataFrames_ instead of `H2OFrame`s.
This is exactly the purpose of the [Sparkling-Water](https://www.h2o.ai/products/h2o-sparkling-water/) system.
R users can connect to it using the [RSparkling](http://docs.h2o.ai/sparkling-water/2.2/latest-stable/doc/rsparkling.html) package, written and maintained by H2O.
## Caution: Nested Parallelism {#nested-parallel}
A common problem when parallelising is that the processes you invoke explicitely, may themselves invoke other processes.
Consider a user _forking_ multiple processes, each process calling `data.table`, which itself will invoke multiple threads.
This is called _nested parallelism_, and may cause you to lose control of the number of machine being invoked.
The operating system will spend most of its time with housekeeping, instead of doing your computations.
Luckily, `data.table` was designed to avoid this.
If you are parallelising your linear algebra with OpenBLAS, you may control nested parallelism with the package [RhpcBLASctl](https://cran.r-project.org/package=RhpcBLASctl).
In other cases, you should be aware of this, and may need to consult an expert.
## Bibliographic Notes
To understand how computers work in general, see @bryant2015computer.
For a brief and excellent explanation on parallel computing in R see @schmidberger2009state.
For a full review see @chapple2016mastering.
For a blog-level introduction see [ParallelR
](http://www.parallelr.com/r-with-parallel-computing/).
For an article-level introduction, see the excellent @eddelbuettel2019parallel.
For an up-to-date list of packages supporting parallel programming see the High Performance Computing [R task view](https://cran.r-project.org/web/views/HighPerformanceComputing.html).
For some theory of distributed machine learning, see @rosenblatt2016optimality.
An excellent video explaining `data.table` and H2O, by the author of `data.table, is [this](https://www.youtube.com/watch?v=5X7h1rZGVs0).
More benchmarks in [here](https://h2oai.github.io/db-benchmark/).
More on Spark with R in [Mastering Apache Spark with R](https://therinspark.com).
For a blog level introduction to linear algebra in R see [Joseph Rickert's entry](https://blog.revolutionanalytics.com/2013/08/r-and-linear-algebra.html).
For a detailed discussion see @oancea2015accelerating.
## Practice Yourself
TODO
Try DataCamp's [Parallel Programming in R](https://www.datacamp.com/courses/parallel-programming-in-r).