aboutsummaryrefslogtreecommitdiffstats
path: root/src/io_util.cpp
diff options
context:
space:
mode:
authorSven Gothel <[email protected]>2022-07-31 10:35:24 +0200
committerSven Gothel <[email protected]>2022-07-31 10:35:24 +0200
commit9f6e122a4df1ffd91e1c1ae92dc324ddd35df1df (patch)
tree2f13d06511d5afa0d7da90d227fabbc7da508e13 /src/io_util.cpp
parent2aa5582118521c9abd922d8a4cfd7439b8cfd419 (diff)
Add jau::io::read_stream() with double-buffered reading to ensure last consumer_fn() call gets is_final set if next buffer has eof() w/ zero bytes.
This behavior is required for certain operations, i.e. where we cannot have a zero-sized consumer_fn() call as is_final=true. Example: cipherpack using Botan API, where AEAD::finish() requires the last Tag bytes to be delivered.
Diffstat (limited to 'src/io_util.cpp')
-rw-r--r--src/io_util.cpp74
1 files changed, 72 insertions, 2 deletions
diff --git a/src/io_util.cpp b/src/io_util.cpp
index 38eda11..c66cafc 100644
--- a/src/io_util.cpp
+++ b/src/io_util.cpp
@@ -87,6 +87,76 @@ uint64_t jau::io::read_stream(ByteInStream& in,
return total;
}
+static uint64_t _read_buffer(ByteInStream& in,
+ secure_vector<uint8_t>& buffer) noexcept {
+ if( in.check_available(1) ) { // at least one byte to stream ..
+ buffer.resize(buffer.capacity());
+ const uint64_t got = in.read(buffer.data(), buffer.capacity());
+ buffer.resize(got);
+ return got;
+ }
+ return 0;
+}
+
+uint64_t jau::io::read_stream(ByteInStream& in,
+ secure_vector<uint8_t>& buffer1, secure_vector<uint8_t>& buffer2,
+ StreamConsumerFunc consumer_fn) noexcept {
+ secure_vector<uint8_t>* buffers[] = { &buffer1, &buffer2 };
+ bool eof[] = { false, false };
+
+ bool eof_read = false;
+ uint64_t total_send = 0;
+ uint64_t total_read = 0;
+ int idx = 0;
+ // fill 1st buffer upfront
+ {
+ uint64_t got = _read_buffer(in, *buffers[idx]);
+ total_read += got;
+ eof_read = 0 == got || in.end_of_data() || ( in.has_content_size() && total_read >= in.content_size() );
+ eof[idx] = eof_read;
+ ++idx;
+ }
+
+ // - buffer_idx was filled
+ // - buffer_idx++
+ //
+ // - while !eof_send do
+ // - read buffer_idx if not eof_read,
+ // - set eof[buffer_idx+1]=true if zero bytes
+ // - buffer_idx++
+ // - sent buffer_idx
+ //
+ bool eof_send = false;
+ while( !eof_send ) {
+ int bidx_next = ( idx + 1 ) % 2;
+ if( !eof_read ) {
+ uint64_t got = _read_buffer(in, *buffers[idx]);
+ total_read += got;
+ eof_read = 0 == got || in.end_of_data() || ( in.has_content_size() && total_read >= in.content_size() );
+ eof[idx] = eof_read;
+ if( 0 == got ) {
+ // read-ahead eof propagation if read zero bytes,
+ // hence next consumer_fn() will send last bytes with is_final=true
+ eof[bidx_next] = true;
+ }
+ }
+ idx = bidx_next;
+
+ secure_vector<uint8_t>* buffer = buffers[idx];
+ eof_send = eof[idx];
+ total_send += buffer->size();
+ try {
+ if( !consumer_fn(*buffer, eof_send) ) {
+ return total_send; // end streaming
+ }
+ } catch (std::exception &e) {
+ ERR_PRINT("jau::io::read_stream: Caught exception: %s", e.what());
+ return total_send; // end streaming
+ }
+ }
+ return total_send;
+}
+
std::vector<std::string_view> jau::io::uri_tk::supported_protocols() noexcept {
std::vector<std::string_view> res;
#ifdef USE_LIBCURL
@@ -240,7 +310,7 @@ uint64_t jau::io::read_url_stream(const std::string& url,
if( !uri_tk::protocol_supported(url) ) {
const std::string_view scheme = uri_tk::get_scheme(url);
DBG_PRINT("Protocol of given uri-scheme '%s' not supported. Supported protocols [%s].",
- std::string(scheme).c_str(), to_string(uri_tk::supported_protocols(), ",").c_str());
+ std::string(scheme).c_str(), jau::to_string(uri_tk::supported_protocols(), ",").c_str());
return 0;
}
@@ -613,7 +683,7 @@ std::unique_ptr<std::thread> jau::io::read_url_stream(const std::string& url,
result = io::async_io_result_t::FAILED;
const std::string_view scheme = uri_tk::get_scheme(url);
DBG_PRINT("Protocol of given uri-scheme '%s' not supported. Supported protocols [%s].",
- std::string(scheme).c_str(), to_string(uri_tk::supported_protocols(), ",").c_str());
+ std::string(scheme).c_str(), jau::to_string(uri_tk::supported_protocols(), ",").c_str());
return nullptr;
#ifdef USE_LIBCURL
}