Friday, December 27, 2013

Thoughts on Improving ADS 02 --Singe-chain History

Single-chain history

So far my thoughts on restructuring the history data has followed the ADS method where there is an individual history list for each cell. An alternative is to have a single history block per row for each update. Going back to the original example of table t, lets say that we start out with the values i=2, j=20, k=200 and execute the following updates in order:
update t set i=3, j=30
update t set j=40, k=400


After the update blocks are gone and just history remains, we are left with this:



Having a single history chain complicates things. The historic read algorithm is more complicated because the value that the reader wants may not be in the last history block older than the reader. In the above example, a reader older than either update would read k from the first history block but read i and j from the second. With the ADS strategy or history array, the reader can very quickly see if a given column has history because the history for each column is in a separate list. With the single history chain, if any column has history then for every column the reader accesses it has to potentially traverse a long history list even just to find out if that column has history.


On the other hand, having a single history chain could make multicolumn access faster when the columns do have history because the reader only has to traverse one list rather than one list for each column. Another advantage of this strategy is that it takes less space and it allows row updates and cell updates to coexist with very little added complexity. In other words, some updates could act as if the granularity of updates is whole rows (like many other RDBMSs do), preventing any other transaction from updating other cells in the row even if the first update didn’t update those particular cells. This would help to avoid rollbacks for certain workloads.

Reducing Memory Further


All of the alternative schemes I’ve discussed for representing history require a pointer in every row in the database in order to hold the pointer to the row history. Instead of putting a history pointer in each row, we could put one on each page, a single history pointer for the page that points to a list of row history objects. We could use a linked list of row history objects or an array. The linked list would generally take up less space if only a few rows are updated, but will always be slower.

Saturday, December 21, 2013

Secrets of the ANTs Data Server 07 --Memory Management

previous

There are several disadvantages to the standard C/C++ memory management libraries represented by malloc() and free() or new and delete. These libraries have to handle the most general case of managing memory chunks in size from as small as a byte to as large as megabytes and do so in the face of completely arbitrary allocations and deallocations. Solving this problem and avoiding fragmentation requires a lot of runtime work to coalesce adjacent deallocated objects. It also requires an overhead of a word or two of for each allocation. This overhead is significant if a lot of your chunks are small.


Also, these libraries have no inherent protection against memory leaks or other memory-management problems such as freeing the same object twice. A server like ADS is intended to run for months at a time without stopping, so memory leaks and other memory-management errors are not tolerable.


For these reasons, and because ADS had to do a lot of memory management of small to medium-sized objects, ADS implemented its own memory management libraries --about four of them. These four memory management facilities were not destructor-friendly. That is, they did not guarantee to run C++ destructors, so they were only used for object that did not have destructors.

Arenas

The arena allocator was used primarily in the SQL compiler. To use the Arena library, a task would begin by allocating an arena. There was only one operation on an arena other than deallocating the arena itself:
void* arenaAlloc(Arena* arena, size_t n)
This function acted like malloc(), it would return a new chunk of memory n bytes large. However, there was no way to free these individual objects. Instead, all chunks from the arena would remain live until the entire arena was deallocated, and then they would all vanish.


Arenas are convenient for allocating nodes in the parse tree and the other small tree and graph nodes used in a compiler. Because we were using an arena, we never had to worry about having multiple pointers to a parse tree node because it was never freed individually.


There were a few places where a node was used for a time and then no longer needed so it could have been freed, but the total memory used by such objects was never very large compared to the memory that could never be freed until the compilation was finished. It may be that even with these objects, the compiler used less memory overall because most compiler object are small and arena-allocated chunks did not need the 4 to 16 bytes per chunk of overhead that the standard allocation facility uses.


A new arena would be allocated with a single 64KB page and all allocations would take memory out of that page if there was enough space left. If there was not enough space, a new 64KB page would be allocated for the object. If the object was larger than 64KB then a page was allocated that was large enough for the object, but a multiple of 64KB to help out the higher-level memory management routines (I think we used malloc() for this this).


This would create a chain of allocation pages, each of which might contain some unallocated space at the end. If the unallocated space was large enough, then each allocation would first check to see if it could fit into one of these pages before going to the last page.

Memory Pools

A memory pool is an allocator that only lets you allocate fixed-size blocks. That is, every block allocated from a particular memory pool has the same size.Since all blocks are the same size, there is no danger of fragmentation and no need to coalesce freed blocks to avoid it. The memory for memory pools came out of 64KB pages and like arenas, memory pools could chain together multiple 64KB pages if the first page ran out of memory. However, memory pools only used 64KB pages allocated on 64K boundaries.


In ADS most memory pools were used for only a particular type of object. For example, there were memory pools that were only used to allocate update blocks. Typically there was one memory pool for each worker thread for each type. So that in some cases there was less contention for allocation.


Unlke arenas, memory pools supported freeing of objects. When an object was freed, it would be added to a linked list of freed objects for that memory pool. I believe that an allocation request would first be served from empty space at the end of the last 64KB page. This was the case where there was no contention over allocations. If there was no space left at the end of the page, but the free list was non-empty, then an item would be returned from the free list. In this case, there was potential contention since any thread might be freeing an object, so access to the free list had to be thread-safe. Finally if there was no space at the end of the last page and nothing in the free list, then a new page would be allocated.


Memory pools did not need to tag allocated blocks with information. The size was fixed and there was no need to record what page the block came from because all pages were allocated on 64K boundaries, so you could find the address of the page that a block was contained in by masking off the lower bits of the block address.


It always seemed to me to be overkill having a different allocator for each type, but this turned out to be useful in tracking down memory leaks. It was easy to instrument the allocator to tell you at least what kind of blocks were leaking, and this is the first step in tracking down a leak.

Variable Blocks

The variable block allocator was actually a collection of memory pools ranging in size by powers of two. I believe there were memory pools for 8, 16, 32, 64, 128, 256, 512, and 1024 bytes. When you requested a chunk of size n, it would give you a chunk from the smallest block allocator of at least size n.


Again, there is no need for the overhead of keeping the block size around because the deallocator can always find the page header by truncating the block address to a 64K boundary and from the page header it can find the size of the block.

Scratchpad

ADS reserved a big chunk of 64MB of memory for special limited-use applications. Blocks could be allocated from this memory much like an arena, but with the additional restriction that the memory would be automatically freed at the end of the current phase so a task had to give up all pointers into this memory before relinquishing control.


As far as I know, the only use of the scratch pad was to sort large sets of data for index rebalancing and for sorted merge joins.

Thursday, December 19, 2013

Thoughts on Improving ADS 01 --Reducing Space in Rows

The following post contains diagrams so I'll have to give a link to rather than putting it in-line. To understand the following link, you probably should first read about ADS MVCC then you are ready for:

Sunday, December 15, 2013

Secrets of the ANTs Data Server 06 --Phased Execution

previous

In the section on MVCC, I said that on a commit or rollback, ADS goes through a list of updates and modifies them. The question you might be asking is, “How do we avoid concurrency issues when doing these changes?”. Also, I only described how values in rows get changed. Indexes also have to change when values change and I didn’t describe how that is done.


There are MVCC ways to handle both of these issues, but ADS didn’t use them. For both tasks, ADS used traditional single-threaded algorithms. It was able to do that by a mechanism called phased execution. In brief, phased execution is a strategy where you break down the tasks into separate phases and require all task threads to be in the same phase at the same time. In other words, threads block at a phase change until all threads are ready to enter that phase.


ADS had three phases:


session phase where we did the MVCC processing described previously
commit phase where we updated indexes and did commits and rollbacks
cleanup phrase where we cleaned up rolled-back update blocks and expired history blocks


In session phase, we used MVCC and mostly non-locking algorithms to modify potentially shared data. In the other two phases, we allocated work to the tasks in such a way that there is no potentially shared data. For example if an index needs to be updated, we would assign the index to a single task to do all of the work. None of the data would be shared with other tasks.


In session phase, each task thread would cycle through a list of messages called the session list until a global flag was set that said to switch to commit phase. Then the thread would increment a counter and block until the counter was equal to the number of task threads. At that point, each task thread would start reading and executing work from another list of messages called the commit list until a flag was set telling to to change back to session phase. Again, it would increment a counter, and then block on the counter until it was equal to the number of task threads. When the counter reached the number of task threads, then all thread would go back to executing work from the session list.


About every hundred or so of these session phase/commit phase loops, the thread would all drop into cleanup phase to do various cleanup work.


Phased execution substitutes time waiting on locks for time waiting for phase changes. It lets you use serial algorithms on shared structures by ensuring that at certain times during execution, the structures are not shared. This is pretty much what a lock does as well. By getting an exclusive lock on a data structure, you know that as long as you hold the lock, the structure is not shared.

Saturday, December 7, 2013

Secrets of the ANTs Data Server 05 --Preemptive Multitasking

previous

Regular thread-based multitasking systems such as Pthreads or the multitasking in Java use preemptive multitasking. This means that except for critical sections and synchronization, the programmer largely ignores the fact that his task has to share resources with other tasks. It is up to the operating system to switch tasks frequently so that they all get enough time slices to make progress.


Preemptive task switching is a fairly heavy-weight process because the switch can happen anywhere in the code. This means that to resume the task, you have to record the entire program state including the stack, the program counter, and the other registers.  Also, except for switching out a task when it is blocked, the operating system doesn’t have any idea when it is a good time to switch so it routinely trashes a warmed-up CPU cache to switch to another task that won’t be accessing the same memory locations.


By contrast, ADS used cooperative multitasking in which the programmer is responsible for relinquishing control of his own tasks in order to make sure that other tasks can be started. The advantage of cooperative multitasking is that the programmer knows when it is a good time to relinquish control. This can be done when there is very little state to save and when the work area is about to change so the contents of the CPU cache will be replaced anyway.


Cooperative multitasking can be much more efficient, but there is a reason that it isn’t more widely used: cooperative multitasking requires a lot more care than preemptive multitasking and is much more bug prone. Cooperative multitasking is analogous to manual memory management while preemptive multitasking is analogous to garbage collection. As for the memory case, it is just a lot harder and more error prone to do it yourself, but you can potentially do it a lot more efficiently by hand than an automated system can do it. A significant number of performance bugs and apparent deadlocks were tracked down to someone failing to relinquish a task and ultimately blocking all other tasks (for reasons I’ll explain in the section on phased execution).


Cooperative multitasking can be made a lot easier and less error prone with support from the programming language. I’ll write about that some time.


Here is the terminology that I will use in the following:


Task: a piece of work that is independently scheduled. A task has to do only a small amount of work and then relinquish control.


Message: a piece of data that contains everything needed to execute a task.


Message Queue: a queue that messages are placed on when they are ready to run.


Rescheduling a Task: If a task has to relinquish control before finishing, then it modifies the message that started it and puts it back on the appropriate message queue.


Job: a unified set of work that may spawn one or many tasks to do the work. A job is what executes a SQL statement.


Terminology used at ANTs was a bit different from what I am using here. In particular, the other engineers often used the word “message” to refer to both the task and the data structure that described the task. That shouldn’t be problematic because you can usually tell by context what someone means, but my brain could not adapt to using the word “message” to refer to a task. To me, a message can’t be anything other than a static piece of data, so I prefer to use the word “task”.

Task Management

When you swap out a task you need to save enough information to resume it in the future. With preemptive multitasking, all of the information you need is in the registers and stack. That sort of information lets you restart anywhere in the program. With cooperative multitasking (of the sort used in ADS), you don’t save the stack so you can’t resume in the middle of a function. In fact the only way to “resume” is to call a function so the “resume address” is a function pointer.


ADS saved the function pointer in a data structure called a message and the function would take a pointer to the message as its argument. In other words, we had a message type:


class Message;


and the task-starting function was of type TaskFunction, defined as:


typedef void (*TaskFunction)(Message*);


The message also included other information such as a session id and a set of parameters for the task.


Part of the efficiency of the ADS task management is that it was only used when needed. Suppose that we need to search a disk page for a value. We would have a function to do the necessary work and the first thing the function would do is check to see if the page was in memory. If it was in memory, then the function would just go ahead and execute without ever relinquishing control. If the page was not in memory, only the would the function initialize a message to execute the work, then start a page read and cooperatively suspend until the page was loaded into memory.


This was the general approach to accessing data that required I/O: check to see if the data is already in memory. If so, then just execute the function. If not, then start a read and put a message on the read queue to execute the function.


In addition to voluntarily suspending for I/O, a task would also have a counter that kept track, in vague units, of the amount of work done. The counter would be updated and checked periodically (typically inside an inner loop) and when it reached some threshold, the task would break out of what it was doing and suspend. In order to support this feature, the functions that did database work were written to be “restartable”.

Message Arguments

There was originally no distinction between external messages that would come from the network and internal messages spawned by tasks to do other tasks. This meant that we had to serialize the task arguments for the next TaskFunction before relinquishing control and that the TaskFunction would then have to deserialize.


Over time as we added more features and optimizations, more and more messages were strictly internal and this serialization/deserialization began to seem like a bigger and bigger problem. The developers responded by using memcpy() to copy literal machine values into the messages. This strategy was somewhat ad hoc and offered no type checking, but we didn’t have to go through and modify all of the existing messages to make it work.


Eventually we added a class hierarchy around messages that allowed current messages to continue working as-is and allowed new messages to be declared as subclasses where the arguments to the messages were (type-checked) class members. These new messages also had a convenient way to avoid scheduling a task when it wasn’t necessary.


Here is a pretty artificial example, but it gives the general idea: lets say that we have an array of integers which is kept in a disk page. A function to sum the values in the array might look something like the following. Note that this function is called within a task and taskMsg is the original message that started the task. taskMsg will be reused if we have to suspend in order to avoid extra allocation and initialization overhead. pageID is the ID of the page that contains the data, and result is the address of where the result should go. This result location must be able to outlive a task suspension.


void sumArray(Message taskMsg, Page pageID, int& result) {
 SumArrayMessage* msg = (SumArrayMessage*)taskMsg;
 msg->callBack = doRun;
 msg->pageID = pageID;
 msg->resultP = &result;
 msg->i = 0;
 if (inMemory(pageID))
   msg->run();
 else
   loadPage(taskMsg, pageID);
}


A few things to note here: SumArrayMessage is a subclass of TypedMessage which is a subclass of Message, so we are casting a super class, Message, to a subclass SumArrayMessage and then setting subclass fields in the cast object. This is not legitimate C++ in most cases but in ADS, all objects of type Message were actually 4KB chunks of memory which was much larger than needed for the Message fields. So a Message could always be converted to one of its subclasses as long as the subclass was less than 4KB in size.


Second, typed messages all used the same callback function, doRun(), which was defined like this:


void doRun(Message* msg) {
 ((TypedMessage*)msg)->run();
}


The run() method was a pure virtual method in TypedMessage.


Third, loadPage() is a function that loads a page and then when it is in memory, executes the task funcion of msg and passes msg as the argument.


The sum-array class is defined like this:


struct SumArrayMessage : public TypedMessage {
 Page pageID;
 int* resultP;
 int i;
 void run() {
   Array arr = getArray(pageID); // assumed to be in memory
   int n = arr->size();
   int workCounter = i + 1000; // keep track of work done
   while (i < n && i < workCounter) {
     *resultP += arr[i++];
   }
   if (i < n) {
     // we exited because we have done enough work this round
     suspend();
   }
   // otherwise we are done so just exit
 }
}


Notice that since i is a class variable, the run() function is restartable. We exit the loop either when we have finished adding or when we have added 1000 number, whichever comes first. If we have not finished adding (meaning that we have added 1000 numbers) then we suspend this task to let someone else do work. The suspend() function is defined as part of TypedMessage which is a subclass of Messsage. What it does is put the current message on the work queue for the next phase.


Thread Infrastructure

ADS used cooperative multitasking for database work, but the infrastructure was based on traditional threads. There were several kinds of threads.
  • Task threads would go through the message queue taking off one message, executing the task until it relinquished control, and then going to the next message in the queue.
  • I/O threads would suspend, waiting for network I/O requests. Whenever an I/O request finished, the thread would wake up the task that was waiting on the I/O and the task would then be put back on the message queue to be executed on a task thread. My memory is that we also used I/O threads for disk I/O in some circumstances but I am told otherwise.
  • Work threads would be used to do long-running work that we didn’t want to use preemptive multitask for. The SQL compiler ran on a work thread. It was possible for a work thread to turn into a task thread and vice versa. This feature was used in long-running threads to access database internals.
When there were fewer than four cores (we called them processors or CPU’s in those days) we would spawn one task thread for each core and one I/O thread. For four to eight cores, we spawned one less task thread, leaving one core for the I/O thread and operating system. The threads were spawned this way in attempt to minimize the preemptive scheduling by the operating system, which would to some extent defeat the advantages of the ADS internal cooperative multitasking. The task threads were given high priority to help prevent them from being swapped out.


I thought we used processor affinity as well to bind the task and I/O threads, but Achim Bode comments:


Thread affinity was added very late, and maybe only for Linux. The Improvements were small (couple of percent). I believe now that the memory locality is important to make thread affinity worth-it.


There were extra threads used for long-running processes that would only run when needed. These threads could enter the task-thread pool to do database work (typically to read data) and then back out for the long-running work.


When we had to do some out-of-band work such as compiling a query, one of the task threads would be temporarily converted into a work thread to execute the work, then be converted back into a task thread.

continued...