The random ramblings of a techie

Message processing and thread safe containers

Using boost and STL

Often you want to have an application that’s driven by external events such as messages sent from another machine. The best way to do this is to have one thread that receives and stores the messages in a container while the main thread is takes them off of it and process them.

The benefit of this approach is that you are not risking to lose messages if the processing takes too long or coincides with a messages being received, however you still have to be careful not to allow the simultaneous access to the container.

The STL containers are not thread safe, so you have to roll your own. The key point is to keep it simple – you want to have a way to add and remove from the container that ensures that only one thread at a time is accessing the container.

Of course if you are writing a thread safe container, you want to make it generic so you can re-use it:

#include <queue>
#include "boost/thread/thread.hpp"

template <class DataType>
class BlockingQueue
{
   private:
      //Lock on the mutex to ensure only
      //one operation is performed at a time
      boost::mutex threadSafety;
      std::queue<DataType> messageQueue;
   public:
      void add(DataType& newMessage)
      {
         //Prevent other calls to add or remove from executing
         //for the duration of this function
         boost::lock_guard lock(threadSafety);
         messageQueue.push(newMessage);
      }

      DataType remove()
      {
         //Prevent other calls to add or remove from executing
         //for the duration of this function
         boost::lock_guard lock(threadSafety);
         return messageQueue.pop();			
      }
      bool isEmpty()
      {
         boost::lock_guard lock(threadSafety);
         return messageQueue.empty();
      }
      void clear()
      {
         boost::lock_guard lock(threadSafety);
         while (!messageQueue.empty())
         {
            messageQueue.pop();
         }
      }
}

Notice that each method first locks the mutex to ensure that only that function is accessing the queue. The lock guard will lock the mutex at the time of it’s creation and will release it once it goes out of scope. If another thread has the lock on the mutex, the thread will wait until it can get access.

Now let’s figure out the threading for the message handling. There will be one thread to read the messages and put them in the queue and another to process them.

//Some class that holds the messages:
class Message;

//Forward declarations details will follow later
void readMessageFunction(BlockingQueue& blockingQueue);

using namespace boost;
int main()
{
   //Create the queue
   BlockingQueue<Message> msgQueue;
   //Start the receiver thread
   thread readMsgsThread(readMessageFunction, ref(msgQueue));

   unit numberRetries = 0;
   //Fetch a message from the queue and process it
   while (true)
   {
      if(msgQueue.isEmpty())
      {
         numberRetries++;
         this_thread::sleep(boost::chrono::miliseconds(50));
      }
      else
      {
         Message incomingMsg = msgQueue.getMessage();
         if (incomingMsg.getMsgCode() == SHUT_DOWN)
         {
            break;
         }
         //Do whatever you want to do with the messages
         processMessage(incomingMsg);
      }

      //Timeout - exit the main loop and terminate
      if (numberRetries >= MAX_NUM_RETRIES)
      {
         break;
      }
   }// end while
   return 0;
}

The readMessageFunction takes a reference to the container class and is then started in a new thread. It will monitor the source of the messages (socket or whatever) and will insert them in the queue as they arrive.

At the same time the main thread checks the queue every so often and processes any messages it finds in it.

The way I implement the locking in the example above has a potential problem, namely that while I check if the queue is empty before I remove from it, the lock is released between the end of the check and the actual remove. If there were multiple threads reading from the queue this would be a problem since one thread could remove a message from the queue right at that moment causing an attempt to remove from an empty queue.

In this case it doesn’t make much sense to have multiple readers removing from the queue. This is often the case, unless you are implementing some sort of shared resource pool, but having to write something like that is rare.

So there you have it – a message handling implementation completed with generic thread safe container that you can easily modify to if you want something a bit more fancy – a priority queue perhaps.

Facebook Twitter Digg Reddit Linkedin Email

Leave a Reply

Connect with:

Your email address will not be published. Required fields are marked *

Please type the characters of this captcha image in the input box

Please type the characters of this captcha image in the input box

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>