diff options
Diffstat (limited to 'include/jau/ringbuffer.hpp')
-rw-r--r-- | include/jau/ringbuffer.hpp | 81 |
1 files changed, 40 insertions, 41 deletions
diff --git a/include/jau/ringbuffer.hpp b/include/jau/ringbuffer.hpp index 3c2646b..92ee799 100644 --- a/include/jau/ringbuffer.hpp +++ b/include/jau/ringbuffer.hpp @@ -92,13 +92,13 @@ template <typename T, std::nullptr_t nullelem> class ringbuffer : public ringbuf std::condition_variable cvRead; std::condition_variable cvWrite; - /* final */ int capacityPlusOne; // not final due to grow + /* final */ size_t capacityPlusOne; // not final due to grow /* final */ T * array; // Synchronized due to MM's data-race-free SC (SC-DRF) between [atomic] acquire/release - sc_atomic_int readPos; // Memory-Model (MM) guaranteed sequential consistency (SC) between acquire (read) and release (write) - sc_atomic_int writePos; // ditto - relaxed_atomic_int size; // Non-SC atomic size, only atomic value itself is synchronized. + sc_atomic_size_t readPos; // Memory-Model (MM) guaranteed sequential consistency (SC) between acquire (read) and release (write) + sc_atomic_size_t writePos; // ditto + relaxed_atomic_size_t size; // Non-SC atomic size, only atomic value itself is synchronized. - T * newArray(const int count) noexcept { + T * newArray(const size_t count) noexcept { return new T[count]; } void freeArray(T * a) noexcept { @@ -119,8 +119,8 @@ template <typename T, std::nullptr_t nullelem> class ringbuffer : public ringbuf readPos = source.readPos; writePos = source.writePos; size = source.size; - int localWritePos = readPos; - for(int i=0; i<size; i++) { + size_t localWritePos = readPos; + for(size_t i=0; i<size; i++) { localWritePos = (localWritePos + 1) % capacityPlusOne; array[localWritePos] = source.array[localWritePos]; } @@ -131,10 +131,10 @@ template <typename T, std::nullptr_t nullelem> class ringbuffer : public ringbuf void clearImpl() noexcept { // clear all elements, zero size - const int _size = size; // fast access + const size_t _size = size; // fast access if( 0 < _size ) { - int localReadPos = readPos; - for(int i=0; i<_size; i++) { + size_t localReadPos = readPos; + for(size_t i=0; i<_size; i++) { localReadPos = (localReadPos + 1) % capacityPlusOne; array[localReadPos] = nullelem; } @@ -147,7 +147,7 @@ template <typename T, std::nullptr_t nullelem> class ringbuffer : public ringbuf } } - void resetImpl(const T * copyFrom, const int copyFromCount) noexcept { + void resetImpl(const T * copyFrom, const size_t copyFromCount) noexcept { clearImpl(); // fill with copyFrom elements @@ -159,8 +159,8 @@ template <typename T, std::nullptr_t nullelem> class ringbuffer : public ringbuf readPos = 0; writePos = 0; } - int localWritePos = writePos; - for(int i=0; i<copyFromCount; i++) { + size_t localWritePos = writePos; + for(size_t i=0; i<copyFromCount; i++) { localWritePos = (localWritePos + 1) % capacityPlusOne; array[localWritePos] = copyFrom[i]; size++; @@ -172,9 +172,9 @@ template <typename T, std::nullptr_t nullelem> class ringbuffer : public ringbuf T getImpl(const bool blocking, const bool peek, const int timeoutMS) noexcept { std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl - const int oldReadPos = readPos; // SC-DRF acquire atomic readPos, sync'ing with putImpl - int localReadPos = oldReadPos; - if( localReadPos == writePos ) { // SC-DRF acquire atomic writePos, sync'ing with putImpl + const size_t oldReadPos = readPos; // SC-DRF acquire atomic readPos, sync'ing with putImpl + size_t localReadPos = oldReadPos; + if( localReadPos == writePos ) { if( blocking ) { std::unique_lock<std::mutex> lockRead(syncRead); // SC-DRF w/ putImpl via same lock while( localReadPos == writePos ) { @@ -211,9 +211,9 @@ template <typename T, std::nullptr_t nullelem> class ringbuffer : public ringbuf bool putImpl(const T &e, const bool sameRef, const bool blocking, const int timeoutMS) noexcept { std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiRead, _not_ sync'ing w/ getImpl - int localWritePos = writePos; // SC-DRF acquire atomic writePos, sync'ing with getImpl + size_t localWritePos = writePos; // SC-DRF acquire atomic writePos, sync'ing with getImpl localWritePos = (localWritePos + 1) % capacityPlusOne; - if( localWritePos == readPos ) { // SC-DRF acquire atomic readPos, sync'ing with getImpl + if( localWritePos == readPos ) { if( blocking ) { std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ getImpl via same lock while( localWritePos == readPos ) { @@ -243,17 +243,17 @@ template <typename T, std::nullptr_t nullelem> class ringbuffer : public ringbuf return true; } - int dropImpl (const int count) noexcept { + size_t dropImpl (const size_t count) noexcept { // locks ringbuffer completely (read/write), hence no need for local copy nor wait/sync etc std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor std::lock(lockMultiRead, lockMultiWrite); - const int dropCount = std::min(count, size.load()); + const size_t dropCount = std::min(count, size.load()); if( 0 == dropCount ) { return 0; } - for(int i=0; i<dropCount; i++) { + for(size_t i=0; i<dropCount; i++) { readPos = (readPos + 1) % capacityPlusOne; // T r = array[localReadPos]; array[readPos] = nullelem; @@ -272,7 +272,7 @@ template <typename T, std::nullptr_t nullelem> class ringbuffer : public ringbuf void dump(FILE *stream, std::string prefix) const noexcept override { fprintf(stream, "%s %s {\n", prefix.c_str(), toString().c_str()); - for(int i=0; i<capacityPlusOne; i++) { + for(size_t i=0; i<capacityPlusOne; i++) { // fprintf(stream, "\t[%d]: %p\n", i, array[i].get()); // FIXME } fprintf(stream, "}\n"); @@ -305,7 +305,7 @@ template <typename T, std::nullptr_t nullelem> class ringbuffer : public ringbuf resetImpl(copyFrom.data(), copyFrom.size()); } - ringbuffer(const T * copyFrom, const int copyFromSize) noexcept + ringbuffer(const T * copyFrom, const size_t copyFromSize) noexcept : capacityPlusOne(copyFromSize + 1), array(newArray(capacityPlusOne)), readPos(0), writePos(0), size(0) { @@ -329,7 +329,7 @@ template <typename T, std::nullptr_t nullelem> class ringbuffer : public ringbuf * @param arrayType the array type of the created empty internal array. * @param capacity the initial net capacity of the ring buffer */ - ringbuffer(const int capacity) noexcept + ringbuffer(const size_t capacity) noexcept : capacityPlusOne(capacity + 1), array(newArray(capacityPlusOne)), readPos(0), writePos(0), size(0) { } @@ -370,7 +370,7 @@ template <typename T, std::nullptr_t nullelem> class ringbuffer : public ringbuf ringbuffer(ringbuffer &&o) noexcept = default; ringbuffer& operator=(ringbuffer &&o) noexcept = default; - int capacity() const noexcept override { return capacityPlusOne-1; } + size_t capacity() const noexcept override { return capacityPlusOne-1; } void clear() noexcept override { std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops @@ -379,7 +379,7 @@ template <typename T, std::nullptr_t nullelem> class ringbuffer : public ringbuf clearImpl(); } - void reset(const T * copyFrom, const int copyFromCount) noexcept override { + void reset(const T * copyFrom, const size_t copyFromCount) noexcept override { std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor std::lock(lockMultiRead, lockMultiWrite); @@ -393,13 +393,15 @@ template <typename T, std::nullptr_t nullelem> class ringbuffer : public ringbuf resetImpl(copyFrom.data(), copyFrom.size()); } - int getSize() const noexcept override { return size; } + size_t getSize() const noexcept override { return size; } - int getFreeSlots() const noexcept override { return capacityPlusOne - 1 - size; } + size_t getFreeSlots() const noexcept override { return capacityPlusOne - 1 - size; } - bool isEmpty() const noexcept override { return writePos == readPos; /* 0 == size */ } + bool isEmpty() const noexcept override { return 0 == size; /* writePos == readPos */ } + bool isEmpty2() const noexcept { return writePos == readPos; /* 0 == size */ } - bool isFull() const noexcept override { return ( writePos + 1 ) % capacityPlusOne == readPos ; /* capacityPlusOne - 1 == size */; } + bool isFull() const noexcept override { return capacityPlusOne - 1 <= size; /* ( writePos + 1 ) % capacityPlusOne == readPos <==> capacityPlusOne - 1 == size */; } + bool isFull2() const noexcept { return ( writePos + 1 ) % capacityPlusOne == readPos; /* capacityPlusOne - 1 == size */; } T get() noexcept override { return getImpl(false, false, 0); } @@ -415,7 +417,7 @@ template <typename T, std::nullptr_t nullelem> class ringbuffer : public ringbuf return getImpl(true, true, timeoutMS); } - int drop(const int count) noexcept override { + size_t drop(const size_t count) noexcept override { return dropImpl(count); } @@ -435,7 +437,7 @@ template <typename T, std::nullptr_t nullelem> class ringbuffer : public ringbuf return putImpl(nullelem, true, true, timeoutMS); } - void waitForFreeSlots(const int count) noexcept override { + void waitForFreeSlots(const size_t count) noexcept override { std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops std::unique_lock<std::mutex> lockRead(syncRead, std::defer_lock); // otherwise RAII-style relinquish via destructor std::lock(lockMultiWrite, lockRead); @@ -445,11 +447,11 @@ template <typename T, std::nullptr_t nullelem> class ringbuffer : public ringbuf } } - void recapacity(const int newCapacity) override { + void recapacity(const size_t newCapacity) override { std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor std::lock(lockMultiRead, lockMultiWrite); - const int _size = size; // fast access + const size_t _size = size; // fast access if( capacityPlusOne == newCapacity+1 ) { return; @@ -457,14 +459,11 @@ template <typename T, std::nullptr_t nullelem> class ringbuffer : public ringbuf if( _size > newCapacity ) { throw IllegalArgumentException("amount "+std::to_string(newCapacity)+" < size, "+toString(), E_FILE_LINE); } - if( 0 > newCapacity ) { - throw IllegalArgumentException("amount "+std::to_string(newCapacity)+" < 0, "+toString(), E_FILE_LINE); - } // save current data - int oldCapacityPlusOne = capacityPlusOne; + size_t oldCapacityPlusOne = capacityPlusOne; T * oldArray = array; - int oldReadPos = readPos; + size_t oldReadPos = readPos; // new blank resized array capacityPlusOne = newCapacity + 1; @@ -474,8 +473,8 @@ template <typename T, std::nullptr_t nullelem> class ringbuffer : public ringbuf // copy saved data if( nullptr != oldArray && 0 < _size ) { - int localWritePos = writePos; - for(int i=0; i<_size; i++) { + size_t localWritePos = writePos; + for(size_t i=0; i<_size; i++) { localWritePos = (localWritePos + 1) % capacityPlusOne; oldReadPos = (oldReadPos + 1) % oldCapacityPlusOne; array[localWritePos] = oldArray[oldReadPos]; |