diff --git a/src/seq/seq_queue.c b/src/seq/seq_queue.c index c04f262d..8a4fbc99 100644 --- a/src/seq/seq_queue.c +++ b/src/seq/seq_queue.c @@ -1,18 +1,45 @@ /*************************************************************************\ Copyright (c) 2010-2015 Helmholtz-Zentrum Berlin f. Materialien und Energie GmbH, Germany (HZB) +Copyright (c) 2026 ITER Organization This file is distributed subject to a Software License Agreement found in the file LICENSE that is included with this distribution. \*************************************************************************/ #include "seq.h" #include "seq_debug.h" +#include "epicsVersion.h" + +/* + * Use epicsAtomic if available (EPICS >= 3.15) + */ +#ifndef VERSION_INT +# define VERSION_INT(V,R,M,P) ( ((V)<<24) | ((R)<<16) | ((M)<<8) | (P)) +#endif +#ifndef EPICS_VERSION_INT +# define EPICS_VERSION_INT VERSION_INT(EPICS_VERSION, EPICS_REVISION, EPICS_MODIFICATION, EPICS_PATCH_LEVEL) +#endif + +#if EPICS_VERSION_INT >= VERSION_INT(3,15,0,0) +#include "epicsAtomic.h" +#define HAS_ATOMICS +#endif + +/* Fallbacks for older EPICS Base or when atomics are not available */ +#ifndef HAS_ATOMICS +#define epicsAtomicGetSizeT(p) (*(p)) +#define epicsAtomicSetSizeT(p,v) (*(p) = (v)) +#define epicsAtomicGetIntT(p) (*(p)) +#define epicsAtomicSetIntT(p,v) (*(p) = (v)) +#define epicsAtomicReadMemoryBarrier() +#define epicsAtomicWriteMemoryBarrier() +#endif struct seqQueue { size_t wr; size_t rd; size_t numElems; size_t elemSize; - boolean overflow; + int overflow; /* Use int for atomic access */ epicsMutexId mutex; char *buffer; }; @@ -23,8 +50,8 @@ epicsShareFunc boolean seqQueueInvariant(QUEUE q) && q->elemSize > 0 && q->numElems > 0 && q->numElems <= seqQueueMaxNumElems - && q->rd < q->numElems - && q->wr < q->numElems; + && epicsAtomicGetSizeT(&q->rd) < q->numElems + && epicsAtomicGetSizeT(&q->wr) < q->numElems; } epicsShareFunc QUEUE seqQueueCreate(size_t numElems, size_t elemSize) @@ -51,7 +78,7 @@ epicsShareFunc QUEUE seqQueueCreate(size_t numElems, size_t elemSize) free(q); return 0; } - DEBUG("%s:%d:calloc(%u,%u)\n",__FILE__,__LINE__,numElems, elemSize); + DEBUG("%s:%d:calloc(%u,%u)\n",__FILE__,__LINE__,(unsigned)numElems, (unsigned)elemSize); q->buffer = (char *)calloc(numElems, elemSize); if (!q->buffer) { errlogSevPrintf(errlogFatal, "seqQueueCreate: out of memory\n"); @@ -67,13 +94,14 @@ epicsShareFunc QUEUE seqQueueCreate(size_t numElems, size_t elemSize) } q->elemSize = elemSize; q->numElems = numElems; - q->overflow = FALSE; + q->overflow = 0; q->rd = q->wr = 0; return q; } epicsShareFunc void seqQueueDestroy(QUEUE q) { + if (!q) return; epicsMutexDestroy(q->mutex); free(q->buffer); free(q); @@ -86,22 +114,44 @@ epicsShareFunc boolean seqQueueGet(QUEUE q, void *value) epicsShareFunc boolean seqQueueGetF(QUEUE q, seqQueueFunc *get, void *arg) { +#ifdef HAS_ATOMICS + size_t rd = epicsAtomicGetSizeT(&q->rd); + size_t wr = epicsAtomicGetSizeT(&q->wr); + + /* Lock-free fast path for Single-Consumer Get */ + if (wr != rd) { + epicsAtomicReadMemoryBarrier(); + get(arg, q->buffer + rd * q->elemSize, q->elemSize); + /* Ensure the data is read before we update the read index. + This prevents a producer from overwriting the element before + the consumer has finished copying it. */ + epicsAtomicWriteMemoryBarrier(); + epicsAtomicSetSizeT(&q->rd, (rd + 1) % q->numElems); + return FALSE; + } +#endif + + /* Mutex path for when wr == rd (empty or overflow) + OR if we don't have atomics */ + epicsMutexLock(q->mutex); if (q->wr == q->rd) { if (!q->overflow) { + epicsMutexUnlock(q->mutex); return TRUE; } - epicsMutexLock(q->mutex); get(arg, q->buffer + q->rd * q->elemSize, q->elemSize); /* check again, a put might have intervened */ - if (q->wr == q->rd && q->overflow) - q->overflow = FALSE; - else - q->rd = (q->rd + 1) % q->numElems; - epicsMutexUnlock(q->mutex); + if (q->wr == q->rd && q->overflow) { + epicsAtomicSetIntT(&q->overflow, 0); + } else { + epicsAtomicSetSizeT(&q->rd, (q->rd + 1) % q->numElems); + } } else { + /* Can happen if wr moved after our lock-free check */ get(arg, q->buffer + q->rd * q->elemSize, q->elemSize); - q->rd = (q->rd + 1) % q->numElems; + epicsAtomicSetSizeT(&q->rd, (q->rd + 1) % q->numElems); } + epicsMutexUnlock(q->mutex); return FALSE; } @@ -113,46 +163,50 @@ epicsShareFunc boolean seqQueuePut(QUEUE q, const void *value) epicsShareFunc boolean seqQueuePutF(QUEUE q, seqQueueFunc *put, const void *arg) { boolean r = FALSE; + size_t rd, wr; - if (q->overflow || (q->wr + 1) % q->numElems == q->rd) { - epicsMutexLock(q->mutex); - if ((q->wr + 1) % q->numElems == q->rd) { - if (q->overflow) { - r = TRUE; /* we will overwrite the last element */ - } - q->overflow = TRUE; + /* Always use mutex for Put to support Multi-Producer and safely handle overflow. */ + epicsMutexLock(q->mutex); + rd = epicsAtomicGetSizeT(&q->rd); + wr = epicsAtomicGetSizeT(&q->wr); + + if (q->overflow || (wr + 1) % q->numElems == rd) { + if ((wr + 1) % q->numElems == rd) { + if (q->overflow) r = TRUE; + epicsAtomicSetIntT(&q->overflow, 1); } else if (q->overflow) { - /* we had a get since the last put, so - can now eliminate overflow flag and instead - increment the write pointer */ - q->wr = (q->wr + 1) % q->numElems; - if ((q->wr + 1) % q->numElems != q->rd) { - q->overflow = FALSE; + /* A get happened, move wr forward */ + wr = (wr + 1) % q->numElems; + epicsAtomicSetSizeT(&q->wr, wr); + if ((wr + 1) % q->numElems != rd) { + epicsAtomicSetIntT(&q->overflow, 0); } } - put(q->buffer + q->wr * q->elemSize, arg, q->elemSize); - if (!q->overflow) { - q->wr = (q->wr + 1) % q->numElems; - } - epicsMutexUnlock(q->mutex); - } else { - put(q->buffer + q->wr * q->elemSize, arg, q->elemSize); - q->wr = (q->wr + 1) % q->numElems; } + + put(q->buffer + wr * q->elemSize, arg, q->elemSize); + + if (!epicsAtomicGetIntT(&q->overflow)) { + epicsAtomicWriteMemoryBarrier(); + epicsAtomicSetSizeT(&q->wr, (wr + 1) % q->numElems); + } + epicsMutexUnlock(q->mutex); return r; } epicsShareFunc void seqQueueFlush(QUEUE q) { epicsMutexLock(q->mutex); - q->rd = q->wr; - q->overflow = FALSE; + epicsAtomicSetSizeT(&q->rd, epicsAtomicGetSizeT(&q->wr)); + epicsAtomicSetIntT(&q->overflow, 0); epicsMutexUnlock(q->mutex); } static size_t used(const QUEUE q) { - return (q->numElems + q->wr - q->rd) % q->numElems + (q->overflow ? 1 : 0); + size_t rd = epicsAtomicGetSizeT(&q->rd); + size_t wr = epicsAtomicGetSizeT(&q->wr); + return (q->numElems + wr - rd) % q->numElems + (epicsAtomicGetIntT(&q->overflow) ? 1 : 0); } epicsShareFunc size_t seqQueueFree(const QUEUE q) @@ -167,12 +221,16 @@ epicsShareFunc size_t seqQueueUsed(const QUEUE q) epicsShareFunc boolean seqQueueIsEmpty(const QUEUE q) { - return q->wr == q->rd && !q->overflow; + size_t rd = epicsAtomicGetSizeT(&q->rd); + size_t wr = epicsAtomicGetSizeT(&q->wr); + return wr == rd && !epicsAtomicGetIntT(&q->overflow); } epicsShareFunc boolean seqQueueIsFull(const QUEUE q) { - return (q->wr + 1) % q->numElems == q->rd && q->overflow; + size_t rd = epicsAtomicGetSizeT(&q->rd); + size_t wr = epicsAtomicGetSizeT(&q->wr); + return (wr + 1) % q->numElems == rd && epicsAtomicGetIntT(&q->overflow); } epicsShareFunc size_t seqQueueNumElems(const QUEUE q) diff --git a/test/unit/queueTest.c b/test/unit/queueTest.c index 73aa6dfc..a9bc1263 100644 --- a/test/unit/queueTest.c +++ b/test/unit/queueTest.c @@ -5,6 +5,7 @@ Copyright (c) 2008 UChicago Argonne LLC, as Operator of Argonne National Laboratory. Copyright (c) 2010-2015 Helmholtz-Zentrum Berlin f. Materialien und Energie GmbH, Germany (HZB) +Copyright (c) 2026 ITER Organization This file is distributed subject to a Software License Agreement found in file LICENSE that is included with this distribution. \*************************************************************************/ @@ -13,6 +14,7 @@ in file LICENSE that is included with this distribution. #include "epicsEvent.h" #include "epicsUnitTest.h" #include "testMain.h" +#include typedef unsigned long long ELEM; @@ -33,32 +35,40 @@ static void check(QUEUE q, size_t expectedFree) testOk(isFull == expectedFull, "Full: %d == %d", isFull, expectedFull); } -static epicsEventId wdone, rdone, ready; +static epicsEventId wdone, rdone; static const int threadTestIterations = 1000000; static const size_t threadTestMaxNumElems = 20; -static int readerLost, writerLost; +static volatile int readerLost, writerLost; + +typedef struct { + int seq; + int inv_seq; +} THREAD_ELEM; static void readerTask(void *arg) { QUEUE q = (QUEUE)arg; - string data; + THREAD_ELEM elem; boolean empty; int i, j; for (i = 0; i < threadTestIterations; i++) { do { - empty = seqQueueGet(q, data); + empty = seqQueueGet(q, &elem); } while (empty); - j = atoi(data); - if (j= %d", j, i); } - if (j>threadTestIterations) { - testAbort("%d<=%d", j, threadTestIterations); + if (j >= threadTestIterations) { + testAbort("Sequence error: received %d, expected < %d", j, threadTestIterations); } - readerLost+=(j-i); + readerLost += (j - i); i = j; } epicsEventWait(wdone); @@ -68,13 +78,14 @@ static void readerTask(void *arg) static void writerTask(void *arg) { QUEUE q = (QUEUE)arg; - string data; + THREAD_ELEM elem; boolean full; int i; for (i = 0; i < threadTestIterations; i++) { - sprintf(data, "%d", i); - full = seqQueuePut(q, data); + elem.seq = i; + elem.inv_seq = ~i; + full = seqQueuePut(q, &elem); if (full) writerLost++; } epicsEventSignal(wdone); @@ -142,11 +153,10 @@ MAIN(queueTest) testDiag("concurrent queueTest with numElems=%u", (unsigned)numElems); - q = seqQueueCreate(numElems, sizeof(string)); + q = seqQueueCreate(numElems, sizeof(THREAD_ELEM)); wdone = epicsEventCreate(epicsEventEmpty); rdone = epicsEventCreate(epicsEventEmpty); - ready = epicsEventCreate(epicsEventEmpty); - if (!wdone || !rdone || !ready) { + if (!wdone || !rdone) { testAbort("epicsEventCreate failed"); } readerLost = writerLost = 0; @@ -162,11 +172,9 @@ MAIN(queueTest) seqQueueDestroy(q); testPass("ok"); + epicsEventDestroy(wdone); + epicsEventDestroy(rdone); } - epicsEventDestroy(wdone); - epicsEventDestroy(rdone); - epicsEventDestroy(ready); - return testDone(); }