The title describes pretty well what the algorithm is for. It will be used for a realtime inter-process communication library that only uses shared memory for exchanging data. For realtime systems, it's usually more important to have the current data than a lossless communication and there it gets tricky. The function producer_force_put
should discard the oldest message when the producer adds a new one and the queue is already full. This is just a proof of concept, so code style, variable names and API may change. I want to focus on the correctness of the algorithm and find ways to verify it. For testing I started to write unit tests and using fibers for some race conditions. Code and test can be found here:
https://github.com/mausys/message-queue.git
But testing alone is not enough because, the order of memory accesses depends on compiler and CPU. Are there other ways to verify the correctness of a wait-free algorithm?
Implementation:
#include <stdbool.h> #include <stdint.h> #include <limits.h> #include <stdlib.h> #include <stdatomic.h> #include <assert.h> typedef struct producer producer_t; typedef struct consumer consumer_t; typedef unsigned int index_t; typedef atomic_uint atomic_index_t; #define INDEX_END UINT_MAX #define CONSUMED_FLAG ((index_t)1 << (UINT_WIDTH - 1)) #define ORIGIN_MASK CONSUMED_FLAG #define INDEX_MASK (~ORIGIN_MASK) typedef struct msgq { unsigned n; size_t msg_size; uintptr_t msgs_buffer; /* producer and consumer can change the tail * the MSB shows who has last modified the tail */ atomic_index_t *tail; /* head is only written by producer and only used * in consumer_get_head */ atomic_index_t *head; /* circular queue for ordering the messages, * initialized simple as queue[i] = (i + 1) % n, * but due to overruns might get scrambled. * only producer can modify the queue */ atomic_index_t *queue; } msgq_t; typedef struct producer { msgq_t msgq; index_t head; /* last message in chain that can be used by consumer, chain[head] is always INDEX_END */ index_t current; /* message used by producer, will become head */ index_t overrun; /* message used by consumer when tail moved away by producer, will become current when released by consumer */ } producer_t; typedef struct consumer { msgq_t msgq; index_t current; } consumer_t; static void* get_message(const msgq_t *msgq, index_t index) { if (index >= msgq->n) { return NULL; } return (void*)(msgq->msgs_buffer + (index * msgq->msg_size)); } static index_t get_next(msgq_t *msgq, index_t current) { return atomic_load(&msgq->queue[current]); } /* set the current message as head */ static index_t append_msg(producer_t *producer) { msgq_t *msgq = &producer->msgq; index_t next = get_next(msgq, producer->current); /* current message is the new end of chain*/ atomic_store(&msgq->queue[producer->current], INDEX_END); if (producer->head == INDEX_END) { /* first message */ atomic_store(msgq->tail, producer->current); } else { /* append current message to the chain */ atomic_store(&msgq->queue[producer->head], producer->current); } producer->head = producer->current; /* announce the new head for consumer_get_head */ atomic_store(msgq->head, producer->head); return next; } static bool producer_move_tail(producer_t *producer, index_t tail) { msgq_t *msgq = &producer->msgq; index_t next = get_next(msgq, tail & INDEX_MASK); return atomic_compare_exchange_weak(producer->msgq.tail, &tail, next); } /* try to jump over tail blocked by consumer */ static void producer_overrun(producer_t *producer, index_t tail) { msgq_t *msgq = &producer->msgq; index_t new_current = get_next(msgq, tail & INDEX_MASK); /* next */ index_t new_tail = get_next(msgq, new_current); /* after next */ /* if atomic_compare_exchange_weak fails expected will be overwritten */ index_t expected = tail; if (atomic_compare_exchange_weak(producer->msgq.tail, &expected, new_tail)) { producer->current = new_current; producer->overrun = tail & INDEX_MASK; } else { /* consumer just released tail, so use it */ producer->current = tail & INDEX_MASK; } } /* inserts the current message into the queue and * if the queue is full, discard the last message that is not * used by consumer. Returns pointer to new message */ void* producer_force_put(producer_t *producer) { msgq_t *msgq = &producer->msgq; if (producer->current == INDEX_END) { producer->current = 0; return get_message(msgq, producer->current); } index_t next = append_msg(producer); index_t tail = atomic_load(msgq->tail); bool consumed = !!(tail & CONSUMED_FLAG); bool full = (next == (tail & INDEX_MASK)); /* only for testing */ index_t old_current = producer->current; if (producer->overrun != INDEX_END) { /* we overran the consumer and moved the tail, use overran message as * soon as the consumer releases it */ if (consumed) { /* consumer released overrun message, so we can use it */ /* requeue overrun */ atomic_store(&msgq->queue[producer->overrun], next); producer->current = producer->overrun; producer->overrun = INDEX_END; } else { /* consumer still blocks overran message, move the tail again, * because the message queue is still full */ if (producer_move_tail(producer, tail)) { producer->current = tail & INDEX_MASK; } else { /* consumer just released overrun message, so we can use it */ /* requeue overrun */ atomic_store(&msgq->queue[producer->overrun], next); producer->current = producer->overrun; producer->overrun = INDEX_END; } } } else { /* no previous overrun, use next or after next message */ if (!full) { /* message queue not full, simply use next */ producer->current = next; } else { if (!consumed) { /* message queue is full, but no message is consumed yet, so try to move tail */ if (producer_move_tail(producer, tail)) { producer->current = tail & INDEX_MASK; } else { /* consumer just started and consumed tail if consumer already moved on, we will use tail */ producer_overrun(producer, tail | CONSUMED_FLAG); } } else { /* overrun the consumer, if the consumer keeps tail*/ producer_overrun(producer, tail); } } } assert(old_current != producer->current); return get_message(msgq, producer->current); } void* consumer_get_head(consumer_t *consumer) { msgq_t *msgq = &consumer->msgq; for (;;) { index_t tail = atomic_fetch_or(msgq->tail, CONSUMED_FLAG); if (tail == INDEX_END) { /* or CONSUMED_FLAG doesn't change INDEX_END*/ return NULL; } index_t head = atomic_load(msgq->head); tail |= CONSUMED_FLAG; if (atomic_compare_exchange_weak(msgq->tail, &tail, head | CONSUMED_FLAG)) { /* only accept head if producer didn't move tail, * otherwise the producer could fill the whole queue and the head could be the * producers current message */ consumer->current = head; break; } } return get_message(msgq, consumer->current); } void* consumer_get_tail(consumer_t *consumer) { msgq_t *msgq = &consumer->msgq; index_t tail = atomic_fetch_or(msgq->tail, CONSUMED_FLAG); if (tail == INDEX_END) return NULL; if (tail == (consumer->current | CONSUMED_FLAG)) { /* try to get next message */ index_t next = get_next(msgq, consumer->current); if (next != INDEX_END) { if (atomic_compare_exchange_weak(msgq->tail, &tail, next | CONSUMED_FLAG)) { consumer->current = next; } else { /* producer just moved tail, use it */ consumer->current = atomic_fetch_or(msgq->tail, CONSUMED_FLAG); } } } else { /* producer moved tail, use it*/ consumer->current = tail; } if (consumer->current == INDEX_END) { /* nothing produced yet */ return NULL; } return get_message(msgq, consumer->current); } static void msgq_init(msgq_t *msgq, msgq_shm_t *shm) { *msgq = (msgq_t) { .n = shm->n, .msg_size = shm->msg_size, .tail = msgq_shm_get_tail(shm), .head = msgq_shm_get_head(shm), .queue = msgq_shm_get_list(shm), .msgs_buffer = msgq_shm_get_buffer(shm), }; } producer_t *producer_new(msgq_shm_t *shm) { producer_t *producer = malloc(sizeof(producer_t)); if (!producer) return NULL; msgq_init(&producer->msgq, shm); producer->current = INDEX_END; producer->overrun = INDEX_END; producer->head = INDEX_END; return producer; } void producer_delete(producer_t *producer) { free(producer); } consumer_t* consumer_new(msgq_shm_t *shm) { consumer_t *consumer = malloc(sizeof(consumer_t)); if (!consumer) return NULL; msgq_init(&consumer->msgq, shm); consumer->current = INDEX_END; return consumer; } void consumer_delete(consumer_t *consumer) { free(consumer); }