diff options
author | Sven Gothel <[email protected]> | 2020-12-13 21:41:43 +0100 |
---|---|---|
committer | Sven Gothel <[email protected]> | 2020-12-13 21:41:43 +0100 |
commit | 19f1d05cda6a809541f9b17397dd3830b2e3f322 (patch) | |
tree | 12da7968cef8b3cf2d40a3ab6e0def9f05686b0b /include/jau/ringbuffer.hpp | |
parent | 3a89fd951921ef4d145d6ba5b47ff9e9168bddb7 (diff) |
ringbuffer.hpp: Drop interface ringbuffer_if; Add move-operations for put*(..); use move-operation for get*(..)
As we only have one implementation of the ringbuffer, drop the need for the virtual function table for efficancy.
The added move-operations allows using types w/o copy-constructor, e.g. std::unique_ptr<T>,
and using std::move for get*(..) operations allows for more efficancy.
Diffstat (limited to 'include/jau/ringbuffer.hpp')
-rw-r--r-- | include/jau/ringbuffer.hpp | 272 |
1 files changed, 226 insertions, 46 deletions
diff --git a/include/jau/ringbuffer.hpp b/include/jau/ringbuffer.hpp index 893b027..0a7b859 100644 --- a/include/jau/ringbuffer.hpp +++ b/include/jau/ringbuffer.hpp @@ -26,6 +26,7 @@ #ifndef JAU_RINGBUFFER_HPP_ #define JAU_RINGBUFFER_HPP_ +#include <type_traits> #include <atomic> #include <memory> #include <mutex> @@ -39,13 +40,12 @@ #include <jau/debug.hpp> #include <jau/basic_types.hpp> -#include <jau/ringbuffer_if.hpp> #include <jau/ordered_atomic.hpp> namespace jau { /** - * Simple implementation of {@link ringbuffer_if}, + * Ring buffer implementation, a.k.a circular buffer, * exposing <i>lock-free</i> * {@link #get() get*(..)} and {@link #put(Object) put*(..)} methods. * <p> @@ -85,7 +85,7 @@ namespace jau { * - std::memory_order <https://en.cppreference.com/w/cpp/atomic/memory_order> * </pre> */ -template <typename T, std::nullptr_t nullelem, typename Size_type> class ringbuffer : public ringbuffer_if<T, Size_type> { +template <typename T, std::nullptr_t nullelem, typename Size_type> class ringbuffer { private: /** SC atomic integral scalar jau::nsize_t. Memory-Model (MM) guaranteed sequential consistency (SC) between acquire (read) and release (write) */ typedef ordered_atomic<Size_type, std::memory_order::memory_order_seq_cst> sc_atomic_Size_type; @@ -175,7 +175,7 @@ template <typename T, std::nullptr_t nullelem, typename Size_type> class ringbuf } } - T getImpl(const bool blocking, const bool peek, const int timeoutMS) noexcept { + T moveOutImpl(const bool blocking, const int timeoutMS) noexcept { std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl const Size_type oldReadPos = readPos; // SC-DRF acquire atomic readPos, sync'ing with putImpl @@ -199,22 +199,51 @@ template <typename T, std::nullptr_t nullelem, typename Size_type> class ringbuf } } localReadPos = (localReadPos + 1) % capacityPlusOne; - T r = array[localReadPos]; // SC-DRF - if( !peek ) { - array[localReadPos] = nullelem; - { - std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock - size--; - readPos = localReadPos; // SC-DRF release atomic readPos - cvWrite.notify_all(); // notify waiting putter + T r = std::move( array[localReadPos] ); // SC-DRF + array[localReadPos] = nullelem; + { + std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock + size--; + readPos = localReadPos; // SC-DRF release atomic readPos + cvWrite.notify_all(); // notify waiting putter + } + return r; + } + + T peekImpl(const bool blocking, const int timeoutMS) noexcept { + if( !std::is_copy_constructible_v<T> ) { + ABORT("T is not copy constructible"); + return nullelem; + } + std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl + + const Size_type oldReadPos = readPos; // SC-DRF acquire atomic readPos, sync'ing with putImpl + Size_type localReadPos = oldReadPos; + if( localReadPos == writePos ) { + if( blocking ) { + std::unique_lock<std::mutex> lockRead(syncRead); // SC-DRF w/ putImpl via same lock + while( localReadPos == writePos ) { + if( 0 == timeoutMS ) { + cvRead.wait(lockRead); + } else { + std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now(); + std::cv_status s = cvRead.wait_until(lockRead, t0 + std::chrono::milliseconds(timeoutMS)); + if( std::cv_status::timeout == s && localReadPos == writePos ) { + return nullelem; + } + } + } + } else { + return nullelem; } - } else { - readPos = oldReadPos; // SC-DRF release atomic readPos (complete acquire-release even @ peek) } + localReadPos = (localReadPos + 1) % capacityPlusOne; + T r = array[localReadPos]; // SC-DRF + readPos = oldReadPos; // SC-DRF release atomic readPos (complete acquire-release even @ peek) return r; } - bool putImpl(const T &e, const bool sameRef, const bool blocking, const int timeoutMS) noexcept { + bool moveIntoImpl(T &&e, const bool blocking, const int timeoutMS) noexcept { std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiRead, _not_ sync'ing w/ getImpl Size_type localWritePos = writePos; // SC-DRF acquire atomic writePos, sync'ing with getImpl @@ -237,9 +266,44 @@ template <typename T, std::nullptr_t nullelem, typename Size_type> class ringbuf return false; } } - if( !sameRef ) { - array[localWritePos] = e; // SC-DRF + array[localWritePos] = std::move(e); // SC-DRF + { + std::unique_lock<std::mutex> lockRead(syncRead); // SC-DRF w/ getImpl via same lock + size++; + writePos = localWritePos; // SC-DRF release atomic writePos + cvRead.notify_all(); // notify waiting getter } + return true; + } + + bool copyIntoImpl(const T &e, const bool blocking, const int timeoutMS) noexcept { + if( !std::is_copy_constructible_v<T> ) { + ABORT("T is not copy constructible"); + return false; + } + std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiRead, _not_ sync'ing w/ getImpl + + Size_type localWritePos = writePos; // SC-DRF acquire atomic writePos, sync'ing with getImpl + localWritePos = (localWritePos + 1) % capacityPlusOne; + if( localWritePos == readPos ) { + if( blocking ) { + std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ getImpl via same lock + while( localWritePos == readPos ) { + if( 0 == timeoutMS ) { + cvWrite.wait(lockWrite); + } else { + std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now(); + std::cv_status s = cvWrite.wait_until(lockWrite, t0 + std::chrono::milliseconds(timeoutMS)); + if( std::cv_status::timeout == s && localWritePos == readPos ) { + return false; + } + } + } + } else { + return false; + } + } + array[localWritePos] = e; // SC-DRF { std::unique_lock<std::mutex> lockRead(syncRead); // SC-DRF w/ getImpl via same lock size++; @@ -269,14 +333,16 @@ template <typename T, std::nullptr_t nullelem, typename Size_type> class ringbuf } public: - std::string toString() const noexcept override { + /** Returns a short string representation incl. size/capacity and internal r/w index (impl. dependent). */ + std::string toString() const noexcept { const std::string es = isEmpty() ? ", empty" : ""; const std::string fs = isFull() ? ", full" : ""; return "ringbuffer<?>[size "+std::to_string(size)+" / "+std::to_string(capacityPlusOne-1)+ ", writePos "+std::to_string(writePos)+", readPos "+std::to_string(readPos)+es+fs+"]"; } - void dump(FILE *stream, std::string prefix) const noexcept override { + /** Debug functionality - Dumps the contents of the internal array. */ + void dump(FILE *stream, std::string prefix) const noexcept { fprintf(stream, "%s %s {\n", prefix.c_str(), toString().c_str()); for(Size_type i=0; i<capacityPlusOne; i++) { // fprintf(stream, "\t[%d]: %p\n", i, array[i].get()); // FIXME @@ -376,74 +442,182 @@ template <typename T, std::nullptr_t nullelem, typename Size_type> class ringbuf ringbuffer(ringbuffer &&o) noexcept = default; ringbuffer& operator=(ringbuffer &&o) noexcept = default; - Size_type capacity() const noexcept override { return capacityPlusOne-1; } + /** Returns the net capacity of this ring buffer. */ + Size_type capacity() const noexcept { return capacityPlusOne-1; } - void clear() noexcept override { + /** + * Releasing all elements by assigning <code>null</code>. + * <p> + * {@link #isEmpty()} will return <code>true</code> and + * {@link #getSize()} will return <code>0</code> after calling this method. + * </p> + */ + void clear() noexcept { std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor std::lock(lockMultiRead, lockMultiWrite); clearImpl(); } - void reset(const T * copyFrom, const Size_type copyFromCount) noexcept override { + /** + * {@link #clear()} all elements and add all <code>copyFrom</code> elements thereafter. + * @param copyFrom Mandatory array w/ length {@link #capacity()} to be copied into the internal array. + */ + void reset(const T * copyFrom, const Size_type copyFromCount) noexcept { std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor std::lock(lockMultiRead, lockMultiWrite); resetImpl(copyFrom, copyFromCount); } - void reset(const std::vector<T> & copyFrom) noexcept override { + void reset(const std::vector<T> & copyFrom) noexcept { std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor std::lock(lockMultiRead, lockMultiWrite); resetImpl(copyFrom.data(), copyFrom.size()); } - Size_type getSize() const noexcept override { return size; } + /** Returns the number of elements in this ring buffer. */ + Size_type getSize() const noexcept { return size; } - Size_type getFreeSlots() const noexcept override { return capacityPlusOne - 1 - size; } + /** Returns the number of free slots available to put. */ + Size_type getFreeSlots() const noexcept { return capacityPlusOne - 1 - size; } - bool isEmpty() const noexcept override { return 0 == size; /* writePos == readPos */ } + /** Returns true if this ring buffer is empty, otherwise false. */ + bool isEmpty() const noexcept { return 0 == size; /* writePos == readPos */ } bool isEmpty2() const noexcept { return writePos == readPos; /* 0 == size */ } - bool isFull() const noexcept override { return capacityPlusOne - 1 <= size; /* ( writePos + 1 ) % capacityPlusOne == readPos <==> capacityPlusOne - 1 == size */; } + /** Returns true if this ring buffer is full, otherwise false. */ + bool isFull() const noexcept { return capacityPlusOne - 1 <= size; /* ( writePos + 1 ) % capacityPlusOne == readPos <==> capacityPlusOne - 1 == size */; } bool isFull2() const noexcept { return ( writePos + 1 ) % capacityPlusOne == readPos; /* capacityPlusOne - 1 == size */; } - T get() noexcept override { return getImpl(false, false, 0); } + /** + * Dequeues the oldest enqueued element if available, otherwise null. + * <p> + * The returned ring buffer slot will be set to <code>null</code> to release the reference + * and move ownership to the caller. + * </p> + * <p> + * Method is non blocking and returns immediately;. + * </p> + * @return the oldest put element if available, otherwise null. + */ + T get() noexcept { + return moveOutImpl(false, 0); + } - T getBlocking(const int timeoutMS=0) noexcept override { - return getImpl(true, false, timeoutMS); + /** + * Dequeues the oldest enqueued element. + * <p> + * The returned ring buffer slot will be set to <code>null</code> to release the reference + * and move ownership to the caller. + * </p> + * <p> + * <code>timeoutMS</code> defaults to zero, + * i.e. infinitive blocking until an element available via put.<br> + * Otherwise this methods blocks for the given milliseconds. + * </p> + * @return the oldest put element or <code>null</code> if timeout occurred. + * @throws InterruptedException + */ + T getBlocking(const int timeoutMS=0) noexcept { + return moveOutImpl(true, timeoutMS); } - T peek() noexcept override { - return getImpl(false, true, 0); + /** + * Peeks the next element at the read position w/o modifying pointer, nor blocking. + * @return <code>null</code> if empty, otherwise the element which would be read next. + */ + T peek() noexcept { + return peekImpl(false, 0); } - T peekBlocking(const int timeoutMS=0) noexcept override { - return getImpl(true, true, timeoutMS); + /** + * Peeks the next element at the read position w/o modifying pointer, but with blocking. + * <p> + * <code>timeoutMS</code> defaults to zero, + * i.e. infinitive blocking until an element available via put.<br> + * Otherwise this methods blocks for the given milliseconds. + * </p> + * @return <code>null</code> if empty or timeout occurred, otherwise the element which would be read next. + */ + T peekBlocking(const int timeoutMS=0) noexcept { + return peekImpl(true, timeoutMS); } - Size_type drop(const Size_type count) noexcept override { + /** + * Drops up to {@code count} oldest enqueued elements. + * <p> + * Method is non blocking and returns immediately;. + * </p> + * @param count maximum number of elements to drop from ringbuffer. + * @return actual number of dropped elements. + */ + Size_type drop(const Size_type count) noexcept { return dropImpl(count); } - bool put(const T & e) noexcept override { - return putImpl(e, false, false, 0); + /** + * Enqueues the given element by moving it into this ringbuffer storage. + * <p> + * Returns true if successful, otherwise false in case buffer is full. + * </p> + * <p> + * Method is non blocking and returns immediately;. + * </p> + */ + bool put(T && e) noexcept { + return moveIntoImpl(std::move(e), false, 0); } - bool putBlocking(const T & e, const int timeoutMS=0) noexcept override { - return !putImpl(e, false, true, timeoutMS); + /** + * Enqueues the given element by moving it into this ringbuffer storage. + * <p> + * <code>timeoutMS</code> defaults to zero, + * i.e. infinitive blocking until a free slot becomes available via get.<br> + * Otherwise this methods blocks for the given milliseconds. + * </p> + * <p> + * Returns true if successful, otherwise false in case timeout occurred. + * </p> + */ + bool putBlocking(T && e, const int timeoutMS=0) noexcept { + return moveIntoImpl(std::move(e), true, timeoutMS); } - bool putSame() noexcept override { - return putImpl(nullelem, true, false, 0); + /** + * Enqueues the given element by copying it into this ringbuffer storage. + * <p> + * Returns true if successful, otherwise false in case buffer is full. + * </p> + * <p> + * Method is non blocking and returns immediately;. + * </p> + */ + bool put(const T & e) noexcept { + return copyIntoImpl(e, false, 0); } - bool putSameBlocking(const int timeoutMS=0) noexcept override { - return putImpl(nullelem, true, true, timeoutMS); + /** + * Enqueues the given element by copying it into this ringbuffer storage. + * <p> + * <code>timeoutMS</code> defaults to zero, + * i.e. infinitive blocking until a free slot becomes available via get.<br> + * Otherwise this methods blocks for the given milliseconds. + * </p> + * <p> + * Returns true if successful, otherwise false in case timeout occurred. + * </p> + */ + bool putBlocking(const T & e, const int timeoutMS=0) noexcept { + return copyIntoImpl(e, true, timeoutMS); } - void waitForFreeSlots(const Size_type count) noexcept override { + /** + * Blocks until at least <code>count</code> free slots become available. + * @throws InterruptedException + */ + void waitForFreeSlots(const Size_type count) noexcept { std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops std::unique_lock<std::mutex> lockRead(syncRead, std::defer_lock); // otherwise RAII-style relinquish via destructor std::lock(lockMultiWrite, lockRead); @@ -453,7 +627,13 @@ template <typename T, std::nullptr_t nullelem, typename Size_type> class ringbuf } } - void recapacity(const Size_type newCapacity) override { + /** + * Resizes this ring buffer's capacity. + * <p> + * New capacity must be greater than current size. + * </p> + */ + void recapacity(const Size_type newCapacity) { std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor std::lock(lockMultiRead, lockMultiWrite); @@ -483,7 +663,7 @@ template <typename T, std::nullptr_t nullelem, typename Size_type> class ringbuf for(Size_type i=0; i<_size; i++) { localWritePos = (localWritePos + 1) % capacityPlusOne; oldReadPos = (oldReadPos + 1) % oldCapacityPlusOne; - array[localWritePos] = oldArray[oldReadPos]; + array[localWritePos] = std::move( oldArray[oldReadPos] ); } writePos = localWritePos; } |