diff options
author | Sven Gothel <[email protected]> | 2022-05-01 07:23:09 +0200 |
---|---|---|
committer | Sven Gothel <[email protected]> | 2022-05-01 07:23:09 +0200 |
commit | 6dfc0cbf5e3f530f6268c5a636a901da3c4bc657 (patch) | |
tree | 7a1498c2a8bbe7f544731b9d427ab3f10ead15e6 | |
parent | 1cbc33b38e34a060848a729d8e38c1c29ab87310 (diff) |
Adopt fraction and fraction_timespec, use wait_until() w/ out-of-loop absolute timeout_time, avoiding deadlocks
Classes affected incl. API change using fraction for relative time
- latch
- ringbuffer
- service_runner
- simple_timer
-rw-r--r-- | README.md | 10 | ||||
-rw-r--r-- | include/jau/latch.hpp | 66 | ||||
-rw-r--r-- | include/jau/ringbuffer.hpp | 201 | ||||
-rw-r--r-- | include/jau/service_runner.hpp | 24 | ||||
-rw-r--r-- | include/jau/simple_timer.hpp | 41 | ||||
-rw-r--r-- | src/service_runner.cpp | 20 | ||||
-rw-r--r-- | src/simple_timer.cpp | 51 | ||||
-rw-r--r-- | test/test_latch01.cpp | 4 | ||||
-rw-r--r-- | test/test_lfringbuffer11.cpp | 7 | ||||
-rw-r--r-- | test/test_lfringbuffer12.cpp | 7 | ||||
-rw-r--r-- | test/test_lfringbuffer13.cpp | 7 | ||||
-rw-r--r-- | test/test_lfringbuffer_a.hpp | 9 |
12 files changed, 232 insertions, 215 deletions
@@ -125,6 +125,16 @@ a Raspi-arm64, Raspi-armhf or PC-amd64 target image. * First stable release (TODO) +**0.9.0** + +* Add getMonotonicTime() & getWallClockTime(), returning fraction_timespec +* Add sleep_for(), sleep_until() and wait_for() & wait_until() using fraction, fraction_timespec, as well as choice over clock type. +* Introduce new types: fraction, fraction_timespec; its constants & literals as well adoption in latch, ringbuffer, service_runner and simple_timer. +* int_math.hpp: Add Integer overflow aware arithmetic, use Integer Overflow Builtins if available (GCC + Clang) +* Refine sign(), invert_sign(), abs() and digits10() template funcs: Better type and signed/unsigned variant for invert_sign() and abs() +* Add stdint literals in namespace `jau::int_literals`, e.g. `3_i64` for `(int64_t)3` for all stdint signed and unsigned 8-64 bit wide types +* Always add libatomic (will be required for new fraction) + **0.8.6** * Enhance `service_runner` and fix `simple_timer`, i.e. end waiting if stopped diff --git a/include/jau/latch.hpp b/include/jau/latch.hpp index 93b9a49..d2a6fae 100644 --- a/include/jau/latch.hpp +++ b/include/jau/latch.hpp @@ -30,6 +30,8 @@ #include <condition_variable> #include <jau/ordered_atomic.hpp> +#include <jau/fraction_type.hpp> +#include <jau/basic_types.hpp> namespace jau { @@ -147,22 +149,19 @@ namespace jau { * * If the internal counter is zero already, returns immediately. * - * Implementation uses `std::chrono::steady_clock::now()`. + * Implementation uses wait_for() w/ a monotonic clock and fraction_i64. * * Extension of std::latch. * - * @tparam Rep - * @tparam Period - * @param timeout_duration maximum time duration to spend waiting + * @param timeout_duration maximum duration in fractions of seconds to wait * @return true if internal counter has reached zero, otherwise a timeout has occurred. */ - template<typename Rep, typename Period> - bool wait_for(const std::chrono::duration<Rep, Period>& timeout_duration) const noexcept { + bool wait_for(const fraction_i64& timeout_duration) const noexcept { if( 0 < count ) { - std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now() + timeout_duration; std::unique_lock<std::mutex> lock(mtx_cd); + const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout_duration); while( 0 < count ) { - std::cv_status s = cv.wait_until(lock, t0); + std::cv_status s = wait_until(cv, lock, timeout_time); if( 0 == count ) { return true; } @@ -175,26 +174,28 @@ namespace jau { } /** - * Blocks the calling thread until the internal counter reaches 0 or the given timeout duration has expired. + * Atomically decrements the internal counter by n and (if necessary) blocks the calling thread until the counter reaches zero + * or the given timeout duration has expired. * - * If the internal counter is zero already, returns immediately. + * Equivalent to `count_down(n); wait(timeout_duration);`. * - * Implementation uses `std::chrono::steady_clock::now()`. + * Implementation uses `std::chrono::steady_clock::now()` and fraction_i64. * * Extension of std::latch. * - * @param timeout_ms maximum time duration to spend waiting in milliseconds + * @param timeout_duration maximum duration in fractions of seconds to wait + * @param n the value by which the internal counter is decreased, defaults to 1 * @return true if internal counter has reached zero, otherwise a timeout has occurred. */ - bool wait_for(const size_t timeout_ms) const noexcept { - return wait_for(std::chrono::milliseconds(timeout_ms)); + bool arrive_and_wait_for(const fraction_i64& timeout_duration, const size_t n = 1) noexcept { + count_down(n); + return wait_for(timeout_duration); } /** - * Atomically decrements the internal counter by n and (if necessary) blocks the calling thread until the counter reaches zero - * or the given timeout duration has expired. + * Blocks the calling thread until the internal counter reaches 0 or the given timeout duration has expired. * - * Equivalent to `count_down(n); wait(timeout_duration);`. + * If the internal counter is zero already, returns immediately. * * Implementation uses `std::chrono::steady_clock::now()`. * @@ -202,14 +203,25 @@ namespace jau { * * @tparam Rep * @tparam Period - * @param timeout_duration maximum time duration to spend waiting - * @param n the value by which the internal counter is decreased, defaults to 1 + * @param timeout_duration maximum duration to wait * @return true if internal counter has reached zero, otherwise a timeout has occurred. */ template<typename Rep, typename Period> - bool arrive_and_wait_for(const std::chrono::duration<Rep, Period>& timeout_duration, const size_t n = 1) noexcept { - count_down(n); - return wait_for(timeout_duration); + bool wait_for(const std::chrono::duration<Rep, Period>& timeout_duration) const noexcept { + if( 0 < count ) { + std::unique_lock<std::mutex> lock(mtx_cd); + std::chrono::steady_clock::time_point timeout_time = std::chrono::steady_clock::now() + timeout_duration; + while( 0 < count ) { + std::cv_status s = cv.wait_until(lock, timeout_time ); + if( 0 == count ) { + return true; + } + if( std::cv_status::timeout == s ) { + return false; + } + } + } + return true; } /** @@ -222,12 +234,16 @@ namespace jau { * * Extension of std::latch. * - * @param timeout_ms maximum time duration to spend waiting in milliseconds + * @tparam Rep + * @tparam Period + * @param timeout_duration maximum duration to wait * @param n the value by which the internal counter is decreased, defaults to 1 * @return true if internal counter has reached zero, otherwise a timeout has occurred. */ - bool arrive_and_wait_for(const size_t timeout_ms, const size_t n = 1) noexcept { - return arrive_and_wait_for(std::chrono::milliseconds(timeout_ms), n); + template<typename Rep, typename Period> + bool arrive_and_wait_for(const std::chrono::duration<Rep, Period>& timeout_duration, const size_t n = 1) noexcept { + count_down(n); + return wait_for(timeout_duration); } }; diff --git a/include/jau/ringbuffer.hpp b/include/jau/ringbuffer.hpp index 47397eb..7672ece 100644 --- a/include/jau/ringbuffer.hpp +++ b/include/jau/ringbuffer.hpp @@ -41,6 +41,7 @@ #include <jau/debug.hpp> #include <jau/basic_types.hpp> #include <jau/ordered_atomic.hpp> +#include <jau/fraction_type.hpp> #include <jau/callocator.hpp> namespace jau { @@ -248,18 +249,18 @@ class ringbuffer { } } - Size_type waitForElementsImpl(const Size_type min_count, const int timeoutMS) noexcept { + Size_type waitForElementsImpl(const Size_type min_count, const fraction_i64& timeout) noexcept { Size_type available = size(); if( min_count > available ) { std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock available = size(); + const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout); while( min_count > available ) { - if( 0 == timeoutMS ) { + if( fractions_i64::zero == timeout ) { cvWrite.wait(lockWrite); available = size(); } else { - std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now(); - std::cv_status s = cvWrite.wait_until(lockWrite, t0 + std::chrono::milliseconds(timeoutMS)); + std::cv_status s = wait_until(cvWrite, lockWrite, timeout_time ); available = size(); if( std::cv_status::timeout == s && min_count > available ) { return available; @@ -270,18 +271,18 @@ class ringbuffer { return available; } - Size_type waitForFreeSlotsImpl(const Size_type min_count, const int timeoutMS) noexcept { + Size_type waitForFreeSlotsImpl(const Size_type min_count, const fraction_i64& timeout) noexcept { Size_type available = freeSlots(); if( min_count > available ) { std::unique_lock<std::mutex> lockRead(syncRead); // SC-DRF w/ getImpl via same lock available = freeSlots(); + const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout); while( min_count > available ) { - if( 0 == timeoutMS ) { + if( fractions_i64::zero == timeout ) { cvRead.wait(lockRead); available = freeSlots(); } else { - std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now(); - std::cv_status s = cvRead.wait_until(lockRead, t0 + std::chrono::milliseconds(timeoutMS)); + std::cv_status s = wait_until(cvRead, lockRead, timeout_time ); available = freeSlots(); if( std::cv_status::timeout == s && min_count > available ) { return available; @@ -403,7 +404,7 @@ class ringbuffer { } } - bool peekImpl(Value_type& dest, const bool blocking, const int timeoutMS) noexcept { + bool peekImpl(Value_type& dest, const bool blocking, const fraction_i64& timeout) noexcept { if( !std::is_copy_constructible_v<Value_type> ) { ABORT("Value_type is not copy constructible"); return false; @@ -413,12 +414,12 @@ class ringbuffer { if( localReadPos == writePos ) { if( blocking ) { std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock + const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout); while( localReadPos == writePos ) { - if( 0 == timeoutMS ) { + if( fractions_i64::zero == timeout ) { cvWrite.wait(lockWrite); } else { - std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now(); - std::cv_status s = cvWrite.wait_until(lockWrite, t0 + std::chrono::milliseconds(timeoutMS)); + std::cv_status s = wait_until(cvWrite, lockWrite, timeout_time ); if( std::cv_status::timeout == s && localReadPos == writePos ) { return false; } @@ -441,18 +442,18 @@ class ringbuffer { return true; } - bool moveOutImpl(Value_type& dest, const bool blocking, const int timeoutMS) noexcept { + bool moveOutImpl(Value_type& dest, const bool blocking, const fraction_i64& timeout) noexcept { const Size_type oldReadPos = readPos; // SC-DRF acquire atomic readPos, sync'ing with putImpl Size_type localReadPos = oldReadPos; if( localReadPos == writePos ) { if( blocking ) { std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock + const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout); while( localReadPos == writePos ) { - if( 0 == timeoutMS ) { + if( fractions_i64::zero == timeout ) { cvWrite.wait(lockWrite); } else { - std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now(); - std::cv_status s = cvWrite.wait_until(lockWrite, t0 + std::chrono::milliseconds(timeoutMS)); + std::cv_status s = wait_until(cvWrite, lockWrite, timeout_time ); if( std::cv_status::timeout == s && localReadPos == writePos ) { return false; } @@ -488,7 +489,7 @@ class ringbuffer { return true; } - Size_type moveOutImpl(Value_type *dest, const Size_type dest_len, const Size_type min_count_, const bool blocking, const int timeoutMS) noexcept { + Size_type moveOutImpl(Value_type *dest, const Size_type dest_len, const Size_type min_count_, const bool blocking, const fraction_i64& timeout) noexcept { const Size_type min_count = std::min(dest_len, min_count_); Value_type *iter_out = dest; @@ -506,13 +507,13 @@ class ringbuffer { if( blocking ) { std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock available = size(); + const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout); while( min_count > available ) { - if( 0 == timeoutMS ) { + if( fractions_i64::zero == timeout ) { cvWrite.wait(lockWrite); available = size(); } else { - std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now(); - std::cv_status s = cvWrite.wait_until(lockWrite, t0 + std::chrono::milliseconds(timeoutMS)); + std::cv_status s = wait_until(cvWrite, lockWrite, timeout_time ); available = size(); if( std::cv_status::timeout == s && min_count > available ) { return 0; @@ -583,7 +584,7 @@ class ringbuffer { return count; } - Size_type dropImpl (Size_type count, const bool blocking, const int timeoutMS) noexcept { + Size_type dropImpl (Size_type count, const bool blocking, const fraction_i64& timeout) noexcept { if( count >= capacityPlusOne ) { if( blocking ) { return 0; @@ -601,13 +602,13 @@ class ringbuffer { if( blocking ) { std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock available = size(); + const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout); while( count > available ) { - if( 0 == timeoutMS ) { + if( fractions_i64::zero == timeout ) { cvWrite.wait(lockWrite); available = size(); } else { - std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now(); - std::cv_status s = cvWrite.wait_until(lockWrite, t0 + std::chrono::milliseconds(timeoutMS)); + std::cv_status s = wait_until(cvWrite, lockWrite, timeout_time ); available = size(); if( std::cv_status::timeout == s && count > available ) { return 0; @@ -665,18 +666,18 @@ class ringbuffer { return count; } - bool moveIntoImpl(Value_type &&e, const bool blocking, const int timeoutMS) noexcept { + bool moveIntoImpl(Value_type &&e, const bool blocking, const fraction_i64& timeout) noexcept { Size_type localWritePos = writePos; // SC-DRF acquire atomic writePos, sync'ing with getImpl localWritePos = (localWritePos + 1) % capacityPlusOne; if( localWritePos == readPos ) { if( blocking ) { std::unique_lock<std::mutex> lockRead(syncRead); // SC-DRF w/ getImpl via same lock + const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout); while( localWritePos == readPos ) { - if( 0 == timeoutMS ) { + if( fractions_i64::zero == timeout ) { cvRead.wait(lockRead); } else { - std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now(); - std::cv_status s = cvRead.wait_until(lockRead, t0 + std::chrono::milliseconds(timeoutMS)); + std::cv_status s = wait_until(cvRead, lockRead, timeout_time ); if( std::cv_status::timeout == s && localWritePos == readPos ) { return false; } @@ -703,7 +704,7 @@ class ringbuffer { return true; } - bool copyIntoImpl(const Value_type &e, const bool blocking, const int timeoutMS) noexcept { + bool copyIntoImpl(const Value_type &e, const bool blocking, const fraction_i64& timeout) noexcept { if( !std::is_copy_constructible_v<Value_type> ) { ABORT("Value_type is not copy constructible"); return false; @@ -713,12 +714,12 @@ class ringbuffer { if( localWritePos == readPos ) { if( blocking ) { std::unique_lock<std::mutex> lockRead(syncRead); // SC-DRF w/ getImpl via same lock + const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout); while( localWritePos == readPos ) { - if( 0 == timeoutMS ) { + if( fractions_i64::zero == timeout ) { cvRead.wait(lockRead); } else { - std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now(); - std::cv_status s = cvRead.wait_until(lockRead, t0 + std::chrono::milliseconds(timeoutMS)); + std::cv_status s = wait_until(cvRead, lockRead, timeout_time ); if( std::cv_status::timeout == s && localWritePos == readPos ) { return false; } @@ -745,7 +746,7 @@ class ringbuffer { return true; } - bool copyIntoImpl(const Value_type *first, const Value_type* last, const bool blocking, const int timeoutMS) noexcept { + bool copyIntoImpl(const Value_type *first, const Value_type* last, const bool blocking, const fraction_i64& timeout) noexcept { if( !std::is_copy_constructible_v<Value_type> ) { ABORT("Value_type is not copy constructible"); return false; @@ -766,13 +767,13 @@ class ringbuffer { if( blocking ) { std::unique_lock<std::mutex> lockRead(syncRead); // SC-DRF w/ getImpl via same lock available = freeSlots(); + const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout); while( total_count > available ) { - if( 0 == timeoutMS ) { + if( fractions_i64::zero == timeout ) { cvRead.wait(lockRead); available = freeSlots(); } else { - std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now(); - std::cv_status s = cvRead.wait_until(lockRead, t0 + std::chrono::milliseconds(timeoutMS)); + std::cv_status s = wait_until(cvRead, lockRead, timeout_time ); available = freeSlots(); if( std::cv_status::timeout == s && total_count > available ) { return false; @@ -1074,15 +1075,15 @@ class ringbuffer { * for subsequent get() and getBlocking(). * * @param min_count minimum number of put slots - * @param timeoutMS + * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely * @return the number of put elements, available for get() and getBlocking() */ - Size_type waitForElements(const Size_type min_count, const int timeoutMS) noexcept { + Size_type waitForElements(const Size_type min_count, const fraction_i64& timeout) noexcept { if( multi_pc_enabled ) { std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl - return waitForElementsImpl(min_count, timeoutMS); + return waitForElementsImpl(min_count, timeout); } else { - return waitForElementsImpl(min_count, timeoutMS); + return waitForElementsImpl(min_count, timeout); } } @@ -1091,15 +1092,15 @@ class ringbuffer { * for subsequent put() and putBlocking(). * * @param min_count minimum number of free slots - * @param timeoutMS + * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely * @return the number of free slots, available for put() and putBlocking() */ - Size_type waitForFreeSlots(const Size_type min_count, const int timeoutMS) noexcept { + Size_type waitForFreeSlots(const Size_type min_count, const fraction_i64& timeout) noexcept { if( multi_pc_enabled ) { std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiWrite, _not_ sync'ing w/ getImpl - return waitForFreeSlotsImpl(min_count, timeoutMS); + return waitForFreeSlotsImpl(min_count, timeout); } else { - return waitForFreeSlotsImpl(min_count, timeoutMS); + return waitForFreeSlotsImpl(min_count, timeout); } } @@ -1118,9 +1119,9 @@ class ringbuffer { std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor std::lock(lockMultiRead, lockMultiWrite); - dropImpl (capacityPlusOne-1, false /* blocking */, 0 /* timeoutMS */); + dropImpl (capacityPlusOne-1, false /* blocking */, fractions_i64::zero); } else { - dropImpl (capacityPlusOne-1, false /* blocking */, 0 /* timeoutMS */); + dropImpl (capacityPlusOne-1, false /* blocking */, fractions_i64::zero); } } @@ -1229,28 +1230,25 @@ class ringbuffer { bool peek(Value_type& result) noexcept { if( multi_pc_enabled ) { std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl - return peekImpl(result, false, 0); + return peekImpl(result, false, fractions_i64::zero); } else { - return peekImpl(result, false, 0); + return peekImpl(result, false, fractions_i64::zero); } } /** * Peeks the next element at the read position w/o modifying pointer, but with blocking. * - * `timeoutMS` defaults to zero, - * i.e. infinitive blocking until an element available via put.<br> - * Otherwise this methods blocks for the given milliseconds. - * * @param result storage for the resulting value if successful, otherwise unchanged. + * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely * @return true if successful, otherwise false. */ - bool peekBlocking(Value_type& result, const int timeoutMS=0) noexcept { + bool peekBlocking(Value_type& result, const fraction_i64& timeout) noexcept { if( multi_pc_enabled ) { std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl - return peekImpl(result, true, timeoutMS); + return peekImpl(result, true, timeout); } else { - return peekImpl(result, true, timeoutMS); + return peekImpl(result, true, timeout); } } @@ -1267,9 +1265,9 @@ class ringbuffer { bool get(Value_type& result) noexcept { if( multi_pc_enabled ) { std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl - return moveOutImpl(result, false, 0); + return moveOutImpl(result, false, fractions_i64::zero); } else { - return moveOutImpl(result, false, 0); + return moveOutImpl(result, false, fractions_i64::zero); } } @@ -1278,19 +1276,16 @@ class ringbuffer { * * The ring buffer slot will be released and its value moved to the caller's `result` storage, if successful. * - * `timeoutMS` defaults to zero, - * i.e. infinitive blocking until an element available via put.<br> - * Otherwise this methods blocks for the given milliseconds. - * * @param result storage for the resulting value if successful, otherwise unchanged. + * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely * @return true if successful, otherwise false. */ - bool getBlocking(Value_type& result, const int timeoutMS=0) noexcept { + bool getBlocking(Value_type& result, const fraction_i64& timeout) noexcept { if( multi_pc_enabled ) { std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl - return moveOutImpl(result, true, timeoutMS); + return moveOutImpl(result, true, timeout); } else { - return moveOutImpl(result, true, timeoutMS); + return moveOutImpl(result, true, timeout); } } @@ -1309,9 +1304,9 @@ class ringbuffer { Size_type get(Value_type *dest, const Size_type dest_len, const Size_type min_count) noexcept { if( multi_pc_enabled ) { std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl - return moveOutImpl(dest, dest_len, min_count, false, 0); + return moveOutImpl(dest, dest_len, min_count, false, fractions_i64::zero); } else { - return moveOutImpl(dest, dest_len, min_count, false, 0); + return moveOutImpl(dest, dest_len, min_count, false, fractions_i64::zero); } } @@ -1320,22 +1315,18 @@ class ringbuffer { * * The ring buffer slots will be released and its value moved to the caller's `dest` storage, if successful. * - * `timeoutMS` defaults to zero, - * i.e. infinitive blocking until an element available via put.<br> - * Otherwise this methods blocks for the given milliseconds. - * * @param dest pointer to first storage element of `dest_len` consecutive elements to store the values, if successful. * @param dest_len number of consecutive elements in `dest`, hence maximum number of elements to return. * @param min_count minimum number of consecutive elements to return - * @param timeoutMS + * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely * @return actual number of elements returned */ - Size_type getBlocking(Value_type *dest, const Size_type dest_len, const Size_type min_count, const int timeoutMS=0) noexcept { + Size_type getBlocking(Value_type *dest, const Size_type dest_len, const Size_type min_count, const fraction_i64& timeout) noexcept { if( multi_pc_enabled ) { std::unique_lock<std::mutex> lockMultiRead(syncMultiRead); // acquire syncMultiRead, _not_ sync'ing w/ putImpl - return moveOutImpl(dest, dest_len, min_count, true, timeoutMS); + return moveOutImpl(dest, dest_len, min_count, true, timeout); } else { - return moveOutImpl(dest, dest_len, min_count, true, timeoutMS); + return moveOutImpl(dest, dest_len, min_count, true, timeout); } } @@ -1353,9 +1344,9 @@ class ringbuffer { std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor std::lock(lockMultiRead, lockMultiWrite); - return dropImpl(max_count, false, 0); + return dropImpl(max_count, false, fractions_i64::zero); } else { - return dropImpl(max_count, false, 0); + return dropImpl(max_count, false, fractions_i64::zero); } } @@ -1363,24 +1354,21 @@ class ringbuffer { * Drops exactly `count` oldest enqueued elements, * will block until they become available. * - * `timeoutMS` defaults to zero, - * i.e. infinitive blocking until an element available via put.<br> - * Otherwise this methods blocks for the given milliseconds. - * * In `count` elements are not available to drop even after * blocking for `timeoutMS`, no element will be dropped. * * @param count number of elements to drop from ringbuffer. + * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely * @return true if successful, otherwise false */ - bool dropBlocking(const Size_type count, const int timeoutMS=0) noexcept { + bool dropBlocking(const Size_type count, const fraction_i64& timeout) noexcept { if( multi_pc_enabled ) { std::unique_lock<std::mutex> lockMultiRead(syncMultiRead, std::defer_lock); // utilize std::lock(r, w), allowing mixed order waiting on read/write ops std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite, std::defer_lock); // otherwise RAII-style relinquish via destructor std::lock(lockMultiRead, lockMultiWrite); - return 0 != dropImpl(count, true, timeoutMS); + return 0 != dropImpl(count, true, timeout); } else { - return 0 != dropImpl(count, true, timeoutMS); + return 0 != dropImpl(count, true, timeout); } } @@ -1396,27 +1384,25 @@ class ringbuffer { bool put(Value_type && e) noexcept { if( multi_pc_enabled ) { std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiWrite, _not_ sync'ing w/ getImpl - return moveIntoImpl(std::move(e), false, 0); + return moveIntoImpl(std::move(e), false, fractions_i64::zero); } else { - return moveIntoImpl(std::move(e), false, 0); + return moveIntoImpl(std::move(e), false, fractions_i64::zero); } } /** * Enqueues the given element by moving it into this ringbuffer storage. * - * `timeoutMS` defaults to zero, - * i.e. infinitive blocking until a free slot becomes available via get.<br> - * Otherwise this methods blocks for the given milliseconds. - * + * @param e + * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely * @return true if successful, otherwise false in case timeout occurred or otherwise. */ - bool putBlocking(Value_type && e, const int timeoutMS=0) noexcept { + bool putBlocking(Value_type && e, const fraction_i64& timeout) noexcept { if( multi_pc_enabled ) { std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiWrite, _not_ sync'ing w/ getImpl - return moveIntoImpl(std::move(e), true, timeoutMS); + return moveIntoImpl(std::move(e), true, timeout); } else { - return moveIntoImpl(std::move(e), true, timeoutMS); + return moveIntoImpl(std::move(e), true, timeout); } } @@ -1432,27 +1418,24 @@ class ringbuffer { bool put(const Value_type & e) noexcept { if( multi_pc_enabled ) { std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiWrite, _not_ sync'ing w/ getImpl - return copyIntoImpl(e, false, 0); + return copyIntoImpl(e, false, fractions_i64::zero); } else { - return copyIntoImpl(e, false, 0); + return copyIntoImpl(e, false, fractions_i64::zero); } } /** * Enqueues the given element by copying it into this ringbuffer storage. * - * `timeoutMS` defaults to zero, - * i.e. infinitive blocking until a free slot becomes available via get.<br> - * Otherwise this methods blocks for the given milliseconds. - * + * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely * @return true if successful, otherwise false in case timeout occurred or otherwise. */ - bool putBlocking(const Value_type & e, const int timeoutMS=0) noexcept { + bool putBlocking(const Value_type & e, const fraction_i64& timeout) noexcept { if( multi_pc_enabled ) { std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiWrite, _not_ sync'ing w/ getImpl - return copyIntoImpl(e, true, timeoutMS); + return copyIntoImpl(e, true, timeout); } else { - return copyIntoImpl(e, true, timeoutMS); + return copyIntoImpl(e, true, timeout); } } @@ -1470,30 +1453,26 @@ class ringbuffer { bool put(const Value_type *first, const Value_type* last) noexcept { if( multi_pc_enabled ) { std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiWrite, _not_ sync'ing w/ getImpl - return copyIntoImpl(first, last, false, 0); + return copyIntoImpl(first, last, false, fractions_i64::zero); } else { - return copyIntoImpl(first, last, false, 0); + return copyIntoImpl(first, last, false, fractions_i64::zero); } } /** * Enqueues the given range of consecutive elementa by copying it into this ringbuffer storage. * - * `timeoutMS` defaults to zero, - * i.e. infinitive blocking until a free slot becomes available via get.<br> - * Otherwise this methods blocks for the given milliseconds. - * * @param first pointer to first consecutive element to range of value_type [first, last) * @param last pointer to last consecutive element to range of value_type [first, last) - * @param timeoutMS + * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely * @return true if successful, otherwise false in case timeout occurred or otherwise. */ - bool putBlocking(const Value_type *first, const Value_type* last, const int timeoutMS=0) noexcept { + bool putBlocking(const Value_type *first, const Value_type* last, const fraction_i64& timeout) noexcept { if( multi_pc_enabled ) { std::unique_lock<std::mutex> lockMultiWrite(syncMultiWrite); // acquire syncMultiWrite, _not_ sync'ing w/ getImpl - return copyIntoImpl(first, last, true, timeoutMS); + return copyIntoImpl(first, last, true, timeout); } else { - return copyIntoImpl(first, last, true, timeoutMS); + return copyIntoImpl(first, last, true, timeout); } } }; diff --git a/include/jau/service_runner.hpp b/include/jau/service_runner.hpp index a55378d..3c0b858 100644 --- a/include/jau/service_runner.hpp +++ b/include/jau/service_runner.hpp @@ -37,8 +37,8 @@ #include <algorithm> #include <jau/cpp_lang_util.hpp> -#include <jau/basic_types.hpp> #include <jau/ordered_atomic.hpp> +#include <jau/fraction_type.hpp> #include <jau/function_def.hpp> namespace jau { @@ -57,9 +57,9 @@ namespace jau { std::string name_; /** - * Maximum time in milliseconds to wait for a thread shutdown or zero to wait infinitely. + * Maximum duration in fractions of seconds to wait for service to stop at stop() and join(), where fractions_i64::zero waits infinitely */ - nsize_t service_shutdown_timeout_ms_; + fraction_i64 service_shutdown_timeout_; Callback service_work; Callback service_init_locked; @@ -110,13 +110,13 @@ namespace jau { * start() shall be issued to kick off this service. * * @param name service name - * @param service_shutdown_timeout_ms maximum time in milliseconds to wait for stop() and join(), where zero waits infinitely + * @param service_shutdown_timeout maximum duration in fractions of seconds to wait for service to stop at stop() and join(), where fractions_i64::zero waits infinitely * @param service_work service working function * @param service_init_locked optional service init function, lifecycle mutex is locked * @param service_end_locked optional service end function, lifecycle mutex is locked */ service_runner(const std::string& name, - const nsize_t service_shutdown_timeout_ms, + const fraction_i64& service_shutdown_timeout, Callback service_work, Callback service_init_locked = Callback(), Callback service_end_locked = Callback()) noexcept; @@ -134,11 +134,11 @@ namespace jau { const std::string& name() const noexcept { return name_; } /** - * Returns maximum time in milliseconds to wait for stop() and join(), where zero waits infinitely. + * Returns maximum duration in fractions of seconds to wait for service to stop at stop() and join(), where fractions_i64::zero waits infinitely * @see stop() * @see join() */ - nsize_t service_shutdown_timeout_ms() const noexcept { return service_shutdown_timeout_ms_; } + fraction_i64 service_shutdown_timeout() const noexcept { return service_shutdown_timeout_; } /** * Return the thread-id of this service service thread, zero if not running. @@ -211,7 +211,7 @@ namespace jau { * @see cv_shall_stop() * @see stop() * @see join() - * @see service_shutdown_timeout_ms() + * @see service_shutdown_timeout() */ void start() noexcept; @@ -221,7 +221,7 @@ namespace jau { * If called from the service thread, method just issues set_shall_stop() without blocking, * otherwise methods blocks the current thread until service is stopped. * - * Maximum blocked wait period is optionally limited by service_shutdown_timeout_ms(). + * Maximum blocked wait period is optionally limited by service_shutdown_timeout(). * * Method attempts to stop the service thread * - by flagging `shall stop` via set_shall_stop() @@ -241,7 +241,7 @@ namespace jau { * @see cv_shall_stop() * @see start() * @see join() - * @see service_shutdown_timeout_ms() + * @see service_shutdown_timeout() * @see singleton_sighandler() */ bool stop() noexcept; @@ -250,7 +250,7 @@ namespace jau { * Blocks the current thread until service is stopped * or returns immediately if not running or called from the service thread. * - * Maximum blocked wait period is optionally limited by service_shutdown_timeout_ms(). + * Maximum blocked wait period is optionally limited by service_shutdown_timeout(). * * @returns true if thread has been stopped or false if timeout has been hit * @see is_running() @@ -260,7 +260,7 @@ namespace jau { * @see cv_shall_stop() * @see start() * @see stop() - * @see service_shutdown_timeout_ms() + * @see service_shutdown_timeout() */ bool join() noexcept; diff --git a/include/jau/simple_timer.hpp b/include/jau/simple_timer.hpp index ad14625..ed561b2 100644 --- a/include/jau/simple_timer.hpp +++ b/include/jau/simple_timer.hpp @@ -31,6 +31,7 @@ #include <thread> #include <jau/ordered_atomic.hpp> +#include <jau/fraction_type.hpp> #include <jau/function_def.hpp> #include <jau/service_runner.hpp> @@ -48,25 +49,31 @@ namespace jau { typedef simple_timer& Timer0_ref; /** - * User defined timer function using millisecond granularity. + * User defined timer function using custom granularity via fraction_i64. * * Function gets invoked for each timer event, - * i.e. after reaching the duration in milliseconds set earlier. + * i.e. after reaching the duration set earlier. * - * @return duration in milliseconds for the next timer event or zero to end the timer thread. + * @return duration in fractions of seconds for the next timer event or zero to end the timer thread. */ - typedef FunctionDef<nsize_t, Timer0_ref> Timer_func_ms; + typedef FunctionDef<fraction_i64, Timer0_ref> Timer_func; private: service_runner timer_service; std::mutex mtx_timerfunc; - Timer_func_ms timer_func_ms; - jau::relaxed_atomic_nsize_t duration_ms; + Timer_func timer_func; + // Note: Requires libatomic with libstdc++10 + sc_atomic_fraction_i64 duration; void timer_work(service_runner& sr_ref); public: - simple_timer(const std::string& name, const nsize_t service_shutdown_timeout_ms) noexcept; + /** + * Constructs a new service + * @param name thread name of this service + * @param service_shutdown_timeout maximum duration in fractions of seconds to wait for service to stop at stop(), where fractions_i64::zero waits infinitely + */ + simple_timer(const std::string& name, const fraction_i64& service_shutdown_timeout) noexcept; /** * No copy constructor nor move constructor. @@ -92,29 +99,29 @@ namespace jau { /** * Returns true if timer shall stop. * - * This flag can be used by the Timer_func_ms function to determine whether to skip lengthly tasks. + * This flag can be used by the Timer_func function to determine whether to skip lengthly tasks. */ bool shall_stop() const noexcept { return timer_service.shall_stop(); } /** - * Start the timer with given user Timer_func_ms function and initial duration in milliseconds. + * Start the timer with given user Timer_func function and initial duration. * - * @param duration_ms_ initial timer duration until next timer event in milliseconds - * @param tofunc user Timer_func_ms to be called on next timer event + * @param duration_ initial timer duration in fractions of seconds until next timer event + * @param tofunc user Timer_func to be called on next timer event * @return true if timer has been started, otherwise false implies timer is already running. */ - bool start(nsize_t duration_ms_, Timer_func_ms tofunc) noexcept; + bool start(const fraction_i64& duration_, Timer_func tofunc) noexcept; /** - * Start or update the timer with given user Timer_func_ms function and initial duration in milliseconds. + * Start or update the timer with given user Timer_func function and initial duration. * * This is faster than calling stop() and start(), however, - * an already started timer user Timer_func_ms will proceed. + * an already started timer user Timer_func will proceed. * - * @param duration_ms_ initial timer duration until next timer event in milliseconds - * @param tofunc user Timer_func_ms to be called on next timer event + * @param duration_ initial timer duration in fractions of seconds until next timer event + * @param tofunc user Timer_func to be called on next timer event */ - void start_or_update(nsize_t duration_ms_, Timer_func_ms tofunc) noexcept; + void start_or_update(const fraction_i64& duration_, Timer_func tofunc) noexcept; /** * Stop timer, see service_runner::stop() diff --git a/src/service_runner.cpp b/src/service_runner.cpp index ba5942a..1a01d77 100644 --- a/src/service_runner.cpp +++ b/src/service_runner.cpp @@ -114,12 +114,12 @@ bool service_runner::remove_sighandler() noexcept { } service_runner::service_runner(const std::string& name__, - nsize_t service_shutdown_timeout_ms__, + const fraction_i64& service_shutdown_timeout, Callback service_work_, Callback service_init_locked_, Callback service_end_locked_) noexcept : name_(name__), - service_shutdown_timeout_ms_(service_shutdown_timeout_ms__), + service_shutdown_timeout_(service_shutdown_timeout), service_work(service_work_), service_init_locked(service_init_locked_), service_end_locked(service_end_locked_), @@ -192,11 +192,11 @@ bool service_runner::stop() noexcept { } // Ensure the reader thread has ended, no runaway-thread using *this instance after destruction result = true; + const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(service_shutdown_timeout_); while( true == running && result ) { std::cv_status s { std::cv_status::no_timeout }; - if( 0 < service_shutdown_timeout_ms_ ) { - std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now(); - s = cv_init.wait_until(lock, t0 + std::chrono::milliseconds(service_shutdown_timeout_ms_)); + if( fractions_i64::zero < service_shutdown_timeout_ ) { + s = wait_until(cv_init, lock, timeout_time ); } else { cv_init.wait(lock); } @@ -218,7 +218,7 @@ bool service_runner::stop() noexcept { bool service_runner::join() noexcept { DBG_PRINT("%s::join: Begin: %s", name_.c_str(), toString().c_str()); - std::unique_lock<std::mutex> lockReader(mtx_lifecycle); // RAII-style acquire and relinquish via destructor + std::unique_lock<std::mutex> lock(mtx_lifecycle); // RAII-style acquire and relinquish via destructor const bool is_service = thread_id_ == pthread_self(); DBG_PRINT("%s::join: is_service %d, %s", name_.c_str(), is_service, toString().c_str()); @@ -227,13 +227,13 @@ bool service_runner::join() noexcept { if( !is_service ) { // Ensure the reader thread has ended, no runaway-thread using *this instance after destruction result = true; + const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(service_shutdown_timeout_); while( true == running && result ) { std::cv_status s { std::cv_status::no_timeout }; - if( 0 < service_shutdown_timeout_ms_ ) { - std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now(); - s = cv_init.wait_until(lockReader, t0 + std::chrono::milliseconds(service_shutdown_timeout_ms_)); + if( fractions_i64::zero < service_shutdown_timeout_ ) { + s = wait_until(cv_init, lock, timeout_time ); } else { - cv_init.wait(lockReader); + cv_init.wait(lock); } if( std::cv_status::timeout == s && true == running ) { ERR_PRINT("%s::join: Timeout (force !running): %s", name_.c_str(), toString().c_str()); diff --git a/src/simple_timer.cpp b/src/simple_timer.cpp index 710f5fe..703b3b6 100644 --- a/src/simple_timer.cpp +++ b/src/simple_timer.cpp @@ -30,58 +30,57 @@ void simple_timer::timer_work(service_runner& sr_ref) { if( !sr_ref.shall_stop() ) { // non-blocking sleep in regards to stop() std::unique_lock<std::mutex> lock(sr_ref.mtx_shall_stop()); // RAII-style acquire and relinquish via destructor - jau::nsize_t sleep_left_ms = duration_ms; - while( !sr_ref.shall_stop() && 0 < sleep_left_ms ) { - const std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now(); - const std::cv_status s = sr_ref.cv_shall_stop().wait_until(lock, t0 + std::chrono::milliseconds(sleep_left_ms)); - const jau::nsize_t slept = std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::steady_clock::now() - t0 ).count(); - sleep_left_ms = sleep_left_ms >= slept ? sleep_left_ms - slept : 0; + bool overflow = false; + const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(duration, &overflow); + if( overflow ) { + sr_ref.set_shall_stop(); // bail out + } + std::cv_status s { std::cv_status::no_timeout }; + while( !sr_ref.shall_stop() && std::cv_status::timeout != s ) { + s = wait_until( sr_ref.cv_shall_stop(), lock, timeout_time ); if( std::cv_status::timeout == s && !sr_ref.shall_stop() ) { - // Made it through whole period w/o being stopped nor spurious wakeups - // This branch is only for documentation purposes, as shall_stop is being tested - sleep_left_ms = 0; - duration_ms = 0; + duration = fractions_i64::zero; } } } - Timer_func_ms tf; + Timer_func tf; { std::unique_lock<std::mutex> lockReader(mtx_timerfunc); // RAII-style acquire and relinquish via destructor - tf = timer_func_ms; + tf = timer_func; } if( !tf.isNullType() && !sr_ref.shall_stop() ) { - duration_ms = tf(*this); + duration = tf(*this); } else { - duration_ms = 0; + duration = fractions_i64::zero; } - if( 0 == duration_ms ) { + if( fractions_i64::zero == duration.load() ) { sr_ref.set_shall_stop(); } } -simple_timer::simple_timer(const std::string& name, const nsize_t service_shutdown_timeout_ms) noexcept -: timer_service(name, service_shutdown_timeout_ms, jau::bindMemberFunc(this, &simple_timer::timer_work)), - timer_func_ms(), duration_ms(0) +simple_timer::simple_timer(const std::string& name, const fraction_i64& service_shutdown_timeout) noexcept +: timer_service(name, service_shutdown_timeout, jau::bindMemberFunc(this, &simple_timer::timer_work)), + timer_func(), duration() {} -bool simple_timer::start(nsize_t duration_ms_, Timer_func_ms tofunc) noexcept { +bool simple_timer::start(const fraction_i64& duration_, Timer_func tofunc) noexcept { if( is_running() ) { return false; } - timer_func_ms = tofunc; - duration_ms = duration_ms_; + timer_func = tofunc; + duration = duration_; timer_service.start(); return true; } -void simple_timer::start_or_update(nsize_t duration_ms_, Timer_func_ms tofunc) noexcept { +void simple_timer::start_or_update(const fraction_i64& duration_, Timer_func tofunc) noexcept { if( is_running() ) { std::unique_lock<std::mutex> lockReader(mtx_timerfunc); // RAII-style acquire and relinquish via destructor - timer_func_ms = tofunc; - duration_ms = duration_ms_; + timer_func = tofunc; + duration = duration_; } else { - timer_func_ms = tofunc; - duration_ms = duration_ms_; + timer_func = tofunc; + duration = duration_; timer_service.start(); } } diff --git a/test/test_latch01.cpp b/test/test_latch01.cpp index 63cf836..eb178db 100644 --- a/test/test_latch01.cpp +++ b/test/test_latch01.cpp @@ -37,6 +37,8 @@ #include <jau/latch.hpp> using namespace jau; +using namespace jau::fractions_i64_literals; +using namespace jau::int_literals; class TestLatch01 { private: @@ -83,7 +85,7 @@ class TestLatch01 { for(size_t i=0; i<count; i++) { tasks[i] = std::thread(&TestLatch01::something, this, std::ref(completion)); } - REQUIRE_MSG("complete", true == completion.arrive_and_wait_for(10000 /* timeout_ms */) ); + REQUIRE_MSG("complete", true == completion.arrive_and_wait_for(10_s) ); REQUIRE_MSG("zero", 0 == completion.value()); REQUIRE_MSG("8", count == my_counter); diff --git a/test/test_lfringbuffer11.cpp b/test/test_lfringbuffer11.cpp index 7a7a709..ab503c0 100644 --- a/test/test_lfringbuffer11.cpp +++ b/test/test_lfringbuffer11.cpp @@ -36,6 +36,7 @@ #include <jau/ringbuffer.hpp> using namespace jau; +using namespace jau::fractions_i64_literals; typedef uint8_t IntegralType; typedef uint8_t TrivialType; @@ -75,7 +76,7 @@ class TestRingbuffer11 { // INFO_STR(msg+": Created / " + rb->toString()); for(jau::nsize_t i=0; i<len; i++) { TrivialType svI = 0; - REQUIRE_MSG("not empty at read #"+std::to_string(i+1)+": "+rb->toString()+", elem "+std::to_string(svI), rb->getBlocking(svI)); + REQUIRE_MSG("not empty at read #"+std::to_string(i+1)+": "+rb->toString()+", elem "+std::to_string(svI), rb->getBlocking(svI, fractions_i64::zero)); // INFO_STR("Got "+std::to_string(svI->intValue())+" / " + rb->toString()); } // INFO_STR(msg+": Dies / " + rb->toString()); @@ -89,7 +90,7 @@ class TestRingbuffer11 { // INFO_STR, INFO: Not thread safe yet // INFO_STR(msg+": Created / " + rb->toString()); std::vector<TrivialType> array(len); - REQUIRE_MSG("get-range of "+std::to_string(array.size())+" elem in "+rb->toString(), len==rb->getBlocking( &(*array.begin()), len, len) ); + REQUIRE_MSG("get-range of "+std::to_string(array.size())+" elem in "+rb->toString(), len==rb->getBlocking( &(*array.begin()), len, len, 0_s) ); for(jau::nsize_t i=0; i<len; i++) { TrivialType svI = array[i]; @@ -108,7 +109,7 @@ class TestRingbuffer11 { for(jau::nsize_t i=0; i<len; i++) { IntegralType vI( ( startValue + (IntegralType)i ) % integral_modulus ); // INFO_STR("Putting "+std::to_string(vI)+" ... / " + rb->toString()); - rb->putBlocking( vI ); + rb->putBlocking( vI, 0_s ); } // INFO_STR(msg+": Dies / " + rb->toString()); (void)msg; diff --git a/test/test_lfringbuffer12.cpp b/test/test_lfringbuffer12.cpp index 6404fda..fc3eeb1 100644 --- a/test/test_lfringbuffer12.cpp +++ b/test/test_lfringbuffer12.cpp @@ -36,6 +36,7 @@ #include <jau/ringbuffer.hpp> using namespace jau; +using namespace jau::fractions_i64_literals; typedef jau::snsize_t IntegralType; @@ -94,7 +95,7 @@ class TestRingbuffer12 { // INFO_STR(msg+": Created / " + rb->toString()); for(jau::nsize_t i=0; i<len; i++) { TrivialType svI; - REQUIRE_MSG("not empty at read #"+std::to_string(i+1)+": "+rb->toString(), rb->getBlocking(svI)); + REQUIRE_MSG("not empty at read #"+std::to_string(i+1)+": "+rb->toString(), rb->getBlocking(svI, 0_s)); // INFO_STR("Got "+std::to_string(svI->intValue())+" / " + rb->toString()); } // INFO_STR(msg+": Dies / " + rb->toString()); @@ -108,7 +109,7 @@ class TestRingbuffer12 { // INFO_STR, INFO: Not thread safe yet // INFO_STR(msg+": Created / " + rb->toString()); std::vector<TrivialType> array(len); - REQUIRE_MSG("get-range of "+std::to_string(array.size())+" elem in "+rb->toString(), len==rb->getBlocking( &(*array.begin()), len, len) ); + REQUIRE_MSG("get-range of "+std::to_string(array.size())+" elem in "+rb->toString(), len==rb->getBlocking( &(*array.begin()), len, len, 0_s) ); for(jau::nsize_t i=0; i<len; i++) { TrivialType svI = array[i]; @@ -127,7 +128,7 @@ class TestRingbuffer12 { for(jau::nsize_t i=0; i<len; i++) { Integer vI(startValue+i); // INFO_STR("Putting "+std::to_string(vI->intValue())+" ... / " + rb->toString()); - rb->putBlocking( vI ); + rb->putBlocking( vI, 0_s ); } // INFO_STR(msg+": Dies / " + rb->toString()); (void)msg; diff --git a/test/test_lfringbuffer13.cpp b/test/test_lfringbuffer13.cpp index c64c276..039ca78 100644 --- a/test/test_lfringbuffer13.cpp +++ b/test/test_lfringbuffer13.cpp @@ -36,6 +36,7 @@ #include <jau/ringbuffer.hpp> using namespace jau; +using namespace jau::fractions_i64_literals; typedef jau::snsize_t IntegralType; @@ -88,7 +89,7 @@ class TestRingbuffer13 { // INFO_STR(msg+": Created / " + rb->toString()); for(jau::nsize_t i=0; i<len; i++) { SharedType svI; - REQUIRE_MSG("not empty at read #"+std::to_string(i+1)+": "+rb->toString(), rb->getBlocking(svI)); + REQUIRE_MSG("not empty at read #"+std::to_string(i+1)+": "+rb->toString(), rb->getBlocking(svI, 0_s)); // INFO_STR("Got "+std::to_string(svI->intValue())+" / " + rb->toString()); } // INFO_STR(msg+": Dies / " + rb->toString()); @@ -102,7 +103,7 @@ class TestRingbuffer13 { // INFO_STR, INFO: Not thread safe yet // INFO_STR(msg+": Created / " + rb->toString()); std::vector<SharedType> array(len); - REQUIRE_MSG("get-range of "+std::to_string(array.size())+" elem in "+rb->toString(), len==rb->getBlocking( &(*array.begin()), len, len) ); + REQUIRE_MSG("get-range of "+std::to_string(array.size())+" elem in "+rb->toString(), len==rb->getBlocking( &(*array.begin()), len, len, 0_s) ); for(jau::nsize_t i=0; i<len; i++) { SharedType svI = array[i]; @@ -121,7 +122,7 @@ class TestRingbuffer13 { for(jau::nsize_t i=0; i<len; i++) { Integer * vI = new Integer(startValue+i); // INFO_STR("Putting "+std::to_string(vI->intValue())+" ... / " + rb->toString()); - rb->putBlocking( SharedType( vI ) ); + rb->putBlocking( SharedType( vI ), 0_s ); } // INFO_STR(msg+": Dies / " + rb->toString()); (void)msg; diff --git a/test/test_lfringbuffer_a.hpp b/test/test_lfringbuffer_a.hpp index 4c88e8c..7e4a6c4 100644 --- a/test/test_lfringbuffer_a.hpp +++ b/test/test_lfringbuffer_a.hpp @@ -37,6 +37,7 @@ #include <jau/ringbuffer.hpp> using namespace jau; +using namespace jau::fractions_i64_literals; template<typename Value_type> Value_type getDefault(); @@ -107,7 +108,7 @@ class TestRingbuffer_A { for(jau::nsize_t i=0; i<dest_len; i++) { Value_type svI = getDefault<Value_type>(); - REQUIRE_MSG("not empty at read #"+std::to_string(i)+" / "+std::to_string(dest_len), rb.getBlocking(svI)); + REQUIRE_MSG("not empty at read #"+std::to_string(i)+" / "+std::to_string(dest_len), rb.getBlocking(svI, 0_s)); REQUIRE_MSG("value at read #"+std::to_string(i)+" / "+std::to_string(dest_len)+" @ "+std::to_string(startValue), startValue+(Integral_type)i == getValue<Integral_type, Value_type>(svI)); } REQUIRE_MSG("free slots after reading "+std::to_string(dest_len)+": "+rb.toString(), rb.freeSlots()>= dest_len); @@ -159,7 +160,7 @@ class TestRingbuffer_A { REQUIRE_MSG("capacity at read "+std::to_string(dest_len)+" elems: "+rb.toString(), capacity >= dest_len); std::vector<Value_type> array(dest_len); - const jau::nsize_t count = rb.getBlocking( &(*array.begin()), dest_len, min_count); + const jau::nsize_t count = rb.getBlocking( &(*array.begin()), dest_len, min_count, 0_s); REQUIRE_MSG("get-range >= min_count / "+std::to_string(array.size())+" of "+rb.toString(), min_count <= count); for(jau::nsize_t i=0; i<count; i++) { @@ -340,12 +341,12 @@ class TestRingbuffer_A { while(count1 < element_count || count2 < element_count) { Value_type svI = getDefault<Value_type>(); if( count1 < element_count ) { - REQUIRE_MSG("not empty at read.1 #"+std::to_string(count1)+" / "+std::to_string(element_count), rb1.getBlocking(svI)); + REQUIRE_MSG("not empty at read.1 #"+std::to_string(count1)+" / "+std::to_string(element_count), rb1.getBlocking(svI, 0_s)); REQUIRE_MSG("value at read.1 #"+std::to_string(count1)+" / "+std::to_string(element_count), (Integral_type)count1 == getValue<Integral_type, Value_type>(svI)); ++count1; } if( count2 < element_count ) { - REQUIRE_MSG("not empty at read.2 #"+std::to_string(count2)+" / "+std::to_string(element_count), rb2.getBlocking(svI)); + REQUIRE_MSG("not empty at read.2 #"+std::to_string(count2)+" / "+std::to_string(element_count), rb2.getBlocking(svI, 0_s)); REQUIRE_MSG("value at read.2 #"+std::to_string(count2)+" / "+std::to_string(element_count), (Integral_type)count2 == getValue<Integral_type, Value_type>(svI)); ++count2; } |