Rust Concurrency Tutorial

In this Rust tutorial we learn how to run code in parallel by using threads.

We learn how to spawn threads, create and join their handles, take ownership from inside a thread and how to send and receive data through channels.

We also learn how to send multiple values through multiple transmitters, and how to handle data races with mutex and arc.

What is concurrency

Concurrency is when sections of code in our application runs parallel to other sections of code.

Typically, a program would run its code sequentially, section after section. By using threads, we can run sections of our code at the same time as other sections.

This is why concurrency is also often referred to as multi-threading, we use multiple threads to run code in parallel.

As an example, let’s consider two loops. Typically, we have to wait for one loop to finish before the next one can start.

Example:
use std::thread;
use std::time::Duration;

fn main() {

    for i in 0..5 {

        println!("Loop 1 iteration: {}", i);
        // wait a bit before next iteration
        // for demonstration purposes
        thread::sleep(Duration::from_millis(500));
    }

    for i in 0..5 {

        println!("Loop 2 iteration: {}", i);
        thread::sleep(Duration::from_millis(500));
    }
}

In the output we can see that the first loop completes all its iterations before the second loop starts.

Output:
Loop 1 iteration: 0
Loop 1 iteration: 1
Loop 1 iteration: 2
Loop 1 iteration: 3
Loop 1 iteration: 4
Loop 2 iteration: 0
Loop 2 iteration: 1
Loop 2 iteration: 2
Loop 2 iteration: 3
Loop 2 iteration: 4

Placing one of the loops above in a separate thread, will allow us to run both at the same time.

How to create a thread

When working with threads, we have to import some necessary modules, specifically, the thread and time::Duration modules.

Example:
use std::thread;
use std::time::Duration;

To create a thread, we use the thread::spawn function.

The spawn function takes a closure as a parameter, which defines code that should be executed by the thread.

Syntax:
 thread::spawn(|| { /* code to execute in the thread */ })
Example:
use std::thread;
use std::time::Duration;

fn main() {

    // create a thread
    thread::spawn(|| {

        // everything in here runs
        // in its own separate thread
        for i in 0..5 {

            println!("Loop 2 iteration: {}", i);
            thread::sleep(Duration::from_millis(500));
        }
    });

    // main thread
    for i in 0..5 {

        println!("Loop 1 iteration: {}", i);
        thread::sleep(Duration::from_millis(500));
    }
}

In the example above, we create a new thread and place one of our loops inside it.

In the output, both print statements execute simultaniously.

Output:
Loop 1 iteration: 0
Loop 2 iteration: 0
Loop 2 iteration: 1
Loop 1 iteration: 1
Loop 2 iteration: 2
Loop 1 iteration: 2
Loop 1 iteration: 3
Loop 2 iteration: 3
Loop 2 iteration: 4
Loop 1 iteration: 4

Note that a spawned thread will be stopped when the main thread ends.

Let’s increase the iterations in our extra thread to demonstrate.

Example:
use std::thread;
use std::time::Duration;

fn main() {

    // create a thread
    thread::spawn(|| {

        // everything in here runs
        // in its own separate thread
        for i in 0..10 {

            println!("Loop 2 iteration: {}", i);
            thread::sleep(Duration::from_millis(500));
        }
    });

    // main thread
    for i in 0..5 {

        println!("Loop 1 iteration: {}", i);
        thread::sleep(Duration::from_millis(500));
    }
}

The program stops as soon as the main thread completes its loop, so the new thread doesn’t have time to finish. The solution is handles.

How to join handles

We can put our thread in a handle, then join it with the handle of the main thread. This will let all the handles complete before the program exits.

To put our thread in a handle, we just assign it to a variable.

Syntax:
 let variable_name = thread::spawn(|| {})

To join the handles we use the .join() method on the handle name. We also include .unwrap() to handle possible errors.

Syntax:
let variable_name = thread::spawn(|| {})

variable_name.join().unwrap();
Example:
use std::thread;
use std::time::Duration;

fn main() {

    // create a thread
    let handle = thread::spawn(|| {

        // everything in here runs
        // in its own separate thread
        for i in 0..10 {

            println!("Loop 2 iteration: {}", i);
            thread::sleep(Duration::from_millis(500));
        }
    });

    // main thread
    for i in 0..5 {

        println!("Loop 1 iteration: {}", i);
        thread::sleep(Duration::from_millis(500));
    }

    handle.join().unwrap();
}

This time in the output, we can see that the loop inside the thread gets to finish all its iterations.

Output:
Loop 1 iteration: 0
Loop 2 iteration: 0
Loop 1 iteration: 1
Loop 2 iteration: 1
Loop 2 iteration: 2
Loop 1 iteration: 2
Loop 1 iteration: 3
Loop 2 iteration: 3
Loop 1 iteration: 4
Loop 2 iteration: 4
Loop 2 iteration: 5
Loop 2 iteration: 6
Loop 2 iteration: 7
Loop 2 iteration: 8
Loop 2 iteration: 9

How to take ownership from inside a thread

It’s all well and good to be able to run code in another thread, but what if we need to access data outside of the thread.

Example:
use std::thread;

fn main() {

    let a:i32 = 5;

    let handle = thread::spawn(|| {

        // try to access the
        // variable outside
        // of this thread
        println!("a: {}", a);
    });

    handle.join().unwrap();
}

Trying to access the variable outside of the thread will raise an error.

Output:
 error[E0373]: closure may outlive the current function, but it borrows `a`, which is owned by the current function

This simply means we need to take ownership. To do this we use the move closure with the move keyword.

Syntax:
let outside_variable = value;

let handle = thread::spawn(move || {})

The variable will then be taken as something that has ownership inside the thread.

Example:
use std::thread;

fn main() {

    let a:i32 = 5;

    let handle = thread::spawn(move|| {

        // try to access the
        // variable outside
        // of this thread
        println!("a: {}", a);
    });

    handle.join().unwrap();
}

Now that the thread has ownership of the variable, it can print to the console.

Output:
 a: 5

How to send messages between threads

To send messages between threads we need to create a channel for them to move through.

When working with channels, we have to import the mpsc module.

Example:
use std::thread;
use std::sync::mpsc;

Mpsc stands for Multiple Producer Single Consumer.

Next, we need to create the transmitter and the receiver, which will be used to transfer information through the channel.

The channel() function will return the transmitter and receiver. We can store it in two variables.

Example:
use std::thread;
use std::sync::mpsc;

fn main() {

    // create send/receiver vars
    // to move data through channel
    let (tx, rx) = mpsc::channel();
}

The variable ‘tx’ will be our sender, and the variable ‘rx’ will be our receiver.

Next, let’s create an extra thread that will pass some data to the main thread through the channel.

To transmit data we use the .send() function, which takes the value to be sent as an argument.

Again, .unwrap() is used to help with any errors.

Example:
use std::thread;
use std::sync::mpsc;

fn main() {

    // create send/receiver vars
    // to move data through channel
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {

        // value to be sent
        let a:i32 = 5;

        // the send() function will send
        // a value to the receiver (rx)
        tx.send(a).unwrap();
    });
}

The example above will send the value of the variable ‘a’ out from the thread.

So, let’s catch the value in the main thread with the .recv() function.

Example:
use std::thread;
use std::sync::mpsc;

fn main() {

    // create send/receiver vars
    // to move data through channel
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {

        // value to be sent
        let a:i32 = 5;

        // the send() function will send
        // a value to the receiver (rx)
        tx.send(a).unwrap();
    });

    // catch the value with the recv()
    // function and store it in 'b'
    let b:i32 = rx.recv().unwrap();

    println!("Value a from extra thread: {}", b);
}

When we run the program, the value is printed to the console so it was successfully sent and received.

How to pass channels to functions

To pass a channel to a function, we use the Sender<type> generic as a function parameter type.

Let’s create a function with the parameter.

Example:
use std::thread;
use std::sync::mpsc;

// accept an int32 value
// from a channel
fn logger(a: mpsc::Sender<i32>) {

    // send value
    a.send(5).unwrap();
}

fn main() {

    // create send/receiver vars
    // to move data through channel
    let (tx, rx) = mpsc::channel();

    let handle = thread::spawn(move || {


    });

    handle.join().unwrap();
}

Next, we call the function inside the extra thread with the sender variable as the argument.

Example:
use std::thread;
use std::sync::mpsc;

// accept an int32 value
// from a channel
fn logger(a: mpsc::Sender<i32>) {

    // send value
    a.send(5).unwrap();
}

fn main() {

    // create send/receiver vars
    // to move data through channel
    let (tx, rx) = mpsc::channel();

    let handle = thread::spawn(move || {

        // call function with
        // sender as parameter
        logger(tx);
    });

    handle.join().unwrap();
}

Now all that’s left to do is to receive the value, and do something with it.

Example:
use std::thread;
use std::sync::mpsc;

// accept an int32 value
// from a channel
fn logger(a: mpsc::Sender<i32>) {

    // send value
    a.send(5).unwrap();
}

fn main() {

    // create send/receiver vars
    // to move data through channel
    let (tx, rx) = mpsc::channel();

    let handle = thread::spawn(move || {

        // call function with
        // sender as parameter
        logger(tx);
    });

    handle.join().unwrap();

    // receive value
    let b = rx.recv().unwrap();
    println!("Value a from function in extra thread: {}", b);
}

When we run the example above, we see that the value was successfully sent by the function through the channel.

How to send multiple values

We can send more than one value through a channel.

We can either send them through individual statements.

Example:
use std::thread;
use std::sync::mpsc;
use std::time::Duration;

fn main() {

    // create send/receiver vars
    // to move data through channel
    let (tx, rx) = mpsc::channel();

    let handle = thread::spawn(move || {

        let num1 = 1;
        let num2 = 2;
        let num3 = 3;

        tx.send(num1).unwrap();
        tx.send(num2).unwrap();
        tx.send(num3).unwrap();
        // wait .5 sec between sendings
        thread::sleep(Duration::from_millis(500));
    });

    handle.join().unwrap();

    // access rx directly
    // and use values
    for i in rx {

        println!("{}", i);
        thread::sleep(Duration::from_millis(500));
    }
}

Or we can send them through a loop.

Example:
use std::thread;
use std::sync::mpsc;
use std::time::Duration;

fn main() {

    // create send/receiver vars
    // to move data through channel
    let (tx, rx) = mpsc::channel();

    let handle = thread::spawn(move || {

        // vector array
        let num = vec![3,5,8];

        // send each value
        // from a loop
        for n in num {
            tx.send(n).unwrap();
            // wait .5 sec between sendings
            thread::sleep(Duration::from_millis(500));
        }
    });

    handle.join().unwrap();

    // access rx directly
    // and use values
    for i in rx {

        println!("{}", i);
        thread::sleep(Duration::from_millis(500));
    }
}

In both examples, we loop through the values directly in the receiver.

How to create multiple producers

We can send values from multiple threads by cloning our sender with the Sender::clone() function.

The clone() function accepts the address of the transmitter, which it then clones.

Example:
use std::thread;
use std::sync::mpsc;

fn main() {

    let (tx, rx) = mpsc::channel();
    // clone the sender
    let tx1 = mpsc::Sender::clone(&tx);

    thread::spawn(move || {

        // send from the regular transmitter
        tx.send("Hello").unwrap();
    });

    thread::spawn(move || {

        // send from the cloned transmitter
        tx1.send("there").unwrap();
    });

    let a:&str = rx.recv().unwrap();
    let b:&str = rx.recv().unwrap();
    println!("{} {}", a, b);
}

The first thread sends from the regular transmitter, and the second thread sends from the cloned transmitter.

Instead of looping directly from the receiver, we assign the values to variables. The values can be assigned sequentially, in the same order they were sent.

How to handle data races with mutex and arc

A data race is caused when more than one thread is trying to mutate data at the same time.

The compiler doesn’t know which one to do first, so it panics. Rust provides us with Mutex and Arc to handle data races.

Mutex

Mutex is a container that holds and locks the data it’s currently working on, to prevent anything else from trying to mutate it.

It implements these locks in a data-driven way. We have to specify the type of data protected by the mutex, and Rust will ensure the data is only accessed through the mutex. In other words, “lock data, not code”.

Arc

Arc provides thread-safe shared ownership of a value. Simply put, we use Arc to share memory between threads.

Typically, the last thread that finishes its execution is the one that should free the data in memory.

So, let’s consider that we have two threads and they both have access to the same variable. We need to decide which of them is responsible for freeing the variable.

If we don’t know in which order the threads are run, we use Arc to provide us with thread-safe ownership of the value.

When working with mutex and arc, we have to import the sync module for both.

Example:
use std::thread;
use std::sync::{Mutex, Arc};

First, let’s create a thread-safe value that we can share with Arc::new() .

Example:
use std::thread;
use std::sync::{Mutex, Arc};

fn main() {

    // thread safe value that
    // can be shared across threads
    let safe = Arc::new(5);
}

We also want to be able to lock the value while we access it, so we add the value to Mutex::new() inside the Arc.

Example:
use std::thread;
use std::sync::{Mutex, Arc};

fn main() {

    // thread-safe and lockable
    let safe = Arc::new(Mutex::new(5));
}

Next, let’s create multiple threads, but this time we’ll do it in a loop.

Example:
use std::thread;
use std::sync::{Mutex, Arc};

fn main() {

    // thread-safe and lockable
    let safe = Arc::new(Mutex::new(5));

    // more than one handle
    // store them in a vec
    // for convenience
    let mut handles = vec![];

    for i in 0..2 {
        // clone the transmitter
        let safe = Arc::clone(&safe);
        // create the thread
        let handle = thread::spawn(move|| {

        });

        // push the handle into the handles
        // vector so we can join them later
        handles.push(handle);
    }
}

We want to mutate the value, so let’s lock it, then change it.

Example:
use std::thread;
use std::sync::{Mutex, Arc};

fn main() {

    // thread-safe and lockable
    let safe = Arc::new(Mutex::new(5));

    // more than one handle
    // store them in a vec
    // for convenience
    let mut handles = vec![];

    for i in 0..2 {
        // clone the transmitter
        let safe = Arc::clone(&safe);
        // create the thread
        let handle = thread::spawn(move|| {

            // lock the value
            let mut a = safe.lock().unwrap();
            // mutate the value
            *a += 3;
        });

        // push the handle into the handles
        // vector so we can join them later
        handles.push(handle);
    }
}

Finally, we wrap up by joining the handles in the vector, and printing the value to the console.

Example:
use std::thread;
use std::sync::{Mutex, Arc};

fn main() {

    // thread-safe and lockable
    let safe = Arc::new(Mutex::new(5));

    // more than one handle
    // store them in a vec
    // for convenience
    let mut handles = vec![];

    for i in 0..2 {
        // clone the transmitter
        let safe = Arc::clone(&safe);
        // create the thread
        let handle = thread::spawn(move|| {

            // lock the value
            let mut a = safe.lock().unwrap();
            // mutate the value
            *a += 3;
        });

        // push the handle into the handles
        // vector so we can join them
        handles.push(handle);
    }

    // join the handles in the vector
    for i in handles {
        i.join().unwrap();
    }

    // lock the value when accessing it
    println!("a: {}", *safe.lock().unwrap());
}

Remember that when working with Arc, we’re working with memory, so when the value is mutated or accessed, we have to dereference it with the * operator.

Summary: Points to remember

  • Concurrency is when some of our code runs parallel to other code, in threads.
  • We spawn threads and place the code we want to run in them.
  • We assign our threads to handles and join them to allow all threads to finish.
  • To take ownership from inside a thread, we use the move closure.
  • We send messages between threads through a channel.
    • Channels work with a transmitter to send data and a receiver to catch data.
    • A channel can be passed to a function by specifying the Sender generic as a data type.
  • Multiple values can be sent through a channel, either one by one or in a loop.
    • When receiving multiple values, we can access them in a loop or sequentially as they we sent.
  • To send values from multiple threads, we clone our transmitters.
  • Arc allows us to share memory between threads.
  • Mutex allows us to lock data to prevent data races.
    • Mutex and Arc are typically used together, but it’s not required.
    • Arc works with memory, so values have to be dereferenced.
    • We should lock data with Mutex each time we access or mutate it.