aboutsummaryrefslogtreecommitdiffstats
path: root/include/jau
diff options
context:
space:
mode:
authorSven Gothel <[email protected]>2023-01-02 07:46:23 +0100
committerSven Gothel <[email protected]>2023-01-02 07:46:23 +0100
commit1a7985e17d9ba9a9627ee66d4aca660636d3f8e6 (patch)
tree3f4c8d1b4fb7801ed90048420cab34b8d219ae99 /include/jau
parentdc0a26783a2c2e0a5912302d00216f99b009d3b5 (diff)
jau::ringbuffer: Add 'End of Stream' (EOS) concept and utilize it for jau::io::ByteInStream_[URL|Feed]'s available() and read() operation.v1.1.1
jau::ringbuffer: Add 'End of Stream' (EOS) concept to unblock all read-operations from this point onwards and unblocking a potentially currently blocked reader thread. The EOS concept is required to allow blocking operations from a consumer (reader) w/o knowledge of the content-size, having the producer (writer) signaling EOS at the end and unblocking all read-operations. Fuerther, after unblocking a potentially blocker reader-thread, the failure criteria is now being checked as for the `timeout` case. This allows a read-thread being unblocked (interrupted) after writing the last chunk and hence picking up this last written data. +++ Utilize ringbuffer EOS concept for jau::io::ByteInStream_[URL|Feed]'s available() and read() operation, i.e. both methods utilize blocking operations. After the producer (URL: Our Curl consume callback; Feed: Manual set_eof()) marks EOS on the ringbuffer, all blocking read-operations are permanently unblocked and a current read-thread also interrupted. +++ Passed test_bytestream01 and test_iostream01 while causing full CPU load via `stress --cpu $(getconf _NPROCESSORS_ONLN)`.
Diffstat (limited to 'include/jau')
-rw-r--r--include/jau/byte_stream.hpp2
-rw-r--r--include/jau/io_util.hpp2
-rw-r--r--include/jau/ringbuffer.hpp88
3 files changed, 66 insertions, 26 deletions
diff --git a/include/jau/byte_stream.hpp b/include/jau/byte_stream.hpp
index 4790a9a..d65e5f7 100644
--- a/include/jau/byte_stream.hpp
+++ b/include/jau/byte_stream.hpp
@@ -722,7 +722,7 @@ namespace jau::io {
return m_parent.available(n);
}
- void clear(iostate state = iostate::goodbit) noexcept override { m_parent.clear( state ); }
+ void clear(const iostate state = iostate::goodbit) noexcept override { m_parent.clear( state ); }
iostate rdstate() const noexcept override { return m_parent.rdstate(); }
std::string id() const noexcept override { return m_parent.id(); }
diff --git a/include/jau/io_util.hpp b/include/jau/io_util.hpp
index 74ae97e..bf63ee1 100644
--- a/include/jau/io_util.hpp
+++ b/include/jau/io_util.hpp
@@ -1,6 +1,6 @@
/*
* Author: Sven Gothel <[email protected]>
- * Copyright (c) 2021 Gothel Software e.K.
+ * Copyright (c) 2021-2023 Gothel Software e.K.
* Copyright (c) 2021 ZAFENA AB
*
* Permission is hereby granted, free of charge, to any person obtaining
diff --git a/include/jau/ringbuffer.hpp b/include/jau/ringbuffer.hpp
index 80f8f6e..2782747 100644
--- a/include/jau/ringbuffer.hpp
+++ b/include/jau/ringbuffer.hpp
@@ -1,6 +1,6 @@
/*
* Author: Sven Gothel <[email protected]>
- * Copyright (c) 2020 Gothel Software e.K.
+ * Copyright (c) 2020-2023 Gothel Software e.K.
* Copyright (c) 2020 ZAFENA AB
*
* Permission is hereby granted, free of charge, to any person obtaining
@@ -127,11 +127,27 @@ namespace jau {
* - {@link #get() get*(..)} operations concurrently from multiple threads.
* - {@link #put() put*(..)} producer and {@link #get() get*(..)} consumer threads can be different or the same.
*
+ * #### Interruption of Consumer and Producer
+ * To allow an application to unblock a potentially blocked producer (writer)
+ * or consumer (reader) thread once,
+ * one can call interruptWriter() or interruptReader() respectively.
+ *
+ * #### Marking End of Input Stream (EOS)
+ * To allow an application to mark the end of input stream,
+ * i.e. the producer (write) has completed filling the ringbuffer,
+ * one can call set_end_of_input().
+ *
+ * Calling set_end_of_input(true) will unblock all read-operations from this point onwards.
+ * A potentially currently blocked reader thread is also interrupted and hence unblocked.
+ *
* #### See also
* - Sequentially Consistent (SC) ordering or SC-DRF (data race free) <https://en.cppreference.com/w/cpp/atomic/memory_order#Sequentially-consistent_ordering>
* - std::memory_order <https://en.cppreference.com/w/cpp/atomic/memory_order>
* - jau::sc_atomic_critical
* - setMultiPCEnabled()
+ * - interruptReader()
+ * - interruptWriter()
+ * - set_end_of_input()
*
* @anchor ringbuffer_ntt_params
* ### Non-Type Template Parameter (NTTP) controlling Value_type memory
@@ -214,6 +230,7 @@ class ringbuffer {
jau::relaxed_atomic_bool interrupted_read = false;
jau::relaxed_atomic_bool interrupted_write = false;
+ jau::relaxed_atomic_bool end_of_input = false;
allocator_type alloc_inst;
@@ -267,12 +284,12 @@ class ringbuffer {
Size_type waitForElementsImpl(const Size_type min_count, const fraction_i64& timeout, bool& timeout_occurred) noexcept {
timeout_occurred = false;
Size_type available = size();
- if( min_count > available && min_count < capacityPlusOne ) {
+ if( available < min_count && min_count < capacityPlusOne && !end_of_input ) {
interrupted_read = false;
std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock
available = size();
const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout);
- while( !interrupted_read && min_count > available ) {
+ while( !interrupted_read && !end_of_input && min_count > available ) {
if( fractions_i64::zero == timeout ) {
cvWrite.wait(lockWrite);
available = size();
@@ -285,7 +302,7 @@ class ringbuffer {
}
}
}
- if( interrupted_read ) {
+ if( interrupted_read ) { // interruption or end_of_input may happen after delivering last data chunk
interrupted_read = false;
}
}
@@ -467,11 +484,11 @@ class ringbuffer {
const Size_type oldReadPos = readPos; // SC-DRF acquire atomic readPos, sync'ing with putImpl
Size_type localReadPos = oldReadPos;
if( localReadPos == writePos ) {
- if( blocking ) {
+ if( blocking && !end_of_input ) {
interrupted_read = false;
std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock
const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout);
- while( !interrupted_read && localReadPos == writePos ) {
+ while( !interrupted_read && !end_of_input && localReadPos == writePos ) {
if( fractions_i64::zero == timeout ) {
cvWrite.wait(lockWrite);
} else {
@@ -482,9 +499,11 @@ class ringbuffer {
}
}
}
- if( interrupted_read ) {
+ if( interrupted_read || end_of_input ) {
interrupted_read = false;
- return false;
+ if( localReadPos == writePos ) { // interruption or end_of_input may happen after delivering last data chunk
+ return false;
+ }
}
} else {
return false;
@@ -511,11 +530,11 @@ class ringbuffer {
const Size_type oldReadPos = readPos; // SC-DRF acquire atomic readPos, sync'ing with putImpl
Size_type localReadPos = oldReadPos;
if( localReadPos == writePos ) {
- if( blocking ) {
+ if( blocking && !end_of_input ) {
interrupted_read = false;
std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock
const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout);
- while( !interrupted_read && localReadPos == writePos ) {
+ while( !interrupted_read && !end_of_input && localReadPos == writePos ) {
if( fractions_i64::zero == timeout ) {
cvWrite.wait(lockWrite);
} else {
@@ -526,9 +545,11 @@ class ringbuffer {
}
}
}
- if( interrupted_read ) {
+ if( interrupted_read || end_of_input ) {
interrupted_read = false;
- return false;
+ if( localReadPos == writePos ) { // interruption or end_of_input may happen after delivering last data chunk
+ return false;
+ }
}
} else {
return false;
@@ -576,12 +597,12 @@ class ringbuffer {
Size_type localReadPos = oldReadPos;
Size_type available = size();
if( min_count > available ) {
- if( blocking ) {
+ if( blocking && !end_of_input ) {
interrupted_read = false;
std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock
available = size();
const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout);
- while( !interrupted_read && min_count > available ) {
+ while( !interrupted_read && !end_of_input && min_count > available ) {
if( fractions_i64::zero == timeout ) {
cvWrite.wait(lockWrite);
available = size();
@@ -594,9 +615,11 @@ class ringbuffer {
}
}
}
- if( interrupted_read ) {
+ if( interrupted_read || end_of_input ) {
interrupted_read = false;
- return 0;
+ if( min_count > available ) { // interruption or end_of_input may happen after delivering last data chunk
+ return 0;
+ }
}
} else {
return 0;
@@ -678,12 +701,12 @@ class ringbuffer {
Size_type localReadPos = oldReadPos;
Size_type available = size();
if( count > available ) {
- if( blocking ) {
+ if( blocking && !end_of_input ) {
interrupted_read = false;
std::unique_lock<std::mutex> lockWrite(syncWrite); // SC-DRF w/ putImpl via same lock
available = size();
const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout);
- while( !interrupted_read && count > available ) {
+ while( !interrupted_read && !end_of_input && count > available ) {
if( fractions_i64::zero == timeout ) {
cvWrite.wait(lockWrite);
available = size();
@@ -696,9 +719,11 @@ class ringbuffer {
}
}
}
- if( interrupted_read ) {
+ if( interrupted_read || end_of_input ) {
interrupted_read = false;
- return 0;
+ if( count > available ) { // interruption or end_of_input may happen after delivering last data chunk
+ return 0;
+ }
}
} else {
count = available; // drop all available for non-blocking
@@ -1303,6 +1328,7 @@ class ringbuffer {
closeImpl(zeromem);
interrupted_read = true;
interrupted_write = true;
+ end_of_input = true;
} else {
std::unique_lock<std::mutex> lockRead(syncRead, std::defer_lock); // same for *this instance!
std::unique_lock<std::mutex> lockWrite(syncWrite, std::defer_lock);
@@ -1311,6 +1337,7 @@ class ringbuffer {
closeImpl(zeromem);
interrupted_write = true;
interrupted_read = true;
+ end_of_input = true;
}
// have mutex unlocked before notify_all to avoid pessimistic re-block of notified wait() thread.
cvRead.notify_all(); // notify waiting writer
@@ -1412,16 +1439,29 @@ class ringbuffer {
}
/**
- * Interrupt a potentially blocked reader.
+ * Interrupt a potentially blocked reader once.
*
- * Call this method if intended to abort reading and to interrupt the reader thread's potentially blocked read-access call.
+ * Call this method to unblock a potentially blocked reader thread once.
*/
void interruptReader() noexcept { interrupted_read = true; cvWrite.notify_all(); }
/**
- * Interrupt a potentially blocked writer.
+ * Set `End of Input` from writer thread, unblocking all read-operations and a potentially currently blocked reader thread.
+ *
+ * Call this method with `true` after concluding writing input data will unblock all read-operations from this point onwards.
+ * A potentially currently blocked reader thread is also interrupted and hence unblocked.
+ */
+ void set_end_of_input(const bool v=true) noexcept {
+ end_of_input = v;
+ if( v ) {
+ cvWrite.notify_all();
+ }
+ }
+
+ /**
+ * Interrupt a potentially blocked writer once.
*
- * Call this method if intended to abort writing and to interrupt the writing thread's potentially blocked write-access call.
+ * Call this method to unblock a potentially blocked writer thread once.
*/
void interruptWriter() noexcept { interrupted_write = true; cvRead.notify_all(); }