Home | Libraries | People | FAQ | More |
In the first scheduling example, when a task is suspended, it is always added to the back task queue. We will now let a task decide whether be automatically rescheduled or not. This way a task can wait to be rescheduled at a latter time, when an event arrives.
We slightly modify scheduler::run()
:
...
void run () {
while(!m_queue.empty()) {
current()(std::nothrow);
m_queue.pop();
}
}
...
The line add(current()):
has been removed.
The reschedule()
member function:
...
void reschedule(job_type::self& self) {
add(current());
self.yield();
}
...
is added to scheduler
. It is used by a task to
reschedule itself. We will define a message queue class now:
class message_queue {
public:
std::string pop(job_type::self& self) {
while(m_queue.empty()) {
m_waiters.push(m_scheduler.current());
self.yield();
}
std::string res = m_queue.front();
m_queue.pop();
return res;
}
void push(const std::string& val) {
m_queue.push(val);
while(!m_waiters.empty()) {
m_scheduler.add(m_waiters.front());
m_waiters.pop();
}
}
message_queue(scheduler& s) :
m_scheduler(s) {}
private:
std::queue<std::string> m_queue;
std::queue<job_type> m_waiters;
scheduler & m_scheduler;
};
A task can wait for a message to arrive by calling
message_queue::pop()
. This function returns the first element in the
internal queue; if the queue is empty adds the current task to an internal wait
queue and yields control to the scheduler. When message_queue::pop()
is called, if the wait queue is not empty, its top element is removed
and rescheduled. Note that we use a while
loop instead of a simple
if
to check for the emptiness of the message queue. This is to
correctly handle spurious wakeups. Consider this scenario:
pop()
. Message queue is empty, so it sleeps waiting for
data.
pop()
. Message queue is empty, so it sleeps waiting for data.
pop()
without yielding control. Message queue is not empty, so it consumes
data produced by Consumer 2. It calls pop()
again. This time the message queue
is empty and goes to sleep.
if
where used
instead, the test wouldn't be performed, and Consumer 2 would try to
extract an non-existent element from the queue.
This means that this implementation of the message queue could starve the second consumer
if the first can always extract an element from the queue. A possible
solution to the problem would be to to insert an explicit call to
reschedule()
in pop()
that would give another consume a chance to
run. This would require extra context switches though. This is a
matter of preferring fairness or performance.
The "wait while message queue is empty" and "signal message queue not empty" pattern is reminiscent of condition variables used in threaded programming. In fact the idea is the same, except that we need not to associate a lock with the condition variable given the cooperative behavior of the scheduler. |
This is our message queue object. Again a global for simplicity:
message_queue mqueue(global_scheduler);
Now we will create some jobs:
void producer(job_type::self& self, int id, int count) {
while(--count) {
std::cout << "In producer: "<<id<<", left: "<<count <<"\n";
mqueue.push("message from " + boost::lexical_cast<std::string>(id));
std::cout << "\tmessage sent\n";
global_scheduler.reschedule(self);
}
}
void consumer(job_type::self& self, int id) {
while(true) {
std::string result = mqueue.pop(self);
std::cout <<"In consumer: "<<id<<"\n";
std::cout <<"\tReceived: "<<result<<"\n";
}
}
And add some instances of them to the scheduler:
global_scheduler.add(boost::bind(producer, _1, 0, 3));
global_scheduler.add(boost::bind(producer, _1, 1, 3));
global_scheduler.add(boost::bind(producer, _1, 2, 3));
global_scheduler.add(boost::bind(consumer, _1, 3));
global_scheduler.add(boost::bind(consumer, _1, 4));
calling global_scheduler.run()
generates the following output:
In producer: 0, left: 3
message sent
In producer: 1, left: 2
message sent
In producer: 2, left: 1
message sent
In consumer: 3
Received: message from 0
In consumer: 3
Received: message from 1
In consumer: 3
Received: message from 2
In producer: 0, left: 2
message sent
In producer: 1, left: 1
message sent
In consumer: 3
Received: message from 0
In consumer: 3
Received: message from 1
In producer: 0, left: 1
message sent
In consumer: 3
Received: message from 0
While this example is very simple and can't be easily extended to support system events (i.e. I/O, alarms and much more), it shows how a more complex event framework can be implemented. In the advanced session we will see how Boost.Asio can be used as a scheduler and how coroutines can be adapted as callbacks to asynchronous functions.
Copyright © 2006 Giovanni P. Deretta |