diff options
author | Sven Gothel <[email protected]> | 2023-01-02 07:46:23 +0100 |
---|---|---|
committer | Sven Gothel <[email protected]> | 2023-01-02 07:46:23 +0100 |
commit | 1a7985e17d9ba9a9627ee66d4aca660636d3f8e6 (patch) | |
tree | 3f4c8d1b4fb7801ed90048420cab34b8d219ae99 | |
parent | dc0a26783a2c2e0a5912302d00216f99b009d3b5 (diff) |
jau::ringbuffer: Add 'End of Stream' (EOS) concept and utilize it for jau::io::ByteInStream_[URL|Feed]'s available() and read() operation.v1.1.1
jau::ringbuffer: Add 'End of Stream' (EOS) concept to unblock all read-operations from this point onwards
and unblocking a potentially currently blocked reader thread.
The EOS concept is required to allow blocking operations from a consumer (reader) w/o knowledge of the content-size,
having the producer (writer) signaling EOS at the end and unblocking all read-operations.
Fuerther, after unblocking a potentially blocker reader-thread,
the failure criteria is now being checked as for the `timeout` case.
This allows a read-thread being unblocked (interrupted) after writing the last chunk
and hence picking up this last written data.
+++
Utilize ringbuffer EOS concept for jau::io::ByteInStream_[URL|Feed]'s available() and read() operation,
i.e. both methods utilize blocking operations.
After the producer (URL: Our Curl consume callback; Feed: Manual set_eof()) marks EOS on the ringbuffer,
all blocking read-operations are permanently unblocked and a current read-thread also interrupted.
+++
Passed test_bytestream01 and test_iostream01
while causing full CPU load via `stress --cpu $(getconf _NPROCESSORS_ONLN)`.
-rw-r--r-- | CHANGES.md | 6 | ||||
-rw-r--r-- | include/jau/byte_stream.hpp | 2 | ||||
-rw-r--r-- | include/jau/io_util.hpp | 2 | ||||
-rw-r--r-- | include/jau/ringbuffer.hpp | 88 | ||||
-rw-r--r-- | src/byte_stream.cpp | 18 | ||||
-rw-r--r-- | src/io_util.cpp | 29 |
6 files changed, 98 insertions, 47 deletions
@@ -4,6 +4,12 @@ ## Changes +**1.1.1** +* Fixes + - `jau::io::ByteInStream_[URL|Feed]` utilize blocking read-operations w/o knowledge of content-size until producer (curl consumer, manual feeder) signals EOS, see ringbuffer changes. +* Features + - `jau::ringbuffer`: Add 'End of Stream' (EOS) concept to unblock all read-operations from this point onwards and unblocking a potentially currently blocked reader thread. + **1.1.0** * Fixes - JNI: `Java_org_jau_sys_Clock_get[Monotonic|WallClock]TimeImpl()`: Avoid GetPrimitiveArrayCritical(), which occasionally hangs on system call `::clock_gettime()` diff --git a/include/jau/byte_stream.hpp b/include/jau/byte_stream.hpp index 4790a9a..d65e5f7 100644 --- a/include/jau/byte_stream.hpp +++ b/include/jau/byte_stream.hpp @@ -722,7 +722,7 @@ namespace jau::io { return m_parent.available(n); } - void clear(iostate state = iostate::goodbit) noexcept override { m_parent.clear( state ); } + void clear(const iostate state = iostate::goodbit) noexcept override { m_parent.clear( state ); } iostate rdstate() const noexcept override { return m_parent.rdstate(); } std::string id() const noexcept override { return m_parent.id(); } diff --git a/include/jau/io_util.hpp b/include/jau/io_util.hpp index 74ae97e..bf63ee1 100644 --- a/include/jau/io_util.hpp +++ b/include/jau/io_util.hpp @@ -1,6 +1,6 @@ /* * Author: Sven Gothel <[email protected]> - * Copyright (c) 2021 Gothel Software e.K. + * Copyright (c) 2021-2023 Gothel Software e.K. * Copyright (c) 2021 ZAFENA AB * * Permission is hereby granted, free of charge, to any person obtaining diff --git a/include/jau/ringbuffer.hpp b/include/jau/ringbuffer.hpp index 80f8f6e..2782747 100644 --- a/include/jau/ringbuffer.hpp +++ b/include/jau/ringbuffer.hpp @@ -1,6 +1,6 @@ /* * Author: Sven Gothel <[email protected]> - * Copyright (c) 2020 Gothel Software e.K. + * Copyright (c) 2020-2023 Gothel Software e.K. * Copyright (c) 2020 ZAFENA AB * * Permission is hereby granted, free of charge, to any person obtaining @@ -127,11 +127,27 @@ namespace jau { * - {@link #get() get*(..)} operations concurrently from multiple threads. * - {@link #put() put*(..)} producer and {@link #get() get*(..)} consumer threads can be different or the same. * + * #### Interruption of Consumer and Producer + * To allow an application to unblock a potentially blocked producer (writer) + * or consumer (reader) thread once, + * one can call interruptWriter() or interruptReader() respectively. + * + * #### Marking End of Input Stream (EOS) + * To allow an application to mark the end of input stream, + * i.e. the producer (write) has completed filling the ringbuffer, + * one can call set_end_of_input(). + * + * Calling set_end_of_input(true) will unblock all read-operations from this point onwards. + * A potentially currently blocked reader thread is also interrupted and hence unblocked. + * * #### See also * - Sequentially Consistent (SC) ordering or SC-DRF (data race free) <https://en.cppreference.com/w/cpp/atomic/memory_order#Sequentially-consistent_ordering> * - std::memory_order <https://en.cppreference.com/w/cpp/atomic/memory_order> * - jau::sc_atomic_critical * - setMultiPCEnabled() + * - interruptReader() + * - interruptWriter() + * - set_end_of_input() * * @anchor ringbuffer_ntt_params * ### Non-Type Template Parameter (NTTP) controlling Value_type memory @@ -214,6 +230,7 @@ class ringbuffer { jau::relaxed_atomic_bool interrupted_read = false; jau::relaxed_atomic_bool interrupted_write = false; + jau::relaxed_atomic_bool end_of_input = false; allocator_type alloc_inst; @@ -267,12 +284,12 @@ class ringbuffer { Size_type waitForElementsImpl(const Size_type min_count, const fraction_i64& timeout, bool& timeout_occurred) noexcept { timeout_occurred = false; Size_type available = size(); - if( min_count > available && min_count < capacityPlusOne ) { + if( available < min_count && min_count < capacityPlusOne && !end_of_input ) { interrupted_read = false; std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock available = size(); const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout); - while( !interrupted_read && min_count > available ) { + while( !interrupted_read && !end_of_input && min_count > available ) { if( fractions_i64::zero == timeout ) { cvWrite.wait(lockWrite); available = size(); @@ -285,7 +302,7 @@ class ringbuffer { } } } - if( interrupted_read ) { + if( interrupted_read ) { // interruption or end_of_input may happen after delivering last data chunk interrupted_read = false; } } @@ -467,11 +484,11 @@ class ringbuffer { const Size_type oldReadPos = readPos; // SC-DRF acquire atomic readPos, sync'ing with putImpl Size_type localReadPos = oldReadPos; if( localReadPos == writePos ) { - if( blocking ) { + if( blocking && !end_of_input ) { interrupted_read = false; std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout); - while( !interrupted_read && localReadPos == writePos ) { + while( !interrupted_read && !end_of_input && localReadPos == writePos ) { if( fractions_i64::zero == timeout ) { cvWrite.wait(lockWrite); } else { @@ -482,9 +499,11 @@ class ringbuffer { } } } - if( interrupted_read ) { + if( interrupted_read || end_of_input ) { interrupted_read = false; - return false; + if( localReadPos == writePos ) { // interruption or end_of_input may happen after delivering last data chunk + return false; + } } } else { return false; @@ -511,11 +530,11 @@ class ringbuffer { const Size_type oldReadPos = readPos; // SC-DRF acquire atomic readPos, sync'ing with putImpl Size_type localReadPos = oldReadPos; if( localReadPos == writePos ) { - if( blocking ) { + if( blocking && !end_of_input ) { interrupted_read = false; std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout); - while( !interrupted_read && localReadPos == writePos ) { + while( !interrupted_read && !end_of_input && localReadPos == writePos ) { if( fractions_i64::zero == timeout ) { cvWrite.wait(lockWrite); } else { @@ -526,9 +545,11 @@ class ringbuffer { } } } - if( interrupted_read ) { + if( interrupted_read || end_of_input ) { interrupted_read = false; - return false; + if( localReadPos == writePos ) { // interruption or end_of_input may happen after delivering last data chunk + return false; + } } } else { return false; @@ -576,12 +597,12 @@ class ringbuffer { Size_type localReadPos = oldReadPos; Size_type available = size(); if( min_count > available ) { - if( blocking ) { + if( blocking && !end_of_input ) { interrupted_read = false; std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock available = size(); const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout); - while( !interrupted_read && min_count > available ) { + while( !interrupted_read && !end_of_input && min_count > available ) { if( fractions_i64::zero == timeout ) { cvWrite.wait(lockWrite); available = size(); @@ -594,9 +615,11 @@ class ringbuffer { } } } - if( interrupted_read ) { + if( interrupted_read || end_of_input ) { interrupted_read = false; - return 0; + if( min_count > available ) { // interruption or end_of_input may happen after delivering last data chunk + return 0; + } } } else { return 0; @@ -678,12 +701,12 @@ class ringbuffer { Size_type localReadPos = oldReadPos; Size_type available = size(); if( count > available ) { - if( blocking ) { + if( blocking && !end_of_input ) { interrupted_read = false; std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock available = size(); const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout); - while( !interrupted_read && count > available ) { + while( !interrupted_read && !end_of_input && count > available ) { if( fractions_i64::zero == timeout ) { cvWrite.wait(lockWrite); available = size(); @@ -696,9 +719,11 @@ class ringbuffer { } } } - if( interrupted_read ) { + if( interrupted_read || end_of_input ) { interrupted_read = false; - return 0; + if( count > available ) { // interruption or end_of_input may happen after delivering last data chunk + return 0; + } } } else { count = available; // drop all available for non-blocking @@ -1303,6 +1328,7 @@ class ringbuffer { closeImpl(zeromem); interrupted_read = true; interrupted_write = true; + end_of_input = true; } else { std::unique_lock<std::mutex> lockRead(syncRead, std::defer_lock); // same for *this instance! std::unique_lock<std::mutex> lockWrite(syncWrite, std::defer_lock); @@ -1311,6 +1337,7 @@ class ringbuffer { closeImpl(zeromem); interrupted_write = true; interrupted_read = true; + end_of_input = true; } // have mutex unlocked before notify_all to avoid pessimistic re-block of notified wait() thread. cvRead.notify_all(); // notify waiting writer @@ -1412,16 +1439,29 @@ class ringbuffer { } /** - * Interrupt a potentially blocked reader. + * Interrupt a potentially blocked reader once. * - * Call this method if intended to abort reading and to interrupt the reader thread's potentially blocked read-access call. + * Call this method to unblock a potentially blocked reader thread once. */ void interruptReader() noexcept { interrupted_read = true; cvWrite.notify_all(); } /** - * Interrupt a potentially blocked writer. + * Set `End of Input` from writer thread, unblocking all read-operations and a potentially currently blocked reader thread. + * + * Call this method with `true` after concluding writing input data will unblock all read-operations from this point onwards. + * A potentially currently blocked reader thread is also interrupted and hence unblocked. + */ + void set_end_of_input(const bool v=true) noexcept { + end_of_input = v; + if( v ) { + cvWrite.notify_all(); + } + } + + /** + * Interrupt a potentially blocked writer once. * - * Call this method if intended to abort writing and to interrupt the writing thread's potentially blocked write-access call. + * Call this method to unblock a potentially blocked writer thread once. */ void interruptWriter() noexcept { interrupted_write = true; cvRead.notify_all(); } diff --git a/src/byte_stream.cpp b/src/byte_stream.cpp index eb0d951..bb94a57 100644 --- a/src/byte_stream.cpp +++ b/src/byte_stream.cpp @@ -1,6 +1,6 @@ /* * Author: Sven Gothel <[email protected]> - * Copyright (c) 2021 Gothel Software e.K. + * Copyright (c) 2021-2023 Gothel Software e.K. * * ByteInStream, ByteInStream_SecMemory and ByteInStream_istream are derived from Botan under same license: * - Copyright (c) 1999-2007 Jack Lloyd @@ -377,6 +377,7 @@ bool ByteInStream_URL::available(size_t n) noexcept { return false; } // I/O still in progress, we have to poll until data is available or timeout + // set_eof() unblocks ringbuffer via set_end_of_input(true) permanently, hence blocking call on !m_has_content_length is OK. bool timeout_occured; const size_t avail = m_buffer.waitForElements(n, m_timeout, timeout_occured); if( avail < n ) { @@ -404,9 +405,8 @@ size_t ByteInStream_URL::read(void* out, size_t length) noexcept { return 0; } bool timeout_occured = false; - const size_t got = m_has_content_length ? - m_buffer.getBlocking(static_cast<uint8_t*>(out), length, 1, m_timeout, timeout_occured) : - m_buffer.get(static_cast<uint8_t*>(out), length, 1); + // set_eof() unblocks ringbuffer via set_end_of_input(true) permanently, hence blocking call on !m_has_content_length is OK. + const size_t got = m_buffer.getBlocking(static_cast<uint8_t*>(out), length, 1, m_timeout, timeout_occured); m_bytes_consumed += got; if( timeout_occured ) { setstate_impl( iostate::timeout ); @@ -493,6 +493,7 @@ bool ByteInStream_Feed::available(size_t n) noexcept { return false; } // I/O still in progress, we have to poll until data is available or timeout + // set_eof() unblocks ringbuffer via set_end_of_input(true) permanently, hence blocking call on !m_has_content_length is OK. bool timeout_occured; const size_t avail = m_buffer.waitForElements(n, m_timeout, timeout_occured); if( avail < n ) { @@ -519,9 +520,8 @@ size_t ByteInStream_Feed::read(void* out, size_t length) noexcept { return 0; } bool timeout_occured = false; - const size_t got = m_has_content_length ? - m_buffer.getBlocking(static_cast<uint8_t*>(out), length, 1, m_timeout, timeout_occured) : - m_buffer.get(static_cast<uint8_t*>(out), length, 1); + // set_eof() unblocks ringbuffer via set_end_of_input(true) permanently, hence blocking call on !m_has_content_length is OK. + const size_t got = m_buffer.getBlocking(static_cast<uint8_t*>(out), length, 1, m_timeout, timeout_occured); m_bytes_consumed += got; if( timeout_occured ) { setstate_impl( iostate::timeout ); @@ -530,7 +530,7 @@ size_t ByteInStream_Feed::read(void* out, size_t length) noexcept { } m_buffer.interruptWriter(); } - // DBG_PRINT("ByteInStream_Feed::read: size %zu/%zu bytes, %s", got, length, to_string_int().c_str() ); + // DBG_PRINT("ByteInStream_Feed::read: size %zu/%zu bytes, timeout_occured %d, %s", got, length, timeout_occured, to_string_int().c_str() ); return got; } @@ -579,7 +579,7 @@ bool ByteInStream_Feed::write(uint8_t in[], size_t length, const jau::fraction_i void ByteInStream_Feed::set_eof(const async_io_result_t result) noexcept { m_result = result; - interruptReader(); // FIXME: ??? + m_buffer.set_end_of_input(true); // still considering last data, also irqs blocking ringbuffer reader } std::string ByteInStream_Feed::to_string_int() const noexcept { diff --git a/src/io_util.cpp b/src/io_util.cpp index a9754dd..320508b 100644 --- a/src/io_util.cpp +++ b/src/io_util.cpp @@ -1,6 +1,6 @@ /* * Author: Sven Gothel <[email protected]> - * Copyright (c) 2021 Gothel Software e.K. + * Copyright (c) 2021-2023 Gothel Software e.K. * Copyright (c) 2021 ZAFENA AB * * Permission is hereby granted, free of charge, to any person obtaining @@ -62,13 +62,13 @@ uint64_t jau::io::read_stream(ByteInStream& in, uint64_t total = 0; bool has_more; do { - if( in.available(1) ) { // at least one byte to stream .. + if( in.available(1) ) { // at least one byte to stream, also considers eof buffer.resize(buffer.capacity()); const uint64_t got = in.read(buffer.data(), buffer.capacity()); buffer.resize(got); total += got; - has_more = 1 <= got && in.good() && ( !in.has_content_size() || total < in.content_size() ); + has_more = 1 <= got && !in.fail() && ( !in.has_content_size() || total < in.content_size() ); try { if( !consumer_fn(buffer, !has_more) ) { break; // end streaming @@ -88,7 +88,7 @@ uint64_t jau::io::read_stream(ByteInStream& in, static uint64_t _read_buffer(ByteInStream& in, secure_vector<uint8_t>& buffer) noexcept { - if( in.available(1) ) { // at least one byte to stream .. + if( in.available(1) ) { // at least one byte to stream, also considers eof buffer.resize(buffer.capacity()); const uint64_t got = in.read(buffer.data(), buffer.capacity()); buffer.resize(got); @@ -111,7 +111,7 @@ uint64_t jau::io::read_stream(ByteInStream& in, { uint64_t got = _read_buffer(in, *buffers[idx]); total_read += got; - eof_read = 0 == got || !in.good() || ( in.has_content_size() && total_read >= in.content_size() ); + eof_read = 0 == got || in.fail() || ( in.has_content_size() && total_read >= in.content_size() ); eof[idx] = eof_read; ++idx; } @@ -131,7 +131,7 @@ uint64_t jau::io::read_stream(ByteInStream& in, if( !eof_read ) { uint64_t got = _read_buffer(in, *buffers[idx]); total_read += got; - eof_read = 0 == got || !in.good() || ( in.has_content_size() && total_read >= in.content_size() ); + eof_read = 0 == got || in.fail() || ( in.has_content_size() && total_read >= in.content_size() ); eof[idx] = eof_read; if( 0 == got ) { // read-ahead eof propagation if read zero bytes, @@ -486,6 +486,10 @@ struct curl_glue2_t { buffer.interruptReader(); header_sync.notify_complete(); } + void set_end_of_input() noexcept { + buffer.set_end_of_input(true); + header_sync.notify_complete(); + } }; static size_t consume_header_curl2(char *buffer, size_t size, size_t nmemb, void *userdata) noexcept { @@ -495,7 +499,7 @@ static size_t consume_header_curl2(char *buffer, size_t size, size_t nmemb, void // user abort! DBG_PRINT("consume_header_curl2 ABORT by User: total %" PRIi64 ", result %d, rb %s", cg->total_read.load(), cg->result.load(), cg->buffer.toString().c_str() ); - cg->interrupt_all(); + cg->set_end_of_input(); return 0; } @@ -506,7 +510,7 @@ static size_t consume_header_curl2(char *buffer, size_t size, size_t nmemb, void if( 400 <= v ) { IRQ_PRINT("response_code: %ld", v); cg->result = async_io_result_t::FAILED; - cg->interrupt_all(); + cg->set_end_of_input(); return 0; } else { DBG_PRINT("consume_header_curl2.0 response_code: %ld", v); @@ -548,7 +552,7 @@ static size_t consume_data_curl2(char *ptr, size_t size, size_t nmemb, void *use // user abort! DBG_PRINT("consume_data_curl2 ABORT by User: total %" PRIi64 ", result %d, rb %s", cg->total_read.load(), cg->result.load(), cg->buffer.toString().c_str() ); - cg->interrupt_all(); + cg->set_end_of_input(); return 0; } @@ -576,7 +580,7 @@ static size_t consume_data_curl2(char *ptr, size_t size, size_t nmemb, void *use DBG_PRINT("consume_data_curl2 Failed put: total %" PRIi64 ", result %d, timeout %d, rb %s", cg->total_read.load(), cg->result.load(), timeout_occured, cg->buffer.toString().c_str() ); if( timeout_occured ) { - cg->interrupt_all(); + cg->set_end_of_input(); } return 0; } @@ -586,6 +590,7 @@ static size_t consume_data_curl2(char *ptr, size_t size, size_t nmemb, void *use cg->has_content_length ? cg->total_read >= cg->content_length : false; if( is_final ) { cg->result = async_io_result_t::SUCCESS; + cg->set_end_of_input(); } if( false ) { @@ -710,7 +715,7 @@ static void read_url_stream_thread(const char *url, std::unique_ptr<curl_glue2_t errout: cg->result = async_io_result_t::FAILED; - cg->interrupt_all(); + cg->set_end_of_input(); cleanup: if( nullptr != curl_handle ) { @@ -738,7 +743,7 @@ std::unique_ptr<std::thread> jau::io::read_url_stream(const std::string& url, #endif // USE_LIBCURL result = io::async_io_result_t::FAILED; header_sync.notify_complete(); - buffer.interruptReader(); + buffer.set_end_of_input(true); const std::string_view scheme = uri_tk::get_scheme(url); DBG_PRINT("Protocol of given uri-scheme '%s' not supported. Supported protocols [%s].", std::string(scheme).c_str(), jau::to_string(uri_tk::supported_protocols(), ",").c_str()); |