A message-oriented pipe
I am working on a side project that involves sending and receiving messages on a netlink(7) socket. It has become a fairly hairy endeavor, so I am taking a mini-break to improve one aspect of the project. The project's purpose is to update the IP addresses assigned to interfaces based on intercepted traffic. It's a way to allow IPVlan-based interfaces to use DHCP and SLAAC to configure IP addresses in the local network automatically. It's structured as a long-running process split into multiple threads:
- Thread A reads and writes messages to and from an rtnetlink(7) socket.
- Thread B reads and writes messages to and from an nfnetlink queue.
- Thread C receives the messages from A and B via a channel, and generates new messages for A and B to send.
The system is time-sensitive, since the messages sent to thread B are verdicts to netfilter about whether or not a packet should be dropped; delay there will cause packet loss. Delays in messages to thread A affects interface's abilities to receive packets on new IP addresses they've configured.
Currently, when a thread wants to send a chain of messages to another thread, it relinquishes that chunk of memory and allocates a new one. The garbage collector then frees at some later time, when it's no longer in use. Because this is a long-running process, I have to balance multiple priorities in how it runs:
- It should use a predictable amount of memory at all times. If it can start running, it should continue to run in perpetuity, even when the system is low on memory, regardless of how many packets the kernel may sling at it.
- It should take a predictable amount of time to process a message.
While meeting these constraints, the code should be easy to read.
The requirement to use a predictable (preferrably constant) amount of memory means it has to refuse taking new packets if it is out of memory. There needs to be some kind of flow control. One way of accomplishing this is by not allocating any new memory after startup, and re-using a fixed buffer instead. This would limit activity from the garbage collector which can affect the second requirement of taking a predictable amount of time.
While there are a lot of ways to accomplish what I want, the best way I can think of is a ring buffer; a fixed-size array that uses a "wrap-around" addressing scheme so that the space freed by dequeueing items from the front can be re-used for new items that are enqueued at the back, without moving them around.
Ring buffers are a very old, very well-known technique, and they are ubiquitous; they're used to implement queues for network interfaces, pipes for inter-process communication, in IO submission and completion queues for block storage devices. They are a really efficient way for two (or more) processes to share memory in a streaming fashion.
While ring buffer implementations abound, I'm writing my own. Why? The requirements for this implementation are as follows:
- Single producer, single consumer.
- Messages have variable length
- Writers must be able to efficiently wait for a minimum amount of space to become available.
- Readers must be able to efficiently wait for data to become available.
- It must preserve message boundaries. I'm sharing a sequence of messages, not a byte stream. Readers should never be exposed to a partial message.
- Messages should be stored in contiguous memory, and not split across the edge of the buffer.
- I will be passing these buffers to C functions, so the buffer needs to be at a fixed location in memory; the garbage collector should not move it around.
- It should integrate with the Event module, which I'm already using in my project. (This is a nice to have).
Also, it's not a requirement, but performance is a non-goal. I'm reaching for a ring buffer because it's a simple way to store sequences using a fixed amount of memory. Simplicity is more important.
The requirements are unique enough to warrant taking a break from my project for something a little more tractable and fun. While writing comments, I found myself wanting to write more and include diagrams to explain the workings of the code, so the rest of this post is something like extended comments or literate program. More than anything, they are an aid for me to understand what I wrote and reassure myself that it's correct.
Core data structure
Given a fixed-size, byte-addressable region of memory:
We can treat this region as a ring buffer, by tracking the beginning (head) and end (tail) of the occupied region:
There are various ways of tracking these boundaries, but they must be tracked. The occupied region is the area between the head and tail:
When dequeuing an item, a consumer increments the position of the head:
When enqueuing a new item, a consumer increments the position of the tail. If the tail is incremented beyond the end of the buffer, it "wraps around" to the beginning. This is how the "ring" or "circular" buffer gets its name.
When the tail "catches up" to the head, the buffer is full. When the head "catches up" to the tail, the buffer is empty. It is an error for the head to overtake the tail, or the tail to overtake the head, as it would imply reading data that doesn't exist yet, or overwriting valid data, respectively.
Preserving (variable-length) message boundaries
I want to use this buffer to share messages, rather than a splittable byte stream. It's important that readers only get whole messages, not fragments of them. If a reader dequeued the first half of a message, the next reader would be unable to parse the remainder. Similarly, if a writer only enqueued part of a message, it would not be possible for a reader to fully decode it.
I could rely on the message to delimit itself by including a length
header like nlmsg_len
in the Netlink message header, and trust that
a reader would dequeue the correct number of bytes, and the writers
would only write whole messages at a time. But I want a little more
certainty that a message ends where I think it ends.
This would be easy to support if all messages were the same size; just make that size the minimum addressable unit of the buffer. But that's not the case here; the netlink messages I want to store are comprised of a fixed-size header, an optional body, and zero or more optional key/value pairs called "attributes" which are of varying size.
We make the assumption that each write to the buffer stores an integral (non-fractional) number of messages, and those are the boundaries that should be respected when handling reads. We can preserve these boundaries by inserting a length header in front of each message:
The length includes the length of the header itself. Including the header length is common in network protocols, because it allows elements in the data flow which don't need to inspect an item to skip it without knowing the full format. That's not too relevant for my use case, since this is all within the same process, but it's just what I'm used to.
Writing to the buffer is a two-step process:
- The writer gets access to the free space in the buffer and writes into it, skipping over the space for the length header
- The writer "commits" its write to the buffer. This updates the length header and the tail position.
And reading becomes the following process:
- Decode the integer len at head
- Pass the following len bytes to the reader
- Increment head by len
A downside to this approach is that if the header were somehow corrupted, the ring would be come unusable. Another approach that I see often is for the ring to contain fixed-size pointers to buffers with the actual data. This is what virtio virtqueues do, for example. I don't want to do that, as I would then have to come up with a scheme to allocate and re-use those buffers.
Keeping messages contiguous
I want messages to remain contiguous in memory, and not split across the edge of the buffer. This makes it easier to use APIs that expect the message to reside in a single chunk of memory, like read(2), write(2), send(2) and recv(2). While there may be scatter/gather versions of those APIs, such as readv(2), many modules in the OCaml standard library and the broader ecosystem work on contiguous chunks of memory, so I don't want to split messages up.
When there is free space on both ends of the buffer, but a writer wants to write a message that is larger than the space after the tail:
We can insert a "hole" at the end of the buffer that pushes the tail into the larger space at the beginning:
There's a couple ways we can do this. One way is to maintain an extra variable, G that can be the width or position of the "gap". Whenever H or T land on the gap after a read or write, they can be incremented or decremented as appropriate to skip it.
Another approach is to insert a "filler" entry into the queue that takes up the required amount of space, but is skipped on reads.
We have to dedicate a value in the header to represent filler entries, so that the read functions know to skip over them rather than passing them along to the caller.
Each approach has its pros and cons. The approach using the extra variable has the benefit that every item in the buffer is valid, and makes it simpler to detect whether the buffer is empty. The approach of using a filler value may simplify the read and write functions, and has one less variable to track and keep up to date. I am leaning towards using the filler value, but will probably implement both and choose the one that feels less error-prone.
OCaml implementation
Allocating a memory region
Most values in OCaml are tracked by its garbage collector, and OCaml may choose to move a value in memory as part of compaction in an effort to improve its utilization of memory. But I intend to pass memory ranges from this ring buffer into C functions that have no knowledge of the garbage collector's activity and cannot tolerate a buffer being relocated while they're executing. I/O functions that release the runtime lock while waiting for I/O may find that ocaml values are no longer valid when they wake up. The standard library provides the Bigarray module that allows you to allocate a chunk of memory whose contents are not scanned by the garbage collector, and is not relocated during the lifetime of the program. It's an ideal data type for storing the ring buffer data.
A ring buffer, then, is a memory region, plus a head and tail representing the header of the first entry and the start of the empty space, respectively.
type ring = {
data : (char, int8_unsigned_elt, C_layout) Bigarray.Array1.t;
mask : int; (* see Numbering scheme *)
mutable head : int;
mutable tail : int;
}
The type
(char, int8_unsigned_elt, C_layout) Bigarray.Array1.t
Is a 1-dimensional array whose elements are stored as 8-bit unsigned
integers(int8_unsigned_elt
), in row-major order(C_layout
), and where
accessing an item in the array yields a value of type char
.
Numbering scheme
To handle the wrap-around, we make use of a neat trick that I first saw in the Linux kernel's implementation of pipes, but which is quite pervasive (I most recently saw it in the virtio spec for split virtqueues). First, we always ensure that the buffer's length is a power of 2. If we do this, we can allow the head and tail counters to naturally wrap around the full range of an integer; as long as we always mask off the high bits before using the counters to access the array, we'll arrive at the correct value. Here's a demonstration:
# let len = 8 ;;
val len : int = 8
# let data = String.init len (fun i -> Char.(code 'a' + i |> chr)) ;;
val data : string = "abcdefgh"
# let mask = len - 1;;
val mask : int = 7
# let ring_pos = Int.max_int - Random.int len ;;
val ring_pos : int = 4611686018427387897
# let get_char i = data.[(ring_pos + i) land mask] ;;
val get_char : int -> char = <fun>
# String.init (len * 2) get_char ;;
- : string = "bcdefghabcdefgha"
The nice thing about letting the head and tail keep their natural values is that it simplifies the code in other places. For example, the length of the buffer is always
r.tail - r.head
It does not matter if the tail has wrapped around to the start of the
buffer. It does not matter if OCaml's int
size is 31 or 63 bits, as
long as it is large enough to address the full length of the memory
buffer. It does not even matter if the tail and/or head have wrapped
the integer size and are negative, since the two's complement numbering
scheme preserves the distance between the wrapped integers:
# min_int - max_int ;;
- : int = 1
# (min_int + 10) - (max_int - 4) ;;
- : int = 15
For the rest of this article I will refer to "unwrapped" and "wrapped" numbering spaces, where "unwrapped" numbers are in the range [min_int, max_int] and "wrapped" numbers are specific to a ring of size L and are in the range [0, L). So far we have visualized the ring buffer in terms of the "wrapped" numbering space, but we can also visualize it in terms of the "unwrapped" numbering space.
In this visualization, the occupied part of the ring looks contiguous, and it's the memory border which moves:
In general, it will be simpler and more robust to perform computations in the "unwrapped" numbering space than the "wrapped" space, as there are fewer special cases. For example, say I want to get the next n bytes from the head of the ring. I could do this:
let get_bytes n ring =
Bytes.init n (fun i -> ring.data.{(ring.head land ring.mask) + i})
but this fetch
ring.data.{(ring.head land ring.mask) + i}
would fail if the head is closer than n bytes to the end of the array. We could wrap again:
let get_bytes n ring =
let get i =
let pos = ((ring.head land ring.mask) + i) land ring.mask
but it's better to defer wrapping the number until after it's incremented:
let get_bytes n:int r:ring =
Bytes.init n (fun i -> r.data.{(r.head + i) land r.mask})
We can calculate various important locations in the unwrapped space. Given a number x in the unwrapped space, we can get the next value that, when converted into the wrapped space, is equal to the end of the array, with an expression like this:
x lor ring.mask + 1
It should be obvious why this works; the bitwise OR maxes out the lower bits of x, leaving the higher bits untouched, giving us the maximum value of x within the current cycle. Adding 1 gives the start of the next cycle. Similarly, We can get the start of the current cycle by zeroing the low bits, like this:
x land (lnot r.mask)
Here is a demonstration:
# let len = 8 ;;
val len : int = 8
# let mask = len - 1 ;;
val mask : int = 7
# let next_edge x = x lor mask + 1;;
val next_edge : int -> int = <fun>
# let prev_edge x = x land (lnot mask) ;;
val prev_edge : int -> int = <fun>
# List.init 5 (fun i -> i+47, next_edge (i+47)) ;;
- : (int * int) list = [(47, 48); (48, 56); (49, 56); (50, 56); (51, 56)]
# List.init 5 (fun i -> i+47, prev_edge (i+47)) ;;
- : (int * int) list = [(47, 40); (48, 48); (49, 48); (50, 48); (51, 48)]
We will use these tricks later on.
Reading primitives
Let's tackle reading first. First, let's define custom indexing operators to read from and write to the ring.
let ( %.{} ) r:ring i:int = r.data.{i land r.mask}
let ( %.{}<- ) r:ring i:int value = r.data.{i land r.mask} <- value
This allows us to index into the bigarray using numbers in the "unwrapped"
numbering space; the operators will wrap them to valid positions in
the array for us. We can define a slicing operator as well. The sub
function extracts a sub-array from a larger array that uses the same
underlying data.
let ( .%{;..} ) r:ring = function
| [| pos; len |] -> Bigarray.Array1.sub r.data (pos land r.mask) len
| _ -> assert false
To get the next item, we decode a 16-bit header at head
,
then use that to compute the boundaries of the next
message. 16 bits should be enough for network packets. On
Linux, a netlink message will never be larger than 32KiB.
In practice, they are typically much smaller.
let header_length = 2
let get_uint16_le r pos = (r.%{pos}) lor ((r.%{pos + 1} lsl 8))
let next r = r.%{r.head + header_length; get_uint16_le r r.head}
To remove the message from the queue, we increment head
:
let advance r = r.head <- r.head + header r
What about the "filler" I mentioned earlier, to keep messages contiguous? We can implement it with an extension to the header; an "offset" that is the offset of the first byte of the message relative to the header:
We change the original definition of the length field to be the length of the data bytes, so that (H + off) is the start of the entry, and (H + off + len) is the position of the next item (or T if there is only one item).
Essentially, the header will grow to consume any unused space at the end of the buffer. This is better than having a discrete "filler" entry because it preserves the property that the buffer is non-empty when head > tail, but it does mean that computing the number of data bytes in the buffer is no longer simple. I already made that tradeoff when I decided to put headers into the ring, so this isn't any worse.
Here is how we parse the new header:
let header r =
get_uint16_le r (r.head + 0),
get_uint16_le r (r.head + 2)
And the new and improved next
and advance
functions:
let next r =
let length, offset = header r in
r.%{r.head + offset; length}
let advance r =
let length, offset = header r in
r.head <- r.head + length + offset
Writing primitives
Writing is a little more complicated. Since the available free space may be spread across the beginning and end of the underlying memory buffer, we need to be able to choose between contiguous ranges of memory according to the write size requirements.
This computation can be done in the "unwrapped" numbering space. Consider the two possible cases where wrapped H > T and H < T:
Imagine this buffer is a carousel that you can put your finger on and spin. The numbers will change but the pattern of occupied and unoccupied positions will repeat forever.
The subscript on H and T denotes the cycle the position is in. The numbers are arbitrary, but the rule (Tₙ - Hₙ ≥ 0) holds for any ring buffer.
As noted earlier, given a position xₙ and a ring size, we can easily calculate the base or most recent edge of the buffer, which we will call left(xₙ), and the next edge, which we will call right(xₙ). And we can always calculate xₙ₊₁ by adding the length of the underlying buffer.
let left r x = x land (lnot r.mask)
let right r x = x lor r.mask + 1
let succ r x = x + r.mask + 1
The free space in the ring is always between Tₙ and Hₙ₊₁. The free space is split when Tₙ < right(Tₙ) < Hₙ₊₁. The size of the slot immediately following T is
right r.tail - r.tail (* when E' < H' *)
succ r.head - r.tail (* when H' <= E' *)
min (edge' r.tail - r.tail) (succ r.head - r.tail) (* either case *)
and the size of the free space after the split is
succ r.head - right r.tail
This will be negative if H' is before the split, as in the second example above. These expressions work even during integer overflow. For the sake of the diagram, consider an implementation using 8-bit signed integers, where the representable range of numbers is [-128, 127]:
In the second case, the expression
succ r.head - edge' r.tail
works because in 2's complement signed integer arithmetic, (x - y) is always the (signed) distance, or number of steps to take to get from y to x, if you arrange all the integers in a (gasp!) ring. In the example, this would be 126 - (-128) = -3, meaning to get from -128 to 126, take 3 steps "to the left". This would not work using unsigned integers; an equivalent scenario using unsigned 8-bit integers would be if H' is 254, then (254 - 0 = 254), but the (nonexistent) space between H' and edge'(T) cannot not hold 254 bytes.
Putting these expressions together, slots r
is the size of both "slots",
or contiguous regions of free memory.
let slots r =
min (right r.tail - r.tail) (succ r.head - r.tail),
succ r.head - right r.tail
reserve r hint
is the length and offset (from T) of a contiguous
region in r
that may be large enough to store hint
bytes, and is
suitable for encoding as a 4-byte header of a data item in said region.
If there are two contiguous regions, the first is preferred if it is large
enough, even if it's smaller than the second region, as that will better
utilize the buffer memory. The caller must check that the returned length
is large enough to hold its message.
1 let header_length = 4
2 let reserve r hint =
3 let rec f s0 s1 n
4 if s0 < 0 then f 0 (s1 + s0) n
5 else if s0 >= n then s0, header_length
6 else s1, s0 + header_length
7 in
8 let s0, s1 = slots r in
9 f (s0 - header_length) s1 hint
This may be the most cryptic function in the core data structure. I'll go through it line by line, in execution order.
9 f (s0 - header_length) s1 hint
With my ring layout, the item header for the next item must start at T. That means the space for the header must come out of any free space starting at T. Because we parse the header byte-by-byte, it is OK if the header itself is split across the edge of the buffer.
3 let rec f s0 s1 n
4 if s0 < 0 then f 0 (s1 + s0) n
here we handle the case where the first slot is not even large enough to hold a header; we take the debt from the first slot and give it to the second slot.
5 else if s0 >= n then s0, header_length
If the first slot is large enough to hold the item, we will prefer that. A different use case might prefer to choose the larger of the two slots. We've already accounted for space for the header on line 9.
6 else s1, s0 + header_length
Here is the special case, where we have to pad the buffer. The region will have the length of the second slot (minus any debt for the header transferred on line 4), and will start after the first slot. Since we decremented the header length from the first slot on line 9, we have to add it back here.
After the caller has written some number of bytes, it must call commit
to update the tail and insert a header. commit
will write the header and
update the tail position. The caller may have written a smaller number of
bytes than the region described by reserve
.
let set_uint16_le r pos x =
r.%{pos+0} <- x lsr 0 land 0xff;
r.%{pos+1} <- x lsr 8 land 0xff
let commit r len off =
set_uint16_le r (r.tail + 0) len;
set_uint16_le r (r.tail + 2) off;
r.tail <- r.tail + off + len
I'm happy with how clean this turned out. Originally I had a guard against 0-length entries:
let commit r len off =
if len > 0 then (
set_uint16_le r (r.tail + 0) len;
set_uint16_le r (r.tail + 2) off;
r.tail <- r.tail + off + len)
but I removed it, because 0-length entries could be potentially useful. For instance, if the ring is used as a buffer for a file or network connection, a 0-length entry could indicate an end-of-file condition or a closed connection.
Confirming it works
While I am reasonably confident that my implementation is sound, it is still worth kicking the tires, and while I'm doing that I might as well have some fun. So I put together a test that does the following:
- Creates a ring
- Writes an arbitrary amount of data to the ring
- Reads it and confirms that the data read matches the data written
- Appends the state of the ring to a file
- I then wrote a program that renders a visualization of the ring buffer to an SDL window using the Tsdl library.
It turned out like this:
This shows a 4k ring buffer. Each (scaled) pixel is a memory location, mapped to a row and column with the expression
pos mod window_width, (* x or col *)
pos / window_width (* y or row *)
The yellow regions are headers that start a new item, and the black regions at the bottom of the window show gaps inserted to keep the messages contiguous.
A note on alignment
On most modern architectures, even though memory is byte-addressable, the CPU fetches multiple bytes at a time, usually some multiple of its word size (4 or 8), and there is a sometimes severe performance penalty for fetching memory at an address that is not aligned, e.g. is not a multiple of the memory access size. The performance penalty can be even worse if you are attempting to implement a "lockless" data structure using atomic load and store operations.
Since high performance is not an objective for me, I won't pursue
alignment in my initial implementation. However, the implementation
described thus far could be made to align data items with relatively
modest changes to how the offset
field is calculated in the reserve
function. Aligning the headers themselves can be done by updating the
advance
and commit
functions to always round the new head and tail
positions up to the next alignment boundary.
High level API(s)
We've defined the core data structure. I'm pleased with how the core operations turned out. I have a nearly ideal (for my use cases) bounded queue in less than a page of code. But there are some sharp edges to these operations, and I want to build an abstraction on top of it that's more fool-proof. Since I plan to use these as message streams, it's useful to be able to wait for data or free space when reading or writing, respectively.
let wait_read r =
while r.head = r.tail do Thread.yield () done
let wait_write r n =
while r.tail - r.head < n do Thread.yield () done
There's two problems with this approach:
- It's a busy loop and will cause high CPU usage.
- It's not thread-safe; other threads may be updating r.head or r.tail while it is fetching their current values.
In a single-reader single-writer scenario, where the reader and writer are threads in the same domain, only one thread may run at any given time. The worst that could happen is that a reader sees less new data than was actually available, or a writer sees less free space.
If the reader and writer are in separate domains, they may execute in
parallel, and I do not know the semantics of fetching an int
while
it's being updated, if they could result in a corrupted result rather
than just a stale one. To protect against this scenario, and to enable
efficient waiting, I'll add a mutex and condtion to the ring:
type ring = {
...
m : Mutex.t;
c : Condition.t;
}
let wait_read r =
Mutex.lock r.m;
while r.head = r.tail do Condition.wait r.c done;
Mutex.unlock r.m
let wait_write r n =
Mutex.lock r.m;
while r.tail - r.head < n do Condition.wait r.c done;
Mutex.unlock r.m
The commit
and advance
functions need to be updated to wake up readers and
writers:
let commit r len off =
Mutex.lock r.m;
...
if len > 0 then Condition.broadcast r.c;
Mutex.unlock r.m
let advance r =
Mutex.unlock r.m;
...
Condition.broadcast r.c;
Mutex.unlock r.m
I use Condition.broadcast
instead of Condition.signal
to wake up all
waiting readers or writers, to cover the case where a single reader does
not consume all elements in the ring, or a single writer does not fill
all free space. The mutex will still serialize their access to the ring.
The wait_read
and wait_write
functions, as they are, are useless,
because they release the mutex after the ring is ready. The mutex must
remain held until the reader or writer has accomplished its goal and
removed or added data.
Since the reason I am using a ring buffer at all is to provide re-usable storage and reduce allocations and copying, I want to expose an API that allows readers to access message contents without copying, and writers to marshal data directly into the ring rather than copying it in from their own buffer.
let read r cb =
Mutex.lock r.m;
while r.head = r.tail do Condition.wait r.c r.m done;
let len, off = header r r.head in
let v = Fun.protect ~finally:(fun () -> Mutex.unlock r.m)
(fun () -> cb r.data ((r.head + off) land r.mask) len)
in
advance r;
v
let write r atleast cb =
let rec wait r n =
let len, off as slot = reserve r n in
if len >= n then slot
else (Condition.wait r.c r.m; wait r n)
in
Mutex.lock r.m;
let len, off = wait r atleast in
Fun.protect ~finally:(fun () -> Mutex.unlock r.m)
(fun () ->
let written = cb r.data ((r.tail + off) land r.mask) len in
commit r written off)
The read
and write
functions take callbacks which are given views
into the buffer. The function signatures are chosen to look very much like
the read(2) and write system calls, so that you could do something like:
(* Fill the buffer with data from a file *)
Ring.write ring min_size (Unix.read_bigarray fd)
(* Write the next item in the ring to a file descriptor *)
let n = Ring.read ring (Unix.write_bigarray fd)
If the functions raise an exception, the ring is unchanged. This gives
the caller a chance to retry without losing the message or committing
garbage data. The Fun.protect
function ensures that the ring is unlocked
after the callback runs, even if an error is encountered.
I am still experimenting with the higher-level APIs. The underlying data structure is so simple that I will probably copy it directly into my projects, tweak it as needed, and custom-tailor an API specifically for that project. For example, I am working on a networked file system where it would be useful to "fuse" specific chains of protocol requests into something more efficient; putting them in a ring buffer and then exposing an API to iterate over the next N items without de-queueing them would be useful there.
While I am pretty happy with how this all turned out, the irony is that, while working on this module I've decided that I don't really need it in the original use case, and it will probably end up in the bin marked "Note for further use".
- See also
-
Build log: DHCP for ipvlan mode l2 devices
Feb 2025
Interfacing OCaml with netlink and C