summaryrefslogtreecommitdiffstats
path: root/include/jau/ringbuffer.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'include/jau/ringbuffer.hpp')
-rw-r--r--include/jau/ringbuffer.hpp272
1 files changed, 226 insertions, 46 deletions
diff --git a/include/jau/ringbuffer.hpp b/include/jau/ringbuffer.hpp
index 893b027..0a7b859 100644
--- a/include/jau/ringbuffer.hpp
+++ b/include/jau/ringbuffer.hpp
@@ -26,6 +26,7 @@
#ifndef JAU_RINGBUFFER_HPP_
#define JAU_RINGBUFFER_HPP_
+#include <type_traits>
#include <atomic>
#include <memory>
#include <mutex>
@@ -39,13 +40,12 @@
#include <jau/debug.hpp>
#include <jau/basic_types.hpp>
-#include <jau/ringbuffer_if.hpp>
#include <jau/ordered_atomic.hpp>
namespace jau {
/**
- * Simple implementation of {@link ringbuffer_if},
+ * Ring buffer implementation, a.k.a circular buffer,
* exposing <i>lock-free</i>
* {@link #get() get*(..)} and {@link #put(Object) put*(..)} methods.
* <p>
@@ -85,7 +85,7 @@ namespace jau {
* - std::memory_order <https://en.cppreference.com/w/cpp/atomic/memory_order>
* </pre>
*/
-template <typename T, std::nullptr_t nullelem, typename Size_type> class ringbuffer : public ringbuffer_if<T, Size_type> {
+template <typename T, std::nullptr_t nullelem, typename Size_type> class ringbuffer {
private:
/** SC atomic integral scalar jau::nsize_t. Memory-Model (MM) guaranteed sequential consistency (SC) between acquire (read) and release (write) */
typedef ordered_atomic<Size_type, std::memory_order::memory_order_seq_cst> sc_atomic_Size_type;
@@ -175,7 +175,7 @@ template <typename T, std::nullptr_t nullelem, typename Size_type> class ringbuf
}
}
- T getImpl(const bool blocking, const bool peek, const int timeoutMS) noexcept {
+ T moveOutImpl(const bool blocking, const int timeoutMS) noexcept {
std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl
const Size_type oldReadPos = readPos; // SC-DRF acquire atomic readPos, sync'ing with putImpl
@@ -199,22 +199,51 @@ template <typename T, std::nullptr_t nullelem, typename Size_type> class ringbuf
}
}
localReadPos = (localReadPos + 1) % capacityPlusOne;
- T r = array[localReadPos]; // SC-DRF
- if( !peek ) {
- array[localReadPos] = nullelem;
- {
- std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock
- size--;
- readPos = localReadPos; // SC-DRF release atomic readPos
- cvWrite.notify_all(); // notify waiting putter
+ T r = std::move( array[localReadPos] ); // SC-DRF
+ array[localReadPos] = nullelem;
+ {
+ std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock
+ size--;
+ readPos = localReadPos; // SC-DRF release atomic readPos
+ cvWrite.notify_all(); // notify waiting putter
+ }
+ return r;
+ }
+
+ T peekImpl(const bool blocking, const int timeoutMS) noexcept {
+ if( !std::is_copy_constructible_v<T> ) {
+ ABORT("T is not copy constructible");
+ return nullelem;
+ }
+ std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl
+
+ const Size_type oldReadPos = readPos; // SC-DRF acquire atomic readPos, sync'ing with putImpl
+ Size_type localReadPos = oldReadPos;
+ if( localReadPos == writePos ) {
+ if( blocking ) {
+ std::unique_lock<std::mutex> lockRead(syncRead); // SC-DRF w/ putImpl via same lock
+ while( localReadPos == writePos ) {
+ if( 0 == timeoutMS ) {
+ cvRead.wait(lockRead);
+ } else {
+ std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now();
+ std::cv_status s = cvRead.wait_until(lockRead, t0 + std::chrono::milliseconds(timeoutMS));
+ if( std::cv_status::timeout == s && localReadPos == writePos ) {
+ return nullelem;
+ }
+ }
+ }
+ } else {
+ return nullelem;
}
- } else {
- readPos = oldReadPos; // SC-DRF release atomic readPos (complete acquire-release even @ peek)
}
+ localReadPos = (localReadPos + 1) % capacityPlusOne;
+ T r = array[localReadPos]; // SC-DRF
+ readPos = oldReadPos; // SC-DRF release atomic readPos (complete acquire-release even @ peek)
return r;
}
- bool putImpl(const T &e, const bool sameRef, const bool blocking, const int timeoutMS) noexcept {
+ bool moveIntoImpl(T &&e, const bool blocking, const int timeoutMS) noexcept {
std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiRead, _not_ sync'ing w/ getImpl
Size_type localWritePos = writePos; // SC-DRF acquire atomic writePos, sync'ing with getImpl
@@ -237,9 +266,44 @@ template <typename T, std::nullptr_t nullelem, typename Size_type> class ringbuf
return false;
}
}
- if( !sameRef ) {
- array[localWritePos] = e; // SC-DRF
+ array[localWritePos] = std::move(e); // SC-DRF
+ {
+ std::unique_lock<std::mutex> lockRead(syncRead); // SC-DRF w/ getImpl via same lock
+ size++;
+ writePos = localWritePos; // SC-DRF release atomic writePos
+ cvRead.notify_all(); // notify waiting getter
}
+ return true;
+ }
+
+ bool copyIntoImpl(const T &e, const bool blocking, const int timeoutMS) noexcept {
+ if( !std::is_copy_constructible_v<T> ) {
+ ABORT("T is not copy constructible");
+ return false;
+ }
+ std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiRead, _not_ sync'ing w/ getImpl
+
+ Size_type localWritePos = writePos; // SC-DRF acquire atomic writePos, sync'ing with getImpl
+ localWritePos = (localWritePos + 1) % capacityPlusOne;
+ if( localWritePos == readPos ) {
+ if( blocking ) {
+ std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ getImpl via same lock
+ while( localWritePos == readPos ) {
+ if( 0 == timeoutMS ) {
+ cvWrite.wait(lockWrite);
+ } else {
+ std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now();
+ std::cv_status s = cvWrite.wait_until(lockWrite, t0 + std::chrono::milliseconds(timeoutMS));
+ if( std::cv_status::timeout == s && localWritePos == readPos ) {
+ return false;
+ }
+ }
+ }
+ } else {
+ return false;
+ }
+ }
+ array[localWritePos] = e; // SC-DRF
{
std::unique_lock<std::mutex> lockRead(syncRead); // SC-DRF w/ getImpl via same lock
size++;
@@ -269,14 +333,16 @@ template <typename T, std::nullptr_t nullelem, typename Size_type> class ringbuf
}
public:
- std::string toString() const noexcept override {
+ /** Returns a short string representation incl. size/capacity and internal r/w index (impl. dependent). */
+ std::string toString() const noexcept {
const std::string es = isEmpty() ? ", empty" : "";
const std::string fs = isFull() ? ", full" : "";
return "ringbuffer<?>[size "+std::to_string(size)+" / "+std::to_string(capacityPlusOne-1)+
", writePos "+std::to_string(writePos)+", readPos "+std::to_string(readPos)+es+fs+"]";
}
- void dump(FILE *stream, std::string prefix) const noexcept override {
+ /** Debug functionality - Dumps the contents of the internal array. */
+ void dump(FILE *stream, std::string prefix) const noexcept {
fprintf(stream, "%s %s {\n", prefix.c_str(), toString().c_str());
for(Size_type i=0; i<capacityPlusOne; i++) {
// fprintf(stream, "\t[%d]: %p\n", i, array[i].get()); // FIXME
@@ -376,74 +442,182 @@ template <typename T, std::nullptr_t nullelem, typename Size_type> class ringbuf
ringbuffer(ringbuffer &&o) noexcept = default;
ringbuffer& operator=(ringbuffer &&o) noexcept = default;
- Size_type capacity() const noexcept override { return capacityPlusOne-1; }
+ /** Returns the net capacity of this ring buffer. */
+ Size_type capacity() const noexcept { return capacityPlusOne-1; }
- void clear() noexcept override {
+ /**
+ * Releasing all elements by assigning <code>null</code>.
+ * <p>
+ * {@link #isEmpty()} will return <code>true</code> and
+ * {@link #getSize()} will return <code>0</code> after calling this method.
+ * </p>
+ */
+ void clear() noexcept {
std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops
std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor
std::lock(lockMultiRead, lockMultiWrite);
clearImpl();
}
- void reset(const T * copyFrom, const Size_type copyFromCount) noexcept override {
+ /**
+ * {@link #clear()} all elements and add all <code>copyFrom</code> elements thereafter.
+ * @param copyFrom Mandatory array w/ length {@link #capacity()} to be copied into the internal array.
+ */
+ void reset(const T * copyFrom, const Size_type copyFromCount) noexcept {
std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops
std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor
std::lock(lockMultiRead, lockMultiWrite);
resetImpl(copyFrom, copyFromCount);
}
- void reset(const std::vector<T> & copyFrom) noexcept override {
+ void reset(const std::vector<T> & copyFrom) noexcept {
std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops
std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor
std::lock(lockMultiRead, lockMultiWrite);
resetImpl(copyFrom.data(), copyFrom.size());
}
- Size_type getSize() const noexcept override { return size; }
+ /** Returns the number of elements in this ring buffer. */
+ Size_type getSize() const noexcept { return size; }
- Size_type getFreeSlots() const noexcept override { return capacityPlusOne - 1 - size; }
+ /** Returns the number of free slots available to put. */
+ Size_type getFreeSlots() const noexcept { return capacityPlusOne - 1 - size; }
- bool isEmpty() const noexcept override { return 0 == size; /* writePos == readPos */ }
+ /** Returns true if this ring buffer is empty, otherwise false. */
+ bool isEmpty() const noexcept { return 0 == size; /* writePos == readPos */ }
bool isEmpty2() const noexcept { return writePos == readPos; /* 0 == size */ }
- bool isFull() const noexcept override { return capacityPlusOne - 1 <= size; /* ( writePos + 1 ) % capacityPlusOne == readPos <==> capacityPlusOne - 1 == size */; }
+ /** Returns true if this ring buffer is full, otherwise false. */
+ bool isFull() const noexcept { return capacityPlusOne - 1 <= size; /* ( writePos + 1 ) % capacityPlusOne == readPos <==> capacityPlusOne - 1 == size */; }
bool isFull2() const noexcept { return ( writePos + 1 ) % capacityPlusOne == readPos; /* capacityPlusOne - 1 == size */; }
- T get() noexcept override { return getImpl(false, false, 0); }
+ /**
+ * Dequeues the oldest enqueued element if available, otherwise null.
+ * <p>
+ * The returned ring buffer slot will be set to <code>null</code> to release the reference
+ * and move ownership to the caller.
+ * </p>
+ * <p>
+ * Method is non blocking and returns immediately;.
+ * </p>
+ * @return the oldest put element if available, otherwise null.
+ */
+ T get() noexcept {
+ return moveOutImpl(false, 0);
+ }
- T getBlocking(const int timeoutMS=0) noexcept override {
- return getImpl(true, false, timeoutMS);
+ /**
+ * Dequeues the oldest enqueued element.
+ * <p>
+ * The returned ring buffer slot will be set to <code>null</code> to release the reference
+ * and move ownership to the caller.
+ * </p>
+ * <p>
+ * <code>timeoutMS</code> defaults to zero,
+ * i.e. infinitive blocking until an element available via put.<br>
+ * Otherwise this methods blocks for the given milliseconds.
+ * </p>
+ * @return the oldest put element or <code>null</code> if timeout occurred.
+ * @throws InterruptedException
+ */
+ T getBlocking(const int timeoutMS=0) noexcept {
+ return moveOutImpl(true, timeoutMS);
}
- T peek() noexcept override {
- return getImpl(false, true, 0);
+ /**
+ * Peeks the next element at the read position w/o modifying pointer, nor blocking.
+ * @return <code>null</code> if empty, otherwise the element which would be read next.
+ */
+ T peek() noexcept {
+ return peekImpl(false, 0);
}
- T peekBlocking(const int timeoutMS=0) noexcept override {
- return getImpl(true, true, timeoutMS);
+ /**
+ * Peeks the next element at the read position w/o modifying pointer, but with blocking.
+ * <p>
+ * <code>timeoutMS</code> defaults to zero,
+ * i.e. infinitive blocking until an element available via put.<br>
+ * Otherwise this methods blocks for the given milliseconds.
+ * </p>
+ * @return <code>null</code> if empty or timeout occurred, otherwise the element which would be read next.
+ */
+ T peekBlocking(const int timeoutMS=0) noexcept {
+ return peekImpl(true, timeoutMS);
}
- Size_type drop(const Size_type count) noexcept override {
+ /**
+ * Drops up to {@code count} oldest enqueued elements.
+ * <p>
+ * Method is non blocking and returns immediately;.
+ * </p>
+ * @param count maximum number of elements to drop from ringbuffer.
+ * @return actual number of dropped elements.
+ */
+ Size_type drop(const Size_type count) noexcept {
return dropImpl(count);
}
- bool put(const T & e) noexcept override {
- return putImpl(e, false, false, 0);
+ /**
+ * Enqueues the given element by moving it into this ringbuffer storage.
+ * <p>
+ * Returns true if successful, otherwise false in case buffer is full.
+ * </p>
+ * <p>
+ * Method is non blocking and returns immediately;.
+ * </p>
+ */
+ bool put(T && e) noexcept {
+ return moveIntoImpl(std::move(e), false, 0);
}
- bool putBlocking(const T & e, const int timeoutMS=0) noexcept override {
- return !putImpl(e, false, true, timeoutMS);
+ /**
+ * Enqueues the given element by moving it into this ringbuffer storage.
+ * <p>
+ * <code>timeoutMS</code> defaults to zero,
+ * i.e. infinitive blocking until a free slot becomes available via get.<br>
+ * Otherwise this methods blocks for the given milliseconds.
+ * </p>
+ * <p>
+ * Returns true if successful, otherwise false in case timeout occurred.
+ * </p>
+ */
+ bool putBlocking(T && e, const int timeoutMS=0) noexcept {
+ return moveIntoImpl(std::move(e), true, timeoutMS);
}
- bool putSame() noexcept override {
- return putImpl(nullelem, true, false, 0);
+ /**
+ * Enqueues the given element by copying it into this ringbuffer storage.
+ * <p>
+ * Returns true if successful, otherwise false in case buffer is full.
+ * </p>
+ * <p>
+ * Method is non blocking and returns immediately;.
+ * </p>
+ */
+ bool put(const T & e) noexcept {
+ return copyIntoImpl(e, false, 0);
}
- bool putSameBlocking(const int timeoutMS=0) noexcept override {
- return putImpl(nullelem, true, true, timeoutMS);
+ /**
+ * Enqueues the given element by copying it into this ringbuffer storage.
+ * <p>
+ * <code>timeoutMS</code> defaults to zero,
+ * i.e. infinitive blocking until a free slot becomes available via get.<br>
+ * Otherwise this methods blocks for the given milliseconds.
+ * </p>
+ * <p>
+ * Returns true if successful, otherwise false in case timeout occurred.
+ * </p>
+ */
+ bool putBlocking(const T & e, const int timeoutMS=0) noexcept {
+ return copyIntoImpl(e, true, timeoutMS);
}
- void waitForFreeSlots(const Size_type count) noexcept override {
+ /**
+ * Blocks until at least <code>count</code> free slots become available.
+ * @throws InterruptedException
+ */
+ void waitForFreeSlots(const Size_type count) noexcept {
std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops
std::unique_lock<std::mutex> lockRead(syncRead, std::defer_lock); // otherwise RAII-style relinquish via destructor
std::lock(lockMultiWrite, lockRead);
@@ -453,7 +627,13 @@ template <typename T, std::nullptr_t nullelem, typename Size_type> class ringbuf
}
}
- void recapacity(const Size_type newCapacity) override {
+ /**
+ * Resizes this ring buffer's capacity.
+ * <p>
+ * New capacity must be greater than current size.
+ * </p>
+ */
+ void recapacity(const Size_type newCapacity) {
std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops
std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor
std::lock(lockMultiRead, lockMultiWrite);
@@ -483,7 +663,7 @@ template <typename T, std::nullptr_t nullelem, typename Size_type> class ringbuf
for(Size_type i=0; i<_size; i++) {
localWritePos = (localWritePos + 1) % capacityPlusOne;
oldReadPos = (oldReadPos + 1) % oldCapacityPlusOne;
- array[localWritePos] = oldArray[oldReadPos];
+ array[localWritePos] = std::move( oldArray[oldReadPos] );
}
writePos = localWritePos;
}