C++11 Concurrency Tutorial - Part 3: Advanced locking and condition variables

In the previous article, we saw how to use mutexes to fix concurrency problems. In this post, we will continue to work on mutexes with more advanced techniques. We will also study another concurrency technique of the C++11 Concurrency Library: condition variables.

Recursive locking

Let's imagine that you have a simple class like this one:

struct Complex {
    std::mutex mutex;
    int i;

    Complex() : i(0) {}

    void mul(int x){
        std::lock_guard<std::mutex> lock(mutex);
        i *= x;
    }

    void div(int x){
        std::lock_guard<std::mutex> lock(mutex);
        i /= x;
    }
};

And you want to add an operation doing both operations with no problems, so you add a new function:

void both(int x, int y){
    std::lock_guard<std::mutex> lock(mutex);
    mul(x);
    div(y);
}

Now, it's time to test this function:

int main(){
    Complex complex;
    complex.both(32, 23);

    return 0;
}

If you launch this application, you'll see that the program will never terminates. The problem is very simple. In the both() function, the thread acquires the lock and then calls the mul() function. In this function, the threads tries to acquire the lock again, but the lock is already locked. This is a case of deadlock. By default, a thread cannot acquire the same mutex twice.

There is a simple solution to this problem: std::recursive_mutex. This mutex can be acquired several times by the same thread. Here is the correct version of the Complex struct:

struct Complex {
    std::recursive_mutex mutex;
    int i;

    Complex() : i(0) {}

    void mul(int x){
        std::lock_guard<std::recursive_mutex> lock(mutex);
        i *= x;
    }

    void div(int x){
        std::lock_guard<std::recursive_mutex> lock(mutex);
        i /= x;
    }

    void both(int x, int y){
        std::lock_guard<std::recursive_mutex> lock(mutex);
        mul(x);
        div(y);
    }
};

This time, the application works correctly.

Timed locking

Sometimes, you doesn't want a thread to wait ad infinitum for a mutex. For example, if your thread can do something else when waiting for the thread. For this purpose, the standard library has a solution: std::timed_mutex and std::recursive_timed_mutex (if you need the recursivity properties of the mutex). You have access to the same functions as a std::mutex: lock() and unlock(), but you have also two new functions: try_lock_for() and try_lock_until().

The first one is also the most useful. It allows you to set a timeout after when the function automatically returns even if the lock was not acquired. The function returns true if the lock has been acquired, false otherwise. Let's try it with a simple example:

std::timed_mutex mutex;

void work(){
    std::chrono::milliseconds timeout(100);

    while(true){
        if(mutex.try_lock_for(timeout)){
            std::cout << std::this_thread::get_id() << ": do work with the mutex" << std::endl;

            std::chrono::milliseconds sleepDuration(250);
            std::this_thread::sleep_for(sleepDuration);

            mutex.unlock();

            std::this_thread::sleep_for(sleepDuration);
        } else {
            std::cout << std::this_thread::get_id() << ": do work without mutex" << std::endl;

            std::chrono::milliseconds sleepDuration(100);
            std::this_thread::sleep_for(sleepDuration);
        }
    }
}

int main(){
    std::thread t1(work);
    std::thread t2(work);

    t1.join();
    t2.join();

    return 0;
}

(The example is completely useless in practice)

The first interesting thing in this example is the declaration of the duration with std::chrono::milliseconds. This is also a new feature of the C++11 standard. You have access to several time unit: nanoseconds, microseconds, milliseconds, seconds, minutes and hours. We use a variable of this kind to set the timeout of the try_lock_for function. We also use this to make a thread sleeps with std::this_thread::sleep_for(duration). The rest of the example has nothing exciting in it, just some prints to see the results visually. Note that the program never stops, you have to kill it.

Call once

Sometimes you want a function to be called only once no matter the number of threads that are used. Imagine a function that has two parts. The first part has to be called only once and the second has to be executed every time the function gets called. We can use the std::call_once function to fix this problem very easily. Here is an example using this mechanism:

std::once_flag flag;

void do_something(){
    std::call_once(flag, [](){std::cout << "Called once" << std::endl;});

    std::cout << "Called each time" << std::endl;
}

int main(){
    std::thread t1(do_something);
    std::thread t2(do_something);
    std::thread t3(do_something);
    std::thread t4(do_something);

    t1.join();
    t2.join();
    t3.join();
    t4.join();

    return 0;
}

Each std::call_once is matched to a std::once_flag variable. Here I put a closure to be executed only once, but a function pointer or a std::function will make the trick.

Condition variables

A condition variable manages a list of threads waiting until another thread notify them. Each thread that wants to wait on the condition variable has to acquire a lock first. The lock is then released when the thread starts to wait on the condition and the lock is acquired again when the thread is awakened.

A very good example is a concurrent Bounded Buffer. It’s a cyclic buffer with a certain capacity with a start and an end. Here is our implementation of a Bounded Buffer using condition variables:

struct BoundedBuffer {
    int* buffer;
    int capacity;

    int front;
    int rear;
    int count;

    std::mutex lock;

    std::condition_variable not_full;
    std::condition_variable not_empty;

    BoundedBuffer(int capacity) : capacity(capacity), front(0), rear(0), count(0) {
        buffer = new int[capacity];
    }

    ~BoundedBuffer(){
        delete[] buffer;
    }

    void deposit(int data){
        std::unique_lock<std::mutex> l(lock);

        not_full.wait(l, [this](){return count != capacity; });

        buffer[rear] = data;
        rear = (rear + 1) % capacity;
        ++count;

        l.unlock();
        not_empty.notify_one();
    }

    int fetch(){
        std::unique_lock<std::mutex> l(lock);

        not_empty.wait(l, [this](){return count != 0; });

        int result = buffer[front];
        front = (front + 1) % capacity;
        --count;

        l.unlock();
        not_full.notify_one();

        return result;
    }
};

The mutexes are managed by a std::unique_lock. It is a wrapper to manage a lock. This is necessary to be used with the condition variables. To wake up a thread that is waiting on a condition variable, the notify_one() function is used. The unlock before the notify_one is not totally necessary. If you omit it, it will be done automatically by destructor of the unique_lock. But, it is then possible that the notify_one() call will wake up a waiting thread that will then directly block again since the lock itself is still locked by the notifier thread. Therefore, if you do it before, the notified thread should be able to get the lock directly. Therefore, it's a slight optimization, but it won't make a lot of differences. The wait function is a bit special. It takes as the first argument the unique lock and a the second one a predicate. The predicate must return false when the waiting must be continued (it is equivalent to while(!pred()){cv.wait(l);}). The rest of the example has nothing special.

We can use this structure to fix multiple consumers / multiple producers problem. This problem is very common in concurrent programming. Several threads (consumers) are waiting from data produced by another several threads (producers). Here is an example with several threads using the structure:

void consumer(int id, BoundedBuffer& buffer){
    for(int i = 0; i < 50; ++i){
        int value = buffer.fetch();
        std::cout << "Consumer " << id << " fetched " << value << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(250));
    }
}

void producer(int id, BoundedBuffer& buffer){
    for(int i = 0; i < 75; ++i){
        buffer.deposit(i);
        std::cout << "Produced " << id << " produced " << i << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

int main(){
    BoundedBuffer buffer(200);

    std::thread c1(consumer, 0, std::ref(buffer));
    std::thread c2(consumer, 1, std::ref(buffer));
    std::thread c3(consumer, 2, std::ref(buffer));
    std::thread p1(producer, 0, std::ref(buffer));
    std::thread p2(producer, 1, std::ref(buffer));

    c1.join();
    c2.join();
    c3.join();
    p1.join();
    p2.join();

    return 0;
}

Three consumer threads and two producer threads are created and query the structure constantly. An interesting thing about this example is the use of std::ref to pass the buffer by reference, it is necessary to avoid a copy of the buffer.

Wrap-Up

In this article we saw several things. First, we saw how to use a recursive_mutex to allow a thread to acquire a thread more than once. Then, we saw how to acquire a mutex with a timeout. After that, a method to call a function only once has been studied. And finally, condition variables were used to solve the multiple consumers / multiple producers problem.

The source code for this article can be found on Github.

Next

In the next post of this series, we will another technique of this new C++11 Concurrency Library, the Atomics.

Related articles

  • C++11 Concurrency Tutorial - Part 2 : Protect shared data
  • Java Concurrency – Part 3 : Synchronization with intrinsic locks
  • C++11 Concurrency - Part 1 : Start Threads
  • Java Concurrency - Part 5 : Monitors (Locks and Conditions)
  • C++11 Synchronization Benchmark
  • Java Concurrency - Part 1 : Threads
  • Comments

    Comments powered by Disqus