# Multithreading

## What are threads?
Threads are execution units within a process that can run simultaneously.

<img src="../../static/processes_threads.png" width=400px>

While processes are entirely separate, threads run in a **shared memory** space.

## Starting Julia with multiple threads

By default, Julia starts with a single *user thread*. We must tell it explicitly to start multiple user threads. There are two ways to do this:

* Environment variable: `JULIA_NUM_THREADS=4`
* Command line argument: `julia -t 4` or, equivalently, `julia --threads 4`

**Jupyter lab:**

The simplest way is to globally set the environment variable `JULIA_NUM_THREADS` (e.g. in the `.bashrc`). But one can also create a specific Jupyter kernel for multithreaded Julia:

```julia
using IJulia
installkernel("Julia (4 threads)", env=Dict("JULIA_NUM_THREADS"=>"4"))
```

We can readily check how many threads we are running:

In [None]:
Threads.nthreads()

### User threads vs default threads

Technically, the Julia process is also spawning multiple threads already in "single-threaded" mode, like
* a thread for unix signal listening
* multiple OpenBLAS threads for BLAS/LAPACK operations

For this reason, we call the threads specified via `--threads` or the environment variable *user threads* or simply *Julia threads*.

## Task-based multithreading

Conceptually, Julia implements **task-based** multithreading. In this paradigm, a task - e.g. a computational piece of a code - is marked for parallel execution on **any** of the available Julia threads. Julias **dynamic scheduler** will automatically put the task on one of the threads and trigger the execution of the task on said thread.

Ideally, **a user should think about tasks and not threads**.

**Advantages:**
* high-level and convenient
* **composability / nestability** (Multithreaded code can call multithreaded code can call multithreaded code ....)

**Disadvantages:**
* **scheduling overhead**
* can get in the way when performance engineering
  * scheduler has limited information (e.g. about the system topology)
  * low-level profiling (e.g. with LIKWID) currently requires a known task -> thread -> cpu core mapping.

(Blog post: [Announcing composable multi-threaded parallelism in Julia](https://julialang.org/blog/2019/07/multithreading/))

### Spawning tasks on threads: `Threads.@spawn`
`Threads.@spawn` spawns a task on a Julia thread. Specifically, it creates (and immediately returns) a `Task` and schedules it for execution on an available Julia thread.

Note the conceptual similarity between `Threads.@spawn` (task -> thread) and `Distributed.@spawn` (task -> process) and also `@async`.

To avoid having to prefix `Threads.` to `@spawn` (and other threading-related functions) let's load everything from `Base.Threads` into global scope.

In [None]:
using Base.Threads

In [None]:
@spawn println("test")

While `Threads.@spawn` returns the task right away - it is **non-blocking** - the result might only be fetchable after some time.

In [None]:
t = @spawn begin
    sleep(3)
    "result"
end
@time fetch(t)

Note that we can use (some of) the control flow tools that we've already covered, like `@sync`.

In [None]:
@sync t = @spawn begin
    sleep(3)
    "result"
end
@time fetch(t)

In [None]:
for i in 1:2*nthreads()
    @spawn println("Hi, I'm ", threadid())
end

#### Example: Recursive Fibonacci series

$$ F(n) = F(n-1) + F(n-2), \qquad F(1) = F(2) = 1$$

We can nest `@spawn` calls freely!

In [None]:
function fib(n)
    n < 2 && return n
    t = @spawn fib(n - 2)
    return fib(n - 1) + fetch(t)
end

In [None]:
fib.(1:10)

(Note: Algorithmically, this is a highly inefficient implementation of the Fibonacci series, of course!)

#### Example: `tmap` (like `pmap`)

In [None]:
tmap(fn, itr) = map(fetch, map(i -> Threads.@spawn(fn(i)), itr))

In [None]:
using LinearAlgebra

In [None]:
M = [rand(200, 200) for i in 1:10];

In [None]:
tmap(svdvals, M)

In [None]:
tmap(i -> println(i, " ($(threadid()))"), 1:10);

Note, however, that this implementation creates temporary allocations and thus isn't particularly efficient.

In [None]:
using BenchmarkTools

@btime tmap($svdvals, $M);
@btime map($svdvals, $M);

#### Remarks on `@spawn`

* **Task migration**: Not only does the scheduler dynamically assign tasks to Julia threads, but it is also free to move tasks between threads. Hence, `threadid()` isn't necessarily constant over time and should be used with care!
* **Spawning tasks on specific threads**: Julia doesn't have a built-in tool for this (as of now). However, some packages like [ThreadPinning.jl](https://github.com/carstenbauer/ThreadPinning.jl) export `@tspawnat <threadid> ...` which allows to spawn *sticky* tasks.

In [None]:
using ThreadPinning

@tspawnat 3 println("running on thread ", threadid())

### Multithreading for-loops: `@threads`

Higher level interface to multithreading. (Compare `Distributed.@spawnat` vs `@distributed`)

In [None]:
@threads for i in 1:2*nthreads()
    println("Hi, I'm ", threadid())
end

In [None]:
using BenchmarkTools

function square!(x)
    for i in eachindex(x)
        x[i] = x[i]^2
    end
end

function square_threads!(x)
    @threads for i in eachindex(x)
        x[i] = x[i]^2
    end
end

x = rand(1_000_000)
@btime square!($x);
@btime square_threads!($x);

#### Scheduling options

Syntax: `@threads [schedule] for ...`

  * `:dynamic` (default)
    * creates O(`nthreads()`) many tasks each processing a contigious region of the iteration space
    * each task essentially spawned with `@spawn`
      * -> task migration
      * -> composability / nestability
    
  * `:static`
    * evenly splits up the iteration space and creates one task per block
    * **statically** maps tasks to threads, specifically: task 1 -> thread 1, task 2 -> thread 2, etc.
      * -> no task migration, i.e. **fixed task-thread mapping**
      * -> not composable / nestable
      * -> only little overhead

In [None]:
@threads :dynamic for i in 1:2*nthreads()
    println(i, " -> thread ", threadid())
end

In [None]:
@threads :static for i in 1:2*nthreads()
    println(i, " -> thread ", threadid())
end

For `@threads :static`, every thread handles precisely two iterations!

In [None]:
@threads :dynamic for i in 1:3
    @threads :dynamic for j in 1:3
        println("$i, $j")
    end
end

In [None]:
@threads :static for i in 1:3
    @threads :static for j in 1:3
        println("$i, $j")
    end
end

### Load-balancing

In [None]:
function compute_nonuniform_spawn!(a, niter=zeros(Int, nthreads()), load=zeros(Int, nthreads()))
    @sync for i in 1:length(a)
        Threads.@spawn begin
            a[i] = sum(abs2, rand() for j in 1:i)

            # only for bookkeeping
            niter[threadid()] += 1
            load[threadid()] += i
        end
    end
    return niter, load
end

In [None]:
a = zeros(nthreads() * 20)
niter, load = compute_nonuniform_spawn!(a)

In [None]:
using Plots

b1 = bar(niter, xlab="threadid", ylab="# iterations", title="Number of iterations", legend=false)
b2 = bar(load, xlab="threadid", ylab="workload", title="Workload", legend=false)

display(b1)
display(b2)

In [None]:
function compute_nonuniform_threads!(a, niter=zeros(Int, nthreads()), load=zeros(Int, nthreads()))
    @threads for i in 1:length(a)
        a[i] = sum(abs2, rand() for j in 1:i)

        # only for bookkeeping
        niter[threadid()] += 1
        load[threadid()] += i
    end
    return niter, load
end

In [None]:
a = zeros(nthreads() * 20)
niter, load = compute_nonuniform_threads!(a)

In [None]:
b1 = bar(niter, xlab="threadid", ylab="# iterations", title="Number of iterations", legend=false)
b2 = bar(load, xlab="threadid", ylab="workload", title="Workload", legend=false)

display(b1)
display(b2)

(There might be a scheduling option for `@threads` that implements load-balancing in the future.)

## Multithreading: Things to be aware of

### Race conditions and thread safety

In [None]:
function sum_serial(x)
    s = zero(eltype(x))
    for i in eachindex(x)
        @inbounds s += x[i]
    end
    return s
end

In [None]:
function sum_threads_naive(x)
    s = zero(eltype(x))
    @threads for i in eachindex(x)
        @inbounds s += x[i]
    end
    return s
end

In [None]:
numbers = rand(nthreads() * 10_000);

In [None]:
@show sum(numbers);
@show sum_serial(numbers);
@show sum_threads_naive(numbers);

**Wrong** result! Even worse, it's **non-deterministic** and different every time! It's also slow...

In [None]:
@btime sum_serial($numbers);
@btime sum_threads_naive($numbers);

Reason: There is a [race condition](https://en.wikipedia.org/wiki/Race_condition).

Note that race conditions aren't specific to reductions. More generally, they can appear when multiple threads are modifying a shared "global" state simultaneously.

Not all of Julia and its packages in the ecosystem are thread-safe! In general, it is safer to assume that they're not unless proven otherwise.

#### Fix 1: Divide the work

In [None]:
function sum_threads_subsums(x)
    blocksize = length(x) ÷ nthreads()
    @assert isinteger(blocksize)
    idcs = collect(Iterators.partition(1:length(x), blocksize))

    subsums = zeros(eltype(x), nthreads())
    @threads for tid in 1:nthreads()
        for i in idcs[tid]
            @inbounds subsums[tid] += x[i]
        end
    end
    return sum(subsums)
end

In [None]:
@show sum(numbers);
@show sum_serial(numbers);
@show sum_threads_subsums(numbers);

In [None]:
@btime sum_threads_subsums($numbers);

Speedup and correct result. But not ideal:

* cumbersome to do this manually
* can have more subtle performance issues like [false sharing](https://en.wikipedia.org/wiki/False_sharing#:~:text=In%20computer%20science%2C%20false%20sharing,managed%20by%20the%20caching%20mechanism.)

#### Fix 2: Atomics

See [Atomic Operations](https://docs.julialang.org/en/v1/manual/multi-threading/#Atomic-Operations) in the Julia doc for more information. But in generaly one shouldn't avoid using them as much as possible since they actually limit the parallelism.

### Garbage collection

[As of now](https://www.youtube.com/watch?v=Ks0p6PQyIPs), **Julia's GC is not parallel** and doesn't work nicely with multithreading.

If it gets triggered, it essentially "stops the world" (all threads) for clearing up memory.

Hence, when using multithreading, it is even more important to **avoid heap allocations!**

(If you can't avoid allocations, consider using multiprocessing instead.)

## High-level tools for parallel computing

### [ThreadsX.jl](https://github.com/tkf/ThreadsX.jl)

*Parallelized Base functions*

In [None]:
using ThreadsX

In [None]:
sum(numbers)

In [None]:
ThreadsX.sum(numbers)

In [None]:
@btime ThreadsX.sum($numbers);

### [FLoops.jl](https://github.com/JuliaFolds/FLoops.jl)

*Fast sequential, threaded, and distributed for-loops for Julia*

In [None]:
using FLoops

In [None]:
function sum_floops(x)
    @floop for xi in x
        @reduce(s = zero(eltype(x)) + xi)
    end
    return s
end

In [None]:
@btime sum_floops($numbers);

In [None]:
numbers = rand(nthreads() * 10_000);

sum_floops(numbers) ≈ sum(numbers)

In [None]:
@btime sum_serial($numbers);
@btime sum_floops($numbers);

`@floop` supports different *executors* that allow for easy switching between serial and threaded execution

In [None]:
function sum_floops(x, executor)
    @floop executor for xi in x
        @reduce(s += xi)
    end
    return s
end

In [None]:
@btime sum_floops($numbers, $(SequentialEx()));
@btime sum_floops($numbers, $(ThreadedEx()));

There are many more [executors](https://juliafolds.github.io/FLoops.jl/stable/tutorials/parallel/#tutorials-executor), like `DistributedEx` or `CUDAEx`. See, e.g., [FoldsThreads.jl](https://github.com/JuliaFolds/FoldsThreads.jl) and [FoldsCUDA.jl](https://github.com/JuliaFolds/FoldsCUDA.jl).

Under the hood, FLoops is built on top of [Transducers.jl](https://juliafolds.github.io/Transducers.jl/stable/tutorials/tutorial_parallel/) (i.e. it translates for-loop semantics into folds).

### [Tullio.jl](https://github.com/mcabbott/Tullio.jl)

*Tullio is a very flexible einsum macro* ([Einstein notation](https://en.wikipedia.org/wiki/Einstein_notation))

In [None]:
using Tullio

In [None]:
A = rand(10, 10)
B = rand(10, 10)

C = @tullio C[i, j] := A[i, k] * B[k, j] # matrix multiplication

C ≈ A * B

In [None]:
sum_tullio(xs) = @tullio S := xs[i]

In [None]:
@btime sum_tullio($numbers);

(Uses `fastmath` and other tricks to be faster here.)

### [LoopVectorization.jl](https://github.com/JuliaSIMD/LoopVectorization.jl)

*Macro(s) for vectorizing loops.*

In [None]:
using LoopVectorization

In [None]:
function sum_turbo(x)
    s = zero(eltype(x))
    @tturbo for i in eachindex(x)
        @inbounds s += x[i]
    end
    return s
end

In [None]:
@btime sum_turbo($numbers);

(Uses all kinds of SIMD tricks to be faster than the others.)

## System topology and thread affinity

### Hawk compute node

<img src="../../static/lstopo_hawk.svg" width=100%>

**Not pinning threads (or pinning them badly) can degrade performance massively!**

### Pinning Julia threads to CPU threads

What about external tools like `numactl`, `taskset`, etc.? Doesn't work reliably because it [can't distinguish](https://discourse.julialang.org/t/thread-affinitization-pinning-julia-threads-to-cores/58069/5) between Julia threads and other internal threads.

**Options:**

* Environment variable: `JULIA_EXCLUSIVE=1` (compact pinning)
* More control and convenient visualization: [ThreadPinning.jl](https://github.com/carstenbauer/ThreadPinning.jl)
  * `compact`: pin to cpu thread 0, 1, 2, 3, ... one after another
  * `spread`: alternate between sockets so, e.g., 0, 64, 1, 65, 2, 66, .... (if a socket has 64 cores)
  * `numa`: same as `spread` but alternate between NUMA domains so, e.g., 0, 16, 32, 48, 64, .... (if a NUMA domain has 16 cores)
  * **Caveat:** currently one works on Linux.

<img src="../../static/threadinfo.png" width=1000px>