Recently I started playing with C++ Boost libraries. Originally I played with WT (webtoolkit.eu), but then I also started exploring the rest of Boost. I really like the functional capabilities of Boost, they are IMHO about as good as can be achieved in a C-like language. Having some (32) operators to override helps readability a lot.
When I was exploring the Boost thread library, it puzzled me, that there are no higher level synchronization primitives. There were only platform independent threads, mutexes, condition variables and then thread groups, locks as objects and that was about it. I was really missing an MVar.
As haskellists know, MVar in Haskell is a thread safe variable, which can be either full or empty. We can put value into an empty MVar to make it full and take a value from full empty MVar to make it empty. The principle has a lot in common with std::auto_ptr, but that is not thread safe. By using MVar we can solve a wide range of threading problems without using mutexes or condition variables directly.
Most of the thread problems can be generalized as a producer-consumer problem. You have a group of producer threads, which own some pieces of data, and we want to transfer ownership to a group of consumer threads. With MVars, each producer thread just puts into the MVar and each consumer thread just takes from the MVar. The MVar handles all the concurrency issues. And I though: “MVars should be fairly easy to implement in C++!”. And so I implemented them. I didn’t use Boost here, just pthread. I will give my reasons at the end.
Since the MVar is templated, declarations and definitions are both in the header file.
// File: MVar.H #include <memory> #include <pthread.h> using namespace std;
According to Haskell MVar documentation (google:// haskell mvar):
An MVar (pronounced “em-var”) is a synchronising variable, used for communication between concurrent threads. It can be thought of as a a box, which may be empty or full.
template <typename T>
class MVar {
We will use std::auto_ptr for the putting and taking, because the functionality of MVar is very similar to that of auto_ptr.
public:
auto_ptr<T> take();
void put(auto_ptr<T>);
MVar will be able to hold one value.
private:
auto_ptr<T> value;
When MVar is full, the threads that want to put must wait and we must be able to wake them up.
pthread_cond_t put_cond;
And the same applies when the MVar is empty and some threads want to take a value.
pthread_cond_t take_cond;
All threads (both putting and taking) will need to check the current status of value, so it needs to be protected by a separate mutex.
pthread_mutex_t value_mutex;
All the mutexes and condition variables will need to be initialized and then later released.
public:
MVar();
~MVar();
};
The constructor definition is very straightforward.
template <typename T>
MVar<T>::MVar() {
pthread_mutex_init(&value_mutex,NULL);
pthread_cond_init(&take_cond,NULL);
pthread_cond_init(&put_cond,NULL);
}
And so is the destructor definition.
template <typename T>
MVar<T>::~MVar() {
pthread_mutex_destroy(&value_mutex);
pthread_cond_destroy(&take_cond);
pthread_cond_destroy(&put_cond);
}
Now let’s implement the take method.
template <typename T>
auto_ptr<T> MVar<T>::take() {
To transfer auto_ptr ownership in controlled conditions, we have to own some variable. Local variables are owned by threads.
auto_ptr<T> taken_value;
Lock the mutex.
pthread_mutex_lock(&value_mutex);
We use condition variables, but we do not rely on them completely. The first thing a take thread does is checking whether there is a value for taking.
while (value.get() == NULL) {
And then wait for signal, that our value is (was recently) ready for taking. This doesn’t guarantee, that we will take the value. Another fresh started take thread can be quicker and we will have to wait again.
pthread_cond_wait(&take_cond, &value_mutex);
}
Loop condition failed, so value mutex is locked, take mutex is locked and value is full. So let’s take it.
taken_value = value;
We have to signal the putting threads, that we have emptied the value. The signal may be lost (no putting thread waiting) or it may come too late. But that’s part of the plan.
pthread_cond_signal(&put_cond); pthread_mutex_unlock(&value_mutex);
And return the taken value of course. It is std::auto_ptr so our local variable will be emptied by the return value assignment in the caller function.
return taken_value; }
The put method is just a mirror image of the take method.
template <typename T>
void MVar<T>::put(auto_ptr<T> value_to_put) {
pthread_mutex_lock(&value_mutex);
while (value.get() != NULL) {
pthread_cond_wait(&put_cond, &value_mutex);
}
value = value_to_put;
pthread_cond_signal(&take_cond);
pthread_mutex_unlock(&value_mutex);
}
And that’s all. Now let’s test it.
// File: testMVar.C #include "MVar.H" #include <memory> #include <iostream> using namespace std;
We will have 1 producer (the parent thread) and 3 consumers. We will produce some int numbers and print them in the consumers. We don’t have any infrastructure to determine whether the production of values has ended, so we will produce exactly 12 values and each consumer will consume exactly 4 values.
#define PRODUCT int
Producers and consumers will communicate using just an MVar. Let’s call it gate.
MVar gate;
Consumers will print the numbers to stdout, so it must be protected by mutex.
#define IO_MSG(msg) pthread_mutex_lock( &io_mutex ); cout << msg; pthread_mutex_unlock( &io_mutex ); pthread_mutex_t io_mutex = PTHREAD_MUTEX_INITIALIZER;
With MVar, writing producer and consumer is easy.
void producer() {
auto_ptr<PRODUCT> product;
for(int i=0; i < 12; i++) {
product.reset( new int(i) );
IO_MSG( "Producer " << pthread_self() << " produced " << *(product.get()) << endl )
gate.put( product );
}
}
void* consumer(void*) {
auto_ptr<PRODUCT> product;
for(int i=0; i < 4; i++) {
product = gate.take();
IO_MSG( "Consumer " << pthread_self() << " consumed " << *(product.get()) << endl )
product.reset();
}
}
Put is all together. Start 3 consumers, run 1 producer and then wait for consumers to finish.
int main() {
pthread_t consumers[3];
for(int i=0; i < 3; i ++) {
pthread_create( &consumers[i], NULL, consumer, NULL );
}
producer();
for(int i=0; i < 3; i ++) {
pthread_join( consumers[i], NULL );
}
}
Now we can run the test by:
g++ -o testMVar -lpthread testMVar.C ./testMVar
And the result can look like:
Producer 3075177264 produced 0 Producer 3075177264 produced 1 Producer 3075177264 produced 2 Consumer 3066776432 consumed 0 ... Producer 3075177264 produced 11 Consumer 3058383728 consumed 9 Consumer 3058383728 consumed 10 Consumer 3058383728 consumed 11
So what did we achieve here? Why is it so amazing?
- We made a powerful abstraction to synchronize threads. We don’t have to work with
mutexes and condition variables … ever again! - We only transfer ownership of pointers, there’s no memory copying.
- All the critical code is localized inside MVar (less errors).
Why didn’t I write this using Boost? To tell you the truth, I did. My first try was with Boost. But when I ran it, it sometimes deadlocked. Since C++ and Boost are not my usual tools, I considered it my error and tried valgrind –tool=helgrind on it. There were some possible race condition pointed out in the boost libraries, but I didn’t understand it much. So I decided to rewrite MVar and the test using pthreads directly. Just 1 to 1 rewrite, no change in logic or ordering. It worked on the first try and even valgrind didn’t find any possible places of race conditions. I give you both implementations, click here to download the sources.
EDIT: I have found my race condition (pthread version was also affected, just didn’t demonstrate). In my use case of condition variables, their mutex has to be locked when signaling. I changed the code accordingly (added lock around signal). Now both versions work and are correct.
EDIT2: neonsteven (on reddit) pointed out, that one mutex would be sufficient and faster (as locking mutex is a syscall). Now I share his opinion. I changed the sources, but left the article intact. So don’t be surprised.
EDIT3: After some more discussions on reddit, I realize that the article will be here for some time and the 3 mutex implementation would confuse new readers. I changed the article to use only 1 mutex.
Thank you for reading so far. If you liked this and decided to use MVars in your programs from now on, remember: All these ideas come from Haskell. I never would have achieved it without using Haskell (for over 2 years now). So try it, too (for example Real World Haskell). It will make you a happier programmer … forever after.
November 14, 2010 at 7:56 pm
Hello, very nice post. I agree with the beauty of MVars and of Haskell and I think that C++ is improved by this kind of approach.
Just a minor observations to your post:
.
the destructor is actually a copy-and-paste version of the constructor
I assume that it is simply a typo but I haven’t yet looked at the source.
November 14, 2010 at 10:12 pm
Yes, just a copy&paste error. The code was ok.
November 14, 2010 at 8:28 pm
I believe you have a dead lock in your pthread implementation also.
Only tested on Mac OS X 1.5.8, compiled using g++ 4.0.1
After compiling, simply run:
$ while true; do ./mvar-test; done
Or any other similar continuous execution of the binary.
Which will give you a deadlock almost instantaneously.
November 14, 2010 at 10:10 pm
Yes, I realized it, too. Fixed. Thanks.
November 14, 2010 at 11:44 pm
You need to set the value to NULL in the take method or calling put, then take, then put will deadlock. The value will never be set to NULL and so put’s condition variable condition will never be true. Insert the change after acquiring the mutex.
November 15, 2010 at 12:24 am
http://www.cplusplus.com/reference/std/memory/auto_ptr/operator=/
November 15, 2010 at 12:18 am
boost::shared_ptr
November 15, 2010 at 12:27 am
How can a shared_ptr help in this task? How is shared_ptr thread safe? When there is a value in shared_ptr, it can be rewritten by any other thread.
OTOH, put into a full MVar blocks until MVar is empty.
November 15, 2010 at 2:06 am
The `put` method has 10 (!) synchronization calls.
“All these ideas come from Haskell.”
That pretty much sums it up.
November 15, 2010 at 7:13 am
Thanks for a constructive comment. The code was already using only 1 mutex, now the article does, too. If you can still do better, please offer your solution.
If you don’t seek higher level abstractions while you program (in any language), there is nothing I can do for you.
November 15, 2010 at 2:18 am
ah, C++ fans not happy to rewrite just half of Common Lisp and a third of Perl in. More languages to come into the binary blob as well…
I don’t see the point of using a dog with tentacles strap-on when I can simply use a octopus…
November 22, 2012 at 3:58 pm
I can’t download your source’s can you post it on github or reupload them?