/*
* Author: Sven Gothel
* Copyright (c) 2020 Gothel Software e.K.
* Copyright (c) 2020 ZAFENA AB
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef LFRINGBUFFER_HPP_
#define LFRINGBUFFER_HPP_
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "BasicTypes.hpp"
#include "Ringbuffer.hpp"
namespace direct_bt {
/**
* Simple implementation of {@link Ringbuffer},
* exposing lock-free
* {@link #get() get*(..)} and {@link #put(Object) put*(..)} methods.
*
* Implementation utilizes the Always Keep One Slot Open,
* hence implementation maintains an internal array of capacity
plus one!
*
*
* Implementation is thread safe if:
*
* - {@link #get() get*(..)} operations from multiple threads.
* - {@link #put(Object) put*(..)} operations from multiple threads.
* - {@link #get() get*(..)} and {@link #put(Object) put*(..)} thread may be the same.
*
*
*
* Following methods use acquire the global multi-read and -write mutex:
*
* - {@link #resetFull(Object[])}
* - {@link #clear()}
* - {@link #growEmptyBuffer(Object[])}
*
*
*
* Characteristics:
*
* - Read position points to the last read element.
* - Write position points to the last written element.
*
*
* Empty | writePos == readPos | size == 0 |
* Full | writePos == readPos - 1 | size == capacity |
*
*
*/
template class LFRingbuffer : public Ringbuffer {
private:
std::mutex syncRead, syncMultiRead;
std::mutex syncWrite, syncMultiWrite;
std::condition_variable cvRead;
std::condition_variable cvWrite;
/* final */ int capacityPlusOne; // not final due to grow
/* final */ T * array; // not final due to grow
std::atomic readPos;
std::atomic writePos;
std::atomic size;
T * newArray(const int count) {
return new T[count];
}
void freeArray(T * a) {
delete[] a;
}
void cloneFrom(const bool allocArrayAndCapacity, const LFRingbuffer & source) {
if( allocArrayAndCapacity ) {
capacityPlusOne = source.capacityPlusOne;
if( nullptr != array ) {
freeArray(array, true);
}
array = newArray(capacityPlusOne);
} else if( capacityPlusOne != source.capacityPlusOne ) {
throw InternalError("capacityPlusOne not equal: this "+toString()+", source "+source.toString(), E_FILE_LINE);
}
readPos = source.readPos;
writePos = source.writePos;
size = source.size;
int localWritePos = readPos;
for(int i=0; i capacityPlusOne-1 ) {
throw IllegalArgumentException("copyFrom array length "+std::to_string(copyFromCount)+" > capacity "+toString(), E_FILE_LINE);
}
int localWritePos = writePos;
for(int i=0; i lockMultiRead(syncMultiRead); // RAII-style acquire and relinquish via destructor
int localReadPos = readPos;
if( localReadPos == writePos ) {
if( blocking ) {
std::unique_lock lockRead(syncRead); // RAII-style acquire and relinquish via destructor
while( localReadPos == writePos ) {
if( 0 == timeoutMS ) {
cvRead.wait(lockRead);
} else {
std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now();
std::cv_status s = cvRead.wait_until(lockRead, t0 + std::chrono::milliseconds(timeoutMS));
if( std::cv_status::timeout == s && localReadPos == writePos ) {
return nullelem;
}
}
}
} else {
return nullelem;
}
}
localReadPos = (localReadPos + 1) % capacityPlusOne;
T r = array[localReadPos];
if( !peek ) {
array[localReadPos] = nullelem;
{
std::unique_lock lockWrite(syncWrite); // RAII-style acquire and relinquish via destructor
size--;
readPos = localReadPos;
cvWrite.notify_all(); // notify waiting putter
}
}
return r;
}
int dropImpl (const int count) {
// locks ringbuffer completely (read/write), hence no need for local copy nor wait/sync etc
std::unique_lock lockMultiRead(syncMultiRead); // RAII-style acquire and relinquish via destructor
std::unique_lock lockMultiWrite(syncMultiWrite); // ditto
const int dropCount = std::min(count, size.load());
if( 0 == dropCount ) {
return 0;
}
for(int i=0; i lockMultiWrite(syncMultiWrite); // RAII-style acquire and relinquish via destructor
int localWritePos = writePos;
localWritePos = (localWritePos + 1) % capacityPlusOne;
if( localWritePos == readPos ) {
if( blocking ) {
std::unique_lock lockWrite(syncWrite); // RAII-style acquire and relinquish via destructor
while( localWritePos == readPos ) {
if( 0 == timeoutMS ) {
cvWrite.wait(lockWrite);
} else {
std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now();
std::cv_status s = cvWrite.wait_until(lockWrite, t0 + std::chrono::milliseconds(timeoutMS));
if( std::cv_status::timeout == s && localWritePos == readPos ) {
return false;
}
}
}
} else {
return false;
}
}
if( !sameRef ) {
array[localWritePos] = e;
}
{
std::unique_lock lockRead(syncRead); // RAII-style acquire and relinquish via destructor
size++;
writePos = localWritePos;
cvRead.notify_all(); // notify waiting getter
}
return true;
}
public:
std::string toString() const override {
const std::string es = isEmpty() ? ", empty" : "";
const std::string fs = isFull() ? ", full" : "";
return "LFRingbuffer>[size "+std::to_string(size)+" / "+std::to_string(capacityPlusOne-1)+
", writePos "+std::to_string(writePos)+", readPos "+std::to_string(readPos)+es+fs+"]";
}
void dump(FILE *stream, std::string prefix) const override {
fprintf(stream, "%s %s {\n", prefix.c_str(), toString().c_str());
for(int i=0; i
* Example for a 10 element Integer array:
*
* Integer[] source = new Integer[10];
* // fill source with content ..
* Ringbuffer rb = new LFRingbuffer(source);
*
*
*
* {@link #isFull()} returns true on the newly created full ring buffer.
*
*
* Implementation will allocate an internal array with size of array copyFrom
plus one,
* and copy all elements from array copyFrom
into the internal array.
*
* @param copyFrom mandatory source array determining ring buffer's net {@link #capacity()} and initial content.
* @throws IllegalArgumentException if copyFrom
is nullptr
*/
LFRingbuffer(const std::vector & copyFrom) /* throws IllegalArgumentException */
: capacityPlusOne(copyFrom.size() + 1), array(newArray(capacityPlusOne)),
readPos(0), writePos(0), size(0)
{
resetImpl(copyFrom.data(), copyFrom.size());
}
LFRingbuffer(const T * copyFrom, const int copyFromSize) /* throws IllegalArgumentException */
: capacityPlusOne(copyFromSize + 1), array(newArray(capacityPlusOne)),
readPos(0), writePos(0), size(0)
{
resetImpl(copyFrom, copyFromSize);
}
/**
* Create an empty ring buffer instance w/ the given net capacity
.
*
* Example for a 10 element Integer array:
*
* Ringbuffer rb = new LFRingbuffer(10, Integer[].class);
*
*
*
* {@link #isEmpty()} returns true on the newly created empty ring buffer.
*
*
* Implementation will allocate an internal array of size capacity
plus one.
*
* @param arrayType the array type of the created empty internal array.
* @param capacity the initial net capacity of the ring buffer
*/
LFRingbuffer(const int capacity)
: capacityPlusOne(capacity + 1), array(newArray(capacityPlusOne)),
readPos(0), writePos(0), size(0)
{ }
~LFRingbuffer() {
freeArray(array);
}
LFRingbuffer(const LFRingbuffer &_source) noexcept
: capacityPlusOne(_source.capacityPlusOne), array(newArray(capacityPlusOne)),
readPos(0), writePos(0), size(0)
{
std::unique_lock lockMultiReadS(_source.syncMultiRead);
std::unique_lock lockMultiWriteS(_source.syncMultiWrite);
std::unique_lock lockMultiRead(syncMultiRead);
std::unique_lock lockMultiWrite(syncMultiWrite);
cloneFrom(false, _source);
}
LFRingbuffer& operator=(const LFRingbuffer &_source) {
std::unique_lock lockMultiReadS(_source.syncMultiRead);
std::unique_lock lockMultiWriteS(_source.syncMultiWrite);
std::unique_lock lockMultiRead(syncMultiRead);
std::unique_lock lockMultiWrite(syncMultiWrite);
if( this == &_source ) {
return *this;
}
if( capacityPlusOne != _source.capacityPlusOne ) {
cloneFrom(true, _source);
} else {
resetImpl(nullptr, 0 /* empty, nothing to copy */ ); // clear
cloneFrom(false, _source);
}
return *this;
}
LFRingbuffer(LFRingbuffer &&o) noexcept = default;
LFRingbuffer& operator=(LFRingbuffer &&o) noexcept = default;
int capacity() const override { return capacityPlusOne-1; }
void clear() override {
std::unique_lock lockMultiRead(syncMultiRead); // RAII-style acquire and relinquish via destructor
std::unique_lock lockMultiWrite(syncMultiWrite); // ditto
resetImpl(nullptr, 0 /* empty, nothing to copy */ );
}
void reset(const T * copyFrom, const int copyFromCount) override {
std::unique_lock lockMultiRead(syncMultiRead); // RAII-style acquire and relinquish via destructor
std::unique_lock lockMultiWrite(syncMultiWrite); // ditto
resetImpl(copyFrom, copyFromCount);
}
void reset(const std::vector & copyFrom) override {
std::unique_lock lockMultiRead(syncMultiRead); // RAII-style acquire and relinquish via destructor
std::unique_lock lockMultiWrite(syncMultiWrite); // ditto
resetImpl(copyFrom.data(), copyFrom.size());
}
int getSize() const override { return size; }
int getFreeSlots() const override { return capacityPlusOne - 1 - size; }
bool isEmpty() const override { return writePos == readPos; /* 0 == size */ }
bool isFull() const override { return ( writePos + 1 ) % capacityPlusOne == readPos ; /* capacityPlusOne - 1 == size */; }
T get() override { return getImpl(false, false, 0); }
T getBlocking(const int timeoutMS=0) override /* throws InterruptedException */ {
return getImpl(true, false, timeoutMS);
}
T peek() override {
return getImpl(false, true, 0);
}
T peekBlocking(const int timeoutMS=0) override /* throws InterruptedException */ {
return getImpl(true, true, timeoutMS);
}
int drop(const int count) override {
return dropImpl(count);
}
bool put(const T & e) override {
return putImpl(e, false, false, 0);
}
bool putBlocking(const T & e, const int timeoutMS=0) override {
return !putImpl(e, false, true, timeoutMS);
}
bool putSame() override {
return putImpl(nullelem, true, false, 0);
}
bool putSameBlocking(const int timeoutMS=0) override {
return putImpl(nullelem, true, true, timeoutMS);
}
void waitForFreeSlots(const int count) override /* throws InterruptedException */ {
std::unique_lock lockMultiWrite(syncMultiWrite); // RAII-style acquire and relinquish via destructor
std::unique_lock lockRead(syncRead); // RAII-style acquire and relinquish via destructor
while( capacityPlusOne - 1 - size < count ) {
cvRead.wait(lockRead);
}
}
void recapacity(const int newCapacity) override {
std::unique_lock lockMultiRead(syncMultiRead);
std::unique_lock lockMultiWrite(syncMultiWrite);
if( capacityPlusOne == newCapacity+1 ) {
return;
}
if( size > newCapacity ) {
throw IllegalArgumentException("amount "+std::to_string(newCapacity)+" < size, "+toString(), E_FILE_LINE);
}
if( 0 > newCapacity ) {
throw IllegalArgumentException("amount "+std::to_string(newCapacity)+" < 0, "+toString(), E_FILE_LINE);
}
// save current data
int oldCapacityPlusOne = capacityPlusOne;
T * oldArray = array;
int oldReadPos = readPos;
// new blank resized array
capacityPlusOne = newCapacity + 1;
array = newArray(capacityPlusOne);
readPos = 0;
writePos = 0;
const int _size = size.load(); // fast access
// copy saved data
if( nullptr != oldArray && 0 < _size ) {
int localWritePos = writePos;
for(int i=0; i<_size; i++) {
localWritePos = (localWritePos + 1) % capacityPlusOne;
oldReadPos = (oldReadPos + 1) % oldCapacityPlusOne;
array[localWritePos] = oldArray[oldReadPos];
}
writePos = localWritePos;
}
freeArray(oldArray); // and release
}
};
} /* namespace direct_bt */
#endif /* LFRINGBUFFER_HPP_ */