Recently I started playing with C++ Boost libraries. Originally I played with WT (webtoolkit.eu), but then I also started exploring the rest of Boost. I really like the functional capabilities of Boost, they are IMHO about as good as can be achieved in a C-like language. Having some (32) operators to override helps readability a lot.

When I was exploring the Boost thread library, it puzzled me, that there are no higher level synchronization primitives. There were only platform independent threads, mutexes, condition variables and then thread groups, locks as objects and that was about it. I was really missing an MVar.

As haskellists know, MVar in Haskell is a thread safe variable, which can be either full or empty. We can put value into an empty MVar to make it full and take a value from full empty MVar to make it empty. The principle has a lot in common with std::auto_ptr, but that is not thread safe. By using MVar we can solve a wide range of threading problems without using mutexes or condition variables directly.

Most of the thread problems can be generalized as a producer-consumer problem. You have a group of producer threads, which own some pieces of data, and we want to transfer ownership to a group of consumer threads. With MVars, each producer thread just puts into the MVar and each consumer thread just takes from the MVar. The MVar handles all the concurrency issues. And I though: “MVars should be fairly easy to implement in C++!”. And so I implemented them. I didn’t use Boost here, just pthread. I will give my reasons at the end.

Since the MVar is templated, declarations and definitions are both in the header file.

// File: MVar.H
#include <memory>
#include <pthread.h>

using namespace std;

According to Haskell MVar documentation (google:// haskell mvar):

An MVar (pronounced “em-var”) is a synchronising variable, used for communication between concurrent threads. It can be thought of as a a box, which may be empty or full.

template <typename T>
class MVar {

We will use std::auto_ptr for the putting and taking, because the functionality of MVar is very similar to that of auto_ptr.

  public:
    auto_ptr<T> take();
    void put(auto_ptr<T>);

MVar will be able to hold one value.

  private:
    auto_ptr<T> value;

When MVar is full, the threads that want to put must wait and we must be able to wake them up.

    pthread_cond_t put_cond;

And the same applies when the MVar is empty and some threads want to take a value.

    pthread_cond_t take_cond;

All threads (both putting and taking) will need to check the current status of value, so it needs to be protected by a separate mutex.

    pthread_mutex_t value_mutex;

All the mutexes and condition variables will need to be initialized and then later released.

  public:
    MVar();
    ~MVar();
};

The constructor definition is very straightforward.

template <typename T>
MVar<T>::MVar() {
  pthread_mutex_init(&value_mutex,NULL);
  pthread_cond_init(&take_cond,NULL);
  pthread_cond_init(&put_cond,NULL);
}

And so is the destructor definition.

template <typename T>
MVar<T>::~MVar() {
  pthread_mutex_destroy(&value_mutex);
  pthread_cond_destroy(&take_cond);
  pthread_cond_destroy(&put_cond);
}

Now let’s implement the take method.

template <typename T>
auto_ptr<T> MVar<T>::take() {

To transfer auto_ptr ownership in controlled conditions, we have to own some variable. Local variables are owned by threads.

  auto_ptr<T> taken_value;

Lock the mutex.

  pthread_mutex_lock(&value_mutex);

We use condition variables, but we do not rely on them completely. The first thing a take thread does is checking whether there is a value for taking.

  while (value.get() == NULL) {

And then wait for signal, that our value is (was recently) ready for taking. This doesn’t guarantee, that we will take the value. Another fresh started take thread can be quicker and we will have to wait again.

    pthread_cond_wait(&take_cond, &value_mutex);
  }

Loop condition failed, so value mutex is locked, take mutex is locked and value is full. So let’s take it.

  taken_value = value;

We have to signal the putting threads, that we have emptied the value. The signal may be lost (no putting thread waiting) or it may come too late. But that’s part of the plan.

  pthread_cond_signal(&put_cond);
  pthread_mutex_unlock(&value_mutex);

And return the taken value of course. It is std::auto_ptr so our local variable will be emptied by the return value assignment in the caller function.

  return taken_value;
}

The put method is just a mirror image of the take method.

template <typename T>
void MVar<T>::put(auto_ptr<T> value_to_put) {
  pthread_mutex_lock(&value_mutex);
  while (value.get() != NULL) {
    pthread_cond_wait(&put_cond, &value_mutex);
  }
  value = value_to_put;
  pthread_cond_signal(&take_cond);
  pthread_mutex_unlock(&value_mutex);
}

And that’s all. Now let’s test it.

// File: testMVar.C
#include "MVar.H"
#include <memory>
#include <iostream>

using namespace std;

We will have 1 producer (the parent thread) and 3 consumers. We will produce some int numbers and print them in the consumers. We don’t have any infrastructure to determine whether the production of values has ended, so we will produce exactly 12 values and each consumer will consume exactly 4 values.

#define PRODUCT int

Producers and consumers will communicate using just an MVar. Let’s call it gate.

MVar gate;

Consumers will print the numbers to stdout, so it must be protected by mutex.

#define IO_MSG(msg) pthread_mutex_lock( &io_mutex ); cout << msg; pthread_mutex_unlock( &io_mutex );
pthread_mutex_t io_mutex = PTHREAD_MUTEX_INITIALIZER;

With MVar, writing producer and consumer is easy.

void producer() {
  auto_ptr<PRODUCT> product;
  for(int i=0; i < 12; i++) {
    product.reset( new int(i) );
    IO_MSG( "Producer " << pthread_self() << " produced " << *(product.get()) << endl )
    gate.put( product );
  }
}

void* consumer(void*) {
  auto_ptr<PRODUCT> product;
  for(int i=0; i < 4; i++) {
    product = gate.take();
    IO_MSG( "Consumer " << pthread_self() << " consumed " << *(product.get()) << endl )
    product.reset();
  }
}

Put is all together. Start 3 consumers, run 1 producer and then wait for consumers to finish.

int main() {
  pthread_t consumers[3];
  for(int i=0; i < 3; i ++) {
    pthread_create( &consumers[i], NULL, consumer, NULL );
  }
  producer();
  for(int i=0; i < 3; i ++) {
    pthread_join( consumers[i], NULL );
  }
}

Now we can run the test by:

g++ -o testMVar -lpthread testMVar.C
./testMVar

And the result can look like:

Producer 3075177264 produced 0
Producer 3075177264 produced 1
Producer 3075177264 produced 2
Consumer 3066776432 consumed 0
...
Producer 3075177264 produced 11
Consumer 3058383728 consumed 9
Consumer 3058383728 consumed 10
Consumer 3058383728 consumed 11

So what did we achieve here? Why is it so amazing?

  • We made a powerful abstraction to synchronize threads. We don’t have to work with
    mutexes and condition variables … ever again!
  • We only transfer ownership of pointers, there’s no memory copying.
  • All the critical code is localized inside MVar (less errors).

Why didn’t I write this using Boost? To tell you the truth, I did. My first try was with Boost. But when I ran it, it sometimes deadlocked. Since C++ and Boost are not my usual tools, I considered it my error and tried valgrind –tool=helgrind on it. There were some possible race condition pointed out in the boost libraries, but I didn’t understand it much. So I decided to rewrite MVar and the test using pthreads directly. Just 1 to 1 rewrite, no change in logic or ordering. It worked on the first try and even valgrind didn’t find any possible places of race conditions. I give you both implementations, click here to download the sources.

EDIT: I have found my race condition (pthread version was also affected, just didn’t demonstrate). In my use case of condition variables, their mutex has to be locked when signaling. I changed the code accordingly (added lock around signal). Now both versions work and are correct.

EDIT2: neonsteven (on reddit) pointed out, that one mutex would be sufficient and faster (as locking mutex is a syscall). Now I share his opinion. I changed the sources, but left the article intact. So don’t be surprised.

EDIT3: After some more discussions on reddit, I realize that the article will be here for some time and the 3 mutex implementation would confuse new readers. I changed the article to use only 1 mutex.

Thank you for reading so far. If you liked this and decided to use MVars in your programs from now on, remember: All these ideas come from Haskell. I never would have achieved it without using Haskell (for over 2 years now). So try it, too (for example Real World Haskell). It will make you a happier programmer … forever after.

About these ads