aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSven Gothel <[email protected]>2023-01-02 07:46:23 +0100
committerSven Gothel <[email protected]>2023-01-02 07:46:23 +0100
commit1a7985e17d9ba9a9627ee66d4aca660636d3f8e6 (patch)
tree3f4c8d1b4fb7801ed90048420cab34b8d219ae99
parentdc0a26783a2c2e0a5912302d00216f99b009d3b5 (diff)
jau::ringbuffer: Add 'End of Stream' (EOS) concept and utilize it for jau::io::ByteInStream_[URL|Feed]'s available() and read() operation.v1.1.1
jau::ringbuffer: Add 'End of Stream' (EOS) concept to unblock all read-operations from this point onwards and unblocking a potentially currently blocked reader thread. The EOS concept is required to allow blocking operations from a consumer (reader) w/o knowledge of the content-size, having the producer (writer) signaling EOS at the end and unblocking all read-operations. Fuerther, after unblocking a potentially blocker reader-thread, the failure criteria is now being checked as for the `timeout` case. This allows a read-thread being unblocked (interrupted) after writing the last chunk and hence picking up this last written data. +++ Utilize ringbuffer EOS concept for jau::io::ByteInStream_[URL|Feed]'s available() and read() operation, i.e. both methods utilize blocking operations. After the producer (URL: Our Curl consume callback; Feed: Manual set_eof()) marks EOS on the ringbuffer, all blocking read-operations are permanently unblocked and a current read-thread also interrupted. +++ Passed test_bytestream01 and test_iostream01 while causing full CPU load via `stress --cpu $(getconf _NPROCESSORS_ONLN)`.
-rw-r--r--CHANGES.md6
-rw-r--r--include/jau/byte_stream.hpp2
-rw-r--r--include/jau/io_util.hpp2
-rw-r--r--include/jau/ringbuffer.hpp88
-rw-r--r--src/byte_stream.cpp18
-rw-r--r--src/io_util.cpp29
6 files changed, 98 insertions, 47 deletions
diff --git a/CHANGES.md b/CHANGES.md
index a3608ff..fbc5149 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -4,6 +4,12 @@
## Changes
+**1.1.1**
+* Fixes
+ - `jau::io::ByteInStream_[URL|Feed]` utilize blocking read-operations w/o knowledge of content-size until producer (curl consumer, manual feeder) signals EOS, see ringbuffer changes.
+* Features
+ - `jau::ringbuffer`: Add 'End of Stream' (EOS) concept to unblock all read-operations from this point onwards and unblocking a potentially currently blocked reader thread.
+
**1.1.0**
* Fixes
- JNI: `Java_org_jau_sys_Clock_get[Monotonic|WallClock]TimeImpl()`: Avoid GetPrimitiveArrayCritical(), which occasionally hangs on system call `::clock_gettime()`
diff --git a/include/jau/byte_stream.hpp b/include/jau/byte_stream.hpp
index 4790a9a..d65e5f7 100644
--- a/include/jau/byte_stream.hpp
+++ b/include/jau/byte_stream.hpp
@@ -722,7 +722,7 @@ namespace jau::io {
return m_parent.available(n);
}
- void clear(iostate state = iostate::goodbit) noexcept override { m_parent.clear( state ); }
+ void clear(const iostate state = iostate::goodbit) noexcept override { m_parent.clear( state ); }
iostate rdstate() const noexcept override { return m_parent.rdstate(); }
std::string id() const noexcept override { return m_parent.id(); }
diff --git a/include/jau/io_util.hpp b/include/jau/io_util.hpp
index 74ae97e..bf63ee1 100644
--- a/include/jau/io_util.hpp
+++ b/include/jau/io_util.hpp
@@ -1,6 +1,6 @@
/*
* Author: Sven Gothel <[email protected]>
- * Copyright (c) 2021 Gothel Software e.K.
+ * Copyright (c) 2021-2023 Gothel Software e.K.
* Copyright (c) 2021 ZAFENA AB
*
* Permission is hereby granted, free of charge, to any person obtaining
diff --git a/include/jau/ringbuffer.hpp b/include/jau/ringbuffer.hpp
index 80f8f6e..2782747 100644
--- a/include/jau/ringbuffer.hpp
+++ b/include/jau/ringbuffer.hpp
@@ -1,6 +1,6 @@
/*
* Author: Sven Gothel <[email protected]>
- * Copyright (c) 2020 Gothel Software e.K.
+ * Copyright (c) 2020-2023 Gothel Software e.K.
* Copyright (c) 2020 ZAFENA AB
*
* Permission is hereby granted, free of charge, to any person obtaining
@@ -127,11 +127,27 @@ namespace jau {
* - {@link #get() get*(..)} operations concurrently from multiple threads.
* - {@link #put() put*(..)} producer and {@link #get() get*(..)} consumer threads can be different or the same.
*
+ * #### Interruption of Consumer and Producer
+ * To allow an application to unblock a potentially blocked producer (writer)
+ * or consumer (reader) thread once,
+ * one can call interruptWriter() or interruptReader() respectively.
+ *
+ * #### Marking End of Input Stream (EOS)
+ * To allow an application to mark the end of input stream,
+ * i.e. the producer (write) has completed filling the ringbuffer,
+ * one can call set_end_of_input().
+ *
+ * Calling set_end_of_input(true) will unblock all read-operations from this point onwards.
+ * A potentially currently blocked reader thread is also interrupted and hence unblocked.
+ *
* #### See also
* - Sequentially Consistent (SC) ordering or SC-DRF (data race free) <https://en.cppreference.com/w/cpp/atomic/memory_order#Sequentially-consistent_ordering>
* - std::memory_order <https://en.cppreference.com/w/cpp/atomic/memory_order>
* - jau::sc_atomic_critical
* - setMultiPCEnabled()
+ * - interruptReader()
+ * - interruptWriter()
+ * - set_end_of_input()
*
* @anchor ringbuffer_ntt_params
* ### Non-Type Template Parameter (NTTP) controlling Value_type memory
@@ -214,6 +230,7 @@ class ringbuffer {
jau::relaxed_atomic_bool interrupted_read = false;
jau::relaxed_atomic_bool interrupted_write = false;
+ jau::relaxed_atomic_bool end_of_input = false;
allocator_type alloc_inst;
@@ -267,12 +284,12 @@ class ringbuffer {
Size_type waitForElementsImpl(const Size_type min_count, const fraction_i64& timeout, bool& timeout_occurred) noexcept {
timeout_occurred = false;
Size_type available = size();
- if( min_count > available && min_count < capacityPlusOne ) {
+ if( available < min_count && min_count < capacityPlusOne && !end_of_input ) {
interrupted_read = false;
std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock
available = size();
const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout);
- while( !interrupted_read && min_count > available ) {
+ while( !interrupted_read && !end_of_input && min_count > available ) {
if( fractions_i64::zero == timeout ) {
cvWrite.wait(lockWrite);
available = size();
@@ -285,7 +302,7 @@ class ringbuffer {
}
}
}
- if( interrupted_read ) {
+ if( interrupted_read ) { // interruption or end_of_input may happen after delivering last data chunk
interrupted_read = false;
}
}
@@ -467,11 +484,11 @@ class ringbuffer {
const Size_type oldReadPos = readPos; // SC-DRF acquire atomic readPos, sync'ing with putImpl
Size_type localReadPos = oldReadPos;
if( localReadPos == writePos ) {
- if( blocking ) {
+ if( blocking && !end_of_input ) {
interrupted_read = false;
std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock
const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout);
- while( !interrupted_read && localReadPos == writePos ) {
+ while( !interrupted_read && !end_of_input && localReadPos == writePos ) {
if( fractions_i64::zero == timeout ) {
cvWrite.wait(lockWrite);
} else {
@@ -482,9 +499,11 @@ class ringbuffer {
}
}
}
- if( interrupted_read ) {
+ if( interrupted_read || end_of_input ) {
interrupted_read = false;
- return false;
+ if( localReadPos == writePos ) { // interruption or end_of_input may happen after delivering last data chunk
+ return false;
+ }
}
} else {
return false;
@@ -511,11 +530,11 @@ class ringbuffer {
const Size_type oldReadPos = readPos; // SC-DRF acquire atomic readPos, sync'ing with putImpl
Size_type localReadPos = oldReadPos;
if( localReadPos == writePos ) {
- if( blocking ) {
+ if( blocking && !end_of_input ) {
interrupted_read = false;
std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock
const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout);
- while( !interrupted_read && localReadPos == writePos ) {
+ while( !interrupted_read && !end_of_input && localReadPos == writePos ) {
if( fractions_i64::zero == timeout ) {
cvWrite.wait(lockWrite);
} else {
@@ -526,9 +545,11 @@ class ringbuffer {
}
}
}
- if( interrupted_read ) {
+ if( interrupted_read || end_of_input ) {
interrupted_read = false;
- return false;
+ if( localReadPos == writePos ) { // interruption or end_of_input may happen after delivering last data chunk
+ return false;
+ }
}
} else {
return false;
@@ -576,12 +597,12 @@ class ringbuffer {
Size_type localReadPos = oldReadPos;
Size_type available = size();
if( min_count > available ) {
- if( blocking ) {
+ if( blocking && !end_of_input ) {
interrupted_read = false;
std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock
available = size();
const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout);
- while( !interrupted_read && min_count > available ) {
+ while( !interrupted_read && !end_of_input && min_count > available ) {
if( fractions_i64::zero == timeout ) {
cvWrite.wait(lockWrite);
available = size();
@@ -594,9 +615,11 @@ class ringbuffer {
}
}
}
- if( interrupted_read ) {
+ if( interrupted_read || end_of_input ) {
interrupted_read = false;
- return 0;
+ if( min_count > available ) { // interruption or end_of_input may happen after delivering last data chunk
+ return 0;
+ }
}
} else {
return 0;
@@ -678,12 +701,12 @@ class ringbuffer {
Size_type localReadPos = oldReadPos;
Size_type available = size();
if( count > available ) {
- if( blocking ) {
+ if( blocking && !end_of_input ) {
interrupted_read = false;
std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock
available = size();
const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout);
- while( !interrupted_read && count > available ) {
+ while( !interrupted_read && !end_of_input && count > available ) {
if( fractions_i64::zero == timeout ) {
cvWrite.wait(lockWrite);
available = size();
@@ -696,9 +719,11 @@ class ringbuffer {
}
}
}
- if( interrupted_read ) {
+ if( interrupted_read || end_of_input ) {
interrupted_read = false;
- return 0;
+ if( count > available ) { // interruption or end_of_input may happen after delivering last data chunk
+ return 0;
+ }
}
} else {
count = available; // drop all available for non-blocking
@@ -1303,6 +1328,7 @@ class ringbuffer {
closeImpl(zeromem);
interrupted_read = true;
interrupted_write = true;
+ end_of_input = true;
} else {
std::unique_lock<std::mutex> lockRead(syncRead, std::defer_lock); // same for *this instance!
std::unique_lock<std::mutex> lockWrite(syncWrite, std::defer_lock);
@@ -1311,6 +1337,7 @@ class ringbuffer {
closeImpl(zeromem);
interrupted_write = true;
interrupted_read = true;
+ end_of_input = true;
}
// have mutex unlocked before notify_all to avoid pessimistic re-block of notified wait() thread.
cvRead.notify_all(); // notify waiting writer
@@ -1412,16 +1439,29 @@ class ringbuffer {
}
/**
- * Interrupt a potentially blocked reader.
+ * Interrupt a potentially blocked reader once.
*
- * Call this method if intended to abort reading and to interrupt the reader thread's potentially blocked read-access call.
+ * Call this method to unblock a potentially blocked reader thread once.
*/
void interruptReader() noexcept { interrupted_read = true; cvWrite.notify_all(); }
/**
- * Interrupt a potentially blocked writer.
+ * Set `End of Input` from writer thread, unblocking all read-operations and a potentially currently blocked reader thread.
+ *
+ * Call this method with `true` after concluding writing input data will unblock all read-operations from this point onwards.
+ * A potentially currently blocked reader thread is also interrupted and hence unblocked.
+ */
+ void set_end_of_input(const bool v=true) noexcept {
+ end_of_input = v;
+ if( v ) {
+ cvWrite.notify_all();
+ }
+ }
+
+ /**
+ * Interrupt a potentially blocked writer once.
*
- * Call this method if intended to abort writing and to interrupt the writing thread's potentially blocked write-access call.
+ * Call this method to unblock a potentially blocked writer thread once.
*/
void interruptWriter() noexcept { interrupted_write = true; cvRead.notify_all(); }
diff --git a/src/byte_stream.cpp b/src/byte_stream.cpp
index eb0d951..bb94a57 100644
--- a/src/byte_stream.cpp
+++ b/src/byte_stream.cpp
@@ -1,6 +1,6 @@
/*
* Author: Sven Gothel <[email protected]>
- * Copyright (c) 2021 Gothel Software e.K.
+ * Copyright (c) 2021-2023 Gothel Software e.K.
*
* ByteInStream, ByteInStream_SecMemory and ByteInStream_istream are derived from Botan under same license:
* - Copyright (c) 1999-2007 Jack Lloyd
@@ -377,6 +377,7 @@ bool ByteInStream_URL::available(size_t n) noexcept {
return false;
}
// I/O still in progress, we have to poll until data is available or timeout
+ // set_eof() unblocks ringbuffer via set_end_of_input(true) permanently, hence blocking call on !m_has_content_length is OK.
bool timeout_occured;
const size_t avail = m_buffer.waitForElements(n, m_timeout, timeout_occured);
if( avail < n ) {
@@ -404,9 +405,8 @@ size_t ByteInStream_URL::read(void* out, size_t length) noexcept {
return 0;
}
bool timeout_occured = false;
- const size_t got = m_has_content_length ?
- m_buffer.getBlocking(static_cast<uint8_t*>(out), length, 1, m_timeout, timeout_occured) :
- m_buffer.get(static_cast<uint8_t*>(out), length, 1);
+ // set_eof() unblocks ringbuffer via set_end_of_input(true) permanently, hence blocking call on !m_has_content_length is OK.
+ const size_t got = m_buffer.getBlocking(static_cast<uint8_t*>(out), length, 1, m_timeout, timeout_occured);
m_bytes_consumed += got;
if( timeout_occured ) {
setstate_impl( iostate::timeout );
@@ -493,6 +493,7 @@ bool ByteInStream_Feed::available(size_t n) noexcept {
return false;
}
// I/O still in progress, we have to poll until data is available or timeout
+ // set_eof() unblocks ringbuffer via set_end_of_input(true) permanently, hence blocking call on !m_has_content_length is OK.
bool timeout_occured;
const size_t avail = m_buffer.waitForElements(n, m_timeout, timeout_occured);
if( avail < n ) {
@@ -519,9 +520,8 @@ size_t ByteInStream_Feed::read(void* out, size_t length) noexcept {
return 0;
}
bool timeout_occured = false;
- const size_t got = m_has_content_length ?
- m_buffer.getBlocking(static_cast<uint8_t*>(out), length, 1, m_timeout, timeout_occured) :
- m_buffer.get(static_cast<uint8_t*>(out), length, 1);
+ // set_eof() unblocks ringbuffer via set_end_of_input(true) permanently, hence blocking call on !m_has_content_length is OK.
+ const size_t got = m_buffer.getBlocking(static_cast<uint8_t*>(out), length, 1, m_timeout, timeout_occured);
m_bytes_consumed += got;
if( timeout_occured ) {
setstate_impl( iostate::timeout );
@@ -530,7 +530,7 @@ size_t ByteInStream_Feed::read(void* out, size_t length) noexcept {
}
m_buffer.interruptWriter();
}
- // DBG_PRINT("ByteInStream_Feed::read: size %zu/%zu bytes, %s", got, length, to_string_int().c_str() );
+ // DBG_PRINT("ByteInStream_Feed::read: size %zu/%zu bytes, timeout_occured %d, %s", got, length, timeout_occured, to_string_int().c_str() );
return got;
}
@@ -579,7 +579,7 @@ bool ByteInStream_Feed::write(uint8_t in[], size_t length, const jau::fraction_i
void ByteInStream_Feed::set_eof(const async_io_result_t result) noexcept {
m_result = result;
- interruptReader(); // FIXME: ???
+ m_buffer.set_end_of_input(true); // still considering last data, also irqs blocking ringbuffer reader
}
std::string ByteInStream_Feed::to_string_int() const noexcept {
diff --git a/src/io_util.cpp b/src/io_util.cpp
index a9754dd..320508b 100644
--- a/src/io_util.cpp
+++ b/src/io_util.cpp
@@ -1,6 +1,6 @@
/*
* Author: Sven Gothel <[email protected]>
- * Copyright (c) 2021 Gothel Software e.K.
+ * Copyright (c) 2021-2023 Gothel Software e.K.
* Copyright (c) 2021 ZAFENA AB
*
* Permission is hereby granted, free of charge, to any person obtaining
@@ -62,13 +62,13 @@ uint64_t jau::io::read_stream(ByteInStream& in,
uint64_t total = 0;
bool has_more;
do {
- if( in.available(1) ) { // at least one byte to stream ..
+ if( in.available(1) ) { // at least one byte to stream, also considers eof
buffer.resize(buffer.capacity());
const uint64_t got = in.read(buffer.data(), buffer.capacity());
buffer.resize(got);
total += got;
- has_more = 1 <= got && in.good() && ( !in.has_content_size() || total < in.content_size() );
+ has_more = 1 <= got && !in.fail() && ( !in.has_content_size() || total < in.content_size() );
try {
if( !consumer_fn(buffer, !has_more) ) {
break; // end streaming
@@ -88,7 +88,7 @@ uint64_t jau::io::read_stream(ByteInStream& in,
static uint64_t _read_buffer(ByteInStream& in,
secure_vector<uint8_t>& buffer) noexcept {
- if( in.available(1) ) { // at least one byte to stream ..
+ if( in.available(1) ) { // at least one byte to stream, also considers eof
buffer.resize(buffer.capacity());
const uint64_t got = in.read(buffer.data(), buffer.capacity());
buffer.resize(got);
@@ -111,7 +111,7 @@ uint64_t jau::io::read_stream(ByteInStream& in,
{
uint64_t got = _read_buffer(in, *buffers[idx]);
total_read += got;
- eof_read = 0 == got || !in.good() || ( in.has_content_size() && total_read >= in.content_size() );
+ eof_read = 0 == got || in.fail() || ( in.has_content_size() && total_read >= in.content_size() );
eof[idx] = eof_read;
++idx;
}
@@ -131,7 +131,7 @@ uint64_t jau::io::read_stream(ByteInStream& in,
if( !eof_read ) {
uint64_t got = _read_buffer(in, *buffers[idx]);
total_read += got;
- eof_read = 0 == got || !in.good() || ( in.has_content_size() && total_read >= in.content_size() );
+ eof_read = 0 == got || in.fail() || ( in.has_content_size() && total_read >= in.content_size() );
eof[idx] = eof_read;
if( 0 == got ) {
// read-ahead eof propagation if read zero bytes,
@@ -486,6 +486,10 @@ struct curl_glue2_t {
buffer.interruptReader();
header_sync.notify_complete();
}
+ void set_end_of_input() noexcept {
+ buffer.set_end_of_input(true);
+ header_sync.notify_complete();
+ }
};
static size_t consume_header_curl2(char *buffer, size_t size, size_t nmemb, void *userdata) noexcept {
@@ -495,7 +499,7 @@ static size_t consume_header_curl2(char *buffer, size_t size, size_t nmemb, void
// user abort!
DBG_PRINT("consume_header_curl2 ABORT by User: total %" PRIi64 ", result %d, rb %s",
cg->total_read.load(), cg->result.load(), cg->buffer.toString().c_str() );
- cg->interrupt_all();
+ cg->set_end_of_input();
return 0;
}
@@ -506,7 +510,7 @@ static size_t consume_header_curl2(char *buffer, size_t size, size_t nmemb, void
if( 400 <= v ) {
IRQ_PRINT("response_code: %ld", v);
cg->result = async_io_result_t::FAILED;
- cg->interrupt_all();
+ cg->set_end_of_input();
return 0;
} else {
DBG_PRINT("consume_header_curl2.0 response_code: %ld", v);
@@ -548,7 +552,7 @@ static size_t consume_data_curl2(char *ptr, size_t size, size_t nmemb, void *use
// user abort!
DBG_PRINT("consume_data_curl2 ABORT by User: total %" PRIi64 ", result %d, rb %s",
cg->total_read.load(), cg->result.load(), cg->buffer.toString().c_str() );
- cg->interrupt_all();
+ cg->set_end_of_input();
return 0;
}
@@ -576,7 +580,7 @@ static size_t consume_data_curl2(char *ptr, size_t size, size_t nmemb, void *use
DBG_PRINT("consume_data_curl2 Failed put: total %" PRIi64 ", result %d, timeout %d, rb %s",
cg->total_read.load(), cg->result.load(), timeout_occured, cg->buffer.toString().c_str() );
if( timeout_occured ) {
- cg->interrupt_all();
+ cg->set_end_of_input();
}
return 0;
}
@@ -586,6 +590,7 @@ static size_t consume_data_curl2(char *ptr, size_t size, size_t nmemb, void *use
cg->has_content_length ? cg->total_read >= cg->content_length : false;
if( is_final ) {
cg->result = async_io_result_t::SUCCESS;
+ cg->set_end_of_input();
}
if( false ) {
@@ -710,7 +715,7 @@ static void read_url_stream_thread(const char *url, std::unique_ptr<curl_glue2_t
errout:
cg->result = async_io_result_t::FAILED;
- cg->interrupt_all();
+ cg->set_end_of_input();
cleanup:
if( nullptr != curl_handle ) {
@@ -738,7 +743,7 @@ std::unique_ptr<std::thread> jau::io::read_url_stream(const std::string& url,
#endif // USE_LIBCURL
result = io::async_io_result_t::FAILED;
header_sync.notify_complete();
- buffer.interruptReader();
+ buffer.set_end_of_input(true);
const std::string_view scheme = uri_tk::get_scheme(url);
DBG_PRINT("Protocol of given uri-scheme '%s' not supported. Supported protocols [%s].",
std::string(scheme).c_str(), jau::to_string(uri_tk::supported_protocols(), ",").c_str());