# Disk-based algorithms: External merge-sort

### 2017-11-13

I have lately been studying how databases work under the hood. There are many problems that a database has to solve. At the moment, I’ve been having fun exploring how data is stored on disk. Previously, I wrote about how a hashtable is stored on disk. This post is somewhat similar– I talk about how data stored on disk can be sorted.

Databases frequently have to sort data of massive sizes, sizes big enough that they don’t fit in memory at once. Sorting algorithms that can sort data residing in disk are called external sorting algorithms. External merge-sort is one such algorithm.

## External merge-sort

Just like merge-sort, external merge-sort is a divide-and-conquer algorithm– it recursively sorts smaller subarrays, and then merges those subarrays. Because disk I/O happens in blocks, however, the smallest subarray is the size of a disk page. So for a file with n pages, the algorithm proceeds as follows.

First, all pages are sorted internally. To achieve this, we bring in however many pages can fit into the program’s buffers(remember, the entire data doesn’t fit in memory), sort each page and write them back. This step is usually referred to as pass 0.

Next, in pass 1, we sort runs of length 1. A run is a collection of sorted pages, so that means that we take pairs of pages and merge them to get $n/2$ runs of length 2. Now, the contents of pages 0 and 1 are sorted; as are 2 and 3, and so on. In pass 2, we do the same thing, but with runs of length 2. And so on, until finally we have two subarrays(runs of length $n/2$ pages). Merging those gives us the final sorted result. We start with $n$ runs and halve at each pass, so there will be $log_2(n)$ passes.

All this may seem very familiar to you if you’re already acquainted with merge-sort. The difference here is that is some extra work to be done when merging. Take, for example, the last pass where we do the final merge between two runs of $n/2$ pages. Our original constraint was that n pages don’t fit in memory, so 2 runs of $n/2$ pages certainly won’t either. To solve this, the pages in the two input runs are streamed in.

First we bring in the first pages from each of the runs. As with “regular” merge-sort, we iterate through both, picking off whichever has the smaller item. Eventually, one will run out. At that point, bring in another page from that run into the buffer, and continue. Our output is also a file, so we need to do the same on the output side– once the output buffer is filled, write it to the output file, and start with an empty buffer.

You can check out my Rust implementation of this algorithm on Github.

## Multi-way merging

What I’ve described above is called two-way merging since at any given step we merge two runs. However, it can be advantageous to merge $k$ runs at once. This would result in $log_k(n)$ number of passes. Since we write out to disk in each pass, the efficiency gain this reduction in the number of passes gives us can be significant.

You can read more about the ways to do k-way merging on Wikipedia.

# Implementing a key-value store, part 2: Linear Hashing implementation in Rust

### 2017-11-09

In the last post, I introduced the idea of linear hashing. This post will describe a Rust implementation of the algorithm. I won’t go through every last line of code, but hopefully enough to give you a good understanding of how the whole thing works. I should also mention that even though this is a post about implementing linear hashing, it spends quite some time talking about how storing data to disk works. This is intentional– articles about hashtable implementations are aplenty; articles talking about how external storage datastructures work, in my opinion, are not.

You can check out the full source code on Github.

## The interface

Before diving into how the hashtable will work, let’s first discuss how a client using the hashtable will be expected to use it.

Our hashtable implementation will map byte arrays to byte arrays. That is, we’ll let the client program figure out how to serialize/deserialize keys and values.

This is how a linear hashing table will be created:

let mut h = LinHash::open("book_ratings", 32, 4);

The parameters to open are the filename, the length of the key and the length of the value. That means that our key-value pairs(or records) will be stored in the file “book_ratings”; keys will be 32 bytes long(in the example above, it’s probably a string) and the value will be 4 bytes long(probably an integer). Keeping key and value size fixed is somewhat restrictive(RocksDB, for example, allows you to just pass in bytearrays of any length) but this will simplify how records are stored in our file.

Inserting and looking up works like so(I’ve put an assert for get just to give an idea of what the return value looks like):

h.put(b"Spin", &i32_to_bytearray(9));
h.put(b"Axis", &i32_to_bytearray(6));
// Assumes little-endian architecture
assert_eq!(h.get(b"Spin"), Some(vec![9, 0, 0, 0]));

I know this feels clunky but not having to worry about serialization/deserialization will simplify things quite a bit for us. I should mention that i32_to_byte_array(and other similarly named functions in the util module) use std::mem::transmute under the hood and so are not portable across architectures(hence the comment above about endianness).

## Implementation overview

Here’s a high-level description of what happens when you call put as in the example above(the rest of the post goes into it in depth, of course). First we hash the key, and take however many bits the hashtable is currently set to take(see last post). This tells us which bucket to place the record in. A bucket is a linked list of pages, which are chunks of bytes on disk. Pages in the linked list may be full, so we now need to figure out which page the record should go in. Once we figure out which page it should go in, this page is fetched from disk. We then make the necessary changes to the page in memory– eg. write record, increment number of records in page’s header)– then save it out to disk.

get is very similar and uses the same method(search_bucket described below) that we use to figure out which page in the bucket the record should be placed in.

## Pages and buffer pool

A file is divided into pages of size 4KB. When we want to read or write a some chunk of bytes in a page, we have to read the page into memory, make whatever changes we want to make on the copy resident in memory, then save the updated page out to disk.

Now, we will usually have a little more than 4KB of memory at our disposal, so what we can do is buffer the pages we read and write. So, instead of flushing the page out to disk once we do one operation, we keep the page in memory as long as possible. In this post, we will use a rather simplistic(and inefficient!) way to decide which pages get to stay(or rather which page gets evicted)– a FIFO queue1. If we have a buffer pool of size 2, and we read pages in the order 1,1,2,3 this is what the reads/writes to disk will look like:

read 1 [fetch page 1]
read 3 [fetch page 3; place in slot occupied by 1]


a page resident in our buffer pool is represented in a Page:

pub struct Page {
pub id: usize,
pub storage: [u8; PAGE_SIZE],
pub num_records: usize,
// page_id of overflow bucket
pub next: Option<usize>,
pub dirty: bool,

keysize: usize,
valsize: usize,
}

Notice that besides the bytearray(storage) we have some other metadata about the page here too:

• id specifies which page in the file this page is
• storage is all the bytes in the page copied out to a byte array
• num_records specifies how many records this page has
• next is what strings the overflow pages we talked about in the last post. It is also used to keep track of pages that used to be overflow pages but are not in use.
• dirty specifies whether the page here is out of sync with its corresponding page on file.
• the next two fields keysize and valsize specify what length the key and value bytearrays are in the records the page stores.

The metadata is stored in the page itself. To read and write this metadata we have the following methods:

// in a impl Page block
let num_records : usize = bytearray_to_usize(self.storage[0..8].to_vec());
let next : usize = bytearray_to_usize(self.storage[8..16].to_vec());
self.num_records = num_records;
self.next = if next != 0 {
Some(next)
} else {
None
};
}

mem_move(&mut self.storage[0..8], &usize_to_bytearray(self.num_records));
mem_move(&mut self.storage[8..16], &usize_to_bytearray(self.next.unwrap_or(0)));
}

mem_move is a function similar to the memcpy system call.

Of course, the main content of the page are the records it stores. These are read and written using the following methods

pub fn read_record(&mut self, row_num: usize) -> (&[u8], &[u8]) {
let offsets = self.compute_offsets(row_num);
let key = &self.storage[offsets.key_offset..offsets.val_offset];
let val = &self.storage[offsets.val_offset..offsets.row_end];
(key, val)
}

pub fn write_record(&mut self, row_num: usize, key: &[u8], val: &[u8]) {
let offsets = self.compute_offsets(row_num);
mem_move(&mut self.storage[offsets.key_offset..offsets.val_offset],
key);
mem_move(&mut self.storage[offsets.val_offset..offsets.row_end],
val);
}

I think both of the above are fairly straightforward. Because records are of fixed-size and because we’re only dealing with bytearrays, it’s just a matter of moving stuff around.

## Buckets

Being able to read and write records to pages is great, but we need to be able to write to buckets without having to know in advance where and in which page the record will go to:

pub struct SearchResult {
pub page_id: Option<usize>,
pub row_num: Option<usize>,
pub val: Option<Vec<u8>>
}

// impl DbFile { ...
pub fn search_bucket(&mut self, bucket_id: usize, key: &[u8]) -> SearchResult {
let mut page_id = self.bucket_to_page(bucket_id);
let mut buffer_index;
let mut first_free_row = SearchResult {
page_id: None,
row_num: None,
val: None,
};
loop {
buffer_index = self.fetch_page(page_id);
let next_page = self.buffers[buffer_index].next;
let page_records = self.all_records_in_page(page_id);

let len = page_records.len();
for (row_num, (k,v)) in page_records.into_iter().enumerate() {
if slices_eq(&k, key) {
return SearchResult{
page_id: Some(page_id),
row_num: Some(row_num),
val: Some(v)
}
}
}

let row_num = if len < self.records_per_page {
Some(len)
} else {
None
};

match (first_free_row.page_id, first_free_row.row_num) {
// this is the first free space for a row found, so
// keep track of it.
(Some(_), None) |
(None, _) => {
first_free_row = SearchResult {
page_id: Some(page_id),
row_num: row_num,
val: None,
}
},
_ => (),
}

if let Some(p) = next_page {
page_id = p;
} else {
break;
}
}

first_free_row
}

This may look like a lot but what it’s doing is not too complicated. It keeps fetching the next page in the bucket until it finds the record with the key it’s looking for. What is perhaps interesting is what it returns when it doesn’t find the record(because the record hasn’t been inserted)– if there’s space, it indicates in SearchResult which page and row to insert the record in; if there isn’t any space, it returns what the last page it looked in was, which is meant to be used when creating an overflow page.

## Methods in LinHash

We now know how to figure out where a record should be placed in a bucket(search_bucket) and how to place record in said location(write_record). An implementation of our hashtable’s put operation arises quite naturally:

//  impl LinHash { ...
pub fn put(&mut self, key: &[u8], val: &[u8]) {
let bucket_index = self.bucket(&key);
match self.buckets.search_bucket(bucket_index, key.clone()) {
SearchResult { page_id, row_num, val: old_val } => {
match (page_id, row_num, old_val) {
// new insert
(Some(page_id), Some(pos), None) => {
self.buckets.write_record_incr(page_id, pos, key, val);
self.nitems += 1;
},
// case for update
(Some(_page_id), Some(pos), Some(_old_val)) => {
panic!("can't use put to reinsert old item: {:?}", (key, val));
},
// new insert, in overflow page
(Some(last_page_id), None, None) => { // overflow
self.buckets.allocate_overflow(bucket_index, last_page_id);
self.put(key, val);
},
_ => panic!("impossible case"),
}
},
}

self.maybe_split();
self.buckets.write_ctrlpage((self.nbits, self.nitems, self.nbuckets));
}

The self.bucket call at the very top hashes the key and computes which bucket the item should go in. Remember that which bucket should go in depends on which how many bits we’re looking at and how many buckets we have. We covered how this works in part 1, so we won’t go into that now.

This is how maybe_split is implemented:

fn maybe_split(&mut self) -> bool {
if self.split_needed() {
self.nbuckets += 1;

self.buckets.allocate_new_bucket();
if self.nbuckets > (1 << self.nbits) {
self.nbits += 1;
}

// Take index of last item added and subtract the 1 at the
// MSB position. eg: after bucket 11 is added, bucket 01
// needs to be split
let bucket_to_split =
(self.nbuckets-1) ^ (1 << (self.nbits-1));
// Replace the bucket to split with a fresh, empty
// page. And get a list of all records stored in the bucket
let old_bucket_records =
self.buckets.clear_bucket(bucket_to_split);

// Re-hash all records in old_bucket. Ideally, about half
// of the records will go into the new bucket.
for (k, v) in old_bucket_records.into_iter() {
self.reinsert(&k, &v);
}
return true
}

false
}

## Limitations

Let me close by stating some of the limitations of this implementation. I’ve already hinted at the limitations of the cache eviction policy we use. Most real systems use LRU. The error handling story here is also not great– we should not be panicking when the client uses the datastructure incorrectly.

More importantly, our hashtable cannot handle concurrent accesses. We’re not using locks anywhere so, if multiple threads are inserting records, we almost certainly will see the data go out of whack.

Thanks to Kevin Butler for sending in corrections and suggestions.

1. A much better way of doing cache eviction is to kick out the least-recently used page. I chose FIFO queue just because it’s much simpler to implement.

# Implementing a key-value store, part 1: Linear Hashing

### 2017-11-04

In this series of posts, I want to walk you through how to build a simple, persistent key-value store. We will be using an on-disk hashtable(also called external hashtable) to store the key-value mappings. In this post, I will explain hashtables briefly and an algorithm called linear hashing. In the next post, we’ll look at an implementation of linear hashing(written in Rust).

How would you store a hashtable on disk? This post describes a technique called linear hashing which can be used for this purpose. But before that let’s look at hashtables in general.

## Hashtables primer

You might have seen in-memory implementations in an algorithms textbook or course that did something like the following. You create an array of fixed size n(the bucket array); when inserting a key-value pair (k,v), you take hash(k) mod n to get which slot in the array the pair will go into.

Because of something called the Birthday problem you expect there to be collisions– multiple keys being assigned to the same slot– even without having too many key-value pairs. To handle this, there are two schemes commonly presented:

1. The first idea is to have each entry in the array point to a linked list. When a key-value pair is assigned to a slot, it gets appended to the end of the linked list. A lookup would start at the head of the linked list and look at each node in the list until it finds the node with the key-value pair we’re looking for. This scheme is called separate chaining.

2. In the second scheme, slots in the array do not point to a linked list. They simply contain the key-value pair. Now, when a key-value pair is assigned to a slot and when the slot is occupied, we start scanning the array for a free slot. When doing a lookup, we would look at the slot the pair was assigned but notice that it’s not there. And start scanning the array to find it. This scheme is called open addressing.

Obviously I haven’t covered all the nuances of hashtable implementations here– there are corner cases I haven’t talked about and there are other schemes to deal with collision and hashtables. But there are plenty of fine resources that talk about those so we’ll move on to linear hashing.

## Storing a hashtable to disk

Let’s now talk about how we would go about storing a hashtable in a file on disk. The technique I will describe, called linear hashing, is an adaptation of the first scheme(separate chaining) above. That means, our key-value pairs will be somehow stored in a linked list with array slots pointing to their corresponding linked lists(also called buckets).

However, unlike the in-memory version above, the nodes in the linked list won’t be storing a single pair. Files are organized in pages– each page is a block of bytes, usually 4KB in size; on Linux, you can run getconf PAGESIZE to see what it is on your system. While you can store values that span two blocks, say an 8-byte integer that starts at byte 4090 in the file and ending at byte 4097, the filesystem would have to read/write two pages to fulfill your request. Therefore, we will align a node in the linked list to a page. Each node will consist of multiple key-value pairs; once a page is full, we assign an overflow page linked to from the current page.

Both of the schemes described to deal with collision above are called static hashtable algorithms– the size of the bucket array stays the same1. This can be a problem when we want to persist our hashtable to disk. In the separate chaining scheme, having a long chain of pages means that a lookup would us an I/O operation for each page. So, we want our linked lists to be really short.

## Linear hashing

This is where linear hashing comes in. Linear hashing is a dynamic hashtable algorithm– the bucket array can grow as new key-value pairs are inserted. Keeping the chain of pages pointed at by the array slots short is the main goal of growing the bucket array. More specifically, it limits the average length of buckets(chain of linked list).

In linear hashing, when doing an insert, we start by looking at just the least significant bit(LSB) of the hash of the key. If it’s 0, it goes into the bucket 0 and if it’s 1, it goes into the bucket 1. We can keep inserting new items until the datastructure is utilized up to say 80%– that is, 80% of the storage allocated so far has been filled up.

When the threshold is crossed, we increment both number of bits and the number of buckets by 1. When we increment the number of bits, 0 and 1 become 00 and 01, respectively; for existing buckets, the representation changes to have two bits but their value is the same as before.

We add one bucket, 10. Note that incrementing number of bits by 1 doubles the number of addressable buckets, but we only increment the number of buckets by 1. Let’s now look at this newly added bucket– before we incremented the number of bits, all items whose hash’s LSB was 0 went to bucket 0(which is now bucket 00); some of those items may indeed have their two LSBs 00 but some of them may also have 10 as their two least LSBs. We want to move those to the 10 bucket. So, we iterate over the pairs in bucket 00 and re-insert each pair. This time, it’ll look at two bits in the hash and so the items will be inserted into either 00 or 10. Once we are done iterating we are done with the “growth” stage.

This is what our example hashtable looks like so far:

We calculate load factor as shown below:

num. of bits = 1
num. of buckets = 2
num. of items = 4

Load factor = num. of items / (num. of buckets * items per bucket)
= 4  / (2 * 2) = 4/4
= 1.0


Before, we pointed out that the number of addressable buckets doubled when we incremented number of bits ie. both 10 and 11 are representable with two bits. But we only added one bucket(10). Well, we’re looking at the last two bits of the hash, so what do we do if we see a hash with two LSBs 11? In that case, we need to simply ignore the MSB(1), and insert the pair into 01.

Let’s say that after inserting a few more items, the threshold is again exceeded. We need to grow the hashtable again. This time, however, we don’t increment the number of bits since there is already an unused address that is currently not in use(11). We just allocate space for the 11 bucket, and re-insert pairs in the 01 bucket.

After expanding the hashtable to allocate space for bucket 11, it will look like this:

## Hashtable growth in linear hashing

Growing the hashtable is what makes linear hashing a dynamic hashtable algorithm. And so, it’s important to understand how the hashtable grows. As we saw, first the 10 bucket is added causing the 00 bucket to be split(ie. bucket 00’s items were re-distributed between 00 and 10).

Later on, when the hashtable’s load factor crossed the threshold again, bucket 11 was added. This time, bucket 01 was split. As more items are inserted, the next buckets to be split will be 000(100 added), 001(101 added), and so on.

There are two important characteristics to how hashtables grow under the linear hashing algorithm. First, the hashtable grows linearly– an insert will cause one bucket to split2. This gives us some assurance that insert performance doesn’t degrade. Second, all buckets are split eventually. This property will ensure that the linked lists pointed to by the bucket array don’t get too long.

One last thing worth pointing out is that the order in which buckets get split is deterministic, in linear order. Usually, the bucket split is different than the one to which a key-value pair was just inserted.

## Conclusion

That’s it for now. Hopefully, you have some intuition for why we use linear hashing– that is, to avoid accessing multiple pages when looking up a key in the hashtable. And also why linear hashing works. In the next post we’ll look at a Rust implementation of linear hashing. In case you don’t want to wait, you can check out the implementation on Github.

Update: There’s some interesting discussion in this post’s Reddit thread

1. Both of these schemes can support a re-hash operation, however that can be time-consuming. And the hashtable will be unusable during the rehash.

2. Proof: Let $r$ be the number of records, $n$ be the number of buckets, $b$ be the number of records per bucket and $t$ be the threshold. We know that $\frac{r}{bn} < t$. When we insert a new record, the load exceeds threshold: $\frac{r+1}{n} > bt$. After splitting a bucket we have, $$\text{load factor} = \frac{r+1}{n+1}$$ We want to show that after splitting once, this new load factor < t. Since $r < btn$, $r + 1 < btn + 1$. Then, since $b > 1$ and $0 < t \leq 1$, we know that $bt > 1$. So $r+1 < btn + bt$, so, $r + 1 < bt(n + 1)$. Hence, $$\frac{r+1}{n+1} < bt$$ which is equivalent to $$\text{load factor} < t$$ Thus, we know that we will only have to split once bucket per insert.