snow
packagesnow
(Simple Network of Workstations) package implements a simple mechanism
for using a collection of workstations or a Beowulf cluster for
``embarrassingly parallel'' computations in R. The interface, which
is based in part on the Python CoW
(Cluster of Workstations) package, is intended to be quite simple,
and is designed so that it can be implemented on top of several
different lower level communication mechanisms. Three low level
interfaces have been implemented, one using PVM via the
rpvm
package by Li and Rossini, one using MPI via the
Rmpi
package
by Hao Yu, and one using raw sockets that may be useful if PVM and MPI
are not available. This note describes how to start up a cluster, the
basic functions for computing with a cluster, and an example of using
the cluster for parallel bootstrapping.
Two notes of caution:
^C
is likely to cause
problems so avoid this when running snow
.
stopCluster
for your
cluster. This helps in cleanly shutting down your framework.
Starting a workstation cluster is the only step in using a cluster that is dependent on the underlying communication mechanism.
pvm
:
<starting PVM using the pvm console>= [luke@node00 ~]$ pvm pvm> conf conf 1 host, 1 data format HOST DTID ARCH SPEED DSIG node00.beowulf.stat.uiowa.edu 40000 LINUX64 1000 0x00408c41 pvm> add node01 node02 add node01 node02 2 successful HOST DTID node01 80000 node02 c0000 pvm> quit quit Console: exit handler called pvmd still running. [luke@node00 ~]$
You can also use a list of nodes in a file, say pvmhosts
, and
start the pvm console with pvm pvmhosts
.
If you prefer, you can use xpvm
to start PVM. xpvm
also
provides some nice monitoring tools. If you use xpvm
you may want
to also place a .xpvm_hosts
file in your home directory on the
master. My .xpvm_hosts
file looks like
<.xpvm_hosts>= node00 &node01 &node02 ... &node21
Once PVM is running, to use a cluster of two worker nodes you would
start R on the master, load the snow package, and call makeCluster
with argument 2, the number of worker nodes, to start the worker
processes:
<starting a PVM cluster>= library(snow) cl <- makeCluster(2)
The returned value is a list of references to these two processes.
You can also specify type="PVM"
in the makeCluster
call, but
PVM
is the default if PVM is available.
Once you are done using PVM remember to shut it down with the halt
command in the console:
<stopping PVM using the pvm console>= [luke@node00 ~]$ pvm pvmd already running. pvm> halt halt Terminated [luke@node00 ~]$
snow
on our
cluster. Support for MPICH or MPICH2 will be added eventually.
Avoid using ^C
as I believe it will kill all your R processes once
R is connected to MPI.
You can start LAM-MPI with
lamboot <hostfile>A simple host file might look like
<simple lam host file>= node00 node01 node02 node03
There are two approaches to running snow
under LAM-MPI. One uses
process spawning and is similar to the PVM approach. The other uses
mpirun
to set up the processes. The mpirun
approach may work
better with batch scheduling. To use process spawning, start R and run
<starting an MPI cluster with process spawning>= library(snow) cl<-makeCluster(type="MPI",3)
To use mpirun
you use the shell script RMPISNOW
to start R. This
script should allow the same command line arguments as R
; only the
master process will be given these arguments. The number of process
you specify to mpirun
includes the master process, so -np 4
gives you a master and three workers:
<starting an MPI cluster with mpirun
>=
mpirun -np 4 RMPISNOW
This starts up a master and three worker processes and loads snow into each.
It uses the R_PROFILE
variable to do this. In the master R process use
<getting the MPI cluster>= cl <- getMPIcluster()
to get a reference to the running cluster of worker processes.
Once you are done with LAM-MPI remember to shut it down with
lamhalt
. If this has difficulty lamwipe
may be useful.
The man pages give more details.
makeCluster
and specify
type="SOCK"
and a list of machines to use. For example, to create
a socket cluster with two nodes you would start R on the master, load
the snow
package with
<starting a socket cluster>= [D->] library(snow)
<starting a socket cluster>+= [<-D] cl <- makeCluster(type="SOCK", c("node01", "node02"))
to create a socket cluster consisting of R processes running on the
machines node01
and node02
. The returned value is a list of
references to these two processes.
<stopping a cluster>= stopCluster(cl)
Socket clusters should stop automatically when the process that
created them terminates, but calling stopCluster
is still a good
idea. If you are using PVM you should also make sure that PVM is shut
down as well by issuing the halt
command in the text console or
using the halt
item in the xpvm
File
menu. Similarly, for
LAM-MPI make sure to use lamhalt
to shut down LAM-MPI.
clusterCall
and
clusterApply
. clusterCall
calls a specified function with
identical arguments on each member of the cluster and returns a list
of the results. The calls are executed in parallel. For example,
we can ask the nodes for their names and machine types:
<R session>= [D->] > clusterCall(cl, function() Sys.info()[c("nodename","machine")]) [[1]] nodename machine "node01.beowulf.stat.uiowa.edu" "x86_64" [[2]] nodename machine "node02.beowulf.stat.uiowa.edu" "x86_64" [[3]] nodename machine "node03.beowulf.stat.uiowa.edu" "x86_64"
A useful variation of clusterCall
is clusterEvalQ
, defined as
<definition of clusterEvalQ
>=
clusterEvalQ<-function(cl, expr)
clusterCall(cl, eval, substitute(expr), env=.GlobalEnv)
This can be used, for example, to load a package on each cluster node with
<loading a library on all nodes>= clusterEvalQ(cl, library(boot))
clusterApply
is a version of lapply
that evaluates each call
on a different member of the cluster. It requires that the number of
nodes in the cluster be at least as large as the number of elements in
the list argument. A simple example:
<R session>+= [<-D->] > clusterApply(cl, 1:3, get("+"), 3) [[1]] [1] 4 [[2]] [1] 5 [[3]] [1] 6
<R session>+= [<-D->] > clusterCall(cl, runif, 3) [[1]] [1] 0.7176320 0.1036995 0.8112116 [[2]] [1] 0.7176320 0.1036995 0.8112116 [[3]] [1] 0.7176320 0.1036995 0.8112116
A quick and (very) dirty way of addressing this is random seeding, using something like
<random seeding of cluster generators>= clusterApply(cl, runif(length(cl),max=10000000), set.seed)
A better approach is to use a parallel random number genarator
package. Several are available. By default snow
uses
therlecuyer
package, which provides an interface to a parallel
random number generator package of
L'Ecuyer, Simard, Chen, and Kelton. The function
clusterSetupRNG
handles the initialization. The default call with
no additional arguments uses a random seed; named arguments described
in the help page can be used for more control:
<R session>+= [<-D->] > clusterSetupRNG(cl) > clusterCall(cl, runif, 3) [[1]] [1] 0.1270111 0.3185276 0.3091860 [[2]] [1] 0.7595819 0.9783106 0.6851358 [[3]] [1] 0.7285098 0.9655873 0.9961841
boot
package includes an example using the data nuclear
.
The setup code, given in the boot
help page, is
<bootstrap setup>= library(boot) # In this example we show the use of boot in a prediction from # regression based on the nuclear data. This example is taken # from Example 6.8 of Davison and Hinkley (1997). Notice also # that two extra arguments to statistic are passed through boot. data(nuclear) nuke <- nuclear[,c(1,2,5,7,8,10,11)] nuke.lm <- glm(log(cost)~date+log(cap)+ne+ ct+log(cum.n)+pt, data=nuke) nuke.diag <- glm.diag(nuke.lm) nuke.res <- nuke.diag$res*nuke.diag$sd nuke.res <- nuke.res-mean(nuke.res) # We set up a new dataframe with the data, the standardized # residuals and the fitted values for use in the bootstrap. nuke.data <- data.frame(nuke,resid=nuke.res,fit=fitted(nuke.lm)) # Now we want a prediction of plant number 32 but at date 73.00 new.data <- data.frame(cost=1, date=73.00, cap=886, ne=0, ct=0, cum.n=11, pt=1) new.fit <- predict(nuke.lm, new.data) nuke.fun <- function(dat, inds, i.pred, fit.pred, x.pred) { assign(".inds", inds, envir=.GlobalEnv) lm.b <- glm(fit+resid[.inds] ~date+log(cap)+ne+ct+ log(cum.n)+pt, data=dat) pred.b <- predict(lm.b,x.pred) remove(".inds", envir=.GlobalEnv) c(coef(lm.b), pred.b-(fit.pred+dat$resid[i.pred])) }
Running just on the master the actual bootstrap example takes approximately 12 seconds:
<R session>+= [<-D->] > system.time(nuke.boot <- + boot(nuke.data, nuke.fun, R=999, m=1, + fit.pred=new.fit, x.pred=new.data)) user system elapsed 12.345 0.012 12.369
Using a three-worker cluster is approximately three times as fast:
<R session>+= [<-D->] > clusterSetupRNG(cl) > clusterEvalQ(cl,library(boot)) > system.time(cl.nuke.boot <- + clusterCall(cl,boot,nuke.data, nuke.fun, R=500, m=1, + fit.pred=new.fit, x.pred=new.data)) user system elapsed 0.010 0.001 3.793
The value to use comparing speeds is the third one, the elapsed time.
<higher level functions>= [D->] parLapply <- function(cl, x, fun, ...) docall(c, clusterApply(cl, splitList(x, length(cl)), lapply, fun, ...))
defines a parallel version of Lapply; it uses splitList
to split
the list arguments approximately evenly across the cluster. An
example, using a cluster of size 5:
<R session>+= [<-D->] > x<-1:100/101 > system.time(qtukey(x, 2, df=2)) user system elapsed 3.656 0.001 3.663 > system.time(unlist(parLapply(cl, x, qtukey, 2, df=2))) user system elapsed 0.018 0.000 1.061
The functions parCapply
and parRapply
apply a function to the
columns or rows of a matrix and return the results:
<R session>+= [<-D->] > A<-matrix(c(1,2,3,4,5,6),nrow=2) > A [,1] [,2] [,3] [1,] 1 3 5 [2,] 2 4 6 > parCapply(cl, A, sum) [1] 3 7 11 > parRapply(cl, A, sum) [1] 9 12
These functions are defined as
<higher level functions>+= [<-D->] parRapply <- function(cl, x, fun, ...) docall(c, clusterApply(cl, splitRows(x,length(cl)), apply, 1, fun, ...)) parCapply <- function(cl, x, fun, ...) docall(c, clusterApply(cl, splitCols(x,length(cl)), apply, 2, fun, ...))
A very simple version of a parallel matrix multiplier is defined as
<higher level functions>+= [<-D] parMM <- function(cl, A, B) docall(rbind,clusterApply(cl, splitRows(A, length(cl)), get("%*%"), B))
For small matrices the parallel version is slower due to the communication overhead, but for large ones the parallel version can do better:
<R session>+= [<-D] > A<-matrix(rnorm(10000),100) > system.time(A %*% A) user system elapsed 0.003 0.000 0.003 > system.time(parMM(cl,A , A)) user system elapsed 0.021 0.006 0.039 > A<-matrix(rnorm(1000000),1000) > system.time(A %*% A) user system elapsed 4.415 0.013 4.432 > system.time(parMM(cl,A , A)) user system elapsed 2.122 0.390 3.495
lamhalt
and lamwipe
you can
kill all R processes by halting the PVM or LAM-MPI universe.
Some open issues:
clusterEvalQ
>: D1
mpirun
>: D1