# Distributed Computing: `Distributed` standard library

**What**
* **single Julia process -> multiple Julia processes** that coordinate to perform certain computations

**Why**
* Scaling things up: run computations on multiple CPU cores, potentially even on different machines, e.g. nodes of a supercomputer or a local cluster of desktop machines.
* Effectively increase your memory: process a large dataset, which wouldn't fit into local memory, in parallel across multiple machines with separate dedicated RAM.

**Julia provides two fundamental implementations and paradigms**
* Julia's built-in [`Distributed` standard library](https://docs.julialang.org/en/v1/stdlib/Distributed/)
  * controller-worker model
* [Message Passing Interface (MPI)](https://www.mpi-forum.org/) through [MPI.jl](https://github.com/JuliaParallel/MPI.jl)
  * Single Program Multiple Data (SPMD)
  
The focus of this notebook is on the **`Distributed` standard library.**

## `Distributed` Standard Library

Julia's `Distributed` follows a controller-worker paradigm for its native distributed parallelism: **One controller process coordinates all the worker processes, which perform the actual computations.**

In [None]:
using Distributed

In [None]:
nprocs()

In [None]:
nworkers() # the controller is considered a worker as long as there are no real workers

To increase the number of workers, i.e. Julia processes, from within a Julia session we can dynamically call **`addprocs`**.

Alternatively, when starting Julia from the command line, one can use the `-p` option up front. Example,

```
julia -p 4
```

will start Julia with 5 processes, 1 controller and 4 workers.

In [None]:
addprocs(4)

Every process has a Julia internal `pid` (process id). The controller is always 1. You can get the workers pids from `workers()`.

In [None]:
workers()

Note that the 4 worker's pids aren't necessarily 2, 3, 4 and 5 and one shouldn't rely on those literal values. Let's remove the processes and add them once more.

In [None]:
rmprocs(workers())

In [None]:
nworkers() # only the controller is left

In [None]:
addprocs(4)

In [None]:
workers()

### One Controller to Rule Them All - `@spawn`, `@spawnat`, `@fetch`, `@fetchfrom`, `@everywhere`...

To execute commands and start computations on workers we can use the following macros

* `@spawn`: run a command or a code block on any worker and return a `Future` to it's result. It's basically a version of `@async` for remote processes.
* `@spawnat`: same as `@spawn` but one can choose a specific worker by providing its pid.

**Example:** Let's say we would like to generate a random matrix on one of the workers.

In [None]:
@spawn 3 + 3 # similar to @async

In [None]:
result = @spawn 3 + 3

In [None]:
fetch(result)

Because the combination of spawning at fetching is so common, there is `@fetch` which combines them.

In [None]:
@fetch 3 + 3

In [None]:
@fetch rand(3, 3)

Which worker did the work?

In [None]:
@fetch begin
    println("Hello, I am $(myid())")
    3 + 3
end

Using `@spawnat` and `@fetchfrom` we can delegate the work to a specific worker.

In [None]:
@fetchfrom 7 begin
    println(myid())
    3 + 3
end

We can use `@sync` as a blocker to wait for all workers to complete their tasks.

In [None]:
@sync begin
    pids = workers()
    @spawn (sleep(2); println("Today is reverse day!"))
    @spawn (sleep(1); println(" class!"))
    @spawn println("Hello")
    println(pids)
end;
println("Done!")

Ok, now that we understood all that, let's delegate a *complicated* calculation

In [None]:
using Random

function complicated_calculation()
    sleep(1) # so complex that it takes a long time :)
    randexp(5)
end

@fetch complicated_calculation()

What happened?

**Every worker is a separate Julia process.** (Think of having multiple Julia REPLs open at once.)

We only defined `complicated_calculation()` on the controller process. The function doesn't exist on any of the workers yet.

The macro `@everywhere` allows us to perform steps on all processes (controller and worker). This is particularly useful for loading packages and functions definitions etc.

In [None]:
@everywhere begin # execute this block on all workers
    using Random

    function complicated_calculation()
        sleep(1)
        randexp(5) # lives in Random
    end
end

In [None]:
@fetch complicated_calculation()

### Data movement

There is a crucial difference between the following two pieces of code. Can you guess what it is? (without reading on ðŸ˜‰)

In [None]:
function method1()
    A = rand(100, 100)
    B = rand(100, 100)
    C = @fetch A^2 * B^2
end

In [None]:
function method2()
    C = @fetch rand(100, 100)^2 * rand(100, 100)^2
end

Let's benchmark them.

In [None]:
using BenchmarkTools
@btime method1();
@btime method2();

Method 1 is slower, because `A` and `B` are created on the controller process, transferred to a worker, and squared and multiplied on the worker process before the result is finally transferred back to the controller.

Method 2, on the other hand, creates, squares, and multiplies the random matrix all on the work process and only submits the result to the controller.

Hence, `method1` is **transferring 3x as much data** between the controller and the worker!

**Efficient data movement is crucial for efficient parallel computing!**

In this toy example, it's rather easy to identify the faster method.

In a real program, however, understanding data movement does require more thought and likely some measurement.

For example, if the first process needs matrix `A` in a follow-up computation then the first method might be better in this case. Or, if computing `A` is expensive and only the current process has it, then moving it to another process might be unavoidable.

#### Computer latency at a human scale

To understand why thinking about data is important it's instructive to look at the time scales involved in data access.

<img src="../../static/latency_human_scales.png" width=900px>

(taken from https://www.prowesscorp.com/computer-latency-at-a-human-scale/)

### Avoid Globals (once more)

In [None]:
myglobal = 4

In [None]:
function whohas(s::String)
    @everywhere begin
        var = Symbol($s)
        if isdefined(Main, var)
            println("$var exists.")
        else
            println("Doesn't exist.")
        end
    end
    nothing
end

In [None]:
whohas("myglobal")

In [None]:
@fetchfrom 6 myglobal + 2

In [None]:
whohas("myglobal")

Globals get copied to workers and continue to exist as globals even after the call.

This could lead to **memory accumulation** if many globals are used (just as it would in a single Julia session).

It's better to avoid them.

### Explicit data movement: `Channel` and `RemoteChannel`

Implement communication between tasks. Functions: `put!`, `take!`, `fetch`, `isready` and `wait`.

In [None]:
ch = Channel{Int}(5) # a channel that can hold up to 5 integers

In [None]:
isready(ch) # something in the channel?

In [None]:
put!(ch, 3)

In [None]:
isready(ch)

In [None]:
3 + 3

In [None]:
fetch(ch)

In [None]:
take!(ch)

In [None]:
isready(ch)

In [None]:
put!(ch, 4)

In [None]:
fetch(ch)

In [None]:
take!(ch)

**Be careful**, `take!` and `put!` are blocking if the channel is empty or full!

In [None]:
isready(ch)

In [None]:
# take!(ch) if we execute this, while isready(ch) == false, the current Julia session will hang.

#### `RemoteChannel`

- A `Channel` is local to a process. Worker 2 cannot directly refer to a `Channel` on worker 3 and vice-versa.
- A `RemoteChannel`, however, can put and take values across workers. A `RemoteChannel` can be thought of as a handle to a `Channel`.
- Any process with a reference to a `RemoteChannel` can put and take items from the channel. Data is automatically sent to (or retrieved from) the process a `RemoteChannel` is associated with.
- The process id, pid, associated with a `RemoteChannel` identifies the process where the backing store, i.e., the backing Channel exists.

In [None]:
nworkers()

In [None]:
addprocs(4)

In [None]:
function do_something()
    rc = RemoteChannel(() -> Channel{Int}(10)) # lives on the controller
    @sync for p in workers()
        @spawnat p put!(rc, myid())
    end
    rc
end

r = do_something()

In [None]:
isready(r)

In [None]:
while isready(r)
    @show take!(r)
end

The ecosystem also contains a couple of tools, that make data transfer even simpler. See for example [ParallelDataTransfer.jl](https://github.com/ChrisRackauckas/ParallelDataTransfer.jl/).

### High-level tools: `@distributed` and `pmap`

So far we have seen some of the fundamental building blocks for distributed computing in Julia. However, in practice, one wants to think as little as possible about how to distribute the work and explicitly spawn tasks.

Fortunately, many useful parallel computations do not require (much) data movement at all. A common example is a direct Monte Carlo simulation, where multiple processes can handle independent simulation trials simultaneously. (We'll get to that later in the exercises!)

Julia provides **high-level convenience** tools to
 * parallelize loops ([**`@distributed`**](https://docs.julialang.org/en/v1/stdlib/Distributed/#Distributed.@distributed)) and
 * apply a function to all elements of a collection ([**`pmap`**](https://docs.julialang.org/en/v1/stdlib/Distributed/#Distributed.pmap))


#### Distributed loops (`@distributed`)

In [None]:
using Distributed, BenchmarkTools;
rmprocs(workers());
addprocs(4);
nworkers();

#### Example: Reduction

Task: Counting heads in a series of coin tosses.

In [None]:
function count_heads_loop(n)
    c = 0
    for i = 1:n
        c += rand(Bool)
    end
    return c
end

N = 200_000_000
@btime count_heads_loop($N);

Note that these kinds of computations are called **reductions** (with `+` being the **reducer function**).

In [None]:
count_heads_reduce(n) = mapreduce(i -> rand(Bool), +, 1:n)
@btime count_heads_reduce($N)

In [None]:
function count_heads_distributed_loop(n)
    c = @distributed (+) for i in 1:n
        Int(rand(Bool))
    end
    return c
end

In [None]:
@btime count_heads_distributed_loop($N);

The distributed version is about **4x faster**, which is all we could hope for.

With `@distributed` the work is **evenly distributed** between the workers.

In [None]:
function count_heads_distributed_verbose(n)
    c = @distributed (+) for i in 1:n
        x = Int(rand(Bool))
        println(x)
        x
    end
    c
end

count_heads_distributed_verbose(8);

However, by using `@distributed` we let Julia decide how to split up the work and can't control it ourselves.

#### A common mistake

In [None]:
function g(n)
    a = 0
    @distributed (+) for i in 1:n
        a += 1
    end
    a
end

a = g(10);

What do you expect the value of `a` to be?

In [None]:
a

#### Example: `SharedArray`s

Apart from `@distributed (reducer) ...` there also is a `@distributed for ...` form. The latter is **non-blocking** and returns a `Task`. (You can think of it as a distributed version of `@spawn` for all the iterations.)

However, since the loop body will be executed on different processes, one must be careful to operate on data structures that are available on all processes (similar to the mistake highlighted above).

In [None]:
function square_broken()
    A = collect(1:10)
    @sync @distributed for i in eachindex(A)
        A[i] = A[i]^2
    end
    return A
end

In [None]:
square_broken()

To actually make all processes operate on the same array, one can use a `SharedArray`. For this to work, the **processes need to live on the same host**.

In [None]:
@everywhere using SharedArrays # must be loaded everywhere

In [None]:
A = rand(3, 2)

In [None]:
S = SharedArray(A)

In [None]:
function square!(X)
    for i in eachindex(X)
        sleep(0.001) # mimicing some computational cost
        X[i] = X[i]^2
    end
end

function square_distributed!(X)
    @sync @distributed for i in eachindex(X)
        sleep(0.001) # mimicing some computational cost
        X[i] = X[i]^2
    end
end

A = rand(10, 10)
S = SharedArray(A)

@btime square!(A);
@btime square_distributed!($S);

### Parallel map: `pmap`

The `square!` functions above are typical `map` operations where a function `f` is applied to all elements of a collection.

In [None]:
map(x -> x^2, 1:10)

Such a pattern can be parallelized in Julia via the high-level function `pmap` ("parallel map").

#### Example: Singular values of multiple matrices

In [None]:
using Distributed, BenchmarkTools;
rmprocs(workers());
addprocs(4);
nworkers();

In [None]:
@everywhere using LinearAlgebra

M = Matrix{Float64}[rand(200, 200) for i = 1:10];

In [None]:
svdvals(rand(2, 2))

In [None]:
map(svdvals, M)

In [None]:
pmap(svdvals, M)

Let's check that this indeed utilized multiple workers.

In [None]:
pmap(M) do m
    println(myid())
    svdvals(m)
end;

In [None]:
@btime map($svdvals, $M);
@btime pmap($svdvals, $M);

### When to choose which? (`@distributed` vs `pmap`)

Julia's `pmap` is designed for the case where

* one wants to apply **a function to a collection**,
* each function call does a **larger amount of work**, and/or
* the **workload is non-uniform** (load-balancing).

On the other hand, `@distributed` is good for

* **reductions**, like sums, where
* **each iteration may be tiny**, i.e. perhaps only summing two numbers, and/or
* each iteration **takes about the same time** (uniform)

## High-level array abstractions: [DistributedArrays.jl](https://github.com/JuliaParallel/DistributedArrays.jl)

In a `DArray`, each process has local access to just a chunk of the data, and no two processes share the same chunk. Processes can be on different hosts.

Distributed arrays are for example useful if

* Expensive calculations should be performed in parallel on parts of the array on different hosts.
* The data doesn't fit into the local machines memory (i.e. loading big files in parallel).

In [None]:
using Distributed, BenchmarkTools;
rmprocs(workers());

In [None]:
# make sure that all workers use the same Julia environment
addprocs(4; exeflags="--project")

In [None]:
# check
@everywhere @show Base.active_project()

In [None]:
@everywhere using DistributedArrays, LinearAlgebra

In [None]:
M = Matrix{Float64}[rand(200, 200) for i = 1:10];

In [None]:
D = distribute(M)

Which workers hold parts of D?

In [None]:
procs(D)

In [None]:
workers()

Which parts do they hold?

In [None]:
localpart(D) # the controller doesn't hold anything

In [None]:
# Which parts do they hold?
for p in workers()
    println(@fetchfrom p DistributedArrays.localindices(D))
end

In [None]:
@btime map($svdvals, $M);
@btime map($svdvals, $D);

In [None]:
@btime pmap($svdvals, $M);

## *Actual* distributed computing: Comments

### Creating workers on other machines

So far we have worked with multiple process on the same system, because we simply used `addprocs(::Integer)`. To put worker processes on other machines, e.g. nodes of a cluster, we need to modify the initial `addprocs` call appropriately.

In Julia, starting worker processes is handled by [ClusterManagers](https://docs.julialang.org/en/v1/manual/distributed-computing/#ClusterManagers).

* The default one is `LocalManager`. It is automatically used when running `addprocs(i::Integer)` and we have implicitly used it already.
* Another important one is `SSHManager`. It is automatically used when running `addprocs(hostnames::Array)`, e.g. `addprocs(["node123", "node456"])`. The only requirement is a **passwordless ssh access** to all specified hosts.
* Cluster managers for SLURM, PBS, and others are provided in [ClusterManagers.jl](https://github.com/JuliaParallel/ClusterManagers.jl). For SLURM, this will make `addprocs` use `srun` under the hood.

*Demonstrate in terminal from thp node*

```julia
using Distributed

addprocs(["l93", "l94"])

@everywhere println(gethostname())
```

One can also start multiple processes on different machines:
```julia
addprocs([("node123", 2), ("node456", 3)]) # starts 2 workers on node123 and 3 workers on node456

# Use :auto to start as many processes as CPU threads are available
```

**Be aware of different paths:**
* By default, `addprocs` expects to find the julia executable on the remote machines under the same path as on the host (controller).
* It will also try to `cd` to the same folder (set the working directory).


As you can see from `?addprocs`, `addprocs` takes a bunch of keyword arguments, two of which are of particular importance in this regard:

* `dir`: working directory for the worker processes
* `exename`: path to julia executable (potentially augmented with pre-commands) for the worker processes

In [None]:
# cleanup
rmprocs(workers())