diff options
author | Sven Gothel <[email protected]> | 2022-05-01 07:23:09 +0200 |
---|---|---|
committer | Sven Gothel <[email protected]> | 2022-05-01 07:23:09 +0200 |
commit | 6dfc0cbf5e3f530f6268c5a636a901da3c4bc657 (patch) | |
tree | 7a1498c2a8bbe7f544731b9d427ab3f10ead15e6 /include/jau | |
parent | 1cbc33b38e34a060848a729d8e38c1c29ab87310 (diff) |
Adopt fraction and fraction_timespec, use wait_until() w/ out-of-loop absolute timeout_time, avoiding deadlocks
Classes affected incl. API change using fraction for relative time
- latch
- ringbuffer
- service_runner
- simple_timer
Diffstat (limited to 'include/jau')
-rw-r--r-- | include/jau/latch.hpp | 66 | ||||
-rw-r--r-- | include/jau/ringbuffer.hpp | 201 | ||||
-rw-r--r-- | include/jau/service_runner.hpp | 24 | ||||
-rw-r--r-- | include/jau/simple_timer.hpp | 41 |
4 files changed, 167 insertions, 165 deletions
diff --git a/include/jau/latch.hpp b/include/jau/latch.hpp index 93b9a49..d2a6fae 100644 --- a/include/jau/latch.hpp +++ b/include/jau/latch.hpp @@ -30,6 +30,8 @@ #include <condition_variable> #include <jau/ordered_atomic.hpp> +#include <jau/fraction_type.hpp> +#include <jau/basic_types.hpp> namespace jau { @@ -147,22 +149,19 @@ namespace jau { * * If the internal counter is zero already, returns immediately. * - * Implementation uses `std::chrono::steady_clock::now()`. + * Implementation uses wait_for() w/ a monotonic clock and fraction_i64. * * Extension of std::latch. * - * @tparam Rep - * @tparam Period - * @param timeout_duration maximum time duration to spend waiting + * @param timeout_duration maximum duration in fractions of seconds to wait * @return true if internal counter has reached zero, otherwise a timeout has occurred. */ - template<typename Rep, typename Period> - bool wait_for(const std::chrono::duration<Rep, Period>& timeout_duration) const noexcept { + bool wait_for(const fraction_i64& timeout_duration) const noexcept { if( 0 < count ) { - std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now() + timeout_duration; std::unique_lock<std::mutex> lock(mtx_cd); + const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout_duration); while( 0 < count ) { - std::cv_status s = cv.wait_until(lock, t0); + std::cv_status s = wait_until(cv, lock, timeout_time); if( 0 == count ) { return true; } @@ -175,26 +174,28 @@ namespace jau { } /** - * Blocks the calling thread until the internal counter reaches 0 or the given timeout duration has expired. + * Atomically decrements the internal counter by n and (if necessary) blocks the calling thread until the counter reaches zero + * or the given timeout duration has expired. * - * If the internal counter is zero already, returns immediately. + * Equivalent to `count_down(n); wait(timeout_duration);`. * - * Implementation uses `std::chrono::steady_clock::now()`. + * Implementation uses `std::chrono::steady_clock::now()` and fraction_i64. * * Extension of std::latch. * - * @param timeout_ms maximum time duration to spend waiting in milliseconds + * @param timeout_duration maximum duration in fractions of seconds to wait + * @param n the value by which the internal counter is decreased, defaults to 1 * @return true if internal counter has reached zero, otherwise a timeout has occurred. */ - bool wait_for(const size_t timeout_ms) const noexcept { - return wait_for(std::chrono::milliseconds(timeout_ms)); + bool arrive_and_wait_for(const fraction_i64& timeout_duration, const size_t n = 1) noexcept { + count_down(n); + return wait_for(timeout_duration); } /** - * Atomically decrements the internal counter by n and (if necessary) blocks the calling thread until the counter reaches zero - * or the given timeout duration has expired. + * Blocks the calling thread until the internal counter reaches 0 or the given timeout duration has expired. * - * Equivalent to `count_down(n); wait(timeout_duration);`. + * If the internal counter is zero already, returns immediately. * * Implementation uses `std::chrono::steady_clock::now()`. * @@ -202,14 +203,25 @@ namespace jau { * * @tparam Rep * @tparam Period - * @param timeout_duration maximum time duration to spend waiting - * @param n the value by which the internal counter is decreased, defaults to 1 + * @param timeout_duration maximum duration to wait * @return true if internal counter has reached zero, otherwise a timeout has occurred. */ template<typename Rep, typename Period> - bool arrive_and_wait_for(const std::chrono::duration<Rep, Period>& timeout_duration, const size_t n = 1) noexcept { - count_down(n); - return wait_for(timeout_duration); + bool wait_for(const std::chrono::duration<Rep, Period>& timeout_duration) const noexcept { + if( 0 < count ) { + std::unique_lock<std::mutex> lock(mtx_cd); + std::chrono::steady_clock::time_point timeout_time = std::chrono::steady_clock::now() + timeout_duration; + while( 0 < count ) { + std::cv_status s = cv.wait_until(lock, timeout_time ); + if( 0 == count ) { + return true; + } + if( std::cv_status::timeout == s ) { + return false; + } + } + } + return true; } /** @@ -222,12 +234,16 @@ namespace jau { * * Extension of std::latch. * - * @param timeout_ms maximum time duration to spend waiting in milliseconds + * @tparam Rep + * @tparam Period + * @param timeout_duration maximum duration to wait * @param n the value by which the internal counter is decreased, defaults to 1 * @return true if internal counter has reached zero, otherwise a timeout has occurred. */ - bool arrive_and_wait_for(const size_t timeout_ms, const size_t n = 1) noexcept { - return arrive_and_wait_for(std::chrono::milliseconds(timeout_ms), n); + template<typename Rep, typename Period> + bool arrive_and_wait_for(const std::chrono::duration<Rep, Period>& timeout_duration, const size_t n = 1) noexcept { + count_down(n); + return wait_for(timeout_duration); } }; diff --git a/include/jau/ringbuffer.hpp b/include/jau/ringbuffer.hpp index 47397eb..7672ece 100644 --- a/include/jau/ringbuffer.hpp +++ b/include/jau/ringbuffer.hpp @@ -41,6 +41,7 @@ #include <jau/debug.hpp> #include <jau/basic_types.hpp> #include <jau/ordered_atomic.hpp> +#include <jau/fraction_type.hpp> #include <jau/callocator.hpp> namespace jau { @@ -248,18 +249,18 @@ class ringbuffer { } } - Size_type waitForElementsImpl(const Size_type min_count, const int timeoutMS) noexcept { + Size_type waitForElementsImpl(const Size_type min_count, const fraction_i64& timeout) noexcept { Size_type available = size(); if( min_count > available ) { std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock available = size(); + const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout); while( min_count > available ) { - if( 0 == timeoutMS ) { + if( fractions_i64::zero == timeout ) { cvWrite.wait(lockWrite); available = size(); } else { - std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now(); - std::cv_status s = cvWrite.wait_until(lockWrite, t0 + std::chrono::milliseconds(timeoutMS)); + std::cv_status s = wait_until(cvWrite, lockWrite, timeout_time ); available = size(); if( std::cv_status::timeout == s && min_count > available ) { return available; @@ -270,18 +271,18 @@ class ringbuffer { return available; } - Size_type waitForFreeSlotsImpl(const Size_type min_count, const int timeoutMS) noexcept { + Size_type waitForFreeSlotsImpl(const Size_type min_count, const fraction_i64& timeout) noexcept { Size_type available = freeSlots(); if( min_count > available ) { std::unique_lock<std::mutex> lockRead(syncRead); // SC-DRF w/ getImpl via same lock available = freeSlots(); + const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout); while( min_count > available ) { - if( 0 == timeoutMS ) { + if( fractions_i64::zero == timeout ) { cvRead.wait(lockRead); available = freeSlots(); } else { - std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now(); - std::cv_status s = cvRead.wait_until(lockRead, t0 + std::chrono::milliseconds(timeoutMS)); + std::cv_status s = wait_until(cvRead, lockRead, timeout_time ); available = freeSlots(); if( std::cv_status::timeout == s && min_count > available ) { return available; @@ -403,7 +404,7 @@ class ringbuffer { } } - bool peekImpl(Value_type& dest, const bool blocking, const int timeoutMS) noexcept { + bool peekImpl(Value_type& dest, const bool blocking, const fraction_i64& timeout) noexcept { if( !std::is_copy_constructible_v<Value_type> ) { ABORT("Value_type is not copy constructible"); return false; @@ -413,12 +414,12 @@ class ringbuffer { if( localReadPos == writePos ) { if( blocking ) { std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock + const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout); while( localReadPos == writePos ) { - if( 0 == timeoutMS ) { + if( fractions_i64::zero == timeout ) { cvWrite.wait(lockWrite); } else { - std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now(); - std::cv_status s = cvWrite.wait_until(lockWrite, t0 + std::chrono::milliseconds(timeoutMS)); + std::cv_status s = wait_until(cvWrite, lockWrite, timeout_time ); if( std::cv_status::timeout == s && localReadPos == writePos ) { return false; } @@ -441,18 +442,18 @@ class ringbuffer { return true; } - bool moveOutImpl(Value_type& dest, const bool blocking, const int timeoutMS) noexcept { + bool moveOutImpl(Value_type& dest, const bool blocking, const fraction_i64& timeout) noexcept { const Size_type oldReadPos = readPos; // SC-DRF acquire atomic readPos, sync'ing with putImpl Size_type localReadPos = oldReadPos; if( localReadPos == writePos ) { if( blocking ) { std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock + const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout); while( localReadPos == writePos ) { - if( 0 == timeoutMS ) { + if( fractions_i64::zero == timeout ) { cvWrite.wait(lockWrite); } else { - std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now(); - std::cv_status s = cvWrite.wait_until(lockWrite, t0 + std::chrono::milliseconds(timeoutMS)); + std::cv_status s = wait_until(cvWrite, lockWrite, timeout_time ); if( std::cv_status::timeout == s && localReadPos == writePos ) { return false; } @@ -488,7 +489,7 @@ class ringbuffer { return true; } - Size_type moveOutImpl(Value_type *dest, const Size_type dest_len, const Size_type min_count_, const bool blocking, const int timeoutMS) noexcept { + Size_type moveOutImpl(Value_type *dest, const Size_type dest_len, const Size_type min_count_, const bool blocking, const fraction_i64& timeout) noexcept { const Size_type min_count = std::min(dest_len, min_count_); Value_type *iter_out = dest; @@ -506,13 +507,13 @@ class ringbuffer { if( blocking ) { std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock available = size(); + const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout); while( min_count > available ) { - if( 0 == timeoutMS ) { + if( fractions_i64::zero == timeout ) { cvWrite.wait(lockWrite); available = size(); } else { - std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now(); - std::cv_status s = cvWrite.wait_until(lockWrite, t0 + std::chrono::milliseconds(timeoutMS)); + std::cv_status s = wait_until(cvWrite, lockWrite, timeout_time ); available = size(); if( std::cv_status::timeout == s && min_count > available ) { return 0; @@ -583,7 +584,7 @@ class ringbuffer { return count; } - Size_type dropImpl (Size_type count, const bool blocking, const int timeoutMS) noexcept { + Size_type dropImpl (Size_type count, const bool blocking, const fraction_i64& timeout) noexcept { if( count >= capacityPlusOne ) { if( blocking ) { return 0; @@ -601,13 +602,13 @@ class ringbuffer { if( blocking ) { std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock available = size(); + const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout); while( count > available ) { - if( 0 == timeoutMS ) { + if( fractions_i64::zero == timeout ) { cvWrite.wait(lockWrite); available = size(); } else { - std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now(); - std::cv_status s = cvWrite.wait_until(lockWrite, t0 + std::chrono::milliseconds(timeoutMS)); + std::cv_status s = wait_until(cvWrite, lockWrite, timeout_time ); available = size(); if( std::cv_status::timeout == s && count > available ) { return 0; @@ -665,18 +666,18 @@ class ringbuffer { return count; } - bool moveIntoImpl(Value_type &&e, const bool blocking, const int timeoutMS) noexcept { + bool moveIntoImpl(Value_type &&e, const bool blocking, const fraction_i64& timeout) noexcept { Size_type localWritePos = writePos; // SC-DRF acquire atomic writePos, sync'ing with getImpl localWritePos = (localWritePos + 1) % capacityPlusOne; if( localWritePos == readPos ) { if( blocking ) { std::unique_lock<std::mutex> lockRead(syncRead); // SC-DRF w/ getImpl via same lock + const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout); while( localWritePos == readPos ) { - if( 0 == timeoutMS ) { + if( fractions_i64::zero == timeout ) { cvRead.wait(lockRead); } else { - std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now(); - std::cv_status s = cvRead.wait_until(lockRead, t0 + std::chrono::milliseconds(timeoutMS)); + std::cv_status s = wait_until(cvRead, lockRead, timeout_time ); if( std::cv_status::timeout == s && localWritePos == readPos ) { return false; } @@ -703,7 +704,7 @@ class ringbuffer { return true; } - bool copyIntoImpl(const Value_type &e, const bool blocking, const int timeoutMS) noexcept { + bool copyIntoImpl(const Value_type &e, const bool blocking, const fraction_i64& timeout) noexcept { if( !std::is_copy_constructible_v<Value_type> ) { ABORT("Value_type is not copy constructible"); return false; @@ -713,12 +714,12 @@ class ringbuffer { if( localWritePos == readPos ) { if( blocking ) { std::unique_lock<std::mutex> lockRead(syncRead); // SC-DRF w/ getImpl via same lock + const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout); while( localWritePos == readPos ) { - if( 0 == timeoutMS ) { + if( fractions_i64::zero == timeout ) { cvRead.wait(lockRead); } else { - std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now(); - std::cv_status s = cvRead.wait_until(lockRead, t0 + std::chrono::milliseconds(timeoutMS)); + std::cv_status s = wait_until(cvRead, lockRead, timeout_time ); if( std::cv_status::timeout == s && localWritePos == readPos ) { return false; } @@ -745,7 +746,7 @@ class ringbuffer { return true; } - bool copyIntoImpl(const Value_type *first, const Value_type* last, const bool blocking, const int timeoutMS) noexcept { + bool copyIntoImpl(const Value_type *first, const Value_type* last, const bool blocking, const fraction_i64& timeout) noexcept { if( !std::is_copy_constructible_v<Value_type> ) { ABORT("Value_type is not copy constructible"); return false; @@ -766,13 +767,13 @@ class ringbuffer { if( blocking ) { std::unique_lock<std::mutex> lockRead(syncRead); // SC-DRF w/ getImpl via same lock available = freeSlots(); + const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout); while( total_count > available ) { - if( 0 == timeoutMS ) { + if( fractions_i64::zero == timeout ) { cvRead.wait(lockRead); available = freeSlots(); } else { - std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now(); - std::cv_status s = cvRead.wait_until(lockRead, t0 + std::chrono::milliseconds(timeoutMS)); + std::cv_status s = wait_until(cvRead, lockRead, timeout_time ); available = freeSlots(); if( std::cv_status::timeout == s && total_count > available ) { return false; @@ -1074,15 +1075,15 @@ class ringbuffer { * for subsequent get() and getBlocking(). * * @param min_count minimum number of put slots - * @param timeoutMS + * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely * @return the number of put elements, available for get() and getBlocking() */ - Size_type waitForElements(const Size_type min_count, const int timeoutMS) noexcept { + Size_type waitForElements(const Size_type min_count, const fraction_i64& timeout) noexcept { if( multi_pc_enabled ) { std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl - return waitForElementsImpl(min_count, timeoutMS); + return waitForElementsImpl(min_count, timeout); } else { - return waitForElementsImpl(min_count, timeoutMS); + return waitForElementsImpl(min_count, timeout); } } @@ -1091,15 +1092,15 @@ class ringbuffer { * for subsequent put() and putBlocking(). * * @param min_count minimum number of free slots - * @param timeoutMS + * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely * @return the number of free slots, available for put() and putBlocking() */ - Size_type waitForFreeSlots(const Size_type min_count, const int timeoutMS) noexcept { + Size_type waitForFreeSlots(const Size_type min_count, const fraction_i64& timeout) noexcept { if( multi_pc_enabled ) { std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiWrite, _not_ sync'ing w/ getImpl - return waitForFreeSlotsImpl(min_count, timeoutMS); + return waitForFreeSlotsImpl(min_count, timeout); } else { - return waitForFreeSlotsImpl(min_count, timeoutMS); + return waitForFreeSlotsImpl(min_count, timeout); } } @@ -1118,9 +1119,9 @@ class ringbuffer { std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor std::lock(lockMultiRead, lockMultiWrite); - dropImpl (capacityPlusOne-1, false /* blocking */, 0 /* timeoutMS */); + dropImpl (capacityPlusOne-1, false /* blocking */, fractions_i64::zero); } else { - dropImpl (capacityPlusOne-1, false /* blocking */, 0 /* timeoutMS */); + dropImpl (capacityPlusOne-1, false /* blocking */, fractions_i64::zero); } } @@ -1229,28 +1230,25 @@ class ringbuffer { bool peek(Value_type& result) noexcept { if( multi_pc_enabled ) { std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl - return peekImpl(result, false, 0); + return peekImpl(result, false, fractions_i64::zero); } else { - return peekImpl(result, false, 0); + return peekImpl(result, false, fractions_i64::zero); } } /** * Peeks the next element at the read position w/o modifying pointer, but with blocking. * - * `timeoutMS` defaults to zero, - * i.e. infinitive blocking until an element available via put.<br> - * Otherwise this methods blocks for the given milliseconds. - * * @param result storage for the resulting value if successful, otherwise unchanged. + * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely * @return true if successful, otherwise false. */ - bool peekBlocking(Value_type& result, const int timeoutMS=0) noexcept { + bool peekBlocking(Value_type& result, const fraction_i64& timeout) noexcept { if( multi_pc_enabled ) { std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl - return peekImpl(result, true, timeoutMS); + return peekImpl(result, true, timeout); } else { - return peekImpl(result, true, timeoutMS); + return peekImpl(result, true, timeout); } } @@ -1267,9 +1265,9 @@ class ringbuffer { bool get(Value_type& result) noexcept { if( multi_pc_enabled ) { std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl - return moveOutImpl(result, false, 0); + return moveOutImpl(result, false, fractions_i64::zero); } else { - return moveOutImpl(result, false, 0); + return moveOutImpl(result, false, fractions_i64::zero); } } @@ -1278,19 +1276,16 @@ class ringbuffer { * * The ring buffer slot will be released and its value moved to the caller's `result` storage, if successful. * - * `timeoutMS` defaults to zero, - * i.e. infinitive blocking until an element available via put.<br> - * Otherwise this methods blocks for the given milliseconds. - * * @param result storage for the resulting value if successful, otherwise unchanged. + * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely * @return true if successful, otherwise false. */ - bool getBlocking(Value_type& result, const int timeoutMS=0) noexcept { + bool getBlocking(Value_type& result, const fraction_i64& timeout) noexcept { if( multi_pc_enabled ) { std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl - return moveOutImpl(result, true, timeoutMS); + return moveOutImpl(result, true, timeout); } else { - return moveOutImpl(result, true, timeoutMS); + return moveOutImpl(result, true, timeout); } } @@ -1309,9 +1304,9 @@ class ringbuffer { Size_type get(Value_type *dest, const Size_type dest_len, const Size_type min_count) noexcept { if( multi_pc_enabled ) { std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl - return moveOutImpl(dest, dest_len, min_count, false, 0); + return moveOutImpl(dest, dest_len, min_count, false, fractions_i64::zero); } else { - return moveOutImpl(dest, dest_len, min_count, false, 0); + return moveOutImpl(dest, dest_len, min_count, false, fractions_i64::zero); } } @@ -1320,22 +1315,18 @@ class ringbuffer { * * The ring buffer slots will be released and its value moved to the caller's `dest` storage, if successful. * - * `timeoutMS` defaults to zero, - * i.e. infinitive blocking until an element available via put.<br> - * Otherwise this methods blocks for the given milliseconds. - * * @param dest pointer to first storage element of `dest_len` consecutive elements to store the values, if successful. * @param dest_len number of consecutive elements in `dest`, hence maximum number of elements to return. * @param min_count minimum number of consecutive elements to return - * @param timeoutMS + * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely * @return actual number of elements returned */ - Size_type getBlocking(Value_type *dest, const Size_type dest_len, const Size_type min_count, const int timeoutMS=0) noexcept { + Size_type getBlocking(Value_type *dest, const Size_type dest_len, const Size_type min_count, const fraction_i64& timeout) noexcept { if( multi_pc_enabled ) { std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl - return moveOutImpl(dest, dest_len, min_count, true, timeoutMS); + return moveOutImpl(dest, dest_len, min_count, true, timeout); } else { - return moveOutImpl(dest, dest_len, min_count, true, timeoutMS); + return moveOutImpl(dest, dest_len, min_count, true, timeout); } } @@ -1353,9 +1344,9 @@ class ringbuffer { std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor std::lock(lockMultiRead, lockMultiWrite); - return dropImpl(max_count, false, 0); + return dropImpl(max_count, false, fractions_i64::zero); } else { - return dropImpl(max_count, false, 0); + return dropImpl(max_count, false, fractions_i64::zero); } } @@ -1363,24 +1354,21 @@ class ringbuffer { * Drops exactly `count` oldest enqueued elements, * will block until they become available. * - * `timeoutMS` defaults to zero, - * i.e. infinitive blocking until an element available via put.<br> - * Otherwise this methods blocks for the given milliseconds. - * * In `count` elements are not available to drop even after * blocking for `timeoutMS`, no element will be dropped. * * @param count number of elements to drop from ringbuffer. + * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely * @return true if successful, otherwise false */ - bool dropBlocking(const Size_type count, const int timeoutMS=0) noexcept { + bool dropBlocking(const Size_type count, const fraction_i64& timeout) noexcept { if( multi_pc_enabled ) { std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor std::lock(lockMultiRead, lockMultiWrite); - return 0 != dropImpl(count, true, timeoutMS); + return 0 != dropImpl(count, true, timeout); } else { - return 0 != dropImpl(count, true, timeoutMS); + return 0 != dropImpl(count, true, timeout); } } @@ -1396,27 +1384,25 @@ class ringbuffer { bool put(Value_type && e) noexcept { if( multi_pc_enabled ) { std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiWrite, _not_ sync'ing w/ getImpl - return moveIntoImpl(std::move(e), false, 0); + return moveIntoImpl(std::move(e), false, fractions_i64::zero); } else { - return moveIntoImpl(std::move(e), false, 0); + return moveIntoImpl(std::move(e), false, fractions_i64::zero); } } /** * Enqueues the given element by moving it into this ringbuffer storage. * - * `timeoutMS` defaults to zero, - * i.e. infinitive blocking until a free slot becomes available via get.<br> - * Otherwise this methods blocks for the given milliseconds. - * + * @param e + * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely * @return true if successful, otherwise false in case timeout occurred or otherwise. */ - bool putBlocking(Value_type && e, const int timeoutMS=0) noexcept { + bool putBlocking(Value_type && e, const fraction_i64& timeout) noexcept { if( multi_pc_enabled ) { std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiWrite, _not_ sync'ing w/ getImpl - return moveIntoImpl(std::move(e), true, timeoutMS); + return moveIntoImpl(std::move(e), true, timeout); } else { - return moveIntoImpl(std::move(e), true, timeoutMS); + return moveIntoImpl(std::move(e), true, timeout); } } @@ -1432,27 +1418,24 @@ class ringbuffer { bool put(const Value_type & e) noexcept { if( multi_pc_enabled ) { std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiWrite, _not_ sync'ing w/ getImpl - return copyIntoImpl(e, false, 0); + return copyIntoImpl(e, false, fractions_i64::zero); } else { - return copyIntoImpl(e, false, 0); + return copyIntoImpl(e, false, fractions_i64::zero); } } /** * Enqueues the given element by copying it into this ringbuffer storage. * - * `timeoutMS` defaults to zero, - * i.e. infinitive blocking until a free slot becomes available via get.<br> - * Otherwise this methods blocks for the given milliseconds. - * + * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely * @return true if successful, otherwise false in case timeout occurred or otherwise. */ - bool putBlocking(const Value_type & e, const int timeoutMS=0) noexcept { + bool putBlocking(const Value_type & e, const fraction_i64& timeout) noexcept { if( multi_pc_enabled ) { std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiWrite, _not_ sync'ing w/ getImpl - return copyIntoImpl(e, true, timeoutMS); + return copyIntoImpl(e, true, timeout); } else { - return copyIntoImpl(e, true, timeoutMS); + return copyIntoImpl(e, true, timeout); } } @@ -1470,30 +1453,26 @@ class ringbuffer { bool put(const Value_type *first, const Value_type* last) noexcept { if( multi_pc_enabled ) { std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiWrite, _not_ sync'ing w/ getImpl - return copyIntoImpl(first, last, false, 0); + return copyIntoImpl(first, last, false, fractions_i64::zero); } else { - return copyIntoImpl(first, last, false, 0); + return copyIntoImpl(first, last, false, fractions_i64::zero); } } /** * Enqueues the given range of consecutive elementa by copying it into this ringbuffer storage. * - * `timeoutMS` defaults to zero, - * i.e. infinitive blocking until a free slot becomes available via get.<br> - * Otherwise this methods blocks for the given milliseconds. - * * @param first pointer to first consecutive element to range of value_type [first, last) * @param last pointer to last consecutive element to range of value_type [first, last) - * @param timeoutMS + * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely * @return true if successful, otherwise false in case timeout occurred or otherwise. */ - bool putBlocking(const Value_type *first, const Value_type* last, const int timeoutMS=0) noexcept { + bool putBlocking(const Value_type *first, const Value_type* last, const fraction_i64& timeout) noexcept { if( multi_pc_enabled ) { std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiWrite, _not_ sync'ing w/ getImpl - return copyIntoImpl(first, last, true, timeoutMS); + return copyIntoImpl(first, last, true, timeout); } else { - return copyIntoImpl(first, last, true, timeoutMS); + return copyIntoImpl(first, last, true, timeout); } } }; diff --git a/include/jau/service_runner.hpp b/include/jau/service_runner.hpp index a55378d..3c0b858 100644 --- a/include/jau/service_runner.hpp +++ b/include/jau/service_runner.hpp @@ -37,8 +37,8 @@ #include <algorithm> #include <jau/cpp_lang_util.hpp> -#include <jau/basic_types.hpp> #include <jau/ordered_atomic.hpp> +#include <jau/fraction_type.hpp> #include <jau/function_def.hpp> namespace jau { @@ -57,9 +57,9 @@ namespace jau { std::string name_; /** - * Maximum time in milliseconds to wait for a thread shutdown or zero to wait infinitely. + * Maximum duration in fractions of seconds to wait for service to stop at stop() and join(), where fractions_i64::zero waits infinitely */ - nsize_t service_shutdown_timeout_ms_; + fraction_i64 service_shutdown_timeout_; Callback service_work; Callback service_init_locked; @@ -110,13 +110,13 @@ namespace jau { * start() shall be issued to kick off this service. * * @param name service name - * @param service_shutdown_timeout_ms maximum time in milliseconds to wait for stop() and join(), where zero waits infinitely + * @param service_shutdown_timeout maximum duration in fractions of seconds to wait for service to stop at stop() and join(), where fractions_i64::zero waits infinitely * @param service_work service working function * @param service_init_locked optional service init function, lifecycle mutex is locked * @param service_end_locked optional service end function, lifecycle mutex is locked */ service_runner(const std::string& name, - const nsize_t service_shutdown_timeout_ms, + const fraction_i64& service_shutdown_timeout, Callback service_work, Callback service_init_locked = Callback(), Callback service_end_locked = Callback()) noexcept; @@ -134,11 +134,11 @@ namespace jau { const std::string& name() const noexcept { return name_; } /** - * Returns maximum time in milliseconds to wait for stop() and join(), where zero waits infinitely. + * Returns maximum duration in fractions of seconds to wait for service to stop at stop() and join(), where fractions_i64::zero waits infinitely * @see stop() * @see join() */ - nsize_t service_shutdown_timeout_ms() const noexcept { return service_shutdown_timeout_ms_; } + fraction_i64 service_shutdown_timeout() const noexcept { return service_shutdown_timeout_; } /** * Return the thread-id of this service service thread, zero if not running. @@ -211,7 +211,7 @@ namespace jau { * @see cv_shall_stop() * @see stop() * @see join() - * @see service_shutdown_timeout_ms() + * @see service_shutdown_timeout() */ void start() noexcept; @@ -221,7 +221,7 @@ namespace jau { * If called from the service thread, method just issues set_shall_stop() without blocking, * otherwise methods blocks the current thread until service is stopped. * - * Maximum blocked wait period is optionally limited by service_shutdown_timeout_ms(). + * Maximum blocked wait period is optionally limited by service_shutdown_timeout(). * * Method attempts to stop the service thread * - by flagging `shall stop` via set_shall_stop() @@ -241,7 +241,7 @@ namespace jau { * @see cv_shall_stop() * @see start() * @see join() - * @see service_shutdown_timeout_ms() + * @see service_shutdown_timeout() * @see singleton_sighandler() */ bool stop() noexcept; @@ -250,7 +250,7 @@ namespace jau { * Blocks the current thread until service is stopped * or returns immediately if not running or called from the service thread. * - * Maximum blocked wait period is optionally limited by service_shutdown_timeout_ms(). + * Maximum blocked wait period is optionally limited by service_shutdown_timeout(). * * @returns true if thread has been stopped or false if timeout has been hit * @see is_running() @@ -260,7 +260,7 @@ namespace jau { * @see cv_shall_stop() * @see start() * @see stop() - * @see service_shutdown_timeout_ms() + * @see service_shutdown_timeout() */ bool join() noexcept; diff --git a/include/jau/simple_timer.hpp b/include/jau/simple_timer.hpp index ad14625..ed561b2 100644 --- a/include/jau/simple_timer.hpp +++ b/include/jau/simple_timer.hpp @@ -31,6 +31,7 @@ #include <thread> #include <jau/ordered_atomic.hpp> +#include <jau/fraction_type.hpp> #include <jau/function_def.hpp> #include <jau/service_runner.hpp> @@ -48,25 +49,31 @@ namespace jau { typedef simple_timer& Timer0_ref; /** - * User defined timer function using millisecond granularity. + * User defined timer function using custom granularity via fraction_i64. * * Function gets invoked for each timer event, - * i.e. after reaching the duration in milliseconds set earlier. + * i.e. after reaching the duration set earlier. * - * @return duration in milliseconds for the next timer event or zero to end the timer thread. + * @return duration in fractions of seconds for the next timer event or zero to end the timer thread. */ - typedef FunctionDef<nsize_t, Timer0_ref> Timer_func_ms; + typedef FunctionDef<fraction_i64, Timer0_ref> Timer_func; private: service_runner timer_service; std::mutex mtx_timerfunc; - Timer_func_ms timer_func_ms; - jau::relaxed_atomic_nsize_t duration_ms; + Timer_func timer_func; + // Note: Requires libatomic with libstdc++10 + sc_atomic_fraction_i64 duration; void timer_work(service_runner& sr_ref); public: - simple_timer(const std::string& name, const nsize_t service_shutdown_timeout_ms) noexcept; + /** + * Constructs a new service + * @param name thread name of this service + * @param service_shutdown_timeout maximum duration in fractions of seconds to wait for service to stop at stop(), where fractions_i64::zero waits infinitely + */ + simple_timer(const std::string& name, const fraction_i64& service_shutdown_timeout) noexcept; /** * No copy constructor nor move constructor. @@ -92,29 +99,29 @@ namespace jau { /** * Returns true if timer shall stop. * - * This flag can be used by the Timer_func_ms function to determine whether to skip lengthly tasks. + * This flag can be used by the Timer_func function to determine whether to skip lengthly tasks. */ bool shall_stop() const noexcept { return timer_service.shall_stop(); } /** - * Start the timer with given user Timer_func_ms function and initial duration in milliseconds. + * Start the timer with given user Timer_func function and initial duration. * - * @param duration_ms_ initial timer duration until next timer event in milliseconds - * @param tofunc user Timer_func_ms to be called on next timer event + * @param duration_ initial timer duration in fractions of seconds until next timer event + * @param tofunc user Timer_func to be called on next timer event * @return true if timer has been started, otherwise false implies timer is already running. */ - bool start(nsize_t duration_ms_, Timer_func_ms tofunc) noexcept; + bool start(const fraction_i64& duration_, Timer_func tofunc) noexcept; /** - * Start or update the timer with given user Timer_func_ms function and initial duration in milliseconds. + * Start or update the timer with given user Timer_func function and initial duration. * * This is faster than calling stop() and start(), however, - * an already started timer user Timer_func_ms will proceed. + * an already started timer user Timer_func will proceed. * - * @param duration_ms_ initial timer duration until next timer event in milliseconds - * @param tofunc user Timer_func_ms to be called on next timer event + * @param duration_ initial timer duration in fractions of seconds until next timer event + * @param tofunc user Timer_func to be called on next timer event */ - void start_or_update(nsize_t duration_ms_, Timer_func_ms tofunc) noexcept; + void start_or_update(const fraction_i64& duration_, Timer_func tofunc) noexcept; /** * Stop timer, see service_runner::stop() |