boost.png (6897 bytes) Home Libraries People FAQ More

PrevUpHomeNext

Waiting for events

In the first scheduling example, when a task is suspended, it is always added to the back task queue. We will now let a task decide whether be automatically rescheduled or not. This way a task can wait to be rescheduled at a latter time, when an event arrives.

We slightly modify scheduler::run():

...
void run () {
  while(!m_queue.empty()) {
    current()(std::nothrow);	
    m_queue.pop();
  }
}
...

The line add(current()): has been removed.
The reschedule() member function:

...
void reschedule(job_type::self& self) {
  add(current());
  self.yield();
}
...

is added to scheduler. It is used by a task to reschedule itself. We will define a message queue class now:

class message_queue {
public:
  std::string pop(job_type::self& self) {
    while(m_queue.empty()) {
      m_waiters.push(m_scheduler.current());
      self.yield();      
    }
    std::string res = m_queue.front();
    m_queue.pop();
    return res;
  }

  void push(const std::string& val) {
    m_queue.push(val);
    while(!m_waiters.empty()) {
      m_scheduler.add(m_waiters.front());
      m_waiters.pop();
    }
  }

  message_queue(scheduler& s) :
    m_scheduler(s) {}

private:
  std::queue<std::string> m_queue;
  std::queue<job_type> m_waiters;
  scheduler & m_scheduler;
};

A task can wait for a message to arrive by calling message_queue::pop(). This function returns the first element in the internal queue; if the queue is empty adds the current task to an internal wait queue and yields control to the scheduler. When message_queue::pop() is called, if the wait queue is not empty, its top element is removed and rescheduled. Note that we use a while loop instead of a simple if to check for the emptiness of the message queue. This is to correctly handle spurious wakeups. Consider this scenario:

This means that this implementation of the message queue could starve the second consumer if the first can always extract an element from the queue. A possible solution to the problem would be to to insert an explicit call to reschedule() in pop() that would give another consume a chance to run. This would require extra context switches though. This is a matter of preferring fairness or performance.

The "wait while message queue is empty" and "signal message queue not empty" pattern is reminiscent of condition variables used in threaded programming. In fact the idea is the same, except that we need not to associate a lock with the condition variable given the cooperative behavior of the scheduler.

This is our message queue object. Again a global for simplicity:

message_queue mqueue(global_scheduler);

Now we will create some jobs:

void producer(job_type::self& self, int id, int count) {
  while(--count) {
    std::cout << "In producer: "<<id<<", left: "<<count <<"\n";	
    mqueue.push("message from " + boost::lexical_cast<std::string>(id));
    std::cout << "\tmessage sent\n";
    global_scheduler.reschedule(self);
  } 
}

void consumer(job_type::self& self, int id) {
  while(true) {
    std::string result = mqueue.pop(self);
    std::cout <<"In consumer: "<<id<<"\n";
    std::cout <<"\tReceived: "<<result<<"\n";
  }
}

And add some instances of them to the scheduler:

global_scheduler.add(boost::bind(producer, _1, 0, 3));
global_scheduler.add(boost::bind(producer, _1, 1, 3));
global_scheduler.add(boost::bind(producer, _1, 2, 3));
global_scheduler.add(boost::bind(consumer, _1, 3));
global_scheduler.add(boost::bind(consumer, _1, 4));

calling global_scheduler.run() generates the following output:

In producer: 0, left: 3
        message sent
In producer: 1, left: 2
        message sent
In producer: 2, left: 1
        message sent
In consumer: 3
        Received: message from 0
In consumer: 3
        Received: message from 1
In consumer: 3
        Received: message from 2
In producer: 0, left: 2
        message sent
In producer: 1, left: 1
        message sent
In consumer: 3
        Received: message from 0
In consumer: 3
        Received: message from 1
In producer: 0, left: 1
        message sent
In consumer: 3
        Received: message from 0

Conclusions

While this example is very simple and can't be easily extended to support system events (i.e. I/O, alarms and much more), it shows how a more complex event framework can be implemented. In the advanced session we will see how Boost.Asio can be used as a scheduler and how coroutines can be adapted as callbacks to asynchronous functions.

Copyright © 2006 Giovanni P. Deretta

PrevUpHomeNext