5
\$\begingroup\$

The title describes pretty well what the algorithm is for. It will be used for a realtime inter-process communication library that only uses shared memory for exchanging data. For realtime systems, it's usually more important to have the current data than a lossless communication and there it gets tricky. The function producer_force_put should discard the oldest message when the producer adds a new one and the queue is already full. This is just a proof of concept, so code style, variable names and API may change. I want to focus on the correctness of the algorithm and find ways to verify it. For testing I started to write unit tests and using fibers for some race conditions. Code and test can be found here:

https://github.com/mausys/message-queue.git

But testing alone is not enough because, the order of memory accesses depends on compiler and CPU. Are there other ways to verify the correctness of a wait-free algorithm?

Implementation:

 #include <stdbool.h> #include <stdint.h> #include <limits.h> #include <stdlib.h> #include <stdatomic.h> #include <assert.h> typedef struct producer producer_t; typedef struct consumer consumer_t; typedef unsigned int index_t; typedef atomic_uint atomic_index_t; #define INDEX_END UINT_MAX #define CONSUMED_FLAG ((index_t)1 << (UINT_WIDTH - 1)) #define ORIGIN_MASK CONSUMED_FLAG #define INDEX_MASK (~ORIGIN_MASK) typedef struct msgq { unsigned n; size_t msg_size; uintptr_t msgs_buffer; /* producer and consumer can change the tail * the MSB shows who has last modified the tail */ atomic_index_t *tail; /* head is only written by producer and only used * in consumer_get_head */ atomic_index_t *head; /* circular queue for ordering the messages, * initialized simple as queue[i] = (i + 1) % n, * but due to overruns might get scrambled. * only producer can modify the queue */ atomic_index_t *queue; } msgq_t; typedef struct producer { msgq_t msgq; index_t head; /* last message in chain that can be used by consumer, chain[head] is always INDEX_END */ index_t current; /* message used by producer, will become head */ index_t overrun; /* message used by consumer when tail moved away by producer, will become current when released by consumer */ } producer_t; typedef struct consumer { msgq_t msgq; index_t current; } consumer_t; static void* get_message(const msgq_t *msgq, index_t index) { if (index >= msgq->n) { return NULL; } return (void*)(msgq->msgs_buffer + (index * msgq->msg_size)); } static index_t get_next(msgq_t *msgq, index_t current) { return atomic_load(&msgq->queue[current]); } /* set the current message as head */ static index_t append_msg(producer_t *producer) { msgq_t *msgq = &producer->msgq; index_t next = get_next(msgq, producer->current); /* current message is the new end of chain*/ atomic_store(&msgq->queue[producer->current], INDEX_END); if (producer->head == INDEX_END) { /* first message */ atomic_store(msgq->tail, producer->current); } else { /* append current message to the chain */ atomic_store(&msgq->queue[producer->head], producer->current); } producer->head = producer->current; /* announce the new head for consumer_get_head */ atomic_store(msgq->head, producer->head); return next; } static bool producer_move_tail(producer_t *producer, index_t tail) { msgq_t *msgq = &producer->msgq; index_t next = get_next(msgq, tail & INDEX_MASK); return atomic_compare_exchange_weak(producer->msgq.tail, &tail, next); } /* try to jump over tail blocked by consumer */ static void producer_overrun(producer_t *producer, index_t tail) { msgq_t *msgq = &producer->msgq; index_t new_current = get_next(msgq, tail & INDEX_MASK); /* next */ index_t new_tail = get_next(msgq, new_current); /* after next */ /* if atomic_compare_exchange_weak fails expected will be overwritten */ index_t expected = tail; if (atomic_compare_exchange_weak(producer->msgq.tail, &expected, new_tail)) { producer->current = new_current; producer->overrun = tail & INDEX_MASK; } else { /* consumer just released tail, so use it */ producer->current = tail & INDEX_MASK; } } /* inserts the current message into the queue and * if the queue is full, discard the last message that is not * used by consumer. Returns pointer to new message */ void* producer_force_put(producer_t *producer) { msgq_t *msgq = &producer->msgq; if (producer->current == INDEX_END) { producer->current = 0; return get_message(msgq, producer->current); } index_t next = append_msg(producer); index_t tail = atomic_load(msgq->tail); bool consumed = !!(tail & CONSUMED_FLAG); bool full = (next == (tail & INDEX_MASK)); /* only for testing */ index_t old_current = producer->current; if (producer->overrun != INDEX_END) { /* we overran the consumer and moved the tail, use overran message as * soon as the consumer releases it */ if (consumed) { /* consumer released overrun message, so we can use it */ /* requeue overrun */ atomic_store(&msgq->queue[producer->overrun], next); producer->current = producer->overrun; producer->overrun = INDEX_END; } else { /* consumer still blocks overran message, move the tail again, * because the message queue is still full */ if (producer_move_tail(producer, tail)) { producer->current = tail & INDEX_MASK; } else { /* consumer just released overrun message, so we can use it */ /* requeue overrun */ atomic_store(&msgq->queue[producer->overrun], next); producer->current = producer->overrun; producer->overrun = INDEX_END; } } } else { /* no previous overrun, use next or after next message */ if (!full) { /* message queue not full, simply use next */ producer->current = next; } else { if (!consumed) { /* message queue is full, but no message is consumed yet, so try to move tail */ if (producer_move_tail(producer, tail)) { producer->current = tail & INDEX_MASK; } else { /* consumer just started and consumed tail if consumer already moved on, we will use tail */ producer_overrun(producer, tail | CONSUMED_FLAG); } } else { /* overrun the consumer, if the consumer keeps tail*/ producer_overrun(producer, tail); } } } assert(old_current != producer->current); return get_message(msgq, producer->current); } void* consumer_get_head(consumer_t *consumer) { msgq_t *msgq = &consumer->msgq; for (;;) { index_t tail = atomic_fetch_or(msgq->tail, CONSUMED_FLAG); if (tail == INDEX_END) { /* or CONSUMED_FLAG doesn't change INDEX_END*/ return NULL; } index_t head = atomic_load(msgq->head); tail |= CONSUMED_FLAG; if (atomic_compare_exchange_weak(msgq->tail, &tail, head | CONSUMED_FLAG)) { /* only accept head if producer didn't move tail, * otherwise the producer could fill the whole queue and the head could be the * producers current message */ consumer->current = head; break; } } return get_message(msgq, consumer->current); } void* consumer_get_tail(consumer_t *consumer) { msgq_t *msgq = &consumer->msgq; index_t tail = atomic_fetch_or(msgq->tail, CONSUMED_FLAG); if (tail == INDEX_END) return NULL; if (tail == (consumer->current | CONSUMED_FLAG)) { /* try to get next message */ index_t next = get_next(msgq, consumer->current); if (next != INDEX_END) { if (atomic_compare_exchange_weak(msgq->tail, &tail, next | CONSUMED_FLAG)) { consumer->current = next; } else { /* producer just moved tail, use it */ consumer->current = atomic_fetch_or(msgq->tail, CONSUMED_FLAG); } } } else { /* producer moved tail, use it*/ consumer->current = tail; } if (consumer->current == INDEX_END) { /* nothing produced yet */ return NULL; } return get_message(msgq, consumer->current); } static void msgq_init(msgq_t *msgq, msgq_shm_t *shm) { *msgq = (msgq_t) { .n = shm->n, .msg_size = shm->msg_size, .tail = msgq_shm_get_tail(shm), .head = msgq_shm_get_head(shm), .queue = msgq_shm_get_list(shm), .msgs_buffer = msgq_shm_get_buffer(shm), }; } producer_t *producer_new(msgq_shm_t *shm) { producer_t *producer = malloc(sizeof(producer_t)); if (!producer) return NULL; msgq_init(&producer->msgq, shm); producer->current = INDEX_END; producer->overrun = INDEX_END; producer->head = INDEX_END; return producer; } void producer_delete(producer_t *producer) { free(producer); } consumer_t* consumer_new(msgq_shm_t *shm) { consumer_t *consumer = malloc(sizeof(consumer_t)); if (!consumer) return NULL; msgq_init(&consumer->msgq, shm); consumer->current = INDEX_END; return consumer; } void consumer_delete(consumer_t *consumer) { free(consumer); } 
\$\endgroup\$

    2 Answers 2

    6
    \$\begingroup\$

    design document

    We need to understand the constraints you're working within, so you should write them down. There are some comments in the implementation, which are helpful, but that's no substitute for presenting the use case and theory of operation at a high level.

    In concurrency and in crypto, the usual rule of thumb is "don't reinvent the wheel", because likely you will do so badly. Better to exploit mature, well tested building blocks.

    It would help to know the name of at least one RT OS this library is targeting. Producing an element with possible overwrite, due to slow consumer, is a pretty commonly desired feature. We see a core library implement it here, for example:
    https://github.com/RIOT-OS/RIOT/blob/master/core/lib/include/ringbuffer.h#L68

    If the libraries available to you are somehow unsuitable or lacking in features, it's important to write down those details. Otherwise a maintenance engineer might come along in a few months, survey the codebase, and conclude that it's simpler to rip out some code and replace it with calls to a well-tested publicly available library.

    learning

    If you deliberately wish to reinvent the wheel as a learning exercise, then state that explicitly. It makes a big difference to the engineering tradeoffs.

    scheduling

    If instead of OS scheduled threads we're relying on app scheduled fibers, well, that's a pretty important assumption to document. Presumably benchmark results would be one of the considerations informing such a design decision, but we don't see any performance discussion or observed timings in the codebase.

    It's possible that this fiber library is mature, solid, and well tested. But there's scant evidence of that in the source file or associated documentation files.

    It's not obvious to me that atomic_load() is a big win over protecting variables with CV or mutex.

    extra identifier

    nit: naming ORIGIN_MASK is perhaps not worth it. Consider just using a (~CONSUMED_FLAG) expression instead.

    good comments

    Thank you for including many very helpful comments in the code. Ideally we would see several datastructure invariants listed in the design document, and then the comments would echo those, assuring us that the invariants always hold.

    I would also like to see a paragraph describing an overrun event. Given that producer will adjust head on each overrun, I don't think there's any interesting difference between single element overrun, multiple overrun, single wrap, or multiple wrap around events prior to consumer making its next request. But an example would help to put such concerns to rest.

    statistics

    Consider maintaining counters for overrun and wraparound events. An application might like to syslog them at hourly intervals, for example. Automated unit tests which deliberately consume "too slowly" could use the counters to verify that the desired rates had been achieved.

    cache line

    It appears to me that both of these go in the same cache line, which might bounce around from core to core to maintain cache coherency:

    typedef struct msgq { ... atomic_index_t *tail; ... atomic_index_t *head; 

    Consider padding, so they're in distinct cache lines. Benchmarking results should inform any such changes.

    If your production workload will include diverse threads / fibers besides just the producer and consumer, be sure to include such "distractor" threads in your benchmark workload, as well.

    interleavings

    other ways to verify the correctness

    The SPIN model checker offers one approach for testing diverse interleavings of producer and consumer.

    Just as there are "malloc debug" libraries, there are debug schedulers, that deliberately choose diverse interleavings during a test run. The fiber library you're currently linking against does not appear to support such a debug mode.

    \$\endgroup\$
    2
    • \$\begingroup\$Thank you very much for your response. The algorithm is intended primary for a linux library. I was looking for an existing algorithm, but couldn't find one. The example you mentioned is a simple ringbuffer, but I'm looking for a message queue. The fiber library is neither mature nor solid, nor well tested. Its sole purpose is to test the algorithm. As I wrote, the library is for inter-process communication, so producer and consumer are running in different processes and all the atomic variables and messages are located in a shared memory and different cache lines.\$\endgroup\$
      – mausys
      Commented2 days ago
    • \$\begingroup\$Thank you for the hint with SPIN. I will look into it.\$\endgroup\$
      – mausys
      Commented2 days ago
    5
    \$\begingroup\$

    Be careful claiming it is wait-free

    You claim your queue is wait-free, but you have a potentially unbounded for-loop doing a compare-exchange operation. At the very least, this means consumer_get_head()'s time complexity is actually \$O(N)\$. Sure, you are unlikely to hit that case, but the whole point of a wait-free algorithm is that you never have to wait.

    Confusing API

    The API is very confusing. As you mention in the comments, the producer already has to know the pointer to the first free element, and then producer_force_put() will mark that element as produced and returns a pointer to the next free element. And something similar is going on with the consumer functions. For me, that was very unexpected. It also means the caller now shares some of the bookkeeping.

    I would rather have a function producer_get_head() that returns a pointer to the first free element, and a producer_force_get_head() to force it to free up an element if there is none, and then a producer_put() that returns void and that merely marks the element as produced and increments the head index.

    \$\endgroup\$
    1
    • 1
      \$\begingroup\$Thank you for your response. That's not how the algorithm works, producer_force_put adds a message to the queue which pointers was retrieved on the previous call. The pointer returned is for the empty spot. consumer_get_tail has no for loop and consumer_get_head can only be stuck in the loop if the producer keeps creating messages very fast with high scheduling priority, but then something is already wrong in the system.\$\endgroup\$
      – mausys
      Commented2 days ago

    Start asking to get answers

    Find the answer to your question by asking.

    Ask question

    Explore related questions

    See similar questions with these tags.