/* * Author: Sven Gothel * Copyright (c) 2020 Gothel Software e.K. * Copyright (c) 2020 ZAFENA AB * * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the * "Software"), to deal in the Software without restriction, including * without limitation the rights to use, copy, modify, merge, publish, * distribute, sublicense, and/or sell copies of the Software, and to * permit persons to whom the Software is furnished to do so, subject to * the following conditions: * * The above copyright notice and this permission notice shall be * included in all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #ifndef JAU_RINGBUFFER_HPP_ #define JAU_RINGBUFFER_HPP_ #include #include #include #include #include #include #include #include #include #include #include #include #include namespace jau { /** * Simple implementation of {@link ringbuffer_if}, * exposing lock-free * {@link #get() get*(..)} and {@link #put(Object) put*(..)} methods. *

* Implementation utilizes the Always Keep One Slot Open, * hence implementation maintains an internal array of capacity plus one! *

*

* Implementation is thread safe if: *

    *
  • {@link #get() get*(..)} operations from multiple threads.
  • *
  • {@link #put(Object) put*(..)} operations from multiple threads.
  • *
  • {@link #get() get*(..)} and {@link #put(Object) put*(..)} thread may be the same.
  • *
*

*

* Following methods acquire the global multi-read _and_ -write mutex: *

    *
  • {@link #resetFull(Object[])}
  • *
  • {@link #clear()}
  • *
  • {@link #growEmptyBuffer(Object[])}
  • *
*

*

* Characteristics: *

    *
  • Read position points to the last read element.
  • *
  • Write position points to the last written element.
  • *
* * * *
EmptywritePos == readPossize == 0
FullwritePos == readPos - 1size == capacity
*

* See also: *
 * - Sequentially Consistent (SC) ordering or SC-DRF (data race free) 
 * - std::memory_order 
 * 
*/ template class ringbuffer : public ringbuffer_if { private: /** SC atomic integral scalar jau::nsize_t. Memory-Model (MM) guaranteed sequential consistency (SC) between acquire (read) and release (write) */ typedef ordered_atomic sc_atomic_Size_type; /** Relaxed non-SC atomic integral scalar jau::nsize_t. Memory-Model (MM) only guarantees the atomic value, _no_ sequential consistency (SC) between acquire (read) and release (write). */ typedef ordered_atomic relaxed_atomic_Size_type; std::mutex syncRead, syncMultiRead; // Memory-Model (MM) guaranteed sequential consistency (SC) between acquire and release std::mutex syncWrite, syncMultiWrite; // ditto std::condition_variable cvRead; std::condition_variable cvWrite; /* final */ Size_type capacityPlusOne; // not final due to grow /* final */ T * array; // Synchronized due to MM's data-race-free SC (SC-DRF) between [atomic] acquire/release sc_atomic_Size_type readPos; // Memory-Model (MM) guaranteed sequential consistency (SC) between acquire (read) and release (write) sc_atomic_Size_type writePos; // ditto relaxed_atomic_Size_type size; // Non-SC atomic size, only atomic value itself is synchronized. T * newArray(const Size_type count) noexcept { return new T[count]; } void freeArray(T * a) noexcept { delete[] a; } void cloneFrom(const bool allocArrayAndCapacity, const ringbuffer & source) noexcept { if( allocArrayAndCapacity ) { capacityPlusOne = source.capacityPlusOne; if( nullptr != array ) { freeArray(array, true); } array = newArray(capacityPlusOne); } else if( capacityPlusOne != source.capacityPlusOne ) { throw InternalError("capacityPlusOne not equal: this "+toString()+", source "+source.toString(), E_FILE_LINE); } readPos = source.readPos; writePos = source.writePos; size = source.size; Size_type localWritePos = readPos; for(Size_type i=0; i capacityPlusOne-1 ) { // new blank resized array capacityPlusOne = copyFromCount + 1; array = newArray(capacityPlusOne); readPos = 0; writePos = 0; } Size_type localWritePos = writePos; for(Size_type i=0; i lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl const Size_type oldReadPos = readPos; // SC-DRF acquire atomic readPos, sync'ing with putImpl Size_type localReadPos = oldReadPos; if( localReadPos == writePos ) { if( blocking ) { std::unique_lock lockRead(syncRead); // SC-DRF w/ putImpl via same lock while( localReadPos == writePos ) { if( 0 == timeoutMS ) { cvRead.wait(lockRead); } else { std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now(); std::cv_status s = cvRead.wait_until(lockRead, t0 + std::chrono::milliseconds(timeoutMS)); if( std::cv_status::timeout == s && localReadPos == writePos ) { return nullelem; } } } } else { return nullelem; } } localReadPos = (localReadPos + 1) % capacityPlusOne; T r = array[localReadPos]; // SC-DRF if( !peek ) { array[localReadPos] = nullelem; { std::unique_lock lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock size--; readPos = localReadPos; // SC-DRF release atomic readPos cvWrite.notify_all(); // notify waiting putter } } else { readPos = oldReadPos; // SC-DRF release atomic readPos (complete acquire-release even @ peek) } return r; } bool putImpl(const T &e, const bool sameRef, const bool blocking, const int timeoutMS) noexcept { std::unique_lock lockMultiWrite(syncMultiWrite); // acquire syncMultiRead, _not_ sync'ing w/ getImpl Size_type localWritePos = writePos; // SC-DRF acquire atomic writePos, sync'ing with getImpl localWritePos = (localWritePos + 1) % capacityPlusOne; if( localWritePos == readPos ) { if( blocking ) { std::unique_lock lockWrite(syncWrite); // SC-DRF w/ getImpl via same lock while( localWritePos == readPos ) { if( 0 == timeoutMS ) { cvWrite.wait(lockWrite); } else { std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now(); std::cv_status s = cvWrite.wait_until(lockWrite, t0 + std::chrono::milliseconds(timeoutMS)); if( std::cv_status::timeout == s && localWritePos == readPos ) { return false; } } } } else { return false; } } if( !sameRef ) { array[localWritePos] = e; // SC-DRF } { std::unique_lock lockRead(syncRead); // SC-DRF w/ getImpl via same lock size++; writePos = localWritePos; // SC-DRF release atomic writePos cvRead.notify_all(); // notify waiting getter } return true; } Size_type dropImpl (const Size_type count) noexcept { // locks ringbuffer completely (read/write), hence no need for local copy nor wait/sync etc std::unique_lock lockMultiRead(syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops std::unique_lock lockMultiWrite(syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor std::lock(lockMultiRead, lockMultiWrite); const Size_type dropCount = std::min(count, size.load()); if( 0 == dropCount ) { return 0; } for(Size_type i=0; i[size "+std::to_string(size)+" / "+std::to_string(capacityPlusOne-1)+ ", writePos "+std::to_string(writePos)+", readPos "+std::to_string(readPos)+es+fs+"]"; } void dump(FILE *stream, std::string prefix) const noexcept override { fprintf(stream, "%s %s {\n", prefix.c_str(), toString().c_str()); for(Size_type i=0; i * Example for a 10 element Integer array: *
         *  Integer[] source = new Integer[10];
         *  // fill source with content ..
         *  ringbuffer rb = new ringbuffer(source);
         * 
*

*

* {@link #isFull()} returns true on the newly created full ring buffer. *

*

* Implementation will allocate an internal array with size of array copyFrom plus one, * and copy all elements from array copyFrom into the internal array. *

* @param copyFrom mandatory source array determining ring buffer's net {@link #capacity()} and initial content. * @throws IllegalArgumentException if copyFrom is nullptr */ ringbuffer(const std::vector & copyFrom) noexcept : capacityPlusOne(copyFrom.size() + 1), array(newArray(capacityPlusOne)), readPos(0), writePos(0), size(0) { resetImpl(copyFrom.data(), copyFrom.size()); } ringbuffer(const T * copyFrom, const Size_type copyFromSize) noexcept : capacityPlusOne(copyFromSize + 1), array(newArray(capacityPlusOne)), readPos(0), writePos(0), size(0) { resetImpl(copyFrom, copyFromSize); } /** * Create an empty ring buffer instance w/ the given net capacity. *

* Example for a 10 element Integer array: *

         *  ringbuffer rb = new ringbuffer(10, Integer[].class);
         * 
*

*

* {@link #isEmpty()} returns true on the newly created empty ring buffer. *

*

* Implementation will allocate an internal array of size capacity plus one. *

* @param arrayType the array type of the created empty internal array. * @param capacity the initial net capacity of the ring buffer */ ringbuffer(const Size_type capacity) noexcept : capacityPlusOne(capacity + 1), array(newArray(capacityPlusOne)), readPos(0), writePos(0), size(0) { } ~ringbuffer() noexcept { freeArray(array); } ringbuffer(const ringbuffer &_source) noexcept : capacityPlusOne(_source.capacityPlusOne), array(newArray(capacityPlusOne)), readPos(0), writePos(0), size(0) { std::unique_lock lockMultiReadS(_source.syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops std::unique_lock lockMultiWriteS(_source.syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor std::lock(lockMultiReadS, lockMultiWriteS); // *this instance does not exist yet cloneFrom(false, _source); } ringbuffer& operator=(const ringbuffer &_source) noexcept { std::unique_lock lockMultiReadS(_source.syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops std::unique_lock lockMultiWriteS(_source.syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor std::unique_lock lockMultiRead(syncMultiRead, std::defer_lock); // same for *this instance! std::unique_lock lockMultiWrite(syncMultiWrite, std::defer_lock); std::lock(lockMultiReadS, lockMultiWriteS, lockMultiRead, lockMultiWrite); if( this == &_source ) { return *this; } if( capacityPlusOne != _source.capacityPlusOne ) { cloneFrom(true, _source); } else { clearImpl(); // clear cloneFrom(false, _source); } return *this; } ringbuffer(ringbuffer &&o) noexcept = default; ringbuffer& operator=(ringbuffer &&o) noexcept = default; Size_type capacity() const noexcept override { return capacityPlusOne-1; } void clear() noexcept override { std::unique_lock lockMultiRead(syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops std::unique_lock lockMultiWrite(syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor std::lock(lockMultiRead, lockMultiWrite); clearImpl(); } void reset(const T * copyFrom, const Size_type copyFromCount) noexcept override { std::unique_lock lockMultiRead(syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops std::unique_lock lockMultiWrite(syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor std::lock(lockMultiRead, lockMultiWrite); resetImpl(copyFrom, copyFromCount); } void reset(const std::vector & copyFrom) noexcept override { std::unique_lock lockMultiRead(syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops std::unique_lock lockMultiWrite(syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor std::lock(lockMultiRead, lockMultiWrite); resetImpl(copyFrom.data(), copyFrom.size()); } Size_type getSize() const noexcept override { return size; } Size_type getFreeSlots() const noexcept override { return capacityPlusOne - 1 - size; } bool isEmpty() const noexcept override { return 0 == size; /* writePos == readPos */ } bool isEmpty2() const noexcept { return writePos == readPos; /* 0 == size */ } bool isFull() const noexcept override { return capacityPlusOne - 1 <= size; /* ( writePos + 1 ) % capacityPlusOne == readPos <==> capacityPlusOne - 1 == size */; } bool isFull2() const noexcept { return ( writePos + 1 ) % capacityPlusOne == readPos; /* capacityPlusOne - 1 == size */; } T get() noexcept override { return getImpl(false, false, 0); } T getBlocking(const int timeoutMS=0) noexcept override { return getImpl(true, false, timeoutMS); } T peek() noexcept override { return getImpl(false, true, 0); } T peekBlocking(const int timeoutMS=0) noexcept override { return getImpl(true, true, timeoutMS); } Size_type drop(const Size_type count) noexcept override { return dropImpl(count); } bool put(const T & e) noexcept override { return putImpl(e, false, false, 0); } bool putBlocking(const T & e, const int timeoutMS=0) noexcept override { return !putImpl(e, false, true, timeoutMS); } bool putSame() noexcept override { return putImpl(nullelem, true, false, 0); } bool putSameBlocking(const int timeoutMS=0) noexcept override { return putImpl(nullelem, true, true, timeoutMS); } void waitForFreeSlots(const Size_type count) noexcept override { std::unique_lock lockMultiWrite(syncMultiWrite, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops std::unique_lock lockRead(syncRead, std::defer_lock); // otherwise RAII-style relinquish via destructor std::lock(lockMultiWrite, lockRead); while( capacityPlusOne - 1 - size < count ) { cvRead.wait(lockRead); } } void recapacity(const Size_type newCapacity) override { std::unique_lock lockMultiRead(syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops std::unique_lock lockMultiWrite(syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor std::lock(lockMultiRead, lockMultiWrite); const Size_type _size = size; // fast access if( capacityPlusOne == newCapacity+1 ) { return; } if( _size > newCapacity ) { throw IllegalArgumentException("amount "+std::to_string(newCapacity)+" < size, "+toString(), E_FILE_LINE); } // save current data Size_type oldCapacityPlusOne = capacityPlusOne; T * oldArray = array; Size_type oldReadPos = readPos; // new blank resized array capacityPlusOne = newCapacity + 1; array = newArray(capacityPlusOne); readPos = 0; writePos = 0; // copy saved data if( nullptr != oldArray && 0 < _size ) { Size_type localWritePos = writePos; for(Size_type i=0; i<_size; i++) { localWritePos = (localWritePos + 1) % capacityPlusOne; oldReadPos = (oldReadPos + 1) % oldCapacityPlusOne; array[localWritePos] = oldArray[oldReadPos]; } writePos = localWritePos; } freeArray(oldArray); // and release } }; } /* namespace jau */ #endif /* JAU_RINGBUFFER_HPP_ */