R offers a wide variety of packages dedicated to parallelisation. After doing several tries with some of them, I found snow the best one (for me) to perform classical parallellizable operations. Among the functions that this package provides, I found the parApply family the most useful ones and the easiest to work with to take profit of parallelisation.
In order to use these functions, it is necessary to have firstly a solid knoweldge of the applylike functions in traditional R, i.e., lapply, sapply, vapply and apply. What are they exactly? In essence, they apply a function over an array of objects.
The example below would return a numeric vector consistent of 3,4,5…10 for variable “result”
my.vec < 1:10 result < vapply(my.vec,function(x) x+2,FUN.VALUE=0)

This code returns exactly the same output as the one below
my.vec < 1:10 result < numeric(length(my.vec)) for(i in 1:length(my.vec)){ result[i] < my.vec[i]+2}

Of course, in this example no significant speed differences can be found between the 2 pieces of code shown. However, even in onenode executions, the first alternative is considerably faster, which can be appreciated when working with larger amount of data. For a vector of length 1000000, the difference is 0.94 vs 3.04 secs
In the internet, plenty of posts and tutorials about the use of lapply,sapply,vapply and apply can be found, for example here. However, it is not redundant to explain again what each function does:
a)Lapply
Applies a function over a vector and returns a vector of lists. If lapply were used in the previous example and would want to obtain the first element, a “listlike” indexing should be used
my.vec < 1:10 result < lapply(my.vec,function(x) x+2) #get the third element of result result[3][[1]]

b)Sapply/Vapply
It is very similar to lapply, but, instead of a vector of lists, it returns a vector of some type (numeric, character, Date, etc). Sapply will “deduce” the class of the output elements. In vapply, the output type has to be specified. Although it is simpler to use sapply, as there is no need to specify output type, vapply is faster (0.94 secs vs 4.04) and enables the user to control output type.
my.vec < 1:1000000 system.time(result < vapply(my.vec,function(x) x+2,FUN.VALUE=0)) system.time(result < sapply(my.vec,function(x) x+2))

The FUN.VALUE argument is where the output type of vapply is specified, which is done by passing a “general form” to which the output should fit. If the output returned by the function does not match with the specified return type, R will throw an error
#These 2 alternatives are equivalent vapply(letters[1:20], function(x) paste0(x,"_concat"),FUN.VALUE="") vapply(letters[1:20], function(x) paste0(x,"_concat"),FUN.VALUE="aaa") #This throws an error vapply(letters[1:20], function(x) paste0(x,"_concat"),FUN.VALUE=0)

c)Apply
apply() is used to apply a function over a matrix row or columnwise, which is specified in its MARGIN argument with 1 for row and 2 for columns.
#Some random dataframe ex.df < data.frame(a=seq(1,100,1),b=seq(10,1000,10),c=runif(100)) #Apply rowwise: The first element of each row plus the second, multiplied by the third aa < apply(ex.df,1, function(x) (x[1]+x[2])*x[3]) #Apply columnwise. Extract the mean of each column bb < apply(ex.df,2, mean)

Tip: You may have noticed that you can write applylike functions with a function(…) argument or without it. Usually you use “function(…) + a function” when the attributes of the object you are passing to the function have to do different things (like in the rowwise apply example). However, if it is not the case, you can just pass the name of the function, like in the apply columnwise example. Other functions you could use similarly are median, max, min, quantile, among others.
Parallelisation
If the use of apply functions is clear, then parallelisation is just one small step beyond with snow. The functions equivalents are
Base R  snow 

lapply  parLapply 
sapply  parSapply 
vapply  – 
apply(rowwise)  parRapply, parApply(,1) 
apply(columnwise)  parCapply, parApply(,2) 
The only thing you really have to keep in mind is that when parallelising you have to explicitly export/declare everything you need to perform the parallel operation to each thread. This is done mainly with the clusterExport() and clusterEvalQ() functions. This is very important to keep in mind because you might be able to run something on a onenode traditional R code and then get errors with the same execution in parallel due to the fact that these things are missing.
From apply() rowwise to parRapply()
Below you will find a small example, very similar to the one done above with apply rowwise, illustrating the above mentioned small changes/additions needed in order to run your code parallely
#Return if the result of (x[1]+x[2])*x[3] is greater than 20 or not # The same df as before ex.df < data.frame(a=seq(1,100,1),b=seq(10,1000,10),c=runif(100)) # Define the threshold ths < 20 # These 2 statements in Base R are equivalent aa < apply(ex.df,1, function(x) (x[1]+x[2])*x[3] > 20) aa < apply(ex.df,1, function(x) (x[1]+x[2])*x[3] > ths) ### Equivalent parallel execution ### # Declare the cluster object. Here we use the default settings (SOCK) # and the number of nodes is specified by the number given clus < makeCluster(3) # The equivalent for the first alternative would be very easy aa < parRapply(clus,ex.df, function(x) (x[1]+x[2])*x[3] > 20) #However, if the variable "ths" needs to be used, a line has to be added clusterExport(clus,"ths") aa < parRapply(clus,ex.df, function(x) (x[1]+x[2])*x[3] > ths)

The clusterExport() function exports an object to each node, enabling them to work parallely. The use of it, as it can be appreciated, is extremely simple: you need to pass the variable name/s in a character vector (or a single string, as in this case)
To conclude, imagine you would need to apply a custom function to each row of your data frame. Following the same example, in Base R it would be
#Declare the function custom.function < function(a,b,c){ result < (a+b)*c return(result)} ex.df < data.frame(a=seq(1,100,1),b=seq(10,1000,10),c=runif(100)) #Apply the declared function aa < apply(ex.df,1, function(x) custom.function(x[1],x[2],x[3]))

To perform the same action parallely with snow, you can declare the function inside a clusetEvalQ() statement or declare it in the baseworkspace and then export it. If you use clusterEvalQ() you will not see the function in your workspace
#Create cluster clus < makeCluster(3) #Option 1. Declare the function for each node clusterEvalQ(clus, custom.function < function(a,b,c){ result < (a+b)*c return(result)}) #Option 2. Export it form base workspace custom.function < function(a,b,c){ result < (a+b)*c return(result)} clusterExport(clus,"custom.function") ex.df < data.frame(a=seq(1,100,1),b=seq(10,1000,10),c=runif(100)) #Apply the declared function aa < parRapply(clus,ex.df, function(x) custom.function(x[1],x[2],x[3]))

Performance differences
Of course, the main goal of parallelisation is to reduce execution times. However, it does not make much sense to work over a small set (a small list, data frame, etc) as parallelisation requires time for distributing the task among the nodes and collecting the results from them. This opertaion consumes some time of course, which is not significant when working with large volume of data but in cases of small datasets, the overall execution time required to fulfill a task parallely might be greater than doing it in just one core.
Although the execution time reduces considerably, it should not be assumed that it will decrease proportionally to the nodes introduced.,i.e., if the execution time in one node is 30 seconds it will not decrease to 10 seconds with 3 nodes. Probably, it will be higher.
These are the execution times for 1 and 3 nodes for the example above with a data frame of 1M and 3M rows.
# of Rows  1 Node  3 Nodes 

1M  18.81 secs  10.90 secs 
3M  65.67 secs  40.26 secs 
Feel free to write any questions,suggestions, comments, etc.! I hope you liked it and find it useful
ehm … how ’bout my.vec + 2
Yes… that could be done.. of course. But the aim of this post is to understand parallelisation
Sure, but you don’t think advising people to use the apply functions shouldn’t come with a warning that they’re much (in this case 100x on my laptop) slower than the vectorised equivalent? One might argue that snow is only required because of the inefficient use of R. And with a little matrix algebra many problems can be addressed vectorised, not just simple arithmetics.
Nice post, thanks for doing it. I learned a couple of useful things about how to set up parallel code in R. It is true that for many simple looping tasks, vectorization is handsdown the way to go. However, assuming one has already optimized a function with proper vectorization, then the next step would be to look at ways to leverage those idle processor cores more efficiently.
Perhaps this is a suggestion for Part 2 of this post? What type of functions are most appropriate for parallel coding after extensive use of vectorization has been applied?
Nice, thanks. Indeed for small datasets (10^3 rows) it is much slower but with 10^6 and 3 nodes it is about twice as fast than standard apply. In base R If you vectorise and just do the operations on the vectors themselves and not loop in the dataframe with apply then it is thousands of times faster. So the old maxim for R applies, vectorise when you can and then use machine tricks like parallisation. I am likely to use this in the future.
Thanks! I am glad you liked it
It is exactly as you say, it is always better to vectorise. Of course the examples provided in the post perform much better through vectorisation. However, the main goal was to clarify how parallelisation works and for that purpose I think that simple examples are better.
Pingback: Cuatro enlaces sobre R: Excel, C++, CSV y paralelizaciÃ³n – datanalytics
Thanks. I am using apply function to implement my simulations and found that they are very time consuming. After trying snow package, I will comment its performance.
I have a data.frame: list of emails called data
# Compare Strings Difference
clusterEvalQ(clus, compare.strings < function(j,i) {
library(stringdist)
value = as.numeric(stringdist(data[j,],data[i,],method='lcs', nthread = 6))
pair < rbind(data[j,], data[i,],value)
return(pair)
})
clus < makeCluster(2)
zz = data.frame(parRapply(clus, expand.grid(i,j),function(x,y) compare.strings(x[1],x[2])))
Results: Error in checkForRemoteErrors(val) :
2 nodes produced errors; first error: could not find function "x"
What Am I doing wrong?
zz = data.frame(parRapply(clus, expand.grid(i,j),function(x,y) compare.strings(x[1],x[2])))
got:
Error in checkForRemoteErrors(val) :
2 nodes produced errors; first error: object of type ‘closure’ is not subsettable
Firstly, I don’t know what expand.grid() us producing there because there is no reference to what i and j are. If that expand.grid() does not produce any result the error might rely on that
Sorry… the problem relies on the fact that your function uses data object but you are not exporting it to the clusters.
Try adding clusterExport(clus, “data”)