Saturday, December 7, 2013

Secrets of the ANTs Data Server 05 --Preemptive Multitasking


Regular thread-based multitasking systems such as Pthreads or the multitasking in Java use preemptive multitasking. This means that except for critical sections and synchronization, the programmer largely ignores the fact that his task has to share resources with other tasks. It is up to the operating system to switch tasks frequently so that they all get enough time slices to make progress.

Preemptive task switching is a fairly heavy-weight process because the switch can happen anywhere in the code. This means that to resume the task, you have to record the entire program state including the stack, the program counter, and the other registers.  Also, except for switching out a task when it is blocked, the operating system doesn’t have any idea when it is a good time to switch so it routinely trashes a warmed-up CPU cache to switch to another task that won’t be accessing the same memory locations.

By contrast, ADS used cooperative multitasking in which the programmer is responsible for relinquishing control of his own tasks in order to make sure that other tasks can be started. The advantage of cooperative multitasking is that the programmer knows when it is a good time to relinquish control. This can be done when there is very little state to save and when the work area is about to change so the contents of the CPU cache will be replaced anyway.

Cooperative multitasking can be much more efficient, but there is a reason that it isn’t more widely used: cooperative multitasking requires a lot more care than preemptive multitasking and is much more bug prone. Cooperative multitasking is analogous to manual memory management while preemptive multitasking is analogous to garbage collection. As for the memory case, it is just a lot harder and more error prone to do it yourself, but you can potentially do it a lot more efficiently by hand than an automated system can do it. A significant number of performance bugs and apparent deadlocks were tracked down to someone failing to relinquish a task and ultimately blocking all other tasks (for reasons I’ll explain in the section on phased execution).

Cooperative multitasking can be made a lot easier and less error prone with support from the programming language. I’ll write about that some time.

Here is the terminology that I will use in the following:

Task: a piece of work that is independently scheduled. A task has to do only a small amount of work and then relinquish control.

Message: a piece of data that contains everything needed to execute a task.

Message Queue: a queue that messages are placed on when they are ready to run.

Rescheduling a Task: If a task has to relinquish control before finishing, then it modifies the message that started it and puts it back on the appropriate message queue.

Job: a unified set of work that may spawn one or many tasks to do the work. A job is what executes a SQL statement.

Terminology used at ANTs was a bit different from what I am using here. In particular, the other engineers often used the word “message” to refer to both the task and the data structure that described the task. That shouldn’t be problematic because you can usually tell by context what someone means, but my brain could not adapt to using the word “message” to refer to a task. To me, a message can’t be anything other than a static piece of data, so I prefer to use the word “task”.

Task Management

When you swap out a task you need to save enough information to resume it in the future. With preemptive multitasking, all of the information you need is in the registers and stack. That sort of information lets you restart anywhere in the program. With cooperative multitasking (of the sort used in ADS), you don’t save the stack so you can’t resume in the middle of a function. In fact the only way to “resume” is to call a function so the “resume address” is a function pointer.

ADS saved the function pointer in a data structure called a message and the function would take a pointer to the message as its argument. In other words, we had a message type:

class Message;

and the task-starting function was of type TaskFunction, defined as:

typedef void (*TaskFunction)(Message*);

The message also included other information such as a session id and a set of parameters for the task.

Part of the efficiency of the ADS task management is that it was only used when needed. Suppose that we need to search a disk page for a value. We would have a function to do the necessary work and the first thing the function would do is check to see if the page was in memory. If it was in memory, then the function would just go ahead and execute without ever relinquishing control. If the page was not in memory, only the would the function initialize a message to execute the work, then start a page read and cooperatively suspend until the page was loaded into memory.

This was the general approach to accessing data that required I/O: check to see if the data is already in memory. If so, then just execute the function. If not, then start a read and put a message on the read queue to execute the function.

In addition to voluntarily suspending for I/O, a task would also have a counter that kept track, in vague units, of the amount of work done. The counter would be updated and checked periodically (typically inside an inner loop) and when it reached some threshold, the task would break out of what it was doing and suspend. In order to support this feature, the functions that did database work were written to be “restartable”.

Message Arguments

There was originally no distinction between external messages that would come from the network and internal messages spawned by tasks to do other tasks. This meant that we had to serialize the task arguments for the next TaskFunction before relinquishing control and that the TaskFunction would then have to deserialize.

Over time as we added more features and optimizations, more and more messages were strictly internal and this serialization/deserialization began to seem like a bigger and bigger problem. The developers responded by using memcpy() to copy literal machine values into the messages. This strategy was somewhat ad hoc and offered no type checking, but we didn’t have to go through and modify all of the existing messages to make it work.

Eventually we added a class hierarchy around messages that allowed current messages to continue working as-is and allowed new messages to be declared as subclasses where the arguments to the messages were (type-checked) class members. These new messages also had a convenient way to avoid scheduling a task when it wasn’t necessary.

Here is a pretty artificial example, but it gives the general idea: lets say that we have an array of integers which is kept in a disk page. A function to sum the values in the array might look something like the following. Note that this function is called within a task and taskMsg is the original message that started the task. taskMsg will be reused if we have to suspend in order to avoid extra allocation and initialization overhead. pageID is the ID of the page that contains the data, and result is the address of where the result should go. This result location must be able to outlive a task suspension.

void sumArray(Message taskMsg, Page pageID, int& result) {
 SumArrayMessage* msg = (SumArrayMessage*)taskMsg;
 msg->callBack = doRun;
 msg->pageID = pageID;
 msg->resultP = &result;
 msg->i = 0;
 if (inMemory(pageID))
   loadPage(taskMsg, pageID);

A few things to note here: SumArrayMessage is a subclass of TypedMessage which is a subclass of Message, so we are casting a super class, Message, to a subclass SumArrayMessage and then setting subclass fields in the cast object. This is not legitimate C++ in most cases but in ADS, all objects of type Message were actually 4KB chunks of memory which was much larger than needed for the Message fields. So a Message could always be converted to one of its subclasses as long as the subclass was less than 4KB in size.

Second, typed messages all used the same callback function, doRun(), which was defined like this:

void doRun(Message* msg) {

The run() method was a pure virtual method in TypedMessage.

Third, loadPage() is a function that loads a page and then when it is in memory, executes the task funcion of msg and passes msg as the argument.

The sum-array class is defined like this:

struct SumArrayMessage : public TypedMessage {
 Page pageID;
 int* resultP;
 int i;
 void run() {
   Array arr = getArray(pageID); // assumed to be in memory
   int n = arr->size();
   int workCounter = i + 1000; // keep track of work done
   while (i < n && i < workCounter) {
     *resultP += arr[i++];
   if (i < n) {
     // we exited because we have done enough work this round
   // otherwise we are done so just exit

Notice that since i is a class variable, the run() function is restartable. We exit the loop either when we have finished adding or when we have added 1000 number, whichever comes first. If we have not finished adding (meaning that we have added 1000 numbers) then we suspend this task to let someone else do work. The suspend() function is defined as part of TypedMessage which is a subclass of Messsage. What it does is put the current message on the work queue for the next phase.

Thread Infrastructure

ADS used cooperative multitasking for database work, but the infrastructure was based on traditional threads. There were several kinds of threads.
  • Task threads would go through the message queue taking off one message, executing the task until it relinquished control, and then going to the next message in the queue.
  • I/O threads would suspend, waiting for network I/O requests. Whenever an I/O request finished, the thread would wake up the task that was waiting on the I/O and the task would then be put back on the message queue to be executed on a task thread. My memory is that we also used I/O threads for disk I/O in some circumstances but I am told otherwise.
  • Work threads would be used to do long-running work that we didn’t want to use preemptive multitask for. The SQL compiler ran on a work thread. It was possible for a work thread to turn into a task thread and vice versa. This feature was used in long-running threads to access database internals.
When there were fewer than four cores (we called them processors or CPU’s in those days) we would spawn one task thread for each core and one I/O thread. For four to eight cores, we spawned one less task thread, leaving one core for the I/O thread and operating system. The threads were spawned this way in attempt to minimize the preemptive scheduling by the operating system, which would to some extent defeat the advantages of the ADS internal cooperative multitasking. The task threads were given high priority to help prevent them from being swapped out.

I thought we used processor affinity as well to bind the task and I/O threads, but Achim Bode comments:

Thread affinity was added very late, and maybe only for Linux. The Improvements were small (couple of percent). I believe now that the memory locality is important to make thread affinity worth-it.

There were extra threads used for long-running processes that would only run when needed. These threads could enter the task-thread pool to do database work (typically to read data) and then back out for the long-running work.

When we had to do some out-of-band work such as compiling a query, one of the task threads would be temporarily converted into a work thread to execute the work, then be converted back into a task thread.


No comments:

Post a Comment