aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSven Gothel <[email protected]>2021-06-19 03:56:30 +0200
committerSven Gothel <[email protected]>2021-06-19 03:56:30 +0200
commit4d5c2ff4886fa3e6a693ee32883d0fafb998c1a1 (patch)
treea5ad1bac130d70f12970ece15ce63cd0ba1592b5
parent68b051c15b77ecc5a3421c42e51105b87f67802a (diff)
ringbuffer::get() and ::getBlocking(): Pass size of dest buffer and minimum count to wait for, return actual received elementsv0.4.0
Receiving `min(dest_len, getSize()>=min_count)` elements is most efficient for receiver code. This is aligned with new/changed 'waitForElements(..)' and 'waitForFreeSlots(..)' methods.
-rw-r--r--include/jau/ringbuffer.hpp47
-rw-r--r--test/test_lfringbuffer01.cpp2
-rw-r--r--test/test_lfringbuffer02.cpp2
-rw-r--r--test/test_lfringbuffer03.cpp2
-rw-r--r--test/test_lfringbuffer11.cpp2
-rw-r--r--test/test_lfringbuffer12.cpp2
-rw-r--r--test/test_lfringbuffer13.cpp2
7 files changed, 33 insertions, 26 deletions
diff --git a/include/jau/ringbuffer.hpp b/include/jau/ringbuffer.hpp
index 2c6eaff..05edec6 100644
--- a/include/jau/ringbuffer.hpp
+++ b/include/jau/ringbuffer.hpp
@@ -349,23 +349,27 @@ class ringbuffer {
return r;
}
- bool moveOutImpl(Value_type *dest, const Size_type count, const bool blocking, const int timeoutMS) noexcept {
+ Size_type moveOutImpl(Value_type *dest, const Size_type dest_len, const Size_type min_count_, const bool blocking, const int timeoutMS) noexcept {
std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl
+ const Size_type min_count = std::min(dest_len, min_count_);
Value_type *iter_out = dest;
- if( count >= capacityPlusOne ) {
- return false;
+ if( min_count >= capacityPlusOne ) {
+ return 0;
+ }
+ if( 0 == min_count ) {
+ return 0;
}
const Size_type oldReadPos = readPos; // SC-DRF acquire atomic readPos, sync'ing with putImpl
Size_type localReadPos = oldReadPos;
Size_type available = getSize();
- if( count > available ) {
+ if( min_count > available ) {
if( blocking ) {
std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock
available = getSize();
- while( count > available ) {
+ while( min_count > available ) {
if( 0 == timeoutMS ) {
cvWrite.wait(lockWrite);
available = getSize();
@@ -373,15 +377,17 @@ class ringbuffer {
std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now();
std::cv_status s = cvWrite.wait_until(lockWrite, t0 + std::chrono::milliseconds(timeoutMS));
available = getSize();
- if( std::cv_status::timeout == s && count > available ) {
- return false;
+ if( std::cv_status::timeout == s && min_count > available ) {
+ return 0;
}
}
}
} else {
- return false;
+ return 0;
}
}
+ const Size_type count = std::min(dest_len, available);
+
/**
* Empty [RW][][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ] ; W==R
* Avail [ ][ ][R][.][.][.][.][W][ ][ ][ ][ ][ ][ ][ ] ; W > R
@@ -444,7 +450,7 @@ class ringbuffer {
readPos = localReadPos; // SC-DRF release atomic readPos
cvRead.notify_all(); // notify waiting putter
}
- return true;
+ return count;
}
bool dropImpl (const Size_type count, const bool blocking, const int timeoutMS) noexcept {
@@ -1057,7 +1063,7 @@ class ringbuffer {
}
/**
- * Dequeues the oldest enqueued count elements by copying them into the given consecutive 'dest' storage.
+ * Dequeues the oldest enqueued `min(dest_len, getSize()>=min_count)` elements by copying them into the given consecutive 'dest' storage.
* <p>
* The returned ring buffer slot will be set to <code>nullelem</code> to release the reference
* and move ownership to the caller.
@@ -1066,15 +1072,16 @@ class ringbuffer {
* Method is non blocking and returns immediately;.
* </p>
* @param dest pointer to first storage element of `count` consecutive elements.
- * @param count number of consecutive elements to get
- * @return true if successful, otherwise false
+ * @param dest_len number of consecutive elements in dest and maximum number of elements to get
+ * @param min_count minimum number of consecutive elements to get
+ * @return actual number of elements received
*/
- bool get(Value_type *dest, const Size_type count) noexcept {
- return moveOutImpl(dest, count, false, 0);
+ Size_type get(Value_type *dest, const Size_type dest_len, const Size_type min_count) noexcept {
+ return moveOutImpl(dest, dest_len, min_count, false, 0);
}
/**
- * Dequeues the oldest enqueued count elements by copying them into the given consecutive 'dest' storage.
+ * Dequeues the oldest enqueued `min(dest_len, getSize()>=min_count)` elements by copying them into the given consecutive 'dest' storage.
* <p>
* The returned ring buffer slot will be set to <code>nullelem</code> to release the reference
* and move ownership to the caller.
@@ -1085,13 +1092,13 @@ class ringbuffer {
* Otherwise this methods blocks for the given milliseconds.
* </p>
* @param dest pointer to first storage element of `count` consecutive elements.
- * @param count number of consecutive elements to get
+ * @param dest_len number of consecutive elements in dest and maximum number of elements to get
+ * @param min_count minimum number of consecutive elements to get
* @param timeoutMS
- * @return true if successful, otherwise false
- * @return true if successful, otherwise false in case timeout occurred or otherwise.
+ * @return actual number of elements received
*/
- bool getBlocking(Value_type *dest, const Size_type count, const int timeoutMS=0) noexcept {
- return moveOutImpl(dest, count, true, timeoutMS);
+ Size_type getBlocking(Value_type *dest, const Size_type dest_len, const Size_type min_count, const int timeoutMS=0) noexcept {
+ return moveOutImpl(dest, dest_len, min_count, true, timeoutMS);
}
/**
diff --git a/test/test_lfringbuffer01.cpp b/test/test_lfringbuffer01.cpp
index e6a297c..06b03c3 100644
--- a/test/test_lfringbuffer01.cpp
+++ b/test/test_lfringbuffer01.cpp
@@ -113,7 +113,7 @@ class TestRingbuffer01 {
REQUIRE_MSG("not empty "+rb.toString(), !rb.isEmpty());
std::vector<TrivialType> array(len);
- REQUIRE_MSG("get-range of "+std::to_string(array.size())+" elem in "+rb.toString(), rb.get( &(*array.begin()), len) );
+ REQUIRE_MSG("get-range of "+std::to_string(array.size())+" elem in "+rb.toString(), len==rb.get( &(*array.begin()), len, len) );
REQUIRE_MSG("size "+rb.toString(), preSize-len == rb.getSize());
REQUIRE_MSG("free slots after reading "+std::to_string(len)+": "+rb.toString(), rb.getFreeSlots()>= len);
diff --git a/test/test_lfringbuffer02.cpp b/test/test_lfringbuffer02.cpp
index 07e4ac2..39d2c23 100644
--- a/test/test_lfringbuffer02.cpp
+++ b/test/test_lfringbuffer02.cpp
@@ -134,7 +134,7 @@ class TestRingbuffer02 {
REQUIRE_MSG("not empty "+rb.toString(), !rb.isEmpty());
std::vector<TrivialType> array(len);
- REQUIRE_MSG("get-range of "+std::to_string(array.size())+" elem in "+rb.toString(), rb.get( &(*array.begin()), len) );
+ REQUIRE_MSG("get-range of "+std::to_string(array.size())+" elem in "+rb.toString(), len==rb.get( &(*array.begin()), len, len) );
REQUIRE_MSG("size "+rb.toString(), preSize-len == rb.getSize());
REQUIRE_MSG("free slots after reading "+std::to_string(len)+": "+rb.toString(), rb.getFreeSlots()>= len);
diff --git a/test/test_lfringbuffer03.cpp b/test/test_lfringbuffer03.cpp
index 3cd1bc1..71926fe 100644
--- a/test/test_lfringbuffer03.cpp
+++ b/test/test_lfringbuffer03.cpp
@@ -134,7 +134,7 @@ class TestRingbuffer03 {
REQUIRE_MSG("not empty "+rb.toString(), !rb.isEmpty());
std::vector<SharedType> array(len);
- REQUIRE_MSG("get-range of "+std::to_string(array.size())+" elem in "+rb.toString(), rb.get( &(*array.begin()), len) );
+ REQUIRE_MSG("get-range of "+std::to_string(array.size())+" elem in "+rb.toString(), len==rb.get( &(*array.begin()), len, len) );
REQUIRE_MSG("size "+rb.toString(), preSize-len == rb.getSize());
REQUIRE_MSG("free slots after reading "+std::to_string(len)+": "+rb.toString(), rb.getFreeSlots()>= len);
diff --git a/test/test_lfringbuffer11.cpp b/test/test_lfringbuffer11.cpp
index cd48764..ea4adbc 100644
--- a/test/test_lfringbuffer11.cpp
+++ b/test/test_lfringbuffer11.cpp
@@ -89,7 +89,7 @@ class TestRingbuffer11 {
// INFO_STR, INFO: Not thread safe yet
// INFO_STR(msg+": Created / " + rb->toString());
std::vector<TrivialType> array(len);
- REQUIRE_MSG("get-range of "+std::to_string(array.size())+" elem in "+rb->toString(), rb->getBlocking( &(*array.begin()), len) );
+ REQUIRE_MSG("get-range of "+std::to_string(array.size())+" elem in "+rb->toString(), len==rb->getBlocking( &(*array.begin()), len, len) );
for(jau::nsize_t i=0; i<len; i++) {
TrivialType svI = array[i];
diff --git a/test/test_lfringbuffer12.cpp b/test/test_lfringbuffer12.cpp
index 6e9ba88..734cb43 100644
--- a/test/test_lfringbuffer12.cpp
+++ b/test/test_lfringbuffer12.cpp
@@ -108,7 +108,7 @@ class TestRingbuffer12 {
// INFO_STR, INFO: Not thread safe yet
// INFO_STR(msg+": Created / " + rb->toString());
std::vector<TrivialType> array(len);
- REQUIRE_MSG("get-range of "+std::to_string(array.size())+" elem in "+rb->toString(), rb->getBlocking( &(*array.begin()), len) );
+ REQUIRE_MSG("get-range of "+std::to_string(array.size())+" elem in "+rb->toString(), len==rb->getBlocking( &(*array.begin()), len, len) );
for(jau::nsize_t i=0; i<len; i++) {
TrivialType svI = array[i];
diff --git a/test/test_lfringbuffer13.cpp b/test/test_lfringbuffer13.cpp
index f7626db..dbb39a3 100644
--- a/test/test_lfringbuffer13.cpp
+++ b/test/test_lfringbuffer13.cpp
@@ -104,7 +104,7 @@ class TestRingbuffer13 {
// INFO_STR, INFO: Not thread safe yet
// INFO_STR(msg+": Created / " + rb->toString());
std::vector<SharedType> array(len);
- REQUIRE_MSG("get-range of "+std::to_string(array.size())+" elem in "+rb->toString(), rb->getBlocking( &(*array.begin()), len) );
+ REQUIRE_MSG("get-range of "+std::to_string(array.size())+" elem in "+rb->toString(), len==rb->getBlocking( &(*array.begin()), len, len) );
for(jau::nsize_t i=0; i<len; i++) {
SharedType svI = array[i];