summaryrefslogtreecommitdiffstats
path: root/include/jau
diff options
context:
space:
mode:
authorSven Gothel <[email protected]>2022-05-01 07:23:09 +0200
committerSven Gothel <[email protected]>2022-05-01 07:23:09 +0200
commit6dfc0cbf5e3f530f6268c5a636a901da3c4bc657 (patch)
tree7a1498c2a8bbe7f544731b9d427ab3f10ead15e6 /include/jau
parent1cbc33b38e34a060848a729d8e38c1c29ab87310 (diff)
Adopt fraction and fraction_timespec, use wait_until() w/ out-of-loop absolute timeout_time, avoiding deadlocks
Classes affected incl. API change using fraction for relative time - latch - ringbuffer - service_runner - simple_timer
Diffstat (limited to 'include/jau')
-rw-r--r--include/jau/latch.hpp66
-rw-r--r--include/jau/ringbuffer.hpp201
-rw-r--r--include/jau/service_runner.hpp24
-rw-r--r--include/jau/simple_timer.hpp41
4 files changed, 167 insertions, 165 deletions
diff --git a/include/jau/latch.hpp b/include/jau/latch.hpp
index 93b9a49..d2a6fae 100644
--- a/include/jau/latch.hpp
+++ b/include/jau/latch.hpp
@@ -30,6 +30,8 @@
#include <condition_variable>
#include <jau/ordered_atomic.hpp>
+#include <jau/fraction_type.hpp>
+#include <jau/basic_types.hpp>
namespace jau {
@@ -147,22 +149,19 @@ namespace jau {
*
* If the internal counter is zero already, returns immediately.
*
- * Implementation uses `std::chrono::steady_clock::now()`.
+ * Implementation uses wait_for() w/ a monotonic clock and fraction_i64.
*
* Extension of std::latch.
*
- * @tparam Rep
- * @tparam Period
- * @param timeout_duration maximum time duration to spend waiting
+ * @param timeout_duration maximum duration in fractions of seconds to wait
* @return true if internal counter has reached zero, otherwise a timeout has occurred.
*/
- template<typename Rep, typename Period>
- bool wait_for(const std::chrono::duration<Rep, Period>& timeout_duration) const noexcept {
+ bool wait_for(const fraction_i64& timeout_duration) const noexcept {
if( 0 < count ) {
- std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now() + timeout_duration;
std::unique_lock<std::mutex> lock(mtx_cd);
+ const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout_duration);
while( 0 < count ) {
- std::cv_status s = cv.wait_until(lock, t0);
+ std::cv_status s = wait_until(cv, lock, timeout_time);
if( 0 == count ) {
return true;
}
@@ -175,26 +174,28 @@ namespace jau {
}
/**
- * Blocks the calling thread until the internal counter reaches 0 or the given timeout duration has expired.
+ * Atomically decrements the internal counter by n and (if necessary) blocks the calling thread until the counter reaches zero
+ * or the given timeout duration has expired.
*
- * If the internal counter is zero already, returns immediately.
+ * Equivalent to `count_down(n); wait(timeout_duration);`.
*
- * Implementation uses `std::chrono::steady_clock::now()`.
+ * Implementation uses `std::chrono::steady_clock::now()` and fraction_i64.
*
* Extension of std::latch.
*
- * @param timeout_ms maximum time duration to spend waiting in milliseconds
+ * @param timeout_duration maximum duration in fractions of seconds to wait
+ * @param n the value by which the internal counter is decreased, defaults to 1
* @return true if internal counter has reached zero, otherwise a timeout has occurred.
*/
- bool wait_for(const size_t timeout_ms) const noexcept {
- return wait_for(std::chrono::milliseconds(timeout_ms));
+ bool arrive_and_wait_for(const fraction_i64& timeout_duration, const size_t n = 1) noexcept {
+ count_down(n);
+ return wait_for(timeout_duration);
}
/**
- * Atomically decrements the internal counter by n and (if necessary) blocks the calling thread until the counter reaches zero
- * or the given timeout duration has expired.
+ * Blocks the calling thread until the internal counter reaches 0 or the given timeout duration has expired.
*
- * Equivalent to `count_down(n); wait(timeout_duration);`.
+ * If the internal counter is zero already, returns immediately.
*
* Implementation uses `std::chrono::steady_clock::now()`.
*
@@ -202,14 +203,25 @@ namespace jau {
*
* @tparam Rep
* @tparam Period
- * @param timeout_duration maximum time duration to spend waiting
- * @param n the value by which the internal counter is decreased, defaults to 1
+ * @param timeout_duration maximum duration to wait
* @return true if internal counter has reached zero, otherwise a timeout has occurred.
*/
template<typename Rep, typename Period>
- bool arrive_and_wait_for(const std::chrono::duration<Rep, Period>& timeout_duration, const size_t n = 1) noexcept {
- count_down(n);
- return wait_for(timeout_duration);
+ bool wait_for(const std::chrono::duration<Rep, Period>& timeout_duration) const noexcept {
+ if( 0 < count ) {
+ std::unique_lock<std::mutex> lock(mtx_cd);
+ std::chrono::steady_clock::time_point timeout_time = std::chrono::steady_clock::now() + timeout_duration;
+ while( 0 < count ) {
+ std::cv_status s = cv.wait_until(lock, timeout_time );
+ if( 0 == count ) {
+ return true;
+ }
+ if( std::cv_status::timeout == s ) {
+ return false;
+ }
+ }
+ }
+ return true;
}
/**
@@ -222,12 +234,16 @@ namespace jau {
*
* Extension of std::latch.
*
- * @param timeout_ms maximum time duration to spend waiting in milliseconds
+ * @tparam Rep
+ * @tparam Period
+ * @param timeout_duration maximum duration to wait
* @param n the value by which the internal counter is decreased, defaults to 1
* @return true if internal counter has reached zero, otherwise a timeout has occurred.
*/
- bool arrive_and_wait_for(const size_t timeout_ms, const size_t n = 1) noexcept {
- return arrive_and_wait_for(std::chrono::milliseconds(timeout_ms), n);
+ template<typename Rep, typename Period>
+ bool arrive_and_wait_for(const std::chrono::duration<Rep, Period>& timeout_duration, const size_t n = 1) noexcept {
+ count_down(n);
+ return wait_for(timeout_duration);
}
};
diff --git a/include/jau/ringbuffer.hpp b/include/jau/ringbuffer.hpp
index 47397eb..7672ece 100644
--- a/include/jau/ringbuffer.hpp
+++ b/include/jau/ringbuffer.hpp
@@ -41,6 +41,7 @@
#include <jau/debug.hpp>
#include <jau/basic_types.hpp>
#include <jau/ordered_atomic.hpp>
+#include <jau/fraction_type.hpp>
#include <jau/callocator.hpp>
namespace jau {
@@ -248,18 +249,18 @@ class ringbuffer {
}
}
- Size_type waitForElementsImpl(const Size_type min_count, const int timeoutMS) noexcept {
+ Size_type waitForElementsImpl(const Size_type min_count, const fraction_i64& timeout) noexcept {
Size_type available = size();
if( min_count > available ) {
std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock
available = size();
+ const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout);
while( min_count > available ) {
- if( 0 == timeoutMS ) {
+ if( fractions_i64::zero == timeout ) {
cvWrite.wait(lockWrite);
available = size();
} else {
- std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now();
- std::cv_status s = cvWrite.wait_until(lockWrite, t0 + std::chrono::milliseconds(timeoutMS));
+ std::cv_status s = wait_until(cvWrite, lockWrite, timeout_time );
available = size();
if( std::cv_status::timeout == s && min_count > available ) {
return available;
@@ -270,18 +271,18 @@ class ringbuffer {
return available;
}
- Size_type waitForFreeSlotsImpl(const Size_type min_count, const int timeoutMS) noexcept {
+ Size_type waitForFreeSlotsImpl(const Size_type min_count, const fraction_i64& timeout) noexcept {
Size_type available = freeSlots();
if( min_count > available ) {
std::unique_lock<std::mutex> lockRead(syncRead); // SC-DRF w/ getImpl via same lock
available = freeSlots();
+ const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout);
while( min_count > available ) {
- if( 0 == timeoutMS ) {
+ if( fractions_i64::zero == timeout ) {
cvRead.wait(lockRead);
available = freeSlots();
} else {
- std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now();
- std::cv_status s = cvRead.wait_until(lockRead, t0 + std::chrono::milliseconds(timeoutMS));
+ std::cv_status s = wait_until(cvRead, lockRead, timeout_time );
available = freeSlots();
if( std::cv_status::timeout == s && min_count > available ) {
return available;
@@ -403,7 +404,7 @@ class ringbuffer {
}
}
- bool peekImpl(Value_type& dest, const bool blocking, const int timeoutMS) noexcept {
+ bool peekImpl(Value_type& dest, const bool blocking, const fraction_i64& timeout) noexcept {
if( !std::is_copy_constructible_v<Value_type> ) {
ABORT("Value_type is not copy constructible");
return false;
@@ -413,12 +414,12 @@ class ringbuffer {
if( localReadPos == writePos ) {
if( blocking ) {
std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock
+ const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout);
while( localReadPos == writePos ) {
- if( 0 == timeoutMS ) {
+ if( fractions_i64::zero == timeout ) {
cvWrite.wait(lockWrite);
} else {
- std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now();
- std::cv_status s = cvWrite.wait_until(lockWrite, t0 + std::chrono::milliseconds(timeoutMS));
+ std::cv_status s = wait_until(cvWrite, lockWrite, timeout_time );
if( std::cv_status::timeout == s && localReadPos == writePos ) {
return false;
}
@@ -441,18 +442,18 @@ class ringbuffer {
return true;
}
- bool moveOutImpl(Value_type& dest, const bool blocking, const int timeoutMS) noexcept {
+ bool moveOutImpl(Value_type& dest, const bool blocking, const fraction_i64& timeout) noexcept {
const Size_type oldReadPos = readPos; // SC-DRF acquire atomic readPos, sync'ing with putImpl
Size_type localReadPos = oldReadPos;
if( localReadPos == writePos ) {
if( blocking ) {
std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock
+ const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout);
while( localReadPos == writePos ) {
- if( 0 == timeoutMS ) {
+ if( fractions_i64::zero == timeout ) {
cvWrite.wait(lockWrite);
} else {
- std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now();
- std::cv_status s = cvWrite.wait_until(lockWrite, t0 + std::chrono::milliseconds(timeoutMS));
+ std::cv_status s = wait_until(cvWrite, lockWrite, timeout_time );
if( std::cv_status::timeout == s && localReadPos == writePos ) {
return false;
}
@@ -488,7 +489,7 @@ class ringbuffer {
return true;
}
- Size_type moveOutImpl(Value_type *dest, const Size_type dest_len, const Size_type min_count_, const bool blocking, const int timeoutMS) noexcept {
+ Size_type moveOutImpl(Value_type *dest, const Size_type dest_len, const Size_type min_count_, const bool blocking, const fraction_i64& timeout) noexcept {
const Size_type min_count = std::min(dest_len, min_count_);
Value_type *iter_out = dest;
@@ -506,13 +507,13 @@ class ringbuffer {
if( blocking ) {
std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock
available = size();
+ const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout);
while( min_count > available ) {
- if( 0 == timeoutMS ) {
+ if( fractions_i64::zero == timeout ) {
cvWrite.wait(lockWrite);
available = size();
} else {
- std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now();
- std::cv_status s = cvWrite.wait_until(lockWrite, t0 + std::chrono::milliseconds(timeoutMS));
+ std::cv_status s = wait_until(cvWrite, lockWrite, timeout_time );
available = size();
if( std::cv_status::timeout == s && min_count > available ) {
return 0;
@@ -583,7 +584,7 @@ class ringbuffer {
return count;
}
- Size_type dropImpl (Size_type count, const bool blocking, const int timeoutMS) noexcept {
+ Size_type dropImpl (Size_type count, const bool blocking, const fraction_i64& timeout) noexcept {
if( count >= capacityPlusOne ) {
if( blocking ) {
return 0;
@@ -601,13 +602,13 @@ class ringbuffer {
if( blocking ) {
std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock
available = size();
+ const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout);
while( count > available ) {
- if( 0 == timeoutMS ) {
+ if( fractions_i64::zero == timeout ) {
cvWrite.wait(lockWrite);
available = size();
} else {
- std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now();
- std::cv_status s = cvWrite.wait_until(lockWrite, t0 + std::chrono::milliseconds(timeoutMS));
+ std::cv_status s = wait_until(cvWrite, lockWrite, timeout_time );
available = size();
if( std::cv_status::timeout == s && count > available ) {
return 0;
@@ -665,18 +666,18 @@ class ringbuffer {
return count;
}
- bool moveIntoImpl(Value_type &&e, const bool blocking, const int timeoutMS) noexcept {
+ bool moveIntoImpl(Value_type &&e, const bool blocking, const fraction_i64& timeout) noexcept {
Size_type localWritePos = writePos; // SC-DRF acquire atomic writePos, sync'ing with getImpl
localWritePos = (localWritePos + 1) % capacityPlusOne;
if( localWritePos == readPos ) {
if( blocking ) {
std::unique_lock<std::mutex> lockRead(syncRead); // SC-DRF w/ getImpl via same lock
+ const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout);
while( localWritePos == readPos ) {
- if( 0 == timeoutMS ) {
+ if( fractions_i64::zero == timeout ) {
cvRead.wait(lockRead);
} else {
- std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now();
- std::cv_status s = cvRead.wait_until(lockRead, t0 + std::chrono::milliseconds(timeoutMS));
+ std::cv_status s = wait_until(cvRead, lockRead, timeout_time );
if( std::cv_status::timeout == s && localWritePos == readPos ) {
return false;
}
@@ -703,7 +704,7 @@ class ringbuffer {
return true;
}
- bool copyIntoImpl(const Value_type &e, const bool blocking, const int timeoutMS) noexcept {
+ bool copyIntoImpl(const Value_type &e, const bool blocking, const fraction_i64& timeout) noexcept {
if( !std::is_copy_constructible_v<Value_type> ) {
ABORT("Value_type is not copy constructible");
return false;
@@ -713,12 +714,12 @@ class ringbuffer {
if( localWritePos == readPos ) {
if( blocking ) {
std::unique_lock<std::mutex> lockRead(syncRead); // SC-DRF w/ getImpl via same lock
+ const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout);
while( localWritePos == readPos ) {
- if( 0 == timeoutMS ) {
+ if( fractions_i64::zero == timeout ) {
cvRead.wait(lockRead);
} else {
- std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now();
- std::cv_status s = cvRead.wait_until(lockRead, t0 + std::chrono::milliseconds(timeoutMS));
+ std::cv_status s = wait_until(cvRead, lockRead, timeout_time );
if( std::cv_status::timeout == s && localWritePos == readPos ) {
return false;
}
@@ -745,7 +746,7 @@ class ringbuffer {
return true;
}
- bool copyIntoImpl(const Value_type *first, const Value_type* last, const bool blocking, const int timeoutMS) noexcept {
+ bool copyIntoImpl(const Value_type *first, const Value_type* last, const bool blocking, const fraction_i64& timeout) noexcept {
if( !std::is_copy_constructible_v<Value_type> ) {
ABORT("Value_type is not copy constructible");
return false;
@@ -766,13 +767,13 @@ class ringbuffer {
if( blocking ) {
std::unique_lock<std::mutex> lockRead(syncRead); // SC-DRF w/ getImpl via same lock
available = freeSlots();
+ const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout);
while( total_count > available ) {
- if( 0 == timeoutMS ) {
+ if( fractions_i64::zero == timeout ) {
cvRead.wait(lockRead);
available = freeSlots();
} else {
- std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now();
- std::cv_status s = cvRead.wait_until(lockRead, t0 + std::chrono::milliseconds(timeoutMS));
+ std::cv_status s = wait_until(cvRead, lockRead, timeout_time );
available = freeSlots();
if( std::cv_status::timeout == s && total_count > available ) {
return false;
@@ -1074,15 +1075,15 @@ class ringbuffer {
* for subsequent get() and getBlocking().
*
* @param min_count minimum number of put slots
- * @param timeoutMS
+ * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely
* @return the number of put elements, available for get() and getBlocking()
*/
- Size_type waitForElements(const Size_type min_count, const int timeoutMS) noexcept {
+ Size_type waitForElements(const Size_type min_count, const fraction_i64& timeout) noexcept {
if( multi_pc_enabled ) {
std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl
- return waitForElementsImpl(min_count, timeoutMS);
+ return waitForElementsImpl(min_count, timeout);
} else {
- return waitForElementsImpl(min_count, timeoutMS);
+ return waitForElementsImpl(min_count, timeout);
}
}
@@ -1091,15 +1092,15 @@ class ringbuffer {
* for subsequent put() and putBlocking().
*
* @param min_count minimum number of free slots
- * @param timeoutMS
+ * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely
* @return the number of free slots, available for put() and putBlocking()
*/
- Size_type waitForFreeSlots(const Size_type min_count, const int timeoutMS) noexcept {
+ Size_type waitForFreeSlots(const Size_type min_count, const fraction_i64& timeout) noexcept {
if( multi_pc_enabled ) {
std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiWrite, _not_ sync'ing w/ getImpl
- return waitForFreeSlotsImpl(min_count, timeoutMS);
+ return waitForFreeSlotsImpl(min_count, timeout);
} else {
- return waitForFreeSlotsImpl(min_count, timeoutMS);
+ return waitForFreeSlotsImpl(min_count, timeout);
}
}
@@ -1118,9 +1119,9 @@ class ringbuffer {
std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops
std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor
std::lock(lockMultiRead, lockMultiWrite);
- dropImpl (capacityPlusOne-1, false /* blocking */, 0 /* timeoutMS */);
+ dropImpl (capacityPlusOne-1, false /* blocking */, fractions_i64::zero);
} else {
- dropImpl (capacityPlusOne-1, false /* blocking */, 0 /* timeoutMS */);
+ dropImpl (capacityPlusOne-1, false /* blocking */, fractions_i64::zero);
}
}
@@ -1229,28 +1230,25 @@ class ringbuffer {
bool peek(Value_type& result) noexcept {
if( multi_pc_enabled ) {
std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl
- return peekImpl(result, false, 0);
+ return peekImpl(result, false, fractions_i64::zero);
} else {
- return peekImpl(result, false, 0);
+ return peekImpl(result, false, fractions_i64::zero);
}
}
/**
* Peeks the next element at the read position w/o modifying pointer, but with blocking.
*
- * `timeoutMS` defaults to zero,
- * i.e. infinitive blocking until an element available via put.<br>
- * Otherwise this methods blocks for the given milliseconds.
- *
* @param result storage for the resulting value if successful, otherwise unchanged.
+ * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely
* @return true if successful, otherwise false.
*/
- bool peekBlocking(Value_type& result, const int timeoutMS=0) noexcept {
+ bool peekBlocking(Value_type& result, const fraction_i64& timeout) noexcept {
if( multi_pc_enabled ) {
std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl
- return peekImpl(result, true, timeoutMS);
+ return peekImpl(result, true, timeout);
} else {
- return peekImpl(result, true, timeoutMS);
+ return peekImpl(result, true, timeout);
}
}
@@ -1267,9 +1265,9 @@ class ringbuffer {
bool get(Value_type& result) noexcept {
if( multi_pc_enabled ) {
std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl
- return moveOutImpl(result, false, 0);
+ return moveOutImpl(result, false, fractions_i64::zero);
} else {
- return moveOutImpl(result, false, 0);
+ return moveOutImpl(result, false, fractions_i64::zero);
}
}
@@ -1278,19 +1276,16 @@ class ringbuffer {
*
* The ring buffer slot will be released and its value moved to the caller's `result` storage, if successful.
*
- * `timeoutMS` defaults to zero,
- * i.e. infinitive blocking until an element available via put.<br>
- * Otherwise this methods blocks for the given milliseconds.
- *
* @param result storage for the resulting value if successful, otherwise unchanged.
+ * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely
* @return true if successful, otherwise false.
*/
- bool getBlocking(Value_type& result, const int timeoutMS=0) noexcept {
+ bool getBlocking(Value_type& result, const fraction_i64& timeout) noexcept {
if( multi_pc_enabled ) {
std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl
- return moveOutImpl(result, true, timeoutMS);
+ return moveOutImpl(result, true, timeout);
} else {
- return moveOutImpl(result, true, timeoutMS);
+ return moveOutImpl(result, true, timeout);
}
}
@@ -1309,9 +1304,9 @@ class ringbuffer {
Size_type get(Value_type *dest, const Size_type dest_len, const Size_type min_count) noexcept {
if( multi_pc_enabled ) {
std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl
- return moveOutImpl(dest, dest_len, min_count, false, 0);
+ return moveOutImpl(dest, dest_len, min_count, false, fractions_i64::zero);
} else {
- return moveOutImpl(dest, dest_len, min_count, false, 0);
+ return moveOutImpl(dest, dest_len, min_count, false, fractions_i64::zero);
}
}
@@ -1320,22 +1315,18 @@ class ringbuffer {
*
* The ring buffer slots will be released and its value moved to the caller's `dest` storage, if successful.
*
- * `timeoutMS` defaults to zero,
- * i.e. infinitive blocking until an element available via put.<br>
- * Otherwise this methods blocks for the given milliseconds.
- *
* @param dest pointer to first storage element of `dest_len` consecutive elements to store the values, if successful.
* @param dest_len number of consecutive elements in `dest`, hence maximum number of elements to return.
* @param min_count minimum number of consecutive elements to return
- * @param timeoutMS
+ * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely
* @return actual number of elements returned
*/
- Size_type getBlocking(Value_type *dest, const Size_type dest_len, const Size_type min_count, const int timeoutMS=0) noexcept {
+ Size_type getBlocking(Value_type *dest, const Size_type dest_len, const Size_type min_count, const fraction_i64& timeout) noexcept {
if( multi_pc_enabled ) {
std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl
- return moveOutImpl(dest, dest_len, min_count, true, timeoutMS);
+ return moveOutImpl(dest, dest_len, min_count, true, timeout);
} else {
- return moveOutImpl(dest, dest_len, min_count, true, timeoutMS);
+ return moveOutImpl(dest, dest_len, min_count, true, timeout);
}
}
@@ -1353,9 +1344,9 @@ class ringbuffer {
std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops
std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor
std::lock(lockMultiRead, lockMultiWrite);
- return dropImpl(max_count, false, 0);
+ return dropImpl(max_count, false, fractions_i64::zero);
} else {
- return dropImpl(max_count, false, 0);
+ return dropImpl(max_count, false, fractions_i64::zero);
}
}
@@ -1363,24 +1354,21 @@ class ringbuffer {
* Drops exactly `count` oldest enqueued elements,
* will block until they become available.
*
- * `timeoutMS` defaults to zero,
- * i.e. infinitive blocking until an element available via put.<br>
- * Otherwise this methods blocks for the given milliseconds.
- *
* In `count` elements are not available to drop even after
* blocking for `timeoutMS`, no element will be dropped.
*
* @param count number of elements to drop from ringbuffer.
+ * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely
* @return true if successful, otherwise false
*/
- bool dropBlocking(const Size_type count, const int timeoutMS=0) noexcept {
+ bool dropBlocking(const Size_type count, const fraction_i64& timeout) noexcept {
if( multi_pc_enabled ) {
std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops
std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor
std::lock(lockMultiRead, lockMultiWrite);
- return 0 != dropImpl(count, true, timeoutMS);
+ return 0 != dropImpl(count, true, timeout);
} else {
- return 0 != dropImpl(count, true, timeoutMS);
+ return 0 != dropImpl(count, true, timeout);
}
}
@@ -1396,27 +1384,25 @@ class ringbuffer {
bool put(Value_type && e) noexcept {
if( multi_pc_enabled ) {
std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiWrite, _not_ sync'ing w/ getImpl
- return moveIntoImpl(std::move(e), false, 0);
+ return moveIntoImpl(std::move(e), false, fractions_i64::zero);
} else {
- return moveIntoImpl(std::move(e), false, 0);
+ return moveIntoImpl(std::move(e), false, fractions_i64::zero);
}
}
/**
* Enqueues the given element by moving it into this ringbuffer storage.
*
- * `timeoutMS` defaults to zero,
- * i.e. infinitive blocking until a free slot becomes available via get.<br>
- * Otherwise this methods blocks for the given milliseconds.
- *
+ * @param e
+ * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely
* @return true if successful, otherwise false in case timeout occurred or otherwise.
*/
- bool putBlocking(Value_type && e, const int timeoutMS=0) noexcept {
+ bool putBlocking(Value_type && e, const fraction_i64& timeout) noexcept {
if( multi_pc_enabled ) {
std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiWrite, _not_ sync'ing w/ getImpl
- return moveIntoImpl(std::move(e), true, timeoutMS);
+ return moveIntoImpl(std::move(e), true, timeout);
} else {
- return moveIntoImpl(std::move(e), true, timeoutMS);
+ return moveIntoImpl(std::move(e), true, timeout);
}
}
@@ -1432,27 +1418,24 @@ class ringbuffer {
bool put(const Value_type & e) noexcept {
if( multi_pc_enabled ) {
std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiWrite, _not_ sync'ing w/ getImpl
- return copyIntoImpl(e, false, 0);
+ return copyIntoImpl(e, false, fractions_i64::zero);
} else {
- return copyIntoImpl(e, false, 0);
+ return copyIntoImpl(e, false, fractions_i64::zero);
}
}
/**
* Enqueues the given element by copying it into this ringbuffer storage.
*
- * `timeoutMS` defaults to zero,
- * i.e. infinitive blocking until a free slot becomes available via get.<br>
- * Otherwise this methods blocks for the given milliseconds.
- *
+ * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely
* @return true if successful, otherwise false in case timeout occurred or otherwise.
*/
- bool putBlocking(const Value_type & e, const int timeoutMS=0) noexcept {
+ bool putBlocking(const Value_type & e, const fraction_i64& timeout) noexcept {
if( multi_pc_enabled ) {
std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiWrite, _not_ sync'ing w/ getImpl
- return copyIntoImpl(e, true, timeoutMS);
+ return copyIntoImpl(e, true, timeout);
} else {
- return copyIntoImpl(e, true, timeoutMS);
+ return copyIntoImpl(e, true, timeout);
}
}
@@ -1470,30 +1453,26 @@ class ringbuffer {
bool put(const Value_type *first, const Value_type* last) noexcept {
if( multi_pc_enabled ) {
std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiWrite, _not_ sync'ing w/ getImpl
- return copyIntoImpl(first, last, false, 0);
+ return copyIntoImpl(first, last, false, fractions_i64::zero);
} else {
- return copyIntoImpl(first, last, false, 0);
+ return copyIntoImpl(first, last, false, fractions_i64::zero);
}
}
/**
* Enqueues the given range of consecutive elementa by copying it into this ringbuffer storage.
*
- * `timeoutMS` defaults to zero,
- * i.e. infinitive blocking until a free slot becomes available via get.<br>
- * Otherwise this methods blocks for the given milliseconds.
- *
* @param first pointer to first consecutive element to range of value_type [first, last)
* @param last pointer to last consecutive element to range of value_type [first, last)
- * @param timeoutMS
+ * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely
* @return true if successful, otherwise false in case timeout occurred or otherwise.
*/
- bool putBlocking(const Value_type *first, const Value_type* last, const int timeoutMS=0) noexcept {
+ bool putBlocking(const Value_type *first, const Value_type* last, const fraction_i64& timeout) noexcept {
if( multi_pc_enabled ) {
std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiWrite, _not_ sync'ing w/ getImpl
- return copyIntoImpl(first, last, true, timeoutMS);
+ return copyIntoImpl(first, last, true, timeout);
} else {
- return copyIntoImpl(first, last, true, timeoutMS);
+ return copyIntoImpl(first, last, true, timeout);
}
}
};
diff --git a/include/jau/service_runner.hpp b/include/jau/service_runner.hpp
index a55378d..3c0b858 100644
--- a/include/jau/service_runner.hpp
+++ b/include/jau/service_runner.hpp
@@ -37,8 +37,8 @@
#include <algorithm>
#include <jau/cpp_lang_util.hpp>
-#include <jau/basic_types.hpp>
#include <jau/ordered_atomic.hpp>
+#include <jau/fraction_type.hpp>
#include <jau/function_def.hpp>
namespace jau {
@@ -57,9 +57,9 @@ namespace jau {
std::string name_;
/**
- * Maximum time in milliseconds to wait for a thread shutdown or zero to wait infinitely.
+ * Maximum duration in fractions of seconds to wait for service to stop at stop() and join(), where fractions_i64::zero waits infinitely
*/
- nsize_t service_shutdown_timeout_ms_;
+ fraction_i64 service_shutdown_timeout_;
Callback service_work;
Callback service_init_locked;
@@ -110,13 +110,13 @@ namespace jau {
* start() shall be issued to kick off this service.
*
* @param name service name
- * @param service_shutdown_timeout_ms maximum time in milliseconds to wait for stop() and join(), where zero waits infinitely
+ * @param service_shutdown_timeout maximum duration in fractions of seconds to wait for service to stop at stop() and join(), where fractions_i64::zero waits infinitely
* @param service_work service working function
* @param service_init_locked optional service init function, lifecycle mutex is locked
* @param service_end_locked optional service end function, lifecycle mutex is locked
*/
service_runner(const std::string& name,
- const nsize_t service_shutdown_timeout_ms,
+ const fraction_i64& service_shutdown_timeout,
Callback service_work,
Callback service_init_locked = Callback(),
Callback service_end_locked = Callback()) noexcept;
@@ -134,11 +134,11 @@ namespace jau {
const std::string& name() const noexcept { return name_; }
/**
- * Returns maximum time in milliseconds to wait for stop() and join(), where zero waits infinitely.
+ * Returns maximum duration in fractions of seconds to wait for service to stop at stop() and join(), where fractions_i64::zero waits infinitely
* @see stop()
* @see join()
*/
- nsize_t service_shutdown_timeout_ms() const noexcept { return service_shutdown_timeout_ms_; }
+ fraction_i64 service_shutdown_timeout() const noexcept { return service_shutdown_timeout_; }
/**
* Return the thread-id of this service service thread, zero if not running.
@@ -211,7 +211,7 @@ namespace jau {
* @see cv_shall_stop()
* @see stop()
* @see join()
- * @see service_shutdown_timeout_ms()
+ * @see service_shutdown_timeout()
*/
void start() noexcept;
@@ -221,7 +221,7 @@ namespace jau {
* If called from the service thread, method just issues set_shall_stop() without blocking,
* otherwise methods blocks the current thread until service is stopped.
*
- * Maximum blocked wait period is optionally limited by service_shutdown_timeout_ms().
+ * Maximum blocked wait period is optionally limited by service_shutdown_timeout().
*
* Method attempts to stop the service thread
* - by flagging `shall stop` via set_shall_stop()
@@ -241,7 +241,7 @@ namespace jau {
* @see cv_shall_stop()
* @see start()
* @see join()
- * @see service_shutdown_timeout_ms()
+ * @see service_shutdown_timeout()
* @see singleton_sighandler()
*/
bool stop() noexcept;
@@ -250,7 +250,7 @@ namespace jau {
* Blocks the current thread until service is stopped
* or returns immediately if not running or called from the service thread.
*
- * Maximum blocked wait period is optionally limited by service_shutdown_timeout_ms().
+ * Maximum blocked wait period is optionally limited by service_shutdown_timeout().
*
* @returns true if thread has been stopped or false if timeout has been hit
* @see is_running()
@@ -260,7 +260,7 @@ namespace jau {
* @see cv_shall_stop()
* @see start()
* @see stop()
- * @see service_shutdown_timeout_ms()
+ * @see service_shutdown_timeout()
*/
bool join() noexcept;
diff --git a/include/jau/simple_timer.hpp b/include/jau/simple_timer.hpp
index ad14625..ed561b2 100644
--- a/include/jau/simple_timer.hpp
+++ b/include/jau/simple_timer.hpp
@@ -31,6 +31,7 @@
#include <thread>
#include <jau/ordered_atomic.hpp>
+#include <jau/fraction_type.hpp>
#include <jau/function_def.hpp>
#include <jau/service_runner.hpp>
@@ -48,25 +49,31 @@ namespace jau {
typedef simple_timer& Timer0_ref;
/**
- * User defined timer function using millisecond granularity.
+ * User defined timer function using custom granularity via fraction_i64.
*
* Function gets invoked for each timer event,
- * i.e. after reaching the duration in milliseconds set earlier.
+ * i.e. after reaching the duration set earlier.
*
- * @return duration in milliseconds for the next timer event or zero to end the timer thread.
+ * @return duration in fractions of seconds for the next timer event or zero to end the timer thread.
*/
- typedef FunctionDef<nsize_t, Timer0_ref> Timer_func_ms;
+ typedef FunctionDef<fraction_i64, Timer0_ref> Timer_func;
private:
service_runner timer_service;
std::mutex mtx_timerfunc;
- Timer_func_ms timer_func_ms;
- jau::relaxed_atomic_nsize_t duration_ms;
+ Timer_func timer_func;
+ // Note: Requires libatomic with libstdc++10
+ sc_atomic_fraction_i64 duration;
void timer_work(service_runner& sr_ref);
public:
- simple_timer(const std::string& name, const nsize_t service_shutdown_timeout_ms) noexcept;
+ /**
+ * Constructs a new service
+ * @param name thread name of this service
+ * @param service_shutdown_timeout maximum duration in fractions of seconds to wait for service to stop at stop(), where fractions_i64::zero waits infinitely
+ */
+ simple_timer(const std::string& name, const fraction_i64& service_shutdown_timeout) noexcept;
/**
* No copy constructor nor move constructor.
@@ -92,29 +99,29 @@ namespace jau {
/**
* Returns true if timer shall stop.
*
- * This flag can be used by the Timer_func_ms function to determine whether to skip lengthly tasks.
+ * This flag can be used by the Timer_func function to determine whether to skip lengthly tasks.
*/
bool shall_stop() const noexcept { return timer_service.shall_stop(); }
/**
- * Start the timer with given user Timer_func_ms function and initial duration in milliseconds.
+ * Start the timer with given user Timer_func function and initial duration.
*
- * @param duration_ms_ initial timer duration until next timer event in milliseconds
- * @param tofunc user Timer_func_ms to be called on next timer event
+ * @param duration_ initial timer duration in fractions of seconds until next timer event
+ * @param tofunc user Timer_func to be called on next timer event
* @return true if timer has been started, otherwise false implies timer is already running.
*/
- bool start(nsize_t duration_ms_, Timer_func_ms tofunc) noexcept;
+ bool start(const fraction_i64& duration_, Timer_func tofunc) noexcept;
/**
- * Start or update the timer with given user Timer_func_ms function and initial duration in milliseconds.
+ * Start or update the timer with given user Timer_func function and initial duration.
*
* This is faster than calling stop() and start(), however,
- * an already started timer user Timer_func_ms will proceed.
+ * an already started timer user Timer_func will proceed.
*
- * @param duration_ms_ initial timer duration until next timer event in milliseconds
- * @param tofunc user Timer_func_ms to be called on next timer event
+ * @param duration_ initial timer duration in fractions of seconds until next timer event
+ * @param tofunc user Timer_func to be called on next timer event
*/
- void start_or_update(nsize_t duration_ms_, Timer_func_ms tofunc) noexcept;
+ void start_or_update(const fraction_i64& duration_, Timer_func tofunc) noexcept;
/**
* Stop timer, see service_runner::stop()