A message-oriented pipe

I am working on a side project that involves sending and receiving messages on a netlink(7) socket. It has become a fairly hairy endeavor, so I am taking a mini-break to improve one aspect of the project. The project's purpose is to update the IP addresses assigned to interfaces based on intercepted traffic. It's a way to allow IPVlan-based interfaces to use DHCP and SLAAC to configure IP addresses in the local network automatically. It's structured as a long-running process split into multiple threads:

A B C

The system is time-sensitive, since the messages sent to thread B are verdicts to netfilter about whether or not a packet should be dropped; delay there will cause packet loss. Delays in messages to thread A affects interface's abilities to receive packets on new IP addresses they've configured.

Currently, when a thread wants to send a chain of messages to another thread, it relinquishes that chunk of memory and allocates a new one. The garbage collector then frees at some later time, when it's no longer in use. Because this is a long-running process, I have to balance multiple priorities in how it runs:

While meeting these constraints, the code should be easy to read.

The requirement to use a predictable (preferrably constant) amount of memory means it has to refuse taking new packets if it is out of memory. There needs to be some kind of flow control. One way of accomplishing this is by not allocating any new memory after startup, and re-using a fixed buffer instead. This would limit activity from the garbage collector which can affect the second requirement of taking a predictable amount of time.

While there are a lot of ways to accomplish what I want, the best way I can think of is a ring buffer; a fixed-size array that uses a "wrap-around" addressing scheme so that the space freed by dequeueing items from the front can be re-used for new items that are enqueued at the back, without moving them around.

Ring buffers are a very old, very well-known technique, and they are ubiquitous; they're used to implement queues for network interfaces, pipes for inter-process communication, in IO submission and completion queues for block storage devices. They are a really efficient way for two (or more) processes to share memory in a streaming fashion.

While ring buffer implementations abound, I'm writing my own. Why? The requirements for this implementation are as follows:

Also, it's not a requirement, but performance is a non-goal. I'm reaching for a ring buffer because it's a simple way to store sequences using a fixed amount of memory. Simplicity is more important.

The requirements are unique enough to warrant taking a break from my project for something a little more tractable and fun. While writing comments, I found myself wanting to write more and include diagrams to explain the workings of the code, so the rest of this post is something like extended comments or literate program. More than anything, they are an aid for me to understand what I wrote and reassure myself that it's correct.

Core data structure

Given a fixed-size, byte-addressable region of memory:

We can treat this region as a ring buffer, by tracking the beginning (head) and end (tail) of the occupied region:

H T

There are various ways of tracking these boundaries, but they must be tracked. The occupied region is the area between the head and tail:

H T

When dequeuing an item, a consumer increments the position of the head:

H' T H

When enqueuing a new item, a consumer increments the position of the tail. If the tail is incremented beyond the end of the buffer, it "wraps around" to the beginning. This is how the "ring" or "circular" buffer gets its name.

H T' T

When the tail "catches up" to the head, the buffer is full. When the head "catches up" to the tail, the buffer is empty. It is an error for the head to overtake the tail, or the tail to overtake the head, as it would imply reading data that doesn't exist yet, or overwriting valid data, respectively.

Preserving (variable-length) message boundaries

I want to use this buffer to share messages, rather than a splittable byte stream. It's important that readers only get whole messages, not fragments of them. If a reader dequeued the first half of a message, the next reader would be unable to parse the remainder. Similarly, if a writer only enqueued part of a message, it would not be possible for a reader to fully decode it.

I could rely on the message to delimit itself by including a length header like nlmsg_len in the Netlink message header, and trust that a reader would dequeue the correct number of bytes, and the writers would only write whole messages at a time. But I want a little more certainty that a message ends where I think it ends.

This would be easy to support if all messages were the same size; just make that size the minimum addressable unit of the buffer. But that's not the case here; the netlink messages I want to store are comprised of a fixed-size header, an optional body, and zero or more optional key/value pairs called "attributes" which are of varying size.

We make the assumption that each write to the buffer stores an integral (non-fractional) number of messages, and those are the boundaries that should be respected when handling reads. We can preserve these boundaries by inserting a length header in front of each message:

L L L H T

The length includes the length of the header itself. Including the header length is common in network protocols, because it allows elements in the data flow which don't need to inspect an item to skip it without knowing the full format. That's not too relevant for my use case, since this is all within the same process, but it's just what I'm used to.

Writing to the buffer is a two-step process:

And reading becomes the following process:

A downside to this approach is that if the header were somehow corrupted, the ring would be come unusable. Another approach that I see often is for the ring to contain fixed-size pointers to buffers with the actual data. This is what virtio virtqueues do, for example. I don't want to do that, as I would then have to come up with a scheme to allocate and re-use those buffers.

Keeping messages contiguous

I want messages to remain contiguous in memory, and not split across the edge of the buffer. This makes it easier to use APIs that expect the message to reside in a single chunk of memory, like read(2), write(2), send(2) and recv(2). While there may be scatter/gather versions of those APIs, such as readv(2), many modules in the OCaml standard library and the broader ecosystem work on contiguous chunks of memory, so I don't want to split messages up.

When there is free space on both ends of the buffer, but a writer wants to write a message that is larger than the space after the tail:

H T X

We can insert a "hole" at the end of the buffer that pushes the tail into the larger space at the beginning:

H T H T

There's a couple ways we can do this. One way is to maintain an extra variable, G that can be the width or position of the "gap". Whenever H or T land on the gap after a read or write, they can be incremented or decremented as appropriate to skip it.

H T G

Another approach is to insert a "filler" entry into the queue that takes up the required amount of space, but is skipped on reads.

L H T

We have to dedicate a value in the header to represent filler entries, so that the read functions know to skip over them rather than passing them along to the caller.

Each approach has its pros and cons. The approach using the extra variable has the benefit that every item in the buffer is valid, and makes it simpler to detect whether the buffer is empty. The approach of using a filler value may simplify the read and write functions, and has one less variable to track and keep up to date. I am leaning towards using the filler value, but will probably implement both and choose the one that feels less error-prone.

OCaml implementation

Allocating a memory region

Most values in OCaml are tracked by its garbage collector, and OCaml may choose to move a value in memory as part of compaction in an effort to improve its utilization of memory. But I intend to pass memory ranges from this ring buffer into C functions that have no knowledge of the garbage collector's activity and cannot tolerate a buffer being relocated while they're executing. I/O functions that release the runtime lock while waiting for I/O may find that ocaml values are no longer valid when they wake up. The standard library provides the Bigarray module that allows you to allocate a chunk of memory whose contents are not scanned by the garbage collector, and is not relocated during the lifetime of the program. It's an ideal data type for storing the ring buffer data.

A ring buffer, then, is a memory region, plus a head and tail representing the header of the first entry and the start of the empty space, respectively.

type ring = {
  data : (char, int8_unsigned_elt, C_layout) Bigarray.Array1.t;
  mask : int; (* see Numbering scheme *)
  mutable head : int;
  mutable tail : int;
}

The type

(char, int8_unsigned_elt, C_layout) Bigarray.Array1.t

Is a 1-dimensional array whose elements are stored as 8-bit unsigned integers(int8_unsigned_elt), in row-major order(C_layout), and where accessing an item in the array yields a value of type char.

Numbering scheme

To handle the wrap-around, we make use of a neat trick that I first saw in the Linux kernel's implementation of pipes, but which is quite pervasive (I most recently saw it in the virtio spec for split virtqueues). First, we always ensure that the buffer's length is a power of 2. If we do this, we can allow the head and tail counters to naturally wrap around the full range of an integer; as long as we always mask off the high bits before using the counters to access the array, we'll arrive at the correct value. Here's a demonstration:

# let len = 8 ;;
val len : int = 8

# let data = String.init len (fun i -> Char.(code 'a' + i |> chr)) ;;
val data : string = "abcdefgh"

# let mask = len - 1;;
val mask : int = 7

# let ring_pos = Int.max_int - Random.int len ;;
val ring_pos : int = 4611686018427387897

# let get_char i = data.[(ring_pos + i) land mask] ;;
val get_char : int -> char = <fun>

# String.init (len * 2) get_char ;;
- : string = "bcdefghabcdefgha"

The nice thing about letting the head and tail keep their natural values is that it simplifies the code in other places. For example, the length of the buffer is always

r.tail - r.head

It does not matter if the tail has wrapped around to the start of the buffer. It does not matter if OCaml's int size is 31 or 63 bits, as long as it is large enough to address the full length of the memory buffer. It does not even matter if the tail and/or head have wrapped the integer size and are negative, since the two's complement numbering scheme preserves the distance between the wrapped integers:

# min_int - max_int ;;
- : int = 1

# (min_int + 10) - (max_int - 4) ;;
- : int = 15

For the rest of this article I will refer to "unwrapped" and "wrapped" numbering spaces, where "unwrapped" numbers are in the range [min_int, max_int] and "wrapped" numbers are specific to a ring of size L and are in the range [0, L). So far we have visualized the ring buffer in terms of the "wrapped" numbering space, but we can also visualize it in terms of the "unwrapped" numbering space.

In this visualization, the occupied part of the ring looks contiguous, and it's the memory border which moves:

107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 unwrapped 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 wrapped (length = 8)

In general, it will be simpler and more robust to perform computations in the "unwrapped" numbering space than the "wrapped" space, as there are fewer special cases. For example, say I want to get the next n bytes from the head of the ring. I could do this:

let get_bytes n ring =
  Bytes.init n (fun i -> ring.data.{(ring.head land ring.mask) + i})

but this fetch

ring.data.{(ring.head land ring.mask) + i}

would fail if the head is closer than n bytes to the end of the array. We could wrap again:

let get_bytes n ring =
  let get i =
    let pos = ((ring.head land ring.mask) + i) land ring.mask

but it's better to defer wrapping the number until after it's incremented:

let get_bytes n:int r:ring =
  Bytes.init n (fun i -> r.data.{(r.head + i) land r.mask})

We can calculate various important locations in the unwrapped space. Given a number x in the unwrapped space, we can get the next value that, when converted into the wrapped space, is equal to the end of the array, with an expression like this:

x lor ring.mask + 1

It should be obvious why this works; the bitwise OR maxes out the lower bits of x, leaving the higher bits untouched, giving us the maximum value of x within the current cycle. Adding 1 gives the start of the next cycle. Similarly, We can get the start of the current cycle by zeroing the low bits, like this:

x land (lnot r.mask)

Here is a demonstration:

# let len = 8 ;;
val len : int = 8

# let mask = len - 1 ;;
val mask : int = 7

# let next_edge x = x lor mask + 1;;
val next_edge : int -> int = <fun>

# let prev_edge x = x land (lnot mask) ;;
val prev_edge : int -> int = <fun>

# List.init 5 (fun i -> i+47, next_edge (i+47)) ;;
- : (int * int) list = [(47, 48); (48, 56); (49, 56); (50, 56); (51, 56)]

# List.init 5 (fun i -> i+47, prev_edge (i+47)) ;;
- : (int * int) list = [(47, 40); (48, 48); (49, 48); (50, 48); (51, 48)]

We will use these tricks later on.

Reading primitives

Let's tackle reading first. First, let's define custom indexing operators to read from and write to the ring.

let ( %.{} ) r:ring i:int = r.data.{i land r.mask}
let ( %.{}<- ) r:ring i:int value = r.data.{i land r.mask} <- value

This allows us to index into the bigarray using numbers in the "unwrapped" numbering space; the operators will wrap them to valid positions in the array for us. We can define a slicing operator as well. The sub function extracts a sub-array from a larger array that uses the same underlying data.

let ( .%{;..} ) r:ring = function
  | [| pos; len |] -> Bigarray.Array1.sub r.data (pos land r.mask) len
  | _ -> assert false

To get the next item, we decode a 16-bit header at head, then use that to compute the boundaries of the next message. 16 bits should be enough for network packets. On Linux, a netlink message will never be larger than 32KiB. In practice, they are typically much smaller.

let header_length = 2
let get_uint16_le r pos = (r.%{pos}) lor ((r.%{pos + 1} lsl 8))
let next r = r.%{r.head + header_length; get_uint16_le r r.head}

To remove the message from the queue, we increment head:

let advance r = r.head <- r.head + header r

What about the "filler" I mentioned earlier, to keep messages contiguous? We can implement it with an extension to the header; an "offset" that is the offset of the first byte of the message relative to the header:

7 6 T H length offset

We change the original definition of the length field to be the length of the data bytes, so that (H + off) is the start of the entry, and (H + off + len) is the position of the next item (or T if there is only one item).

Essentially, the header will grow to consume any unused space at the end of the buffer. This is better than having a discrete "filler" entry because it preserves the property that the buffer is non-empty when head > tail, but it does mean that computing the number of data bytes in the buffer is no longer simple. I already made that tradeoff when I decided to put headers into the ring, so this isn't any worse.

Here is how we parse the new header:

let header r =
  get_uint16_le r (r.head + 0),
  get_uint16_le r (r.head + 2)

And the new and improved next and advance functions:

let next r =
  let length, offset = header r in
  r.%{r.head + offset; length}

let advance r =
  let length, offset = header r in
  r.head <- r.head + length + offset

Writing primitives

Writing is a little more complicated. Since the available free space may be spread across the beginning and end of the underlying memory buffer, we need to be able to choose between contiguous ranges of memory according to the write size requirements.

length offset space = 6 space = 1 H T length offset space = 4 T H

This computation can be done in the "unwrapped" numbering space. Consider the two possible cases where wrapped H > T and H < T:

0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 H < T H T 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 H > T T H

Imagine this buffer is a carousel that you can put your finger on and spin. The numbers will change but the pattern of occupied and unoccupied positions will repeat forever.

0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 H T -4 -3 -2 -1 0 1 2 3 4 5 6 7 8 9 10 11 T₋₁ H₀ 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 T H -5 -4 -3 -2 -1 0 1 2 3 4 5 6 7 8 9 10 T₀ H₀

The subscript on H and T denotes the cycle the position is in. The numbers are arbitrary, but the rule (Tₙ - Hₙ ≥ 0) holds for any ring buffer.

As noted earlier, given a position xₙ and a ring size, we can easily calculate the base or most recent edge of the buffer, which we will call left(xₙ), and the next edge, which we will call right(xₙ). And we can always calculate xₙ₊₁ by adding the length of the underlying buffer.

let left r x = x land (lnot r.mask)
let right r x = x lor r.mask + 1
let succ r x = x + r.mask + 1

The free space in the ring is always between Tₙ and Hₙ₊₁. The free space is split when Tₙ < right(Tₙ) < Hₙ₊₁. The size of the slot immediately following T is

right r.tail - r.tail (* when E' < H' *)
succ r.head - r.tail (* when H' <= E' *)
min (edge' r.tail - r.tail) (succ r.head - r.tail) (* either case *)

and the size of the free space after the split is

succ r.head - right r.tail

This will be negative if H' is before the split, as in the second example above. These expressions work even during integer overflow. For the sake of the diagram, consider an implementation using 8-bit signed integers, where the representable range of numbers is [-128, 127]:

121 122 123 124 125 126 127 -128 -127 -126 -125 -124 -123 -122 -121 -120 E₁ T₀ H₁ 121 122 123 124 125 126 127 -128 -127 -126 -125 -124 -123 -122 -121 -120 E₁ T₀ H₁

In the second case, the expression

succ r.head - edge' r.tail

works because in 2's complement signed integer arithmetic, (x - y) is always the (signed) distance, or number of steps to take to get from y to x, if you arrange all the integers in a (gasp!) ring. In the example, this would be 126 - (-128) = -3, meaning to get from -128 to 126, take 3 steps "to the left". This would not work using unsigned integers; an equivalent scenario using unsigned 8-bit integers would be if H' is 254, then (254 - 0 = 254), but the (nonexistent) space between H' and edge'(T) cannot not hold 254 bytes.

Putting these expressions together, slots r is the size of both "slots", or contiguous regions of free memory.

let slots r =
  min (right r.tail - r.tail) (succ r.head - r.tail),
  succ r.head - right r.tail

reserve r hint is the length and offset (from T) of a contiguous region in r that may be large enough to store hint bytes, and is suitable for encoding as a 4-byte header of a data item in said region. If there are two contiguous regions, the first is preferred if it is large enough, even if it's smaller than the second region, as that will better utilize the buffer memory. The caller must check that the returned length is large enough to hold its message.

1	let header_length = 4
2	let reserve r hint =
3	  let rec f s0 s1 n
4	    if s0 < 0 then f 0 (s1 + s0) n
5	    else if s0 >= n then s0, header_length
6	    else s1, s0 + header_length
7	  in
8	  let s0, s1 = slots r in
9	  f (s0 - header_length) s1 hint

This may be the most cryptic function in the core data structure. I'll go through it line by line, in execution order.

9	  f (s0 - header_length) s1 hint

With my ring layout, the item header for the next item must start at T. That means the space for the header must come out of any free space starting at T. Because we parse the header byte-by-byte, it is OK if the header itself is split across the edge of the buffer.

3	  let rec f s0 s1 n
4	    if s0 < 0 then f 0 (s1 + s0) n

here we handle the case where the first slot is not even large enough to hold a header; we take the debt from the first slot and give it to the second slot.

5	    else if s0 >= n then s0, header_length

If the first slot is large enough to hold the item, we will prefer that. A different use case might prefer to choose the larger of the two slots. We've already accounted for space for the header on line 9.

6	    else s1, s0 + header_length

Here is the special case, where we have to pad the buffer. The region will have the length of the second slot (minus any debt for the header transferred on line 4), and will start after the first slot. Since we decremented the header length from the first slot on line 9, we have to add it back here.

After the caller has written some number of bytes, it must call commit to update the tail and insert a header. commit will write the header and update the tail position. The caller may have written a smaller number of bytes than the region described by reserve.

let set_uint16_le r pos x =
  r.%{pos+0} <- x lsr 0 land 0xff;
  r.%{pos+1} <- x lsr 8 land 0xff

let commit r len off =
  set_uint16_le r (r.tail + 0) len;
  set_uint16_le r (r.tail + 2) off;
  r.tail <- r.tail + off + len

I'm happy with how clean this turned out. Originally I had a guard against 0-length entries:

let commit r len off =
  if len > 0 then (
    set_uint16_le r (r.tail + 0) len;
    set_uint16_le r (r.tail + 2) off;
    r.tail <- r.tail + off + len)

but I removed it, because 0-length entries could be potentially useful. For instance, if the ring is used as a buffer for a file or network connection, a 0-length entry could indicate an end-of-file condition or a closed connection.

Confirming it works

While I am reasonably confident that my implementation is sound, it is still worth kicking the tires, and while I'm doing that I might as well have some fun. So I put together a test that does the following:

It turned out like this:

This shows a 4k ring buffer. Each (scaled) pixel is a memory location, mapped to a row and column with the expression

pos mod window_width, (* x or col *)
pos / window_width    (* y or row *)

The yellow regions are headers that start a new item, and the black regions at the bottom of the window show gaps inserted to keep the messages contiguous.

A note on alignment

On most modern architectures, even though memory is byte-addressable, the CPU fetches multiple bytes at a time, usually some multiple of its word size (4 or 8), and there is a sometimes severe performance penalty for fetching memory at an address that is not aligned, e.g. is not a multiple of the memory access size. The performance penalty can be even worse if you are attempting to implement a "lockless" data structure using atomic load and store operations.

Since high performance is not an objective for me, I won't pursue alignment in my initial implementation. However, the implementation described thus far could be made to align data items with relatively modest changes to how the offset field is calculated in the reserve function. Aligning the headers themselves can be done by updating the advance and commit functions to always round the new head and tail positions up to the next alignment boundary.

High level API(s)

We've defined the core data structure. I'm pleased with how the core operations turned out. I have a nearly ideal (for my use cases) bounded queue in less than a page of code. But there are some sharp edges to these operations, and I want to build an abstraction on top of it that's more fool-proof. Since I plan to use these as message streams, it's useful to be able to wait for data or free space when reading or writing, respectively.

let wait_read r =
  while r.head = r.tail do Thread.yield () done

let wait_write r n =
  while r.tail - r.head < n do Thread.yield () done

There's two problems with this approach:

In a single-reader single-writer scenario, where the reader and writer are threads in the same domain, only one thread may run at any given time. The worst that could happen is that a reader sees less new data than was actually available, or a writer sees less free space.

If the reader and writer are in separate domains, they may execute in parallel, and I do not know the semantics of fetching an int while it's being updated, if they could result in a corrupted result rather than just a stale one. To protect against this scenario, and to enable efficient waiting, I'll add a mutex and condtion to the ring:

type ring = {
  ...
  m : Mutex.t;
  c : Condition.t;
}

let wait_read r =
  Mutex.lock r.m;
  while r.head = r.tail do Condition.wait r.c done;
  Mutex.unlock r.m

let wait_write r n =
  Mutex.lock r.m;
  while r.tail - r.head < n do Condition.wait r.c done;
  Mutex.unlock r.m

The commit and advance functions need to be updated to wake up readers and writers:

let commit r len off =
  Mutex.lock r.m;
  ...
  if len > 0 then Condition.broadcast r.c;
  Mutex.unlock r.m

let advance r =
  Mutex.unlock r.m;
  ...
  Condition.broadcast r.c;
  Mutex.unlock r.m

I use Condition.broadcast instead of Condition.signal to wake up all waiting readers or writers, to cover the case where a single reader does not consume all elements in the ring, or a single writer does not fill all free space. The mutex will still serialize their access to the ring.

The wait_read and wait_write functions, as they are, are useless, because they release the mutex after the ring is ready. The mutex must remain held until the reader or writer has accomplished its goal and removed or added data.

Since the reason I am using a ring buffer at all is to provide re-usable storage and reduce allocations and copying, I want to expose an API that allows readers to access message contents without copying, and writers to marshal data directly into the ring rather than copying it in from their own buffer.

let read r cb =
  Mutex.lock r.m;
  while r.head = r.tail do Condition.wait r.c r.m done;
  let len, off = header r r.head in
  let v = Fun.protect ~finally:(fun () -> Mutex.unlock r.m)
      (fun () -> cb r.data ((r.head + off) land r.mask) len)
  in
  advance r;
  v

let write r atleast cb =
  let rec wait r n =
    let len, off as slot = reserve r n in
    if len >= n then slot
    else (Condition.wait r.c r.m; wait r n)
  in
  Mutex.lock r.m;
  let len, off = wait r atleast in
  Fun.protect ~finally:(fun () -> Mutex.unlock r.m)
    (fun () ->
       let written = cb r.data ((r.tail + off) land r.mask) len in
       commit r written off)

The read and write functions take callbacks which are given views into the buffer. The function signatures are chosen to look very much like the read(2) and write system calls, so that you could do something like:

(* Fill the buffer with data from a file *)
Ring.write ring min_size (Unix.read_bigarray fd)

(* Write the next item in the ring to a file descriptor *)
let n = Ring.read ring (Unix.write_bigarray fd)

If the functions raise an exception, the ring is unchanged. This gives the caller a chance to retry without losing the message or committing garbage data. The Fun.protect function ensures that the ring is unlocked after the callback runs, even if an error is encountered.

I am still experimenting with the higher-level APIs. The underlying data structure is so simple that I will probably copy it directly into my projects, tweak it as needed, and custom-tailor an API specifically for that project. For example, I am working on a networked file system where it would be useful to "fuse" specific chains of protocol requests into something more efficient; putting them in a ring buffer and then exposing an API to iterate over the next N items without de-queueing them would be useful there.

While I am pretty happy with how this all turned out, the irony is that, while working on this module I've decided that I don't really need it in the original use case, and it will probably end up in the bin marked "Note for further use".