Flexible parallel processing using the R future and Python Dask packages
1. This Tutorial
This tutorial covers the use of R’s future and Python’s Dask packages, well-established tools for parallelizing computions on a single machine or across multiple machines. There is also a bit of material on Python’s Ray package, which was developed more recently (but has been around for a while).
You should be able to replicate much of what is covered here provided you have Rand Python on your computer, but some of the parallelization approaches may not work on Windows.
This tutorial assumes you have a working knowledge of either R or Python, but not necessarily knowledge of parallelization in R or Python.
2. Some useful terminology
- cores: We’ll use this term to mean the different processing units available on a single node.
- nodes: We’ll use this term to mean the different computers, each with their own distinct memory, that make up a cluster or supercomputer.
- processes: instances of a program executing on a machine; multiple processes may be executing at once. A given executable (e.g., Python or R) may start up multiple processes at once. Ideally we have no more user processes than cores on a node.
- threads: multiple paths of execution within a single process; the OS sees the threads as a single process, but one can think of them as ‘lightweight’ processes. Ideally when considering the processes and their threads, we would have the number of total threads across all processes not exceed the number of cores on a node.
- forking: child processes are spawned that are identical to the parent, but with different process IDs and their own memory.
- sockets: some of R’s parallel functionality involves creating new R processes (e.g., starting processes via Rscript) and communicating with them via a communication technology called sockets.
- tasks: This term gets used in various ways (including in place of ‘processes’), but we’ll use it to refer to the individual computational items you want to complete - e.g., one task per cross-validation fold or one task per simulation replicate/iteration.
3. Types of parallel processing
There are two basic flavors of parallel processing (leaving aside GPUs): distributed memory and shared memory. With shared memory, multiple processors (which I’ll call cores) share the same memory. With distributed memory, you have multiple nodes, each with their own memory. You can think of each node as a separate computer connected by a fast network.
3.2. Distributed memory
Parallel programming for distributed memory parallelism requires passing messages between the different nodes.
The standard protocol for passing messages is MPI, of which there are various versions, including openMPI.
Tools such as Dask, Ray and future all manage the work of moving information between nodes for you (and don’t generally use MPI).
3.3. Other type of parallel processing
We won’t cover either of these in this material.
GPUs
GPUs (Graphics Processing Units) are processing units originally designed for rendering graphics on a computer quickly. This is done by having a large number of simple processing units for massively parallel calculation. The idea of general purpose GPU (GPGPU) computing is to exploit this capability for general computation.
Most researchers don’t program for a GPU directly but rather use software (often machine learning software such as PyTorch, JAX, or Tensorflow) that has been programmed to take advantage of a GPU if one is available.
Spark and Hadoop
Spark and Hadoop are systems for implementing computations in a distributed memory environment, using the MapReduce approach.
Note that Dask provides a lot of the same functionality as Spark, allowing one to create distributed datasets where pieces of the dataset live on different machines but can be treated as a single dataset from the perspective of the user.
4. Package comparison
See this outline for an overview comparison of future ( R ), Dask (Python) and Ray (Python)
- Use cases
- future
- parallelizing tasks
- one or more machines (nodes)
- Dask
- parallelizing tasks
- distributed datasets
- one or more machines (nodes)
- Ray
- parallelizing tasks
- building distributed applications (interacting tasks)
- one or more machines (nodes)
- future
- Task allocation
- future
- static by default
- dynamic optional
- Dask
- dynamic
- Ray
- dynamic (I think)
- future
- Shared memory
- future
- shared across workers only if using forked processes (
multicore
plan on Linux/MacOS)- if data modified, copies need to be made
- data shared across static tasks assigned to a worker
- shared across workers only if using forked processes (
- Dask
- shared across workers only if using
threads
scheduler - data shared across tasks on a worker if data are delayed
- shared across workers only if using
- Ray
- shared across all workers on a node via the object store (nice!)
- future
- Copies made
- future
- generally copy one per worker, but one per task with dynamic allocation
- Dask
- generally one copy per worker if done correctly (data should be delayed if using distributed scheduler)
- Ray
- one copy per node if done correctly (use
ray.put
to use object store)
- one copy per node if done correctly (use
- future