Samrat Man Singh

Hi, I’m Samrat. I’m a backend programmer, also interested in compilers and low-level systems.

I am currently looking for job opportunities.

Implementing a key-value store, part 2: Linear Hashing implementation in Rust

2017-11-09


In the last post, I introduced the idea of linear hashing. This post will describe a Rust implementation of the algorithm. I won’t go through every last line of code, but hopefully enough to give you a good understanding of how the whole thing works. I should also mention that even though this is a post about implementing linear hashing, a spends quite some time talking about how storing data to disk works. This is intentional– articles about hashtable implementations are aplenty; articles talking about how external storage datastructures work, in my opinion, are not. You can of course check out the full source code on Github.

The interface

Before diving into how the hashtable will work, let’s first discuss how a client using the hashtable will be expected to use it.

Our hashtable implementation will map byte arrays to byte arrays. That is, we’ll let the client program figure out how to serialize/deserialize keys and values.

This is how a linear hashing table will be created:

let mut h = LinHash::open("book_ratings", 32, 4);

The parameters to open are the filename, the length of the key and the length of the value. That means that our key-value pairs(or records) will be stored in the file “book_ratings”; keys will be 32 bytes long(in the example above, it’s probably a string) and the value will be 4 bytes long(probably an integer). Keeping key and value size fixed is somewhat restrictive(RocksDB, for example, allows you to just pass in bytearrays of any length) but this will simplify how records are stored in our file.

Inserting and looking up works like so(I’ve put an assert for get just to give an idea of what the return value looks like):

h.put(b"Spin", &i32_to_bytearray(9));
h.put(b"Axis", &i32_to_bytearray(6));
// Assumes little-endian architecture
assert_eq!(h.get(b"Spin"), Some(vec![9, 0, 0, 0]));

I know this feels clunky but not having to worry about serialization/deserialization will simplify things quite a bit for us. I should mention that i32_to_byte_array(and other similarly named functions in the util module) use std::mem::transmute under the hood and so are not portable across architectures(hence the comment above about endianness).

Implementation overview

Here’s a high-level description of what happens when you call put as in the example above(the rest of the post goes into it in depth, of course). First we hash the key, and take however many bits the hashtable is currently set to take(see last post). This tells use which bucket to place the record in. A bucket is a linked list of pages, which are chunks of bytes on disk. Pages in the linked list may be full, so we now need to figure out which page the record should go in. Once we figure out which page it should go in, this page is fetched from disk. We then make the necessary changes to the page in memory– eg. write record, increment number of records in page’s header)– then save it out to disk.

get is very similar and uses the same method(search_bucket described below) that we use to figure out which page in the bucket the record should be placed in.

Pages and buffer pool

A file is divided into pages of size 4KB. When we want to read or write a some chunk of bytes in a page, we have to read the page into memory, make whatever changes we want to make on the copy resident in memory, then save the updated page out to disk.

Now, we will usually have a little more than 4KB of memory at our disposal, so what we can buffer the pages we read and write. So, instead of flushing the page out to disk once we do one operation, we keep the page in memory as long as possible. In this post, we will use a rather simplistic(and inefficient!) way to decide which pages get to stay(or rather which page gets evicted)– a FIFO queue1. If we have a buffer pool of size 2, and we read pages in the order 1,1,2,3 this is what the reads/writes to disk will look like:

read 1 [fetch page 1]
read 1 [no fetch necessary, 1 is already in pool]
read 2 [fetch page 2]
read 3 [fetch page 3; place in slot occupied by 1]

a page resident in our buffer pool is represented in a Page:

pub struct Page {
    pub id: usize,
    pub storage: [u8; PAGE_SIZE],
    pub num_records: usize,
    // page_id of overflow bucket
    pub next: Option<usize>,
    pub dirty: bool,

    keysize: usize,
    valsize: usize,
}

Notice that besides the bytearray(storage) we have some other metadata about the page here too:

The metadata is stored in the page itself. To read and write this metadata we have the following methods:

// in a `impl Page` block
pub fn read_header(&mut self) {
    let num_records : usize = bytearray_to_usize(self.storage[0..8].to_vec());
    let next : usize = bytearray_to_usize(self.storage[8..16].to_vec());
    self.num_records = num_records;
    self.next = if next != 0 {
        Some(next)
    } else {
        None
    };
}

pub fn write_header(&mut self) {
    mem_move(&mut self.storage[0..8], &usize_to_bytearray(self.num_records));
    mem_move(&mut self.storage[8..16], &usize_to_bytearray(self.next.unwrap_or(0)));
}

mem_move is a function similar to the memcpy system call.

Of course, the main content of the page are the records it stores. These are read and written using the following methods

pub fn read_record(&mut self, row_num: usize) -> (&[u8], &[u8]) {
    let offsets = self.compute_offsets(row_num);
    let key = &self.storage[offsets.key_offset..offsets.val_offset];
    let val = &self.storage[offsets.val_offset..offsets.row_end];
    (key, val)
}

pub fn write_record(&mut self, row_num: usize, key: &[u8], val: &[u8]) {
    let offsets = self.compute_offsets(row_num);
    mem_move(&mut self.storage[offsets.key_offset..offsets.val_offset],
             key);
    mem_move(&mut self.storage[offsets.val_offset..offsets.row_end],
             val);
}

I think both of the above are fairly straightforward. Because records are of fixed-size and because we’re only dealing with bytearrays, it’s just a matter of moving stuff around.

Buckets

Being able to read and write records to pages is great, but we need to be able to write to buckets without having to know in advance where and in which page the record will go to:

pub struct SearchResult {
    pub page_id: Option<usize>,
    pub row_num: Option<usize>,
    pub val: Option<Vec<u8>>
}

// impl DbFile { ...
pub fn search_bucket(&mut self, bucket_id: usize, key: &[u8]) -> SearchResult {
    let mut page_id = self.bucket_to_page(bucket_id);
    let mut buffer_index;
    let mut first_free_row = SearchResult {
        page_id: None,
        row_num: None,
        val: None,
    };
    loop {
        buffer_index = self.fetch_page(page_id);
        let next_page = self.buffers[buffer_index].next;
        let page_records = self.all_records_in_page(page_id);

        let len = page_records.len();
        for (row_num, (k,v)) in page_records.into_iter().enumerate() {
            if slices_eq(&k, key) {
                return SearchResult{
                    page_id: Some(page_id),
                    row_num: Some(row_num),
                    val: Some(v)
                }
            }
        }

        let row_num = if len < self.records_per_page {
            Some(len)
        } else {
            None
        };

        match (first_free_row.page_id, first_free_row.row_num) {
            // this is the first free space for a row found, so
            // keep track of it.
            (Some(_), None) |
            (None, _) => {
                first_free_row = SearchResult {
                    page_id: Some(page_id),
                    row_num: row_num,
                    val: None,
                }
            },
            _ => (),
        }

        if let Some(p) = next_page {
            page_id = p;
        } else {
            break;
        }
    }

    first_free_row
}

This may look like a lot but what it’s doing is not too complicated. It keeps fetching the next page in the bucket until it finds the record with the key it’s looking for. What is perhaps interesting is what it returns when it doesn’t find the record(because the record hasn’t been inserted)– if there’s space, it indicates in SearchResult which page and row to insert the record in; if there isn’t any space, it returns what the last page it looked in was, which is meant to be used when creating an overflow page.

Methods in LinHash

We now know how to figure out where a record should be placed in a bucket(search_bucket) and how to place record in said location(write_record). An implementation of our hashtable’s put operation arises quite naturally:

//  impl LinHash { ...
pub fn put(&mut self, key: &[u8], val: &[u8]) {
    let bucket_index = self.bucket(&key);
    match self.buckets.search_bucket(bucket_index, key.clone()) {
        SearchResult { page_id, row_num, val: old_val } => {
            match (page_id, row_num, old_val) {
                // new insert
                (Some(page_id), Some(pos), None) => {
                    self.buckets.write_record_incr(page_id, pos, key, val);
                    self.nitems += 1;
                },
                // case for update
                (Some(_page_id), Some(pos), Some(_old_val)) => {
                    panic!("can't use put to reinsert old item: {:?}", (key, val));
                },
                // new insert, in overflow page
                (Some(last_page_id), None, None) => { // overflow
                    self.buckets.allocate_overflow(bucket_index, last_page_id);
                    self.put(key, val);
                },
                _ => panic!("impossible case"),
            }
        },
    }

    self.maybe_split();
    self.buckets.write_ctrlpage((self.nbits, self.nitems, self.nbuckets));
}

The self.bucket call at the very top hashes the key and computes which bucket the item should go in. Remember that which bucket should go in depends on which how many bits we’re looking at and how many buckets we have. We covered how this works in part 1, so we won’t go into that now.

This is how maybe_split is implemented:

fn maybe_split(&mut self) -> bool {
    if self.split_needed() {
        self.nbuckets += 1;

        self.buckets.allocate_new_bucket();
        if self.nbuckets > (1 << self.nbits) {
            self.nbits += 1;
        }

        // Take index of last item added and subtract the 1 at the
        // MSB position. eg: after bucket 11 is added, bucket 01
        // needs to be split
        let bucket_to_split =
            (self.nbuckets-1) ^ (1 << (self.nbits-1));
        // Replace the bucket to split with a fresh, empty
        // page. And get a list of all records stored in the bucket
        let old_bucket_records =
            self.buckets.clear_bucket(bucket_to_split);

        // Re-hash all records in old_bucket. Ideally, about half
        // of the records will go into the new bucket.
        for (k, v) in old_bucket_records.into_iter() {
            self.reinsert(&k, &v);
        }
        return true
    }

    false
}

Limitations

Let me close by stating some of the limitations of this implementation. I’ve already hinted at the limitations of the cache eviction policy we use. Most real systems use LRU. The error handling story here is also not great– we should not be panicking when the client uses the datastructure incorrectly.

More importantly, our hashtable cannot handle concurrent accesses. We’re not using locks anywhere so, if multiple threads are inserting records, we almost certainly will see the data go out of whack.

Thanks to Kevin Butler for sending in corrections and suggestions.


  1. A much better way of doing cache eviction is to kick out the least-recently used page. I chose FIFO queue just because it’s much simpler to implement.