A simple, wait-free message queue
A few weeks ago I was on the lookout for a wait-free-ish multi-producer-single-consumer (MPSC) queue. I needed one both for my new game, in which I have an asset streaming thread, and also potentially for work where we have a bunch of hardware peripherals that need to be controlled from multiple threads. In general these MPSC queues are very useful when you have a component that can only run single-threaded, but you want to send it commands from multiple threads.
For both of these use cases I currently use a simple mutex protected circular queue. But the workloads are quite bursty, and the lock is heavily contested during a burst, so I think an actual concurrent queue would help.
Mutex locking queue#
This is pretty much the simplest thread-safe queue you can make:
#include <mutex>
#include <condition_variable>
using namespace std;
#define CAPACITY 4096
struct Queue {
mutex lock;
condition_variable not_full;
condition_variable not_empty;
int write_cursor;
int read_cursor;
int count;
int items[CAPACITY];
};
void enqueue(Queue *queue, int item) {
unique_lock lock(queue->lock);
while (queue->count == CAPACITY)
queue->not_full.wait(lock);
++(queue->count);
queue->items[(queue->write_cursor++) % CAPACITY] = item;
queue->not_empty.notify_one();
}
int dequeue(Queue *queue) {
unique_lock lock(queue->lock);
while (queue->count == 0)
queue->not_empty.wait(lock);
--(queue->count);
int item = queue->items[(queue->read_cursor++) % CAPACITY];
queue->not_full.notify_one();
return item;
}
Any number of threads can safely enqueue or dequeue to it, but not in parallel. That can be perfectly fine if you don’t do a lot of enqueues and dequeues, or if it takes a lot of time to produce and consume messages. But in my case, producing a message takes basically no time at all and so multiple threads are constantly barraging the queue with messages. The latency for enqueues was starting to get unacceptable, so I really wanted something better, with a faster enqueue.
Design considerations#
What I needed was a queue that can safely be written from multiple threads, in parallel, while another thread reads from it.
In my use cases, the only job of the reading thread is to read messages from the queue and respond accordingly. So when the queue is empty, the reader should yield to the OS scheduler until there’s work to do.
The queue should have a fixed size allocated up front. A dynamically resizing queue seems nice, until you think about the worst case where the reader thread crashes, or can’t keep up with the writer threads. Then the queue will grow indefinitely and bring the whole computer down with it.
The queue must be consistent, if Enqueue(A)
finishes before Enqueue(B)
starts, then A
must end up in the queue before B
. Obviously if two enqueues are running in parallel, then the order in which they end up is ambiguous (it’s a race condition). But from a single thread’s perspective, all enqueues must end up in sequence. Re-ordering the messages would be buggy in many scenarios. For example, reordering [load a.wav, unload a.wav]
to [unload a.wav, load a.wav]
) is clearly a bug.
If the queue is full, writer threads should block and yield to the OS. I don’t want to have to think about the possibility of an enqueue failing. I don’t want to busy-wait either. I fully agree with Bruce Dawson that busy-waiting is almost always a bad idea. I also don’t want to replace older messages from the queue. That leaves yielding as the only sane option. Plus there are some nifty tools that we’ll see later that can make this very efficient.
Enqueues should be as fast as possible, and ideally wait-free, meaning that they always complete in a bounded number of steps regardless of what other threads are doing. That’s impossible to achieve if we block when the queue is full, since the time spent blocking depends on how quickly the reader clears up space. But it should be possible to make the enqueue wait-free if the queue isn’t full.
I want a drop-in replacement for the locking queue. Any sort of thread-local storage or thread registration is unacceptable as it would make the surrounding code much more complicated.
My workloads are very “bursty”: the queue remains empty for long periods of time, then a new level needs to load, or the peripheral devices need to be reconfigured, and suddenly there are thousands of messages being enqueued. Messages take a while to process, so it usually takes a while before the queue drains again. It’s possible that a burst happens before the queue is drained, in which case the writers need to block until the reader catches up.
With all of that in mind, I went google-shopping for some existing queues.
Vyukov’s queue#
Dmitry Vyukov’s MPSC queue initially looked promising. Dmitry says it’s wait free for writers, and the code fits on a single page, but it’s apparently not linearizable. I couldn’t figure out exactly how it works even after looking at it for a few minutes. It’s only wait-free if you already have linked list nodes allocated and ready for insertion. But in reality, the nodes needs to come from a freelist. I guess you could use a wait-free freelist. But wait-free stacks are very complicated and slow, so more likely you would use a lock-free stack, which is not wait-free.
Not what I’m looking for.
Bittman’s queue#
Daniel Bittman’s MPSC queue looks almost exactly like what I needed. It’s wait-free (if the queue isn’t full), doesn’t store any extra data per element, it’s linearizable, and it looked relatively simple. It doesn’t yield to the OS on blocking, but it’s simple enough that I can easily adapt it.
Here’s a diagram of how enqueue/dequeue works:
Writers (red circles) bump up the queue’s recorded number of elements. If the queue happens to overflow here, the writers block. In the original implementation, the enqueue fails if the queue is full, but I changed it to block until the reader clears up some space. Once there’s enough space, the writers take turns acquiring a “ticket”. This ticket distributes the writers over all of the queue slots, and effectively grants exclusive ownership of the slot. The writers then stores the data into their assigned slots, and mark it as “full”, then notify the reader.
The reader (blue circle) sequentially cycles through all of the queue slots. It blocks until the current slot is marked “full” by the writers, then reads the data from the slot, and finally, marks the slot “empty” again. It then decrements the queue’s number of elements, and wakes any writers waiting for that.
Writes to the same cache line can only be executed by a single CPU core at a time, so the writer threads serialize themselves at least twice during an enqueue: Once when incrementing the queue count (the reader also writes to the queue count and so also serializes here), and once more to acquire the ticket. I expected this to make the queue scale kind of poorly with the number of writers. But I was still hoping it would be faster than a mutex.
Yielding to the OS could just be implemented as sleep_for(1ms)
, but there’s a much better solution: atomic_wait
/atomic_notify
were added in C++20, and this pair of functions is extremely versatile. atomic_wait
yields a thread to the scheduler until an atomic value changes. atomic_notify
wakes up the threads that are waiting for that value to change. So for our queue, the writers can yield until queue’s length changes, and the readers can yield until slot is marked “full”.
Technically atomic_wait/notify
could just be implemented as a spin-lock. But in reality, it maps to WaitOnAddress
/WakeByAddress
on Windows, futex
on Linux, and ulock_wait
/ulock_wake
on Mac.
Once I convinced myself I understand how the queue works, I implemented it. It came in at only 28 lines of code:
#include <atomic>
using namespace std;
using enum memory_order;
#define CAPACITY 4096
struct Queue {
atomic<uint32_t> count;
atomic<uint32_t> write_ticket;
uint32_t read_ticket;
struct { atomic<bool> full; int item; } slots[CAPACITY];
};
void enqueue(Queue *queue, int item) {
uint32_t count;
while ((count = queue->count.fetch_add(1, acquire)) >= CAPACITY) {
queue->count.fetch_sub(1, release);
queue->count.wait(count, relaxed);
}
uint32_t ticket = queue->write_ticket.fetch_add(1, relaxed);
auto *slot = &queue->slots[ticket % CAPACITY];
slot->item = item;
slot->full.store(true, release);
slot->full.notify_one();
}
int dequeue(Queue *queue) {
auto *slot = &queue->slots[queue->read_ticket++ % CAPACITY];
slot->full.wait(false, acquire);
int item = slot->item;
slot->full.store(false, relaxed);
queue->count.fetch_sub(1, release);
queue->count.notify_all();
return item;
}
UPDATE Thanks you Alloth from the Handmade Network for pointing out a race condition in my previous implementation of enqueue
which occurred when the queue was full. I’ve corrected this mistake, the above code should now be bug-free. The fix didn’t seem to have any impact on the charts below, so they still use data from before the fix.
I was all happy with this. I ran some test cases with a few threads, and everything looked fine. Then I ran the tests with more threads, and the queue deadlocked. I waited on it for a few minutes, but nothing seemed to be happening so I broke into it to see where it happened. To my surprise, the queue didn’t deadlock, it was just taking an insane amount of time to complete the test. Sure enough I left it running for a few more seconds, and it finished without errors.
Yikes.
When the writer threads completely overwhelm the reader, the queue quickly backs up, and then the writers spend basically all of their time waiting for the queue count to decrease. I’m not quite sure why this causes such awful slowdowns, but I think it might go something like this:
- All writers increment the count, they see it’s overflowing the queue, and they go to sleep.
- The reader processes one item, and notifies the writers.
- One of the writers wakes up, enqueues an item, and then immediately starts enqueueing the next item and blocks.
- The other writers then wake up from the original notification, but they see the queue count is still overflowing and just go right back to sleep.
On Windows, atomic_wait
maps to WaitOnAddress
, which is internally implemented as a lock-free hash table that tracks which addresses are waited on by which threads. Since the writer threads all wait for the same address, this hash table probably does a ton of work constantly inserting and removing colliding entries. This is what it has to deal with:
for each writer
atomically remove writer from hash table at &queue.count
wake up the writer
put the writer back to sleep
atomically insert writer into hash table at &queue.count
I don’t exactly know how Window’s lock-free hash table is implemented, but it’s clearly slow for this workload. When I replaced all atomic_wait
calls with _mm_pause
the pathological behavior was gone.
Oddly enough when I tested the queue on my AMD 5600X 6-core/12-thread CPU, the behavior wasn’t as bad:
It’s still very slow, but at least it didn’t lag my entire system for hours. And it also only triggered when the number of threads exceeded the physical thread count. This made me doubt if Windows’ lock-free hash table implementation was really to blame, or if it was some weird CPU interaction.
I also tried it on an Apple M1 8-thread machine. There was no slowdown on that at all:
The queue was very slow compared with the others, and it seemed to scale terribly, but there was no pathological behavior. How strange.
In either case, Bittman’s queue seems to offer almost no advantages for my use case. I definitely want to keep yielding to the OS when the queue is full, so removing the atomic_wait
is not an option. And I want a queue that scales well on ARM processors, and this one doesn’t seem to do that. That’s probably because of those synchronization points in the enqueue code. The threads are constantly invalidating each other’s cache lines.
So this queue ended up a no-go. I had to keep looking.
Rigtorp’s queue#
Erik Rigtorp’s MPMC queue actually allows multiple concurrent writers and readers. It’s another wait-free-ish queue, it stores an extra integer per slot, only does a single atomic add per enqueue, and the writers and readers usually don’t touch the same cache lines. The queue is apparently also used in Frostbite and some low latency trading software, so it’s definitely doing something right.
Erik’s implementation is a bit complex, but the core of the algorithm is pretty simple:
Even though this flowchart is a lot denser than the one from Bittman’s queue, I actually found it way easier to understand. That’s mostly because of how symmetric the writer and reader steps are. It triggers some happy pathway in my brain.
The writers start off by serializing at a ticket dispenser. The ticket tells them both which queue slot to write into, and also in which turn of the queue they should write. Every time the queue wraps around, the turn increases by one. The ticket gives writers exclusive access to a queue slot, but only during a particular turn. The threads then block until it’s their turn, store the value in the slot, and then increment the reader turn counter to publish the value. The readers do basically the exact same thing, and when they’re done reading the value, they increment the writer turn counter.
Because I only needed to support a single reader, I could trim down some of the reader logic. With that modification, the flowchart looks like this:
The beautiful symmetry is gone, but the flowchart is at least less dense. The only major modification here is that instead of storing a reader turn counter per slot, we can just store a flag to mark if the slot is occupied or not. But otherwise it works exactly the same.
Since the writers block on different slot turn counters, I hoped that this queue would avoid the pathological WaitOnAddress
behavior that we saw earlier. Also, because the writers only serialize on the ticket dispenser, I expected this queue to scale quite a bit better than Bittman’s queue.
The code came in at 33 lines of C++:
#include <atomic>
using namespace std;
using enum memory_order;
#define CAPACITY 4096
struct Queue {
atomic<uint32_t> write_ticket;
uint32_t read_ticket;
uint32_t capacity;
struct {
atomic<uint8_t> turn;
atomic<bool> full;
int item;
} slots[CAPACITY];
};
void enqueue(Queue *queue, int item) {
uint32_t ticket = queue->write_ticket.fetch_add(1, relaxed);
auto *slot = &queue->slots[ticket % queue->capacity];
uint8_t turn = (uint8_t)(ticket / queue->capacity);
uint8_t current_turn;
while ((current_turn = slot->turn.load(acquire)) != turn)
slot->turn.wait(current_turn, relaxed);
slot->item = item;
slot->full.store(true, release);
slot->full.notify_one();
}
int dequeue(Queue *queue) {
uint32_t ticket = queue->read_ticket++;
auto *slot = &queue->slots[ticket % queue->capacity];
uint8_t turn = (uint8_t)(ticket / queue->capacity);
slot->full.wait(false, acquire);
int item = slot->item;
slot->full.store(false, relaxed);
slot->turn.store(turn + 1, release);
slot->turn.notify_all();
return item;
}
This passed all of my torture tests, and there was no unexpected slowdown on any of the machines I tested on, thankfully.
Benchmarks#
The code I was using to test the queues already included knobs to set the number of writer threads, the queue capacity, and the size of the “burst” - that is, how many items are enqueued as quickly as possible before a break. So the tests did double duty as a benchmarking suite.
I was mainly concerned with both throughput, and writer latency, in cases where the number of enqueued items was smaller than the queue capacity. I call this a “small burst”. This is the average case for me, and since the queue never gets filled, the writers should never have to block.
I also care about “large bursts” - when the number of enqueued items exceeds the queue capacity, and the writers need to block while the reader drains the queue. This is not a common case, and it’s entirely preventable in all cases - but it does still happen from time to time, and I don’t want it to be more power hungry or slower than what a locking queue does in that case. I think that the large burst tests also give a measure of how the queue would perform under a sustained workload.
For small burst tests, the queue capacity was set to 1,048,576 (= ), and the total number of enqueues is 1,000,000. For large burst tests, the queue capacity was 65,536 (), but the number of enqueues is also 1,000,000. So the burst size is actually the same for the small and large burst tests, it’s the capacity that changes. But this also matches my use case, where the queue workload is more or less fixed, but I can vary the queue capacity.
The tests consist of N
writer threads enqueueing BURST_SIZE / N
items as quickly as possible. The “items” in this case are 4-byte integers, something that’s more or less free to copy. The reader sits in another loop and dequeues BURST_SIZE
items, and it also performs various bug checks on the items and the queue. In my actual use case, processing messages from the queue takes a lot longer than producing them, so I think it’s appropriate that this is somewhat reflected in the benchmark. Anyway, this whole process repeats 100 times for a total of 100,000,000 enqueues and dequeues.
I ran all the benchmarks on an Intel 10900K 10-core/20-thread machine, an AMD 5600X with 6 cores/12 threads, and an Apple M1 with 4P/4E cores. But I’m only showing the results from the Intel machine, since it has the most threads and is therefore the most interesting.
You can download by benchmarking code and raw data here.
Small burst results#
Here’s the total throughput for a small burst of items:
Here I measured the time it takes to enqueue and dequeue all 100,000,000 items. As you can see, Rigtorp’s queue is consistently 2-3x faster. As expected, the locking queue gets slower the more threads you throw at it, since they’re all fighting over the mutex. That doesn’t happen with Rigtorp’s queue, the speed is mostly constant regardless of threads. This is expected, since the entire system is bottlenecked by the single reader thread.
The spike on the graph where the number of threads was 3 can easily be explained. 3 threads = 1 reader + 2 writers. So this is the first time when writer threads have to synchronize with each other on the ticket dispenser.
It’s good to know the system as a whole doesn’t slow down when switching to this queue, but that wasn’t my primary concern. Let’s now take a look at enqueue latency, which is what I’m really after:
The graph shows the average number of time stamp counter cycles needed to enqueue a single item. The locking queue’s latency is an order of magnitude higher at higher thread counts, but the Rigtorp queue’s latency scales much less drastically. Notice how having more than 20 threads doesn’t affect the latency at all. This is because the latency is only determined by the number of threads that need to synchronize amongst themselves, but on a 20-thread CPU, only 20 threads can ever be running in parallel. There’s even a small drop in latency when we reach the highest thread count, because then the operating system scheduling some threads out - and during the context switch there are less parallel threads trying to enqueue.
Let’s also look at the maximum latency for an enqueue:
This is the logarithm of the highest measured time in cycles needed to complete an enqueue, over all writer threads. And the Rigtorp queue’s maximum latency is an order of magnitude lower. Note that this is the worst case latency during the entire test of 100,000,000 enqueues. And the worst case for the Rigtorp queue is when the scheduler pre-empts the thread in the middle of an enqueue and then back. That’s 2 context switches. For the locking queue the worst case is when other threads hoard the queue’s lock for a long time.
I’m not quite sure what those spikes at 2, 4, and 14 threads are. My best guess is that at some point during the 100,000,000 enqueues, the scheduler randomly pre-empted the same writer thread twice during an enqueue. Although this doesn’t match the 10x increase in latency.
Just for fun, I also recorded the total number of context switches during the entire test:
And we can see that the Rigtorp queue writers experience almost an order of magnitude less context switches. I’m pretty sure the spike at 3 threads (2 writers) is just random noise. I was honestly kind of hoping that these numbers would correlate with the spikes we saw in the maximum enqueue latency. But that isn’t the case. It does explain the spike in the throughput graph from earlier though, if you were wondering about that.
Overall I’m very happy with these results. Rigtorp’s queue completely stomps on the locking queue when there’s a small burst of enqueues. Now let’s also see what happens in a “large burst” scenario.
Large burst results#
Here’s the total throughput, this time for a large burst of items that completely fill up the queue a couple of times over:
As expected, the Rigtorp queue is still faster than the locking queue, but the difference is smaller for a sustained workload than for a small burst of items. Remember there’s only 1 reading thread, and the throughput here is mostly limited by that. And you can see that around 10 threads, the throughput plummets and plateaus. I’m guessing up to that point the writers were not completely limited by the reader.
Let’s now take a look at average enqueue latency:
Again, the latency is completely limited by there being only a single reader. And so we see the latency quickly shoot up at around 10 threads. At that point the latency keeps increasing the more threads you add due to the increased contention for empty slots.
I find this graph quite interesting. For the Rigtorp queue, the maximum latency is pretty much constant across the board, even past 20 threads. I would have expected the maximum latency to skyrocket past that point, just like it does for the locking queue, because that’s when the threads should start being constantly switched in-and-out by the scheduler. I’m not sure why that doesn’t happen.
Anyway, again just for fun, here’s the number of context switches experienced by the writer threads during the entire benchmark.
Surprisingly, it’s 3 orders of magnitude smaller for the Rigtorp queue! That could explain why the maximum latency was so low even past 20 threads.
I wasn’t expecting much from these large burst / sustained workload results. I thought Rigtorp’s queue would perform about as well as the locking queue. And that’s exactly what happened. Rigtorp’s queue still outpaces the locking queue even in this workload, but the differences aren’t as substantial as for short bursts of items.
Conclusion#
A modified, MPSC Rigtorp queue seems to be better than a locking queue for every relevant metric I can think of. Even when the queue clogs up in a large burst of enqueues, it still has better throughput and latency. It also doesn’t needlessly waste CPU cycles by spinning when the queue is full.
I’ve switched to using this queue for my game engine, and also at work, and it did help reduce jitter when the queue activity was high. Most importantly, I could stop worrying about the queue latency when writing code for the producer threads, I can now confidently believe that an enqueue is just a slightly more expensive function call, which frees me up to worry about other stuff instead.
A wait-free-ish MPSC queue is a pretty nice tool to have in your arsenal. I’ve added this one to my collection of snippets. And you can find the C++ version here.