Rmpi Tutorial
This is a basic tutorial on parallel programming in R using Rmpi, the MPI interface for R. This R package allow you to create R programs which run cooperatively in parallel across multiple machines, or multiple CPUs on one machine, to accomplish a goal more quickly than running a single program on one machine.
Who should read this tutorial:
Anyone who wishes to have compute-intensive R programs complete much sooner than is normally possible. This tutorial assumes a basic knowledge of R. For more information, and a tutorial on R, please see the R Project's documentation site.
This tutorial does not assume any knowledge of MPI, and briefly introduces the necessary concepts. It does not go in-depth into the full capabilities of MPI. For those interested, the author suggests continuing on to more complete material on MPI, such as the course hosted by NCSA, and Rmpi's documentation.
Rmpi Basics:
This section introduces the basic concepts of MPI, and how it allows programs to communicate and operate in parallel to achieve goals.
MPI: What is it?
MPI stands for Message Passing Interface. It defines an environment under which programs can run in parallel, and communicate with each other by sending messages to each other. There are two major separate parts to MPI:
- the environment your programs run in
- the library calls your programs use to communicate
This tutorial focuses on the latter, as normally the first tends to vary, and the system administrator usually takes care of many of those details for you.
MPI is currently the preferred mechanism for having parallel programs communicate with each other. It has undergone a standardization process, and is a very flexible system, allowing for efficient execution amongst processes running on almost any parallel architecture (e.g. large SMP, clusters)
MPI: How does it work?
MPI is successful largely because of its simplicity - programs communicate with each other using messages. The analogy is simple: every cooperating program has its own mailbox (message queue, or just queue), and any process can place a message in another's mailbox. When a program wishes, it can receive a message from its own mailbox, and do something with it.

That's as complicated as MPI gets conceptually. After that, it's all a matter of how to use this to accomplish the goals of your program.
MPI: Terminology
As is generally the tradition with computer folks, the terminology applied is not always very politically correct. Normally, the main controlling process is called the Master, and the processes it controls are called Slaves. When the Master needs more Slaves, new Slaves are Spawned. No, not the most politically correct terminology, but it's the lingo used.
It is quite possible, in MPI, to create peer-to-peer programs, where all are equal. Unfortunately, the manner of doing this is less intuitive than Master/Slave styles of programs, so the latter is preferred.
Some common terms are listed below:
- Master
-
The process in control. Normally, this is the first process started. All processes communicating as a group have a different number assigned to them. Normally, the master has the number 0 assigned to it.
- Slave
-
A process which does work designated by its Master. Normally, the slaves in a group have the numbers 1 and higher.
- Queue
-
Each process in the group has its own message queue, like a mailbox. Other processes, and even itself, can send messages into its queue. Only the process to which the queue is assigned can receive (take) messages out of the queue.
A queue is First-In, First-Out, meaning that messages are received from the queue in the exact order they are sent into the queue. It is possible in MPI to create a priority queue, where preference is given to certain messages, but R's mpi module doesn't yet support the functionality needed to do this.
- Message
-
Some arbitrary data. This data could represent some basic data, like an integer; it could represent a command for the receiver; it could be some arbitrary R data object, like a list; or it could even be an R function for the receiver to "learn".
Rmpi Sample Program:
It wouldn't be a computing tutorial without a Hello World program, would it?
# Load the R MPI package if it is not already loaded.
if (!is.loaded("mpi_initialize")) {
library("Rmpi")
}
# Spawn as many slaves as possible
mpi.spawn.Rslaves()
# In case R exits unexpectedly, have it automatically clean up
# resources taken up by Rmpi (slaves, memory, etc...)
.Last <- function(){
if (is.loaded("mpi_initialize")){
if (mpi.comm.size(1) > 0){
print("Please use mpi.close.Rslaves() to close slaves.")
mpi.close.Rslaves()
}
print("Please use mpi.quit() to quit R")
.Call("mpi_finalize")
}
}
# Tell all slaves to return a message identifying themselves
mpi.remote.exec(paste("I am",mpi.comm.rank(),"of",mpi.comm.size()))
# Tell all slaves to close down, and exit the program mpi.close.Rslaves() mpi.quit()
The Dissection:
The first segment of R code:
# Load the R MPI package if it is not already loaded.
if (!is.loaded("mpi_initialize")) {
library("Rmpi")
}
tests if Rmpi is already loaded. If it is not already loaded, it loads the Rmpi package. This is in good style. It is not usually necessary to do this check, but it does provide some safety from accidentally loading the package twice
The next snippet:
# Spawn as many slaves as possible
mpi.spawn.Rslaves()
asks the MPI environment to start up as many slave processes as are possible and report the result.
This next snippet, although not strictly necessary, is more of a house-cleaning necessity:
# In case R exits unexpectedly, have it automatically clean up
# resources taken up by Rmpi (slaves, memory, etc...)
.Last <- function(){
if (is.loaded("mpi_initialize")){
if (mpi.comm.size(1) > 0){
print("Please use mpi.close.Rslaves() to close slaves.")
mpi.close.Rslaves()
}
print("Please use mpi.quit() to quit R")
.Call("mpi_finalize")
}
}
This just provides some guarantee that if R is exited before closing the slaves, and without cleaning up the MPI environment, it will do this for you.
The next piece of code does the actual Hello World portion of the code:
# Tell all slaves to print out a message identifying themselves
mpi.remote.exec(paste("I am",mpi.comm.rank(),"of",mpi.comm.size()))
This tells all slaves to execute the R code: paste("I am",mpi.comm.rank(),"of",mpi.comm.size())
and return the result back to the master. In this code, mpi.comm.rank()returns the unique number assigned to the slave, and mpi.comm.size() returns the number of processes communicating. It may not look like there's any messages being passed in this example. However, mpi.remote.exec() actually is sending a message to every slave asking it to execute the given code, and each child is sending a message back to the master with the result. That function just hides the details from you.
And finally:
# Tell all slaves to close down, and exit the program
mpi.close.Rslaves()
mpi.quit()
does a proper shutdown for a program running Rmpi. It tells all slaves to close down, and calls a quit routine which cleans up the mpi environment before exiting R.
The Ouput:
In the sample case the author used to run this script, there were 18 slaves defined. Following is the output produced:
R : Copyright 2004, The R Foundation for Statistical Computing
Version 2.0.1 (2004-11-15), ISBN 3-900051-07-0
R is free software and comes with ABSOLUTELY NO WARRANTY.
You are welcome to redistribute it under certain conditions.
Type 'license()' or 'licence()' for distribution details.
R is a collaborative project with many contributors.
Type 'contributors()' for more information and
'citation()' on how to cite R or R packages in publications.
Type 'demo()' for some demos, 'help()' for on-line help, or
'help.start()' for a HTML browser interface to help.
Type 'q()' to quit R.
WARNING: UTF-8 locales are not currently supported
[Previously saved workspace restored]
> invisible(options(echo = TRUE))
> # Load the R MPI package if it is not already loaded.
> if (!is.loaded("mpi_initialize")) {
+ library("Rmpi")
+ }
>
> # Spawn as many slaves as possible
> mpi.spawn.Rslaves()
18 slaves are spawned successfully. 0 failed.
master (rank 0 , comm 1) of size 19 is running on: carl-eth0
slave1 (rank 1 , comm 1) of size 19 is running on: carl-eth0
slave2 (rank 2 , comm 1) of size 19 is running on: node1
slave3 (rank 3 , comm 1) of size 19 is running on: node2
slave4 (rank 4 , comm 1) of size 19 is running on: node3
slave5 (rank 5 , comm 1) of size 19 is running on: node4
slave6 (rank 6 , comm 1) of size 19 is running on: node5
slave7 (rank 7 , comm 1) of size 19 is running on: node6
slave8 (rank 8 , comm 1) of size 19 is running on: node7
slave9 (rank 9 , comm 1) of size 19 is running on: node8
slave10 (rank 10, comm 1) of size 19 is running on: node9
slave11 (rank 11, comm 1) of size 19 is running on: node10
slave12 (rank 12, comm 1) of size 19 is running on: node11
slave13 (rank 13, comm 1) of size 19 is running on: node12
slave14 (rank 14, comm 1) of size 19 is running on: node13
slave15 (rank 15, comm 1) of size 19 is running on: node14
slave16 (rank 16, comm 1) of size 19 is running on: node15
slave17 (rank 17, comm 1) of size 19 is running on: node16
slave18 (rank 18, comm 1) of size 19 is running on: node17
rsprng package is not installed. Cannot use SPRNG.
>
> # In case R exits unexpectedly, have it automatically clean up
> # resources taken up by Rmpi (slaves, memory, etc...)
> .Last <- function(){
+ if (is.loaded("mpi_initialize")){
+ if (mpi.comm.size(1) > 0){
+ print("Please use mpi.close.Rslaves() to close slaves.")
+ mpi.close.Rslaves()
+ }
+ print("Please use mpi.quit() to quit R")
+ .Call("mpi_finalize")
+ }
+ }
>
> # Tell all slaves to print out a message identifying themselves
> mpi.remote.exec(paste("I am",mpi.comm.rank(),"of",mpi.comm.size()))
$slave1
[1] "I am 1 of 19"
$slave2
[1] "I am 2 of 19"
$slave3
[1] "I am 3 of 19"
$slave4
[1] "I am 4 of 19"
$slave5
[1] "I am 5 of 19"
$slave6
[1] "I am 6 of 19"
$slave7
[1] "I am 7 of 19"
$slave8
[1] "I am 8 of 19"
$slave9
[1] "I am 9 of 19"
$slave10
[1] "I am 10 of 19"
$slave11
[1] "I am 11 of 19"
$slave12
[1] "I am 12 of 19"
$slave13
[1] "I am 13 of 19"
$slave14
[1] "I am 14 of 19"
$slave15
[1] "I am 15 of 19"
$slave16
[1] "I am 16 of 19"
$slave17
[1] "I am 17 of 19"
$slave18
[1] "I am 18 of 19"
>
> # Tell all slaves to close down, and exit the program
> mpi.close.Rslaves()
[1] 1
> mpi.quit()
Rmpi Common Functions:
Following is a list, by section, of some commonly used methods in R MPI programs, grouped in logical clusters. This is just a small set - there are many more. These methods provided here are geared for an introduction to parallel programming in R; more methods and complete documentation can best be found and understood by following the more complete MPI tutorials and Rmpi documentation indicated on the front page to this tutorial.
Startup and Shutdown:
- library("Rmpi")
-
This command loads the Rmpi package and makes all the MPI functions available for use. You need to run this before any of the other MPI functions.
- mpi.spawn.Rslaves([nslaves=#])
-
This function spawns a number of slave processes to perform work. Without any arguments, it spawns as many as the MPI environment knows about. With the nslaves= argument, you can specify a number of slaves to spawn.
- mpi.close.Rslaves()
-
This function closes all slaves. You need to do this before exiting your program.
- mpi.quit([saving=yes/no])
-
This function is a wrapper for R's normal quit() function. It cleans up any resources which Rmpi has allocated, and calls R's quit function. You must use this function to exit a program running with Rmpi.
Cluster Information:
- mpi.comm.size()
-
This returns the number of processes. Normally this is the number of slaves, plus 1 for the master.
- mpi.comm.rank()
-
This returns the number (or rank) assigned to the process which calls this function. Normally, when called in the master, this returns 0. When called in a slave it returns whichever number is assigned to the slave - normally between 1 and mpi.comm.size()-1
Sending/Receiving data/functions:
- mpi.bcast.Robj2slave(object)
-
This function broadcasts/pushes an object, or even a function, out to all the slaves. This is useful for sending slaves any initial data and functions which they require. Note: a matching receive is not needed for this in the slaves - it is handled automatically for you.
- mpi.send.Robj(object,destination,tag)
-
This function sends an object, which can be any R object like a number, string or a list, to a destination process's queue. A tag number can be supplied as extra information about the object. The destination is a number matching the rank of the process you're sending to. For example, mpi.send.Robj(object,0,tag) sends to the master process, whereas mpi.send.Robj(object,5,tag) sends to the slave process with the rank 5.
NOTE: This type of send must be processed by a call to mpi.recv.Robj in the receiving process.
- object <- mpi.recv.Robj(mpi.any.source(),mpi.any.tag())
-
This function takes a message from the calling process' queue and returns it. If there are no messages currently in the queue, the process will wait until there is a message to receive.
For now, always use mpi.any.source() and mpi.any.tag() as arguments
- info <- mpi.get.sourcetag()
-
If this is called after mpi.recv.Robj, it will return a two-integer array containing the source (sender) of the message and tag of the message respectively.
Remote Execution:
- mpi.bcast.cmd("R code")
-
This causes all slave processes to run the R code given as an argument. It does not wait for the slaves to complete; as soon as the slaves are told to run the code, the function returns, and you can go about other useful tasks while the slaves do their job.
- results <- mpi.remote.exec("R code")
-
This causes all slave processes to run the R code, and return the result. This function waits for all slaves to complete the code, and compiles the returned results as a list, with the indexes being the rank of the slave, and the value being the returned value.
Rmpi Program Structure:
Following is a description of common program structures used when creating parallel programs in R using Rmpi. This page gives the basic outline of the code, whereas the next page provides actual examples using actual code.
Basic Program Structure:
Following is the basic frame of any parallel application with R:
- Load Rmpi, and spawn slaves
- Create any necessary functions
- Create a function representing the code the slaves will run
- Initialize data
- Send to the slaves all data and functions they require
- Tell the slaves to execute their function
- Communicate with the slaves to perform the computation
- Operate on the results
- Close the slaves and quit
Most of the steps above are very straight-forward, particularly once an example is seen. However, due to MPI's flexibility, the step of communicating with the slaves to perform the computation can be relatively complex, with almost an infinite number of ways of solving any problem. In order to simplify this, below are some common methods by which this is achieved.
Brute Force:
The first method described we will call the Brute Force method. This involves a very straight-forward decomposition of the problem into equal shares, and having each slave perform its share and return the results.
The first step in achieving this is to divide the task into n equal parts, where n is no larger than the number of slaves which will be available. Then, code the function for the children to determine based on mpi.comm.rank() which sub-problem they are to solve, solve the sub-problem, and return the result. Returning the result is just like returning a result from a normal function with the return() command.
Then, in the master: spawn n slaves; give them their data, functions, and the function created as described above; use mpi.remote.exec to have all the slaves call their function; and operate on the results returned by mpi.remote.exec.
This method is the simplest to code, and is very easy to debug.
Example code for this method is located on the next page
Task Push:
The Brute Force method described above suffers from a few flaws which make it suitable for only simple tasks. Primarily, it is not always possible to determine exactly how many sub-problems there will be until the data has been loaded, and it is most often natural to have more tasks than slaves. To that end, the methodology we will call Task Push relieves that problem by disassociating the relationship between the number of slaves and the number of tasks. This is done at the expense of adding a small amount of additional programming.
The main changes here is that the function coded for the slaves will be a loop which processes tasks provided by the master as messages, and sends the results back to the master as messages.
A frame for the slave function is as follows:
slavefunction <- function() {
# Get a task
task <- mpi.recv.Robj(mpi.any.source(),mpi.any.tag())
task_info <- mpi.get.sourcetag()
tag <- task_info[2]
# While task is not a "done" message. Note the use of the tag to indicate
# the type of message
while (tag != 2) {
perform task, and create results to send to parent
# Send results to master
mpi.send.Robj(results,0,1)
# Get another task
task <- mpi.recv.Robj(mpi.any.source(),mpi.any.tag())
task_info <- mpi.get.sourcetag()
tag <- task_info[2]
}
# Send a message to master indicating the the slave has finished doing tasks.
# Note the use of the tag as an indicator to tell the type of message
junk <- 0
mpi.send.Robj(junk,0,2)
}
A frame for how the master communicates with the slave is as follows:
#-------------------------------- # Task assignment # Build list of tasks. Assume these are stored in 'tasks' tasks <- ...
# Create a round-robin list of which slave to give each task n_slaves <- mpi.comm.size()-1 task_assignees <- rep(1:n_slaves, length=length(tasks))
# Send tasks to slaves for (i in 1:length(tasks)) { mpi.send.Robj(tasks[[i]], task_assignees[i], 1) }
#------------------------------- # Collect the results for (i in 1:length(tasks)) { task_result <- mpi.recv.Robj(mpi.any.source(),mpi.any.tag()) # Incorporate task_result into some data structure or however # you're storing results }
# you can operate on and store/display results from any point on now # Note: you could keep creating and sending more tasks out and collecting # more results #------------------------------- # The shutdown "handshake" # Tell all slaves that there's no more tasks junk <- 0 for (i in 1:n_slaves) { mpi.send.Robj(junk,i,2) }
# Wait for slaves to finish before continuing. Wait until master # receives a done message from each slave (so, wait until receiving # n_slaves messages) for (i in 1:n_slaves) { mpi.recv.Robj(mpi.any.source(),2) }
#------------------------------- # Close down mpi.close.Rslaves() mpi.quit()
Task Pull:
The Task Push method exhibits some advantages over the Brute Force method, namely in that it allows the number of tasks and slaves to not match. However, it does suffer from a few flaws. First, if the number of tasks is MUCH larger than the number of slaves, the underlying MPI implementation could cause a deadlock condition where the master and slaves are all waiting, but unable to submit a new message into a queue. Second, it does not distribute load very intelligently - it assumes all slaves have equal processing rates. Very often, this is not true, and the Task Push methodology can result in waiting for the slowest slave to complete all its tasks.
The Task Pull remedies these two issues by having the slaves tell the master when they're ready for a new task. Only then is the task sent. It keeps a minimal number of messages in the queues at any given time, avoiding the potential for a deadlock condition, and causes load to be distributed fairly. A slave that only operates at half the speed of the other slaves will only receive about half the number of tasks.
There's only a little more programming work involved in the Task Pull method, but is also quite easily templated. The format for the slave function is as follows:
slavefunction <- function() {
# Note the use of the tag for sent messages:
# 1=ready_for_task, 2=done_task, 3=exiting
# Note the use of the tag for received messages:
# 1=task, 2=done_tasks
junk <- 0
done <- 0
while (done != 1) {
# Signal being ready to receive a new task
mpi.send.Robj(junk,0,1)
# Receive a task
task <- mpi.recv.Robj(mpi.any.source(),mpi.any.tag())
task_info <- mpi.get.sourcetag()
tag <- task_info[2]
if (tag == 1) {
# perform the task, and create results
# Send the results back as a task_done message
mpi.send.Robj(results,0,2)
}
else if (tag == 2) {
# Master is saying all tasks are done. Exit
done <- 1
}
# Else ignore the message or report an error
}
# Tell master that this slave is exiting. Send master an exiting message
mpi.send.Robj(junk,0,3)
}
So, in the code for the slave, the task the slave actually performs is buried a little bit more, but all the code around it doesn't really change much. Now, the code for the Master changes more signficantly, where it operates in a loop receiving and answering requests for tasks until all tasks are done. This is shown below:
# Create list of tasks and store in 'tasks' junk <- 0 closed_slaves <- 0 n_slaves <- mpi.comm.size()-1
while (closed_slaves < n_slaves) { # Receive a message from a slave message <- mpi.recv.Robj(mpi.any.source(),mpi.any.tag()) message_info <- mpi.get.sourcetag() slave_id <- message_info[1] tag <- message_info[2]
if (tag == 1) { # slave is ready for a task. Give it the next task, or tell it tasks # are done if there are none. if (length(tasks) > 0) { # Send a task, and then remove it from the task list mpi.send.Robj(tasks[[1]], slave_id, 1); tasks[[1]] <- NULL } else { mpi.send.Robj(junk, slave_id, 2) } } else if (tag == 2) { # The message contains results. Do something with the results. # You might store them in a data structure, or do some other processing # You might even create more tasks and add them to the end of the # tasks list if you like in order to have these sub-tasks done by the # slaves } else if (tag == 3) { # A slave has closed down. closed_slaves <- closed_slaves + 1 } }
# Operate on and/or display your results # close slaves and exit. mpi.close.Rslaves() mpi.quit()
Which to use?
Of the above, use the Task Pull methodology whenever possible. This takes care of distributing load evenly, avoids any queue overflows (keeps the message queues from hitting the end of their memory), and is not sensitive to the number of slaves.
Rmpi Examples:
Following is an example of a simple algorithm implemented in the three schemes described on the previous page. This is a simple 10-fold cross-validation example. It represents a common class of problems where a common set of instructions is run given slightly different data, and the results are collected and operated upon. These problems tend to be classified as "embarassingly parallel."
Original Code:
This is the original single-machine code. It creates a list of 1000 random samples with 30 predictor variables, with more predictive value being placed on the first 15. There's randomization inserted here to make sure the algorithm actually works. Then, 10-fold cross-validation is done over predicting linear models over the first i predictor variables, where i ranges from 1 to 30. The rss values resulting from the cross-validation are calculated and displayed graphically.
# first make some data
n <- 1000 # number of obs
p <- 30 # number of variables
x <- matrix(rnorm(n*p),n,p)
beta <- c(rnorm(p/2,0,5),rnorm(p/2,0,.25))
y <- x %*% beta + rnorm(n,0,20)
thedata <- data.frame(y=y,x=x)
summary(lm(y~x))
fold <- rep(1:10,length=n)
fold <- sample(fold)
rssresult <- matrix(0,p,10)
for (j in 1:10)
for (i in 1:p) {
templm <- lm(y~.,data=thedata[fold!=j,1:(i+1)])
yhat <- predict(templm,newdata=thedata[fold==j,1:(i+1)])
rssresult[i,j] <- sum((yhat-y[fold==j])^2)
}
# this plot shows cross-validated residual sum of squares versus the model
# number. As expected, the most important thing is including the first p/2
# of the predictors.
plot(apply(rssresult,1,mean))
Given the above code, the simplest way of parallelizing the code appears to be to parallelize across the 10-folds, where each fold is a separate task accomplished by the slaves. This demonstrates a common idiom when looking for how to parallelize code - look for your loops.
The Brute Force Method
R code for this solution can be downloaded here. It divides the problem into 10 sub-problems, each of which being a separate division of training and test data (a different fold). 10 slaves are spawned to handle this, and each are given a separate fold to perform. The slaves return their results to the master, and the master calculates and plots the results.
The Task Push Method
R code for this solution can be downloaded here. It divides the problem into tasks the same way as the Brute Force method, but it does not force 1 task per slave. It follows the method described previously.
Notice that messages being contructed are lists. In R, lists provide a convenient means of packing messages that may have multiple named data components, allowing you to create readable, flexible code for creating and using messages.
The Task Pull Method
R code for this solution can be downloaded here. It applies the problem into the Task Pull method describes previously.
Submitting Rmpi jobs:
Note: these instructions assume our system configuration at Acadia, although other sites using Sun Grid Engine will be similar. Even sites using PBS/OpenPBS will work similiarly.
We have Sun N1 Grid Engine configured to be able to run MPI jobs, including ones written in R. To submit this to run on the cluster, create a shell script using the following as a template:
#!/bin/sh
# Run using bash
#$ -S /bin/bash
#
# The name which shows up for this job in the grid engine reports
#$ -N task_pull.R
#
# The number of processors required - 1 for master, plus whatever
# number for slaves. If you want, this can be a range. For example,
# 10-16 would mean "I need 10, but give me as high as 16 if they're available"
# This example runs with a master, plus 6 slaves
#$ -pe mpi 7
#
# Run in the current directory
#$ -cwd
. /etc/profile
# Run the job. Replace with whatever R script should run
mpirun -np 1 R --slave CMD BATCH task_pull.R
Then, to submit the job, log in to carl (our head node), and run 'qsub script_filename'. This actually submits the job to be run by the cluster. You can watch it's current status using the command 'qstat'. When it's done (qstat stops showing it), you will see a .Rout file containing the scripts output, maybe an Rplots.ps file, and perhaps other files, depending on what your R program is intended to do.
