From 1a7985e17d9ba9a9627ee66d4aca660636d3f8e6 Mon Sep 17 00:00:00 2001 From: Sven Gothel Date: Mon, 2 Jan 2023 07:46:23 +0100 Subject: jau::ringbuffer: Add 'End of Stream' (EOS) concept and utilize it for jau::io::ByteInStream_[URL|Feed]'s available() and read() operation. jau::ringbuffer: Add 'End of Stream' (EOS) concept to unblock all read-operations from this point onwards and unblocking a potentially currently blocked reader thread. The EOS concept is required to allow blocking operations from a consumer (reader) w/o knowledge of the content-size, having the producer (writer) signaling EOS at the end and unblocking all read-operations. Fuerther, after unblocking a potentially blocker reader-thread, the failure criteria is now being checked as for the `timeout` case. This allows a read-thread being unblocked (interrupted) after writing the last chunk and hence picking up this last written data. +++ Utilize ringbuffer EOS concept for jau::io::ByteInStream_[URL|Feed]'s available() and read() operation, i.e. both methods utilize blocking operations. After the producer (URL: Our Curl consume callback; Feed: Manual set_eof()) marks EOS on the ringbuffer, all blocking read-operations are permanently unblocked and a current read-thread also interrupted. +++ Passed test_bytestream01 and test_iostream01 while causing full CPU load via `stress --cpu $(getconf _NPROCESSORS_ONLN)`. --- include/jau/byte_stream.hpp | 2 +- include/jau/io_util.hpp | 2 +- include/jau/ringbuffer.hpp | 88 ++++++++++++++++++++++++++++++++------------- 3 files changed, 66 insertions(+), 26 deletions(-) (limited to 'include/jau') diff --git a/include/jau/byte_stream.hpp b/include/jau/byte_stream.hpp index 4790a9a..d65e5f7 100644 --- a/include/jau/byte_stream.hpp +++ b/include/jau/byte_stream.hpp @@ -722,7 +722,7 @@ namespace jau::io { return m_parent.available(n); } - void clear(iostate state = iostate::goodbit) noexcept override { m_parent.clear( state ); } + void clear(const iostate state = iostate::goodbit) noexcept override { m_parent.clear( state ); } iostate rdstate() const noexcept override { return m_parent.rdstate(); } std::string id() const noexcept override { return m_parent.id(); } diff --git a/include/jau/io_util.hpp b/include/jau/io_util.hpp index 74ae97e..bf63ee1 100644 --- a/include/jau/io_util.hpp +++ b/include/jau/io_util.hpp @@ -1,6 +1,6 @@ /* * Author: Sven Gothel - * Copyright (c) 2021 Gothel Software e.K. + * Copyright (c) 2021-2023 Gothel Software e.K. * Copyright (c) 2021 ZAFENA AB * * Permission is hereby granted, free of charge, to any person obtaining diff --git a/include/jau/ringbuffer.hpp b/include/jau/ringbuffer.hpp index 80f8f6e..2782747 100644 --- a/include/jau/ringbuffer.hpp +++ b/include/jau/ringbuffer.hpp @@ -1,6 +1,6 @@ /* * Author: Sven Gothel - * Copyright (c) 2020 Gothel Software e.K. + * Copyright (c) 2020-2023 Gothel Software e.K. * Copyright (c) 2020 ZAFENA AB * * Permission is hereby granted, free of charge, to any person obtaining @@ -127,11 +127,27 @@ namespace jau { * - {@link #get() get*(..)} operations concurrently from multiple threads. * - {@link #put() put*(..)} producer and {@link #get() get*(..)} consumer threads can be different or the same. * + * #### Interruption of Consumer and Producer + * To allow an application to unblock a potentially blocked producer (writer) + * or consumer (reader) thread once, + * one can call interruptWriter() or interruptReader() respectively. + * + * #### Marking End of Input Stream (EOS) + * To allow an application to mark the end of input stream, + * i.e. the producer (write) has completed filling the ringbuffer, + * one can call set_end_of_input(). + * + * Calling set_end_of_input(true) will unblock all read-operations from this point onwards. + * A potentially currently blocked reader thread is also interrupted and hence unblocked. + * * #### See also * - Sequentially Consistent (SC) ordering or SC-DRF (data race free) * - std::memory_order * - jau::sc_atomic_critical * - setMultiPCEnabled() + * - interruptReader() + * - interruptWriter() + * - set_end_of_input() * * @anchor ringbuffer_ntt_params * ### Non-Type Template Parameter (NTTP) controlling Value_type memory @@ -214,6 +230,7 @@ class ringbuffer { jau::relaxed_atomic_bool interrupted_read = false; jau::relaxed_atomic_bool interrupted_write = false; + jau::relaxed_atomic_bool end_of_input = false; allocator_type alloc_inst; @@ -267,12 +284,12 @@ class ringbuffer { Size_type waitForElementsImpl(const Size_type min_count, const fraction_i64& timeout, bool& timeout_occurred) noexcept { timeout_occurred = false; Size_type available = size(); - if( min_count > available && min_count < capacityPlusOne ) { + if( available < min_count && min_count < capacityPlusOne && !end_of_input ) { interrupted_read = false; std::unique_lock lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock available = size(); const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout); - while( !interrupted_read && min_count > available ) { + while( !interrupted_read && !end_of_input && min_count > available ) { if( fractions_i64::zero == timeout ) { cvWrite.wait(lockWrite); available = size(); @@ -285,7 +302,7 @@ class ringbuffer { } } } - if( interrupted_read ) { + if( interrupted_read ) { // interruption or end_of_input may happen after delivering last data chunk interrupted_read = false; } } @@ -467,11 +484,11 @@ class ringbuffer { const Size_type oldReadPos = readPos; // SC-DRF acquire atomic readPos, sync'ing with putImpl Size_type localReadPos = oldReadPos; if( localReadPos == writePos ) { - if( blocking ) { + if( blocking && !end_of_input ) { interrupted_read = false; std::unique_lock lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout); - while( !interrupted_read && localReadPos == writePos ) { + while( !interrupted_read && !end_of_input && localReadPos == writePos ) { if( fractions_i64::zero == timeout ) { cvWrite.wait(lockWrite); } else { @@ -482,9 +499,11 @@ class ringbuffer { } } } - if( interrupted_read ) { + if( interrupted_read || end_of_input ) { interrupted_read = false; - return false; + if( localReadPos == writePos ) { // interruption or end_of_input may happen after delivering last data chunk + return false; + } } } else { return false; @@ -511,11 +530,11 @@ class ringbuffer { const Size_type oldReadPos = readPos; // SC-DRF acquire atomic readPos, sync'ing with putImpl Size_type localReadPos = oldReadPos; if( localReadPos == writePos ) { - if( blocking ) { + if( blocking && !end_of_input ) { interrupted_read = false; std::unique_lock lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout); - while( !interrupted_read && localReadPos == writePos ) { + while( !interrupted_read && !end_of_input && localReadPos == writePos ) { if( fractions_i64::zero == timeout ) { cvWrite.wait(lockWrite); } else { @@ -526,9 +545,11 @@ class ringbuffer { } } } - if( interrupted_read ) { + if( interrupted_read || end_of_input ) { interrupted_read = false; - return false; + if( localReadPos == writePos ) { // interruption or end_of_input may happen after delivering last data chunk + return false; + } } } else { return false; @@ -576,12 +597,12 @@ class ringbuffer { Size_type localReadPos = oldReadPos; Size_type available = size(); if( min_count > available ) { - if( blocking ) { + if( blocking && !end_of_input ) { interrupted_read = false; std::unique_lock lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock available = size(); const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout); - while( !interrupted_read && min_count > available ) { + while( !interrupted_read && !end_of_input && min_count > available ) { if( fractions_i64::zero == timeout ) { cvWrite.wait(lockWrite); available = size(); @@ -594,9 +615,11 @@ class ringbuffer { } } } - if( interrupted_read ) { + if( interrupted_read || end_of_input ) { interrupted_read = false; - return 0; + if( min_count > available ) { // interruption or end_of_input may happen after delivering last data chunk + return 0; + } } } else { return 0; @@ -678,12 +701,12 @@ class ringbuffer { Size_type localReadPos = oldReadPos; Size_type available = size(); if( count > available ) { - if( blocking ) { + if( blocking && !end_of_input ) { interrupted_read = false; std::unique_lock lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock available = size(); const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout); - while( !interrupted_read && count > available ) { + while( !interrupted_read && !end_of_input && count > available ) { if( fractions_i64::zero == timeout ) { cvWrite.wait(lockWrite); available = size(); @@ -696,9 +719,11 @@ class ringbuffer { } } } - if( interrupted_read ) { + if( interrupted_read || end_of_input ) { interrupted_read = false; - return 0; + if( count > available ) { // interruption or end_of_input may happen after delivering last data chunk + return 0; + } } } else { count = available; // drop all available for non-blocking @@ -1303,6 +1328,7 @@ class ringbuffer { closeImpl(zeromem); interrupted_read = true; interrupted_write = true; + end_of_input = true; } else { std::unique_lock lockRead(syncRead, std::defer_lock); // same for *this instance! std::unique_lock lockWrite(syncWrite, std::defer_lock); @@ -1311,6 +1337,7 @@ class ringbuffer { closeImpl(zeromem); interrupted_write = true; interrupted_read = true; + end_of_input = true; } // have mutex unlocked before notify_all to avoid pessimistic re-block of notified wait() thread. cvRead.notify_all(); // notify waiting writer @@ -1412,16 +1439,29 @@ class ringbuffer { } /** - * Interrupt a potentially blocked reader. + * Interrupt a potentially blocked reader once. * - * Call this method if intended to abort reading and to interrupt the reader thread's potentially blocked read-access call. + * Call this method to unblock a potentially blocked reader thread once. */ void interruptReader() noexcept { interrupted_read = true; cvWrite.notify_all(); } /** - * Interrupt a potentially blocked writer. + * Set `End of Input` from writer thread, unblocking all read-operations and a potentially currently blocked reader thread. + * + * Call this method with `true` after concluding writing input data will unblock all read-operations from this point onwards. + * A potentially currently blocked reader thread is also interrupted and hence unblocked. + */ + void set_end_of_input(const bool v=true) noexcept { + end_of_input = v; + if( v ) { + cvWrite.notify_all(); + } + } + + /** + * Interrupt a potentially blocked writer once. * - * Call this method if intended to abort writing and to interrupt the writing thread's potentially blocked write-access call. + * Call this method to unblock a potentially blocked writer thread once. */ void interruptWriter() noexcept { interrupted_write = true; cvRead.notify_all(); } -- cgit v1.2.3