Distributed Computing: Distributed standard library#

What

  • single Julia process -> multiple Julia processes that coordinate to perform certain computations

Why

  • Scaling things up: run computations on multiple CPU cores, potentially even on different machines, e.g. nodes of a supercomputer or a local cluster of desktop machines.

  • Effectively increase your memory: process a large dataset, which wouldn’t fit into local memory, in parallel across multiple machines with separate dedicated RAM.

Julia provides two fundamental implementations and paradigms

The focus of this notebook is on the Distributed standard library.

Distributed Standard Library#

Julia’s Distributed follows a controller-worker paradigm for its native distributed parallelism: One controller process coordinates all the worker processes, which perform the actual computations.

using Distributed
nprocs()
1
nworkers() # the controller is considered a worker as long as there are no real workers
1

To increase the number of workers, i.e. Julia processes, from within a Julia session we can dynamically call addprocs.

Alternatively, when starting Julia from the command line, one can use the -p option up front. Example,

julia -p 4

will start Julia with 5 processes, 1 controller and 4 workers.

addprocs(4)
4-element Vector{Int64}:
 2
 3
 4
 5

Every process has a Julia internal pid (process id). The controller is always 1. You can get the workers pids from workers().

workers()
4-element Vector{Int64}:
 2
 3
 4
 5

Note that the 4 worker’s pids aren’t necessarily 2, 3, 4 and 5 and one shouldn’t rely on those literal values. Let’s remove the processes and add them once more.

rmprocs(workers())
Task (done) @0x00007efb81639cd0
nworkers() # only the controller is left
1
addprocs(4)
4-element Vector{Int64}:
 6
 7
 8
 9
workers()
4-element Vector{Int64}:
 6
 7
 8
 9

One Controller to Rule Them All - @spawn, @spawnat, @fetch, @fetchfrom, @everywhere#

To execute commands and start computations on workers we can use the following macros

  • @spawn: run a command or a code block on any worker and return a Future to it’s result. It’s basically a version of @async for remote processes.

  • @spawnat: same as @spawn but one can choose a specific worker by providing its pid.

Example: Let’s say we would like to generate a random matrix on one of the workers.

@spawn 3 + 3 # similar to @async
Future(6, 1, 10, ReentrantLock(nothing, 0x00000000, 0x00, Base.GenericCondition{Base.Threads.SpinLock}(Base.InvasiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), (0, 139625743647040, 139625685568400)), nothing)
result = @spawn 3 + 3
Future(7, 1, 11, ReentrantLock(nothing, 0x00000000, 0x00, Base.GenericCondition{Base.Threads.SpinLock}(Base.InvasiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), (3, 139625092352000, 0)), nothing)
fetch(result)
6

Because the combination of spawning at fetching is so common, there is @fetch which combines them.

@fetch 3 + 3
6
@fetch rand(3, 3)
3×3 Matrix{Float64}:
 0.64994   0.973973  0.534955
 0.6816    0.977513  0.79925
 0.253518  0.28954   0.998655

Which worker did the work?

@fetch begin
    println("Hello, I am $(myid())")
    3 + 3
end
      From worker 6:	Hello, I am 6
6

Using @spawnat and @fetchfrom we can delegate the work to a specific worker.

@fetchfrom 7 begin
    println(myid())
    3 + 3
end
      From worker 7:	7
6

We can use @sync as a blocker to wait for all workers to complete their tasks.

@sync begin
    pids = workers()
    @spawn (sleep(2); println("Today is reverse day!"))
    @spawn (sleep(1); println(" class!"))
    @spawn println("Hello")
    println(pids)
end;
println("Done!")
[6, 7, 8, 9]
      From worker 9:	Hello
      From worker 8:	 class!
      From worker 7:	Today is reverse day!
Done!

Ok, now that we understood all that, let’s delegate a complicated calculation

using Random

function complicated_calculation()
    sleep(1) # so complex that it takes a long time :)
    randexp(5)
end

@fetch complicated_calculation()
On worker 6:
UndefVarError: #complicated_calculation not defined
Stacktrace:
  [1] deserialize_datatype
    @ /usr/local/julia/share/julia/stdlib/v1.8/Serialization/src/Serialization.jl:1364
  [2] handle_deserialize
    @ /usr/local/julia/share/julia/stdlib/v1.8/Serialization/src/Serialization.jl:866
  [3] deserialize
    @ /usr/local/julia/share/julia/stdlib/v1.8/Serialization/src/Serialization.jl:813
  [4] handle_deserialize
    @ /usr/local/julia/share/julia/stdlib/v1.8/Serialization/src/Serialization.jl:873
  [5] deserialize
    @ /usr/local/julia/share/julia/stdlib/v1.8/Serialization/src/Serialization.jl:813 [inlined]
  [6] deserialize_global_from_main
    @ /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/clusterserialize.jl:160
  [7] #5
    @ /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/clusterserialize.jl:72 [inlined]
  [8] foreach
    @ ./abstractarray.jl:2774
  [9] deserialize
    @ /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/clusterserialize.jl:72
 [10] handle_deserialize
    @ /usr/local/julia/share/julia/stdlib/v1.8/Serialization/src/Serialization.jl:959
 [11] deserialize
    @ /usr/local/julia/share/julia/stdlib/v1.8/Serialization/src/Serialization.jl:813
 [12] handle_deserialize
    @ /usr/local/julia/share/julia/stdlib/v1.8/Serialization/src/Serialization.jl:870
 [13] deserialize
    @ /usr/local/julia/share/julia/stdlib/v1.8/Serialization/src/Serialization.jl:813
 [14] handle_deserialize
    @ /usr/local/julia/share/julia/stdlib/v1.8/Serialization/src/Serialization.jl:873
 [15] deserialize
    @ /usr/local/julia/share/julia/stdlib/v1.8/Serialization/src/Serialization.jl:813 [inlined]
 [16] deserialize_msg
    @ /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/messages.jl:87
 [17] #invokelatest#2
    @ ./essentials.jl:729 [inlined]
 [18] invokelatest
    @ ./essentials.jl:726 [inlined]
 [19] message_handler_loop
    @ /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/process_messages.jl:176
 [20] process_tcp_streams
    @ /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/process_messages.jl:133
 [21] #103
    @ ./task.jl:484

Stacktrace:
 [1] remotecall_fetch(::Function, ::Distributed.Worker; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
   @ Distributed /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/remotecall.jl:465
 [2] remotecall_fetch(::Function, ::Distributed.Worker)
   @ Distributed /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/remotecall.jl:454
 [3] remotecall_fetch(::Function, ::Int64; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
   @ Distributed /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/remotecall.jl:492
 [4] remotecall_fetch(::Function, ::Int64)
   @ Distributed /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/remotecall.jl:492
 [5] top-level scope
   @ In[18]:8

What happened?

Every worker is a separate Julia process. (Think of having multiple Julia REPLs open at once.)

We only defined complicated_calculation() on the controller process. The function doesn’t exist on any of the workers yet.

The macro @everywhere allows us to perform steps on all processes (controller and worker). This is particularly useful for loading packages and functions definitions etc.

@everywhere begin # execute this block on all workers
    using Random

    function complicated_calculation()
        sleep(1)
        randexp(5) # lives in Random
    end
end
@fetch complicated_calculation()
5-element Vector{Float64}:
 3.4455167530151436
 3.193485970251446
 0.6088365240800014
 0.596821469362344
 0.5352406734201677

Data movement#

There is a crucial difference between the following two pieces of code. Can you guess what it is? (without reading on 😉)

function method1()
    A = rand(100, 100)
    B = rand(100, 100)
    C = @fetch A^2 * B^2
end
method1 (generic function with 1 method)
function method2()
    C = @fetch rand(100, 100)^2 * rand(100, 100)^2
end
method2 (generic function with 1 method)

Let’s benchmark them.

using BenchmarkTools
@btime method1();
@btime method2();
  450.916 μs (90 allocations: 237.80 KiB)
  357.601 μs (72 allocations: 81.03 KiB)

Method 1 is slower, because A and B are created on the controller process, transferred to a worker, and squared and multiplied on the worker process before the result is finally transferred back to the controller.

Method 2, on the other hand, creates, squares, and multiplies the random matrix all on the work process and only submits the result to the controller.

Hence, method1 is transferring 3x as much data between the controller and the worker!

Efficient data movement is crucial for efficient parallel computing!

In this toy example, it’s rather easy to identify the faster method.

In a real program, however, understanding data movement does require more thought and likely some measurement.

For example, if the first process needs matrix A in a follow-up computation then the first method might be better in this case. Or, if computing A is expensive and only the current process has it, then moving it to another process might be unavoidable.

Computer latency at a human scale#

To understand why thinking about data is important it’s instructive to look at the time scales involved in data access.

(taken from https://www.prowesscorp.com/computer-latency-at-a-human-scale/)

Avoid Globals (once more)#

myglobal = 4
4
function whohas(s::String)
    @everywhere begin
        var = Symbol($s)
        if isdefined(Main, var)
            println("$var exists.")
        else
            println("Doesn't exist.")
        end
    end
    nothing
end
whohas (generic function with 1 method)
whohas("myglobal")
myglobal exists.
      From worker 6:	Doesn't exist.
      From worker 7:	Doesn't exist.
      From worker 9:	Doesn't exist.
      From worker 8:	Doesn't exist.
@fetchfrom 6 myglobal + 2
6
whohas("myglobal")
myglobal exists.
      From worker 8:	Doesn't exist.
      From worker 6:	myglobal exists.
      From worker 7:	Doesn't exist.
      From worker 9:	Doesn't exist.

Globals get copied to workers and continue to exist as globals even after the call.

This could lead to memory accumulation if many globals are used (just as it would in a single Julia session).

It’s better to avoid them.

Explicit data movement: Channel and RemoteChannel#

Implement communication between tasks. Functions: put!, take!, fetch, isready and wait.

ch = Channel{Int}(5) # a channel that can hold up to 5 integers
Channel{Int64}(5) (empty)
isready(ch) # something in the channel?
false
put!(ch, 3)
3
isready(ch)
true
3 + 3
6
fetch(ch)
3
take!(ch)
3
isready(ch)
false
put!(ch, 4)
4
fetch(ch)
4
take!(ch)
4

Be careful, take! and put! are blocking if the channel is empty or full!

isready(ch)
false
# take!(ch) if we execute this, while isready(ch) == false, the current Julia session will hang.

RemoteChannel#

  • A Channel is local to a process. Worker 2 cannot directly refer to a Channel on worker 3 and vice-versa.

  • A RemoteChannel, however, can put and take values across workers. A RemoteChannel can be thought of as a handle to a Channel.

  • Any process with a reference to a RemoteChannel can put and take items from the channel. Data is automatically sent to (or retrieved from) the process a RemoteChannel is associated with.

  • The process id, pid, associated with a RemoteChannel identifies the process where the backing store, i.e., the backing Channel exists.

nworkers()
4
addprocs(4)
4-element Vector{Int64}:
 10
 11
 12
 13
function do_something()
    rc = RemoteChannel(() -> Channel{Int}(10)) # lives on the controller
    @sync for p in workers()
        @spawnat p put!(rc, myid())
    end
    rc
end

r = do_something()
RemoteChannel{Channel{Int64}}(1, 1, 35884)
isready(r)
true
while isready(r)
    @show take!(r)
end
take!(r) = 9
take!(r) = 6
take!(r) = 7
take!(r) = 8
take!(r) = 13
take!(r) = 12
take!(r) = 10
take!(r) = 11

The ecosystem also contains a couple of tools, that make data transfer even simpler. See for example ParallelDataTransfer.jl.

High-level tools: @distributed and pmap#

So far we have seen some of the fundamental building blocks for distributed computing in Julia. However, in practice, one wants to think as little as possible about how to distribute the work and explicitly spawn tasks.

Fortunately, many useful parallel computations do not require (much) data movement at all. A common example is a direct Monte Carlo simulation, where multiple processes can handle independent simulation trials simultaneously. (We’ll get to that later in the exercises!)

Julia provides high-level convenience tools to

  • parallelize loops (@distributed) and

  • apply a function to all elements of a collection (pmap)

Distributed loops (@distributed)#

using Distributed, BenchmarkTools;
rmprocs(workers());
addprocs(4);
nworkers();

Example: Reduction#

Task: Counting heads in a series of coin tosses.

function count_heads_loop(n)
    c = 0
    for i = 1:n
        c += rand(Bool)
    end
    return c
end

N = 200_000_000
@btime count_heads_loop($N);
  279.902 ms (0 allocations: 0 bytes)

Note that these kinds of computations are called reductions (with + being the reducer function).

count_heads_reduce(n) = mapreduce(i -> rand(Bool), +, 1:n)
@btime count_heads_reduce($N)
  283.665 ms (0 allocations: 0 bytes)
100005166
function count_heads_distributed_loop(n)
    c = @distributed (+) for i in 1:n
        Int(rand(Bool))
    end
    return c
end
count_heads_distributed_loop (generic function with 1 method)
@btime count_heads_distributed_loop($N);
  70.134 ms (305 allocations: 12.77 KiB)

The distributed version is about 4x faster, which is all we could hope for.

With @distributed the work is evenly distributed between the workers.

function count_heads_distributed_verbose(n)
    c = @distributed (+) for i in 1:n
        x = Int(rand(Bool))
        println(x)
        x
    end
    c
end

count_heads_distributed_verbose(8);
      From worker 14:	0
      From worker 14:	0
      From worker 17:	0
      From worker 17:	1
      From worker 15:	1
      From worker 15:	0
      From worker 16:	1
      From worker 16:	0

However, by using @distributed we let Julia decide how to split up the work and can’t control it ourselves.

A common mistake#

function g(n)
    a = 0
    @distributed (+) for i in 1:n
        a += 1
    end
    a
end

a = g(10);

What do you expect the value of a to be?

a
0

Example: SharedArrays#

Apart from @distributed (reducer) ... there also is a @distributed for ... form. The latter is non-blocking and returns a Task. (You can think of it as a distributed version of @spawn for all the iterations.)

However, since the loop body will be executed on different processes, one must be careful to operate on data structures that are available on all processes (similar to the mistake highlighted above).

function square_broken()
    A = collect(1:10)
    @sync @distributed for i in eachindex(A)
        A[i] = A[i]^2
    end
    return A
end
square_broken (generic function with 1 method)
square_broken()
10-element Vector{Int64}:
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10

To actually make all processes operate on the same array, one can use a SharedArray. For this to work, the processes need to live on the same host.

@everywhere using SharedArrays # must be loaded everywhere
A = rand(3, 2)
3×2 Matrix{Float64}:
 0.737665  0.491284
 0.134149  0.946209
 0.926734  0.982605
S = SharedArray(A)
3×2 SharedMatrix{Float64}:
 0.737665  0.491284
 0.134149  0.946209
 0.926734  0.982605
function square!(X)
    for i in eachindex(X)
        sleep(0.001) # mimicing some computational cost
        X[i] = X[i]^2
    end
end

function square_distributed!(X)
    @sync @distributed for i in eachindex(X)
        sleep(0.001) # mimicing some computational cost
        X[i] = X[i]^2
    end
end

A = rand(10, 10)
S = SharedArray(A)

@btime square!(A);
@btime square_distributed!($S);
  206.727 ms (508 allocations: 15.84 KiB)
  52.020 ms (553 allocations: 25.50 KiB)

Parallel map: pmap#

The square! functions above are typical map operations where a function f is applied to all elements of a collection.

map(x -> x^2, 1:10)
10-element Vector{Int64}:
   1
   4
   9
  16
  25
  36
  49
  64
  81
 100

Such a pattern can be parallelized in Julia via the high-level function pmap (“parallel map”).

Example: Singular values of multiple matrices#

using Distributed, BenchmarkTools;
rmprocs(workers());
addprocs(4);
nworkers();
@everywhere using LinearAlgebra

M = Matrix{Float64}[rand(200, 200) for i = 1:10];
svdvals(rand(2, 2))
2-element Vector{Float64}:
 1.02391800261649
 0.03714165784371758
map(svdvals, M)
10-element Vector{Vector{Float64}}:
 [100.58063886374491, 8.136166495895292, 7.995413373636503, 7.780024514144074, 7.685616254043206, 7.573836572528889, 7.448946292366369, 7.365543614835816, 7.284964062043466, 7.256086481471429  …  0.2863305494099173, 0.26573767307474183, 0.23788200640227994, 0.21191485925409254, 0.18482290701858384, 0.1335530247907863, 0.08451102578038008, 0.07359813948447222, 0.026936447863156673, 0.008016322855232348]
 [99.894565287868, 8.05162171507401, 7.948044017586529, 7.775587758030984, 7.693356118019662, 7.679581878768912, 7.482936204539673, 7.34812967648654, 7.27025926993207, 7.25002769348528  …  0.34341398237077314, 0.31161685997311456, 0.255426746846119, 0.21349935075291618, 0.17703154097038734, 0.16301304545046005, 0.10663480245723514, 0.08587297651958112, 0.025065028335975712, 0.009168948491550567]
 [100.18510136394426, 8.045687055975568, 7.811815617169202, 7.674630580710361, 7.622433395154352, 7.539222979854232, 7.443336095785276, 7.351517911455987, 7.248584299150602, 7.192963640726805  …  0.3563371172747457, 0.2879532303381498, 0.25703424752302423, 0.23911037423234777, 0.16624337257504987, 0.13218548798460272, 0.09052396206383845, 0.07139876181385849, 0.02503226396255906, 0.015742646962333087]
 [100.11756318070367, 8.035085755941477, 7.7793791332107345, 7.71378004721732, 7.68600253958479, 7.631351343415243, 7.47797418576448, 7.432106057662394, 7.36855332246843, 7.342321313157375  …  0.2984785912138643, 0.2886530881978146, 0.23702770301351636, 0.2247993371478077, 0.17757561063766708, 0.16042478395372467, 0.12242860960962013, 0.0790957917398363, 0.06489802806089982, 0.022029796892624003]
 [100.054981312951, 7.984684497706104, 7.947692595322665, 7.824528934670557, 7.676185898289979, 7.5785740983521075, 7.424339652243243, 7.350229859790586, 7.2703161921493615, 7.193096901391755  …  0.28272462262706205, 0.2661244767675947, 0.24691054845557517, 0.19544618953665452, 0.15638627146714087, 0.13054262829841537, 0.09080904812355002, 0.083514476700953, 0.0524179205979268, 0.004947367525021889]
 [99.80967953066025, 8.127796139406058, 7.900729819774497, 7.839140883893985, 7.683394256204016, 7.551444335705111, 7.528063976907573, 7.43129916912963, 7.3339937034335065, 7.29255201330665  …  0.31729863146542886, 0.2611147705590394, 0.25005861656292205, 0.1949183875335909, 0.1864396711338906, 0.16029181833359205, 0.0865513524847781, 0.0760293391667345, 0.060112870520106776, 0.008088442822801974]
 [100.12244532514443, 8.071602284573311, 7.922907887448889, 7.662619916811377, 7.557478972687038, 7.465324207408259, 7.414792363397744, 7.386229716675376, 7.259790017428638, 7.218670107290582  …  0.33848759853887783, 0.2629603644599559, 0.25457872162607953, 0.22129785773259641, 0.19154467786858556, 0.15806108146813252, 0.1251408495213005, 0.11585381524957356, 0.04891130967388932, 0.01944875157731924]
 [100.20615178879844, 7.943990370265941, 7.843677476265805, 7.661909802010956, 7.612142764301393, 7.486325298477974, 7.390388111925092, 7.318650262893692, 7.3031443791167225, 7.219693445776926  …  0.3143688050174253, 0.25241206457238435, 0.21945672400291966, 0.19982378437242745, 0.1806700078150273, 0.15280328743560978, 0.08815581729454881, 0.04943326556242211, 0.03528678308500143, 0.011746591840399351]
 [99.69759750408376, 7.981389817488871, 7.930109494247845, 7.715173551142769, 7.566150600574292, 7.537544070589164, 7.455308958007965, 7.402302359288515, 7.383434013242156, 7.2416308449951545  …  0.3106093307442624, 0.28022094367736744, 0.26306853416086307, 0.2292497579154462, 0.13810198964943485, 0.10220670918540857, 0.09481024790116342, 0.07414777039920746, 0.02895568535699494, 0.002612146819441136]
 [100.23878257624223, 7.831770043635619, 7.743885712094974, 7.636926243193107, 7.510023510524957, 7.486922928419766, 7.359762486350772, 7.321659513237829, 7.245022381023872, 7.2036255132778795  …  0.329884529515596, 0.27867185515740445, 0.2560904534343351, 0.19669658510522928, 0.1668594031639506, 0.1573393036401637, 0.12658090284290313, 0.09156756343478628, 0.07042690530502779, 0.03967525818854297]
pmap(svdvals, M)
10-element Vector{Vector{Float64}}:
 [100.58063886374491, 8.1361664958953, 7.995413373636511, 7.780024514144086, 7.685616254043201, 7.573836572528892, 7.448946292366373, 7.365543614835822, 7.284964062043458, 7.25608648147143  …  0.28633054940991737, 0.26573767307474166, 0.23788200640227966, 0.21191485925409287, 0.18482290701858403, 0.13355302479078615, 0.08451102578038008, 0.07359813948447266, 0.02693644786315614, 0.008016322855232648]
 [99.894565287868, 8.051621715074017, 7.948044017586526, 7.775587758030986, 7.693356118019665, 7.679581878768913, 7.482936204539674, 7.348129676486534, 7.270259269932078, 7.250027693485275  …  0.343413982370773, 0.3116168599731141, 0.255426746846119, 0.21349935075291657, 0.17703154097038748, 0.16301304545046064, 0.10663480245723554, 0.08587297651958081, 0.025065028335974696, 0.009168948491551115]
 [100.18510136394426, 8.045687055975561, 7.811815617169204, 7.674630580710352, 7.622433395154352, 7.5392229798542285, 7.443336095785272, 7.351517911455987, 7.248584299150601, 7.1929636407267985  …  0.3563371172747455, 0.28795323033814907, 0.2570342475230253, 0.23911037423234768, 0.1662433725750494, 0.13218548798460283, 0.09052396206383914, 0.07139876181385882, 0.025032263962559125, 0.015742646962332966]
 [100.11756318070367, 8.035085755941491, 7.779379133210737, 7.713780047217329, 7.68600253958479, 7.631351343415251, 7.477974185764484, 7.4321060576624, 7.368553322468436, 7.342321313157384  …  0.29847859121386455, 0.28865308819781527, 0.23702770301351578, 0.22479933714780767, 0.1775756106376669, 0.16042478395372473, 0.12242860960962032, 0.07909579173983701, 0.06489802806089943, 0.022029796892624687]
 [100.054981312951, 7.9846844977061044, 7.947692595322665, 7.824528934670558, 7.676185898289979, 7.5785740983521, 7.424339652243242, 7.3502298597905895, 7.270316192149358, 7.1930969013917485  …  0.28272462262706294, 0.2661244767675937, 0.24691054845557506, 0.19544618953665394, 0.15638627146714026, 0.13054262829841526, 0.09080904812355045, 0.08351447670095309, 0.05241792059792698, 0.004947367525022601]
 [99.80967953066025, 8.12779613940606, 7.900729819774502, 7.839140883893992, 7.683394256204018, 7.551444335705111, 7.528063976907572, 7.431299169129632, 7.333993703433521, 7.2925520133066435  …  0.31729863146542847, 0.2611147705590396, 0.2500586165629226, 0.19491838753359067, 0.18643967113389137, 0.16029181833359263, 0.08655135248477885, 0.07602933916673463, 0.06011287052010671, 0.008088442822802775]
 [100.12244532514443, 8.07160228457331, 7.922907887448891, 7.662619916811388, 7.557478972687038, 7.465324207408266, 7.414792363397744, 7.386229716675377, 7.259790017428641, 7.218670107290587  …  0.33848759853887744, 0.26296036445995635, 0.2545787216260798, 0.2212978577325962, 0.191544677868585, 0.1580610814681328, 0.12514084952130078, 0.11585381524957339, 0.048911309673889514, 0.019448751577319564]
 [100.20615178879844, 7.943990370265937, 7.84367747626583, 7.661909802010965, 7.612142764301399, 7.486325298477964, 7.390388111925095, 7.318650262893688, 7.30314437911672, 7.219693445776935  …  0.3143688050174251, 0.25241206457238474, 0.21945672400292043, 0.19982378437242704, 0.18067000781502734, 0.152803287435609, 0.08815581729454999, 0.04943326556242179, 0.035286783085001695, 0.01174659184039824]
 [99.69759750408376, 7.981389817488863, 7.930109494247857, 7.715173551142768, 7.566150600574289, 7.537544070589168, 7.4553089580079535, 7.402302359288513, 7.383434013242156, 7.241630844995156  …  0.3106093307442629, 0.2802209436773669, 0.2630685341608628, 0.22924975791544647, 0.13810198964943604, 0.10220670918540933, 0.09481024790116212, 0.07414777039920772, 0.028955685356995404, 0.002612146819441217]
 [100.23878257624223, 7.831770043635623, 7.743885712094971, 7.6369262431931135, 7.510023510524964, 7.486922928419772, 7.359762486350767, 7.321659513237832, 7.245022381023874, 7.2036255132778795  …  0.3298845295155952, 0.2786718551574044, 0.25609045343433495, 0.19669658510523047, 0.16685940316395131, 0.15733930364016338, 0.12658090284290308, 0.09156756343478664, 0.07042690530502763, 0.03967525818854299]

Let’s check that this indeed utilized multiple workers.

pmap(M) do m
    println(myid())
    svdvals(m)
end;
      From worker 20:	20
      From worker 18:	18
      From worker 21:	21
      From worker 19:	19
      From worker 21:	21
      From worker 19:	19
      From worker 20:	20
      From worker 18:	18
      From worker 21:	21
      From worker 19:	19
@btime map($svdvals, $M);
@btime pmap($svdvals, $M);
  61.852 ms (81 allocations: 4.22 MiB)
  15.230 ms (509 allocations: 36.91 KiB)

When to choose which? (@distributed vs pmap)#

Julia’s pmap is designed for the case where

  • one wants to apply a function to a collection,

  • each function call does a larger amount of work, and/or

  • the workload is non-uniform (load-balancing).

On the other hand, @distributed is good for

  • reductions, like sums, where

  • each iteration may be tiny, i.e. perhaps only summing two numbers, and/or

  • each iteration takes about the same time (uniform)

High-level array abstractions: DistributedArrays.jl#

In a DArray, each process has local access to just a chunk of the data, and no two processes share the same chunk. Processes can be on different hosts.

Distributed arrays are for example useful if

  • Expensive calculations should be performed in parallel on parts of the array on different hosts.

  • The data doesn’t fit into the local machines memory (i.e. loading big files in parallel).

using Distributed, BenchmarkTools;
rmprocs(workers());
# make sure that all workers use the same Julia environment
addprocs(4; exeflags="--project")
4-element Vector{Int64}:
 22
 23
 24
 25
# check
@everywhere @show Base.active_project()
Base.active_project() = "/__w/julia-hpc-workshop/julia-hpc-workshop/Project.toml"
      From worker 22:	Base.active_project() = "/__w/julia-hpc-workshop/julia-hpc-workshop/Project.toml"
      From worker 25:	Base.active_project() = "/__w/julia-hpc-workshop/julia-hpc-workshop/Project.toml"
      From worker 24:	Base.active_project() = "/__w/julia-hpc-workshop/julia-hpc-workshop/Project.toml"
      From worker 23:	Base.active_project() = "/__w/julia-hpc-workshop/julia-hpc-workshop/Project.toml"
@everywhere using DistributedArrays, LinearAlgebra
M = Matrix{Float64}[rand(200, 200) for i = 1:10];
D = distribute(M)
10-element DArray{Matrix{Float64}, 1, Vector{Matrix{Float64}}}:
 [0.5358184817220137 0.6308800782548332 … 0.9252908664668628 0.4827880551732152; 0.05670760842470279 0.6431122831877423 … 0.41524791727052324 0.32991820776025926; … ; 0.9897572819129957 0.15104515950776098 … 0.7040955376954158 0.6073047505571347; 0.8798271959917336 0.2012610890605191 … 0.6964474239738506 0.2666524632616397]
 [0.17315070883226846 0.9835987661224947 … 0.5898964046883802 0.5152133045706787; 0.3080531453618156 0.8932236133754617 … 0.9981523704010187 0.3125759003484019; … ; 0.14918295413834526 0.2481283888005974 … 0.34470470261818 0.8067132330027367; 0.06667855667508782 0.3322603573524576 … 0.3904682806090911 0.8059689900593414]
 [0.16298916152918952 0.8985582508378845 … 0.9143088783440292 0.23666868311348888; 0.96839652320337 0.732966422726237 … 0.6575132137360044 0.03358888775325952; … ; 0.5251830425989608 0.14446176798166055 … 0.3551339950179452 0.9893014650580257; 0.5797106918421961 0.061421312669996064 … 0.7713960496563887 0.6359562274252615]
 [0.6780289065953333 0.8990314820885226 … 0.20952856082279858 0.5006678855306966; 0.06240125367106386 0.003668792920308639 … 0.7600984295191247 0.516734062233284; … ; 0.989018360330947 0.4634711332988788 … 0.18898686392382147 0.3056562123859631; 0.2721806832307513 0.5695385686076029 … 0.12200332360380028 0.12952881805820227]
 [0.13583751756641904 0.8125091448421593 … 0.6641023595930902 0.5302274342635769; 0.27709793738056154 0.2197193178416128 … 0.27477483150962845 0.9554843881494521; … ; 0.45360528969835046 0.5578646765206187 … 0.4005015613580811 0.100756738723496; 0.2355795668406092 0.43876483337287153 … 0.23411739284124988 0.9348842386664826]
 [0.33613985489222464 0.5700772984540208 … 0.3870318373012738 0.3211322279427695; 0.8913635395740868 0.13144901830213673 … 0.3286810265227986 0.23162432141640288; … ; 0.3587882627426434 0.7735004982275907 … 0.2835193561824815 0.3926701798354556; 0.6768690719121736 0.38502046287537695 … 0.3486689099292648 0.19718434121477546]
 [0.9691655653272538 0.1711667212427077 … 0.5474659353157643 0.40413080646381916; 0.32568191974866423 0.8878839210353213 … 0.08187698845932823 0.78340623656411; … ; 0.5957388950289157 0.9362363987735245 … 0.5390646076084713 0.46242891985388235; 0.8836326196262394 0.8643490991402253 … 0.03907947160311509 0.19579747710888684]
 [0.784867073473593 0.75194670393169 … 0.8768238698890338 0.624451330426573; 0.8010886265569757 0.007197868434890453 … 0.4253921123560882 0.6132655942638155; … ; 0.3030407349515605 0.4874756764067626 … 0.335435402351428 0.38424914443651337; 0.4593443298292559 0.06134662461542961 … 0.5766361071951315 0.5446042750344182]
 [0.4252135281868207 0.33442008010370405 … 0.2632549855719526 0.7578590540752697; 0.10103655586008753 0.05760016162606507 … 0.42976713685945933 0.5087440504203947; … ; 0.3448969310909351 0.7284996719674377 … 0.0826893515973407 0.3460986996371387; 0.6442593165105464 0.47188743523234666 … 0.1340951040153766 0.7312477301589697]
 [0.47306329800156477 0.6353789562528454 … 0.2999665008090243 0.750923473668928; 0.13944855572636883 0.9753642834046911 … 0.9781723144631492 0.15153887437293767; … ; 0.4488229070631512 0.6810875963941394 … 0.8435581680994308 0.2998400336106859; 0.04666925279696055 0.18835619480192178 … 0.547585845801606 0.14376405699392036]

Which workers hold parts of D?

procs(D)
4-element Vector{Int64}:
 22
 23
 24
 25
workers()
4-element Vector{Int64}:
 22
 23
 24
 25

Which parts do they hold?

localpart(D) # the controller doesn't hold anything
Matrix{Float64}[]
# Which parts do they hold?
for p in workers()
    println(@fetchfrom p DistributedArrays.localindices(D))
end
(1:3,)
(4:6,)
(7:8,)
(9:10,)
@btime map($svdvals, $M);
@btime map($svdvals, $D);
  61.579 ms (81 allocations: 4.22 MiB)
  14.810 ms (554 allocations: 25.70 KiB)
@btime pmap($svdvals, $M);
  15.243 ms (510 allocations: 36.94 KiB)

Actual distributed computing: Comments#

Creating workers on other machines#

So far we have worked with multiple process on the same system, because we simply used addprocs(::Integer). To put worker processes on other machines, e.g. nodes of a cluster, we need to modify the initial addprocs call appropriately.

In Julia, starting worker processes is handled by ClusterManagers.

  • The default one is LocalManager. It is automatically used when running addprocs(i::Integer) and we have implicitly used it already.

  • Another important one is SSHManager. It is automatically used when running addprocs(hostnames::Array), e.g. addprocs(["node123", "node456"]). The only requirement is a passwordless ssh access to all specified hosts.

  • Cluster managers for SLURM, PBS, and others are provided in ClusterManagers.jl. For SLURM, this will make addprocs use srun under the hood.

Demonstrate in terminal from thp node

using Distributed

addprocs(["l93", "l94"])

@everywhere println(gethostname())

One can also start multiple processes on different machines:

addprocs([("node123", 2), ("node456", 3)]) # starts 2 workers on node123 and 3 workers on node456

# Use :auto to start as many processes as CPU threads are available

Be aware of different paths:

  • By default, addprocs expects to find the julia executable on the remote machines under the same path as on the host (controller).

  • It will also try to cd to the same folder (set the working directory).

As you can see from ?addprocs, addprocs takes a bunch of keyword arguments, two of which are of particular importance in this regard:

  • dir: working directory for the worker processes

  • exename: path to julia executable (potentially augmented with pre-commands) for the worker processes

# cleanup
rmprocs(workers())
Task (done) @0x00007efd262c3de0