Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 96 additions & 38 deletions src/seq/seq_queue.c
Original file line number Diff line number Diff line change
@@ -1,18 +1,45 @@
/*************************************************************************\
Copyright (c) 2010-2015 Helmholtz-Zentrum Berlin f. Materialien
und Energie GmbH, Germany (HZB)
Copyright (c) 2026 ITER Organization
This file is distributed subject to a Software License Agreement found
in the file LICENSE that is included with this distribution.
\*************************************************************************/
#include "seq.h"
#include "seq_debug.h"
#include "epicsVersion.h"

/*
* Use epicsAtomic if available (EPICS >= 3.15)
*/
#ifndef VERSION_INT
# define VERSION_INT(V,R,M,P) ( ((V)<<24) | ((R)<<16) | ((M)<<8) | (P))
#endif
#ifndef EPICS_VERSION_INT
# define EPICS_VERSION_INT VERSION_INT(EPICS_VERSION, EPICS_REVISION, EPICS_MODIFICATION, EPICS_PATCH_LEVEL)
#endif

#if EPICS_VERSION_INT >= VERSION_INT(3,15,0,0)
#include "epicsAtomic.h"
#define HAS_ATOMICS
#endif

/* Fallbacks for older EPICS Base or when atomics are not available */
#ifndef HAS_ATOMICS
#define epicsAtomicGetSizeT(p) (*(p))
#define epicsAtomicSetSizeT(p,v) (*(p) = (v))
#define epicsAtomicGetIntT(p) (*(p))
#define epicsAtomicSetIntT(p,v) (*(p) = (v))
#define epicsAtomicReadMemoryBarrier()
#define epicsAtomicWriteMemoryBarrier()
#endif

struct seqQueue {
size_t wr;
size_t rd;
size_t numElems;
size_t elemSize;
boolean overflow;
int overflow; /* Use int for atomic access */
epicsMutexId mutex;
char *buffer;
};
Expand All @@ -23,8 +50,8 @@ epicsShareFunc boolean seqQueueInvariant(QUEUE q)
&& q->elemSize > 0
&& q->numElems > 0
&& q->numElems <= seqQueueMaxNumElems
&& q->rd < q->numElems
&& q->wr < q->numElems;
&& epicsAtomicGetSizeT(&q->rd) < q->numElems
&& epicsAtomicGetSizeT(&q->wr) < q->numElems;
}

epicsShareFunc QUEUE seqQueueCreate(size_t numElems, size_t elemSize)
Expand All @@ -51,7 +78,7 @@ epicsShareFunc QUEUE seqQueueCreate(size_t numElems, size_t elemSize)
free(q);
return 0;
}
DEBUG("%s:%d:calloc(%u,%u)\n",__FILE__,__LINE__,numElems, elemSize);
DEBUG("%s:%d:calloc(%u,%u)\n",__FILE__,__LINE__,(unsigned)numElems, (unsigned)elemSize);
q->buffer = (char *)calloc(numElems, elemSize);
if (!q->buffer) {
errlogSevPrintf(errlogFatal, "seqQueueCreate: out of memory\n");
Expand All @@ -67,13 +94,14 @@ epicsShareFunc QUEUE seqQueueCreate(size_t numElems, size_t elemSize)
}
q->elemSize = elemSize;
q->numElems = numElems;
q->overflow = FALSE;
q->overflow = 0;
q->rd = q->wr = 0;
return q;
}

epicsShareFunc void seqQueueDestroy(QUEUE q)
{
if (!q) return;
epicsMutexDestroy(q->mutex);
free(q->buffer);
free(q);
Expand All @@ -86,22 +114,44 @@ epicsShareFunc boolean seqQueueGet(QUEUE q, void *value)

epicsShareFunc boolean seqQueueGetF(QUEUE q, seqQueueFunc *get, void *arg)
{
#ifdef HAS_ATOMICS
size_t rd = epicsAtomicGetSizeT(&q->rd);
size_t wr = epicsAtomicGetSizeT(&q->wr);

/* Lock-free fast path for Single-Consumer Get */
if (wr != rd) {
epicsAtomicReadMemoryBarrier();
get(arg, q->buffer + rd * q->elemSize, q->elemSize);
/* Ensure the data is read before we update the read index.
This prevents a producer from overwriting the element before
the consumer has finished copying it. */
epicsAtomicWriteMemoryBarrier();
epicsAtomicSetSizeT(&q->rd, (rd + 1) % q->numElems);
return FALSE;
}
#endif

/* Mutex path for when wr == rd (empty or overflow)
OR if we don't have atomics */
epicsMutexLock(q->mutex);
if (q->wr == q->rd) {
if (!q->overflow) {
epicsMutexUnlock(q->mutex);
return TRUE;
}
epicsMutexLock(q->mutex);
get(arg, q->buffer + q->rd * q->elemSize, q->elemSize);
/* check again, a put might have intervened */
if (q->wr == q->rd && q->overflow)
q->overflow = FALSE;
else
q->rd = (q->rd + 1) % q->numElems;
epicsMutexUnlock(q->mutex);
if (q->wr == q->rd && q->overflow) {
epicsAtomicSetIntT(&q->overflow, 0);
} else {
epicsAtomicSetSizeT(&q->rd, (q->rd + 1) % q->numElems);
}
} else {
/* Can happen if wr moved after our lock-free check */
get(arg, q->buffer + q->rd * q->elemSize, q->elemSize);
q->rd = (q->rd + 1) % q->numElems;
epicsAtomicSetSizeT(&q->rd, (q->rd + 1) % q->numElems);
}
epicsMutexUnlock(q->mutex);
return FALSE;
}

Expand All @@ -113,46 +163,50 @@ epicsShareFunc boolean seqQueuePut(QUEUE q, const void *value)
epicsShareFunc boolean seqQueuePutF(QUEUE q, seqQueueFunc *put, const void *arg)
{
boolean r = FALSE;
size_t rd, wr;

if (q->overflow || (q->wr + 1) % q->numElems == q->rd) {
epicsMutexLock(q->mutex);
if ((q->wr + 1) % q->numElems == q->rd) {
if (q->overflow) {
r = TRUE; /* we will overwrite the last element */
}
q->overflow = TRUE;
/* Always use mutex for Put to support Multi-Producer and safely handle overflow. */
epicsMutexLock(q->mutex);
rd = epicsAtomicGetSizeT(&q->rd);
wr = epicsAtomicGetSizeT(&q->wr);

if (q->overflow || (wr + 1) % q->numElems == rd) {
if ((wr + 1) % q->numElems == rd) {
if (q->overflow) r = TRUE;
epicsAtomicSetIntT(&q->overflow, 1);
} else if (q->overflow) {
/* we had a get since the last put, so
can now eliminate overflow flag and instead
increment the write pointer */
q->wr = (q->wr + 1) % q->numElems;
if ((q->wr + 1) % q->numElems != q->rd) {
q->overflow = FALSE;
/* A get happened, move wr forward */
wr = (wr + 1) % q->numElems;
epicsAtomicSetSizeT(&q->wr, wr);
if ((wr + 1) % q->numElems != rd) {
epicsAtomicSetIntT(&q->overflow, 0);
}
}
put(q->buffer + q->wr * q->elemSize, arg, q->elemSize);
if (!q->overflow) {
q->wr = (q->wr + 1) % q->numElems;
}
epicsMutexUnlock(q->mutex);
} else {
put(q->buffer + q->wr * q->elemSize, arg, q->elemSize);
q->wr = (q->wr + 1) % q->numElems;
}

put(q->buffer + wr * q->elemSize, arg, q->elemSize);

if (!epicsAtomicGetIntT(&q->overflow)) {
epicsAtomicWriteMemoryBarrier();
epicsAtomicSetSizeT(&q->wr, (wr + 1) % q->numElems);
}
epicsMutexUnlock(q->mutex);
return r;
}

epicsShareFunc void seqQueueFlush(QUEUE q)
{
epicsMutexLock(q->mutex);
q->rd = q->wr;
q->overflow = FALSE;
epicsAtomicSetSizeT(&q->rd, epicsAtomicGetSizeT(&q->wr));
epicsAtomicSetIntT(&q->overflow, 0);
epicsMutexUnlock(q->mutex);
}

static size_t used(const QUEUE q)
{
return (q->numElems + q->wr - q->rd) % q->numElems + (q->overflow ? 1 : 0);
size_t rd = epicsAtomicGetSizeT(&q->rd);
size_t wr = epicsAtomicGetSizeT(&q->wr);
return (q->numElems + wr - rd) % q->numElems + (epicsAtomicGetIntT(&q->overflow) ? 1 : 0);
}

epicsShareFunc size_t seqQueueFree(const QUEUE q)
Expand All @@ -167,12 +221,16 @@ epicsShareFunc size_t seqQueueUsed(const QUEUE q)

epicsShareFunc boolean seqQueueIsEmpty(const QUEUE q)
{
return q->wr == q->rd && !q->overflow;
size_t rd = epicsAtomicGetSizeT(&q->rd);
size_t wr = epicsAtomicGetSizeT(&q->wr);
return wr == rd && !epicsAtomicGetIntT(&q->overflow);
}

epicsShareFunc boolean seqQueueIsFull(const QUEUE q)
{
return (q->wr + 1) % q->numElems == q->rd && q->overflow;
size_t rd = epicsAtomicGetSizeT(&q->rd);
size_t wr = epicsAtomicGetSizeT(&q->wr);
return (wr + 1) % q->numElems == rd && epicsAtomicGetIntT(&q->overflow);
}

epicsShareFunc size_t seqQueueNumElems(const QUEUE q)
Expand Down
48 changes: 28 additions & 20 deletions test/unit/queueTest.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Copyright (c) 2008 UChicago Argonne LLC, as Operator of Argonne
National Laboratory.
Copyright (c) 2010-2015 Helmholtz-Zentrum Berlin f. Materialien
und Energie GmbH, Germany (HZB)
Copyright (c) 2026 ITER Organization
This file is distributed subject to a Software License Agreement found
in file LICENSE that is included with this distribution.
\*************************************************************************/
Expand All @@ -13,6 +14,7 @@ in file LICENSE that is included with this distribution.
#include "epicsEvent.h"
#include "epicsUnitTest.h"
#include "testMain.h"
#include <string.h>

typedef unsigned long long ELEM;

Expand All @@ -33,32 +35,40 @@ static void check(QUEUE q, size_t expectedFree)
testOk(isFull == expectedFull, "Full: %d == %d", isFull, expectedFull);
}

static epicsEventId wdone, rdone, ready;
static epicsEventId wdone, rdone;

static const int threadTestIterations = 1000000;
static const size_t threadTestMaxNumElems = 20;

static int readerLost, writerLost;
static volatile int readerLost, writerLost;

typedef struct {
int seq;
int inv_seq;
} THREAD_ELEM;

static void readerTask(void *arg)
{
QUEUE q = (QUEUE)arg;
string data;
THREAD_ELEM elem;
boolean empty;
int i, j;

for (i = 0; i < threadTestIterations; i++) {
do {
empty = seqQueueGet(q, data);
empty = seqQueueGet(q, &elem);
} while (empty);
j = atoi(data);
if (j<i) {
testAbort("%d<=%d", i, j);
j = elem.seq;
if (j != ~elem.inv_seq) {
testAbort("Data corruption detected: seq=%d, inv_seq=%d (at i=%d)", j, elem.inv_seq, i);
}
if (j < i) {
testAbort("Sequence error: received %d, expected >= %d", j, i);
}
if (j>threadTestIterations) {
testAbort("%d<=%d", j, threadTestIterations);
if (j >= threadTestIterations) {
testAbort("Sequence error: received %d, expected < %d", j, threadTestIterations);
}
readerLost+=(j-i);
readerLost += (j - i);
i = j;
}
epicsEventWait(wdone);
Expand All @@ -68,13 +78,14 @@ static void readerTask(void *arg)
static void writerTask(void *arg)
{
QUEUE q = (QUEUE)arg;
string data;
THREAD_ELEM elem;
boolean full;
int i;

for (i = 0; i < threadTestIterations; i++) {
sprintf(data, "%d", i);
full = seqQueuePut(q, data);
elem.seq = i;
elem.inv_seq = ~i;
full = seqQueuePut(q, &elem);
if (full) writerLost++;
}
epicsEventSignal(wdone);
Expand Down Expand Up @@ -142,11 +153,10 @@ MAIN(queueTest)

testDiag("concurrent queueTest with numElems=%u", (unsigned)numElems);

q = seqQueueCreate(numElems, sizeof(string));
q = seqQueueCreate(numElems, sizeof(THREAD_ELEM));
wdone = epicsEventCreate(epicsEventEmpty);
rdone = epicsEventCreate(epicsEventEmpty);
ready = epicsEventCreate(epicsEventEmpty);
if (!wdone || !rdone || !ready) {
if (!wdone || !rdone) {
testAbort("epicsEventCreate failed");
}
readerLost = writerLost = 0;
Expand All @@ -162,11 +172,9 @@ MAIN(queueTest)

seqQueueDestroy(q);
testPass("ok");
epicsEventDestroy(wdone);
epicsEventDestroy(rdone);
}

epicsEventDestroy(wdone);
epicsEventDestroy(rdone);
epicsEventDestroy(ready);

return testDone();
}
Loading