/* * Author: Sven Gothel * Copyright (c) 2020 Gothel Software e.K. * Copyright (c) 2020 ZAFENA AB * * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the * "Software"), to deal in the Software without restriction, including * without limitation the rights to use, copy, modify, merge, publish, * distribute, sublicense, and/or sell copies of the Software, and to * permit persons to whom the Software is furnished to do so, subject to * the following conditions: * * The above copyright notice and this permission notice shall be * included in all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #ifndef LFRINGBUFFER_HPP_ #define LFRINGBUFFER_HPP_ #include #include #include #include #include #include #include #include #include #include "BasicTypes.hpp" #include "Ringbuffer.hpp" namespace direct_bt { /** * Simple implementation of {@link Ringbuffer}, * exposing lock-free * {@link #get() get*(..)} and {@link #put(Object) put*(..)} methods. *

* Implementation utilizes the Always Keep One Slot Open, * hence implementation maintains an internal array of capacity plus one! *

*

* Implementation is thread safe if: *

    *
  • {@link #get() get*(..)} operations from multiple threads.
  • *
  • {@link #put(Object) put*(..)} operations from multiple threads.
  • *
  • {@link #get() get*(..)} and {@link #put(Object) put*(..)} thread may be the same.
  • *
*

*

* Following methods use acquire the global multi-read and -write mutex: *

    *
  • {@link #resetFull(Object[])}
  • *
  • {@link #clear()}
  • *
  • {@link #growEmptyBuffer(Object[])}
  • *
*

*

* Characteristics: *

    *
  • Read position points to the last read element.
  • *
  • Write position points to the last written element.
  • *
* * * *
EmptywritePos == readPossize == 0
FullwritePos == readPos - 1size == capacity
*

*/ template class LFRingbuffer : public Ringbuffer { private: std::mutex syncRead, syncMultiRead; std::mutex syncWrite, syncMultiWrite; std::condition_variable cvRead; std::condition_variable cvWrite; /* final */ int capacityPlusOne; // not final due to grow /* final */ T * array; // not final due to grow std::atomic readPos; std::atomic writePos; std::atomic size; T * newArray(const int count) { return new T[count]; } void freeArray(T * a) { delete[] a; } void cloneFrom(const bool allocArrayAndCapacity, const LFRingbuffer & source) { if( allocArrayAndCapacity ) { capacityPlusOne = source.capacityPlusOne; if( nullptr != array ) { freeArray(array, true); } array = newArray(capacityPlusOne); } else if( capacityPlusOne != source.capacityPlusOne ) { throw InternalError("capacityPlusOne not equal: this "+toString()+", source "+source.toString(), E_FILE_LINE); } readPos = source.readPos; writePos = source.writePos; size = source.size; int localWritePos = readPos; for(int i=0; i capacityPlusOne-1 ) { throw IllegalArgumentException("copyFrom array length "+std::to_string(copyFromCount)+" > capacity "+toString(), E_FILE_LINE); } int localWritePos = writePos; for(int i=0; i lockMultiRead(syncMultiRead); // RAII-style acquire and relinquish via destructor int localReadPos = readPos; if( localReadPos == writePos ) { if( blocking ) { std::unique_lock lockRead(syncRead); // RAII-style acquire and relinquish via destructor while( localReadPos == writePos ) { if( 0 == timeoutMS ) { cvRead.wait(lockRead); } else { std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now(); std::cv_status s = cvRead.wait_until(lockRead, t0 + std::chrono::milliseconds(timeoutMS)); if( std::cv_status::timeout == s && localReadPos == writePos ) { return nullelem; } } } } else { return nullelem; } } localReadPos = (localReadPos + 1) % capacityPlusOne; T r = array[localReadPos]; if( !peek ) { array[localReadPos] = nullelem; { std::unique_lock lockWrite(syncWrite); // RAII-style acquire and relinquish via destructor size--; readPos = localReadPos; cvWrite.notify_all(); // notify waiting putter } } return r; } int dropImpl (const int count) { // locks ringbuffer completely (read/write), hence no need for local copy nor wait/sync etc std::unique_lock lockMultiRead(syncMultiRead); // RAII-style acquire and relinquish via destructor std::unique_lock lockMultiWrite(syncMultiWrite); // ditto const int dropCount = std::min(count, size.load()); if( 0 == dropCount ) { return 0; } for(int i=0; i lockMultiWrite(syncMultiWrite); // RAII-style acquire and relinquish via destructor int localWritePos = writePos; localWritePos = (localWritePos + 1) % capacityPlusOne; if( localWritePos == readPos ) { if( blocking ) { std::unique_lock lockWrite(syncWrite); // RAII-style acquire and relinquish via destructor while( localWritePos == readPos ) { if( 0 == timeoutMS ) { cvWrite.wait(lockWrite); } else { std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now(); std::cv_status s = cvWrite.wait_until(lockWrite, t0 + std::chrono::milliseconds(timeoutMS)); if( std::cv_status::timeout == s && localWritePos == readPos ) { return false; } } } } else { return false; } } if( !sameRef ) { array[localWritePos] = e; } { std::unique_lock lockRead(syncRead); // RAII-style acquire and relinquish via destructor size++; writePos = localWritePos; cvRead.notify_all(); // notify waiting getter } return true; } public: std::string toString() const override { const std::string es = isEmpty() ? ", empty" : ""; const std::string fs = isFull() ? ", full" : ""; return "LFRingbuffer[size "+std::to_string(size)+" / "+std::to_string(capacityPlusOne-1)+ ", writePos "+std::to_string(writePos)+", readPos "+std::to_string(readPos)+es+fs+"]"; } void dump(FILE *stream, std::string prefix) const override { fprintf(stream, "%s %s {\n", prefix.c_str(), toString().c_str()); for(int i=0; i * Example for a 10 element Integer array: *
         *  Integer[] source = new Integer[10];
         *  // fill source with content ..
         *  Ringbuffer rb = new LFRingbuffer(source);
         * 
*

*

* {@link #isFull()} returns true on the newly created full ring buffer. *

*

* Implementation will allocate an internal array with size of array copyFrom plus one, * and copy all elements from array copyFrom into the internal array. *

* @param copyFrom mandatory source array determining ring buffer's net {@link #capacity()} and initial content. * @throws IllegalArgumentException if copyFrom is nullptr */ LFRingbuffer(const std::vector & copyFrom) /* throws IllegalArgumentException */ : capacityPlusOne(copyFrom.size() + 1), array(newArray(capacityPlusOne)), readPos(0), writePos(0), size(0) { resetImpl(copyFrom.data(), copyFrom.size()); } LFRingbuffer(const T * copyFrom, const int copyFromSize) /* throws IllegalArgumentException */ : capacityPlusOne(copyFromSize + 1), array(newArray(capacityPlusOne)), readPos(0), writePos(0), size(0) { resetImpl(copyFrom, copyFromSize); } /** * Create an empty ring buffer instance w/ the given net capacity. *

* Example for a 10 element Integer array: *

         *  Ringbuffer rb = new LFRingbuffer(10, Integer[].class);
         * 
*

*

* {@link #isEmpty()} returns true on the newly created empty ring buffer. *

*

* Implementation will allocate an internal array of size capacity plus one. *

* @param arrayType the array type of the created empty internal array. * @param capacity the initial net capacity of the ring buffer */ LFRingbuffer(const int capacity) : capacityPlusOne(capacity + 1), array(newArray(capacityPlusOne)), readPos(0), writePos(0), size(0) { } ~LFRingbuffer() { freeArray(array); } LFRingbuffer(const LFRingbuffer &_source) noexcept : capacityPlusOne(_source.capacityPlusOne), array(newArray(capacityPlusOne)), readPos(0), writePos(0), size(0) { std::unique_lock lockMultiReadS(_source.syncMultiRead); std::unique_lock lockMultiWriteS(_source.syncMultiWrite); std::unique_lock lockMultiRead(syncMultiRead); std::unique_lock lockMultiWrite(syncMultiWrite); cloneFrom(false, _source); } LFRingbuffer& operator=(const LFRingbuffer &_source) { std::unique_lock lockMultiReadS(_source.syncMultiRead); std::unique_lock lockMultiWriteS(_source.syncMultiWrite); std::unique_lock lockMultiRead(syncMultiRead); std::unique_lock lockMultiWrite(syncMultiWrite); if( this == &_source ) { return *this; } if( capacityPlusOne != _source.capacityPlusOne ) { cloneFrom(true, _source); } else { resetImpl(nullptr, 0 /* empty, nothing to copy */ ); // clear cloneFrom(false, _source); } return *this; } LFRingbuffer(LFRingbuffer &&o) noexcept = default; LFRingbuffer& operator=(LFRingbuffer &&o) noexcept = default; int capacity() const override { return capacityPlusOne-1; } void clear() override { std::unique_lock lockMultiRead(syncMultiRead); // RAII-style acquire and relinquish via destructor std::unique_lock lockMultiWrite(syncMultiWrite); // ditto resetImpl(nullptr, 0 /* empty, nothing to copy */ ); } void reset(const T * copyFrom, const int copyFromCount) override { std::unique_lock lockMultiRead(syncMultiRead); // RAII-style acquire and relinquish via destructor std::unique_lock lockMultiWrite(syncMultiWrite); // ditto resetImpl(copyFrom, copyFromCount); } void reset(const std::vector & copyFrom) override { std::unique_lock lockMultiRead(syncMultiRead); // RAII-style acquire and relinquish via destructor std::unique_lock lockMultiWrite(syncMultiWrite); // ditto resetImpl(copyFrom.data(), copyFrom.size()); } int getSize() const override { return size; } int getFreeSlots() const override { return capacityPlusOne - 1 - size; } bool isEmpty() const override { return writePos == readPos; /* 0 == size */ } bool isFull() const override { return ( writePos + 1 ) % capacityPlusOne == readPos ; /* capacityPlusOne - 1 == size */; } T get() override { return getImpl(false, false, 0); } T getBlocking(const int timeoutMS=0) override /* throws InterruptedException */ { return getImpl(true, false, timeoutMS); } T peek() override { return getImpl(false, true, 0); } T peekBlocking(const int timeoutMS=0) override /* throws InterruptedException */ { return getImpl(true, true, timeoutMS); } int drop(const int count) override { return dropImpl(count); } bool put(const T & e) override { return putImpl(e, false, false, 0); } bool putBlocking(const T & e, const int timeoutMS=0) override { return !putImpl(e, false, true, timeoutMS); } bool putSame() override { return putImpl(nullelem, true, false, 0); } bool putSameBlocking(const int timeoutMS=0) override { return putImpl(nullelem, true, true, timeoutMS); } void waitForFreeSlots(const int count) override /* throws InterruptedException */ { std::unique_lock lockMultiWrite(syncMultiWrite); // RAII-style acquire and relinquish via destructor std::unique_lock lockRead(syncRead); // RAII-style acquire and relinquish via destructor while( capacityPlusOne - 1 - size < count ) { cvRead.wait(lockRead); } } void recapacity(const int newCapacity) override { std::unique_lock lockMultiRead(syncMultiRead); std::unique_lock lockMultiWrite(syncMultiWrite); if( capacityPlusOne == newCapacity+1 ) { return; } if( size > newCapacity ) { throw IllegalArgumentException("amount "+std::to_string(newCapacity)+" < size, "+toString(), E_FILE_LINE); } if( 0 > newCapacity ) { throw IllegalArgumentException("amount "+std::to_string(newCapacity)+" < 0, "+toString(), E_FILE_LINE); } // save current data int oldCapacityPlusOne = capacityPlusOne; T * oldArray = array; int oldReadPos = readPos; // new blank resized array capacityPlusOne = newCapacity + 1; array = newArray(capacityPlusOne); readPos = 0; writePos = 0; const int _size = size.load(); // fast access // copy saved data if( nullptr != oldArray && 0 < _size ) { int localWritePos = writePos; for(int i=0; i<_size; i++) { localWritePos = (localWritePos + 1) % capacityPlusOne; oldReadPos = (oldReadPos + 1) % oldCapacityPlusOne; array[localWritePos] = oldArray[oldReadPos]; } writePos = localWritePos; } freeArray(oldArray); // and release } }; } /* namespace direct_bt */ #endif /* LFRINGBUFFER_HPP_ */