summaryrefslogtreecommitdiffstats
path: root/include/jau
diff options
context:
space:
mode:
authorSven Gothel <[email protected]>2022-05-28 06:34:20 +0200
committerSven Gothel <[email protected]>2022-05-28 06:34:20 +0200
commitaf35a513b0d60301d38e3ab5cb3d119d81398998 (patch)
tree6117607323fbb3a3e66ac3ca3c934a7f9ddb0095 /include/jau
parent0874b1dde918f2441a0af1ecec07e10a9bd2e181 (diff)
Integrating byte_stream.* and io_util.* incl. unit test from Elevator project for better encapsulation, test and generic use
Fixed: ByteStream_File::check_available(): 'm_bytes_consumed - m_content_size >= n', missed subtracting m_content_size (duh) test_iostream01 uses the fragile std::system() call to start/stop mini_httpd as the http server for testing streaming with curl.
Diffstat (limited to 'include/jau')
-rw-r--r--include/jau/byte_stream.hpp637
-rw-r--r--include/jau/io_util.hpp110
2 files changed, 747 insertions, 0 deletions
diff --git a/include/jau/byte_stream.hpp b/include/jau/byte_stream.hpp
new file mode 100644
index 0000000..eb690c4
--- /dev/null
+++ b/include/jau/byte_stream.hpp
@@ -0,0 +1,637 @@
+/*
+ * Author: Sven Gothel <[email protected]>
+ * Copyright (c) 2021 Gothel Software e.K.
+ *
+ * ByteStream, ByteStream_SecMemory and ByteStream_istream are derived from Botan under same license:
+ * Copyright (c) 1999-2007 Jack Lloyd
+ * Copyright (c) 2005 Matthew Gregan
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining
+ * a copy of this software and associated documentation files (the
+ * "Software"), to deal in the Software without restriction, including
+ * without limitation the rights to use, copy, modify, merge, publish,
+ * distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so, subject to
+ * the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+ * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+ * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+ * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+#ifndef JAU_BYTE_STREAM_HPP_
+#define JAU_BYTE_STREAM_HPP_
+
+#include <fstream>
+#include <string>
+#include <cstdint>
+#include <functional>
+#include <thread>
+
+#include <jau/basic_types.hpp>
+#include <jau/callocator_sec.hpp>
+#include <jau/ringbuffer.hpp>
+
+// Include Botan header files before this one to be integrated w/ Botan!
+// #include <botan_all.h>
+
+namespace jau::io {
+
+ /**
+ * Operation result value
+ */
+ enum class result_t : int8_t {
+ /** Operation failed. */
+ FAILED = -1,
+
+ /** Operation still in progress. */
+ NONE = 0,
+
+ /** Operation succeeded. */
+ SUCCESS = 1
+ };
+ typedef jau::ordered_atomic<result_t, std::memory_order::memory_order_relaxed> relaxed_atomic_result_t;
+
+ typedef jau::ringbuffer<uint8_t, size_t> ByteRingbuffer;
+
+ extern const size_t BEST_URLSTREAM_RINGBUFFER_SIZE;
+
+#ifdef BOTAN_VERSION_MAJOR
+ #define VIRTUAL_BOTAN
+ #define OVERRIDE_BOTAN override
+ template<typename T> using secure_vector = std::vector<T, Botan::secure_allocator<T>>;
+#else
+ #define VIRTUAL_BOTAN virtual
+ #define OVERRIDE_BOTAN
+ template<typename T> using secure_vector = std::vector<T, jau::callocator_sec<T>>;
+#endif
+
+ /**
+ * This class represents an abstract byte stream object.
+ *
+ * @anchor byte_stream_properties
+ * ### ByteStream Properties
+ * The byte stream can be originated from a local source w/o delay,
+ * remote like http streaming or even from another thread.<br />
+ * Both latter asynchronous resources may expose blocking properties
+ * in check_available().
+ *
+ * Asynchronous resources benefit from knowing their content size,
+ * their check_available() implementation may avoid
+ * blocking and waiting for requested bytes available
+ * if the stream is already beyond its scope.
+ *
+ * @see @ref byte_stream_properties "ByteStream Properties"
+ */
+ class ByteStream
+#ifdef BOTAN_VERSION_MAJOR
+ : public Botan::DataSource
+#endif
+ {
+ public:
+ ByteStream() = default;
+ virtual ~ByteStream() = default;
+ ByteStream& operator=(const ByteStream&) = delete;
+ ByteStream(const ByteStream&) = delete;
+
+ /**
+ * Close the stream if supported by the underlying mechanism.
+ */
+ virtual void close() noexcept = 0;
+
+ /**
+ * Check whether n bytes are available in the input stream.
+ *
+ * This method may be blocking when using an asynchronous source
+ * up until the requested bytes are actually available.
+ *
+ * A subsequent call to read() shall return immediately with at least
+ * the requested numbers of bytes available.
+ *
+ * @param n byte count to wait for
+ * @return true if n bytes are available, otherwise false
+ *
+ * @see read()
+ * @see @ref byte_stream_properties "ByteStream Properties"
+ */
+ VIRTUAL_BOTAN bool check_available(size_t n) OVERRIDE_BOTAN = 0;
+
+ /**
+ * Read from the source. Moves the internal offset so that every
+ * call to read will return a new portion of the source.
+ *
+ * Use check_available() to wait and ensure a certain amount of bytes are available.
+ *
+ * This method is not blocking.
+ *
+ * @param out the byte array to write the result to
+ * @param length the length of the byte array out
+ * @return length in bytes that was actually read and put into out
+ *
+ * @see check_available()
+ * @see @ref byte_stream_properties "ByteStream Properties"
+ */
+ [[nodiscard]] VIRTUAL_BOTAN size_t read(uint8_t out[], size_t length) OVERRIDE_BOTAN = 0;
+
+ /**
+ * Read from the source but do not modify the internal
+ * offset. Consecutive calls to peek() will return portions of
+ * the source starting at the same position.
+ *
+ * @param out the byte array to write the output to
+ * @param length the length of the byte array out
+ * @param peek_offset the offset into the stream to read at
+ * @return length in bytes that was actually read and put into out
+ */
+ [[nodiscard]] VIRTUAL_BOTAN size_t peek(uint8_t out[], size_t length, size_t peek_offset) const OVERRIDE_BOTAN = 0;
+
+ /**
+ * Test whether the source still has data that can be read.
+ * @return true if there is no more data to read, false otherwise
+ */
+ VIRTUAL_BOTAN bool end_of_data() const OVERRIDE_BOTAN = 0;
+
+#ifndef BOTAN_VERSION_MAJOR
+
+ /**
+ * return the id of this data source
+ * @return std::string representing the id of this data source
+ */
+ virtual std::string id() const OVERRIDE_BOTAN { return ""; }
+
+ /**
+ * Read one byte.
+ * @param out the byte to read to
+ * @return length in bytes that was actually read and put
+ * into out
+ */
+ size_t read_byte(uint8_t& out);
+
+ /**
+ * Peek at one byte.
+ * @param out an output byte
+ * @return length in bytes that was actually read and put
+ * into out
+ */
+ size_t peek_byte(uint8_t& out) const;
+
+ /**
+ * Discard the next N bytes of the data
+ * @param N the number of bytes to discard
+ * @return number of bytes actually discarded
+ */
+ size_t discard_next(size_t N);
+
+#endif /* BOTAN_VERSION_MAJOR */
+
+ /**
+ * @return number of bytes read so far.
+ */
+ VIRTUAL_BOTAN size_t get_bytes_read() const OVERRIDE_BOTAN = 0;
+
+ virtual std::string to_string() const = 0;
+ };
+
+ /**
+ * This class represents a secure Memory-Based byte stream
+ */
+ class ByteStream_SecMemory final : public ByteStream {
+ public:
+ /**
+ * Read from a memory buffer
+ */
+ size_t read(uint8_t[], size_t) override;
+
+ /**
+ * Peek into a memory buffer
+ */
+ size_t peek(uint8_t[], size_t, size_t) const override;
+
+ bool check_available(size_t n) override;
+
+ /**
+ * Check if the memory buffer is empty
+ */
+ bool end_of_data() const override;
+
+ /**
+ * Construct a memory source that reads from a string
+ * @param in the string to read from
+ */
+ explicit ByteStream_SecMemory(const std::string& in);
+
+ /**
+ * Construct a memory source that reads from a byte array
+ * @param in the byte array to read from
+ * @param length the length of the byte array
+ */
+ ByteStream_SecMemory(const uint8_t in[], size_t length)
+ : m_source(in, in + length), m_offset(0) {}
+
+ /**
+ * Construct a memory source that reads from a secure_vector
+ * @param in the MemoryRegion to read from
+ */
+ explicit ByteStream_SecMemory(const io::secure_vector<uint8_t>& in)
+ : m_source(in), m_offset(0) {}
+
+ /**
+ * Construct a memory source that reads from a std::vector
+ * @param in the MemoryRegion to read from
+ */
+ explicit ByteStream_SecMemory(const std::vector<uint8_t>& in)
+ : m_source(in.begin(), in.end()), m_offset(0) {}
+
+ void close() noexcept override;
+
+ ~ByteStream_SecMemory() override { close(); }
+
+ size_t get_bytes_read() const override { return m_offset; }
+
+ std::string to_string() const override;
+
+ private:
+ io::secure_vector<uint8_t> m_source;
+ size_t m_offset;
+ };
+
+ /**
+ * This class represents an std::istream based byte stream.
+ */
+ class ByteStream_istream final : public ByteStream {
+ public:
+ size_t read(uint8_t[], size_t) override;
+ size_t peek(uint8_t[], size_t, size_t) const override;
+ bool check_available(size_t n) override;
+ bool end_of_data() const override;
+ std::string id() const override;
+
+ ByteStream_istream(std::istream&, const std::string& id = "<std::istream>");
+
+ ByteStream_istream(const ByteStream_istream&) = delete;
+
+ ByteStream_istream& operator=(const ByteStream_istream&) = delete;
+
+ void close() noexcept override;
+
+ ~ByteStream_istream() override { close(); }
+
+ size_t get_bytes_read() const override { return m_bytes_consumed; }
+
+ std::string to_string() const override;
+
+ private:
+ const std::string m_identifier;
+
+ std::istream& m_source;
+ size_t m_bytes_consumed;
+ };
+
+
+ /**
+ * This class represents a file based byte stream.
+ */
+ class ByteStream_File final : public ByteStream {
+ public:
+ size_t read(uint8_t[], size_t) override;
+ size_t peek(uint8_t[], size_t, size_t) const override;
+ bool check_available(size_t n) override;
+ bool end_of_data() const override;
+ std::string id() const override;
+
+ /**
+ * Construct a Stream-Based byte stream from filesystem path
+ * @param file the path to the file
+ * @param use_binary whether to treat the file as binary or not
+ */
+ ByteStream_File(const std::string& file, bool use_binary);
+
+ ByteStream_File(const ByteStream_File&) = delete;
+
+ ByteStream_File& operator=(const ByteStream_File&) = delete;
+
+ void close() noexcept override;
+
+ ~ByteStream_File() override { close(); }
+
+ size_t get_bytes_read() const override { return (size_t)m_bytes_consumed; }
+
+ /**
+ * Botan's get_bytes_read() API uses `size_t`,
+ * which only covers 32bit or 4GB on 32bit systems.
+ * @return uint64_t bytes read
+ */
+ uint64_t get_bytes_read_u64() const { return m_bytes_consumed; }
+
+ std::string to_string() const override;
+
+ private:
+ const std::string m_identifier;
+ std::unique_ptr<std::ifstream> m_source;
+ uint64_t m_content_size;
+ uint64_t m_bytes_consumed;
+ };
+
+ /**
+ * This class represents a Ringbuffer-Based URL byte stream
+ */
+ class ByteStream_URL final : public ByteStream {
+ public:
+ /**
+ * Check whether n bytes are available in the input stream.
+ *
+ * Wait up to timeout duration given in constructor until n bytes become available, where fractions_i64::zero waits infinitely.
+ *
+ * This method is blocking.
+ *
+ * @param n byte count to wait for
+ * @return true if n bytes are available, otherwise false
+ *
+ * @see read()
+ * @see @ref byte_stream_properties "ByteStream Properties"
+ */
+ bool check_available(size_t n) override;
+
+ /**
+ * Read from the source. Moves the internal offset so that every
+ * call to read will return a new portion of the source.
+ *
+ * Use check_available() to wait and ensure a certain amount of bytes are available.
+ *
+ * This method is not blocking.
+ *
+ * @param out the byte array to write the result to
+ * @param length the length of the byte array out
+ * @return length in bytes that was actually read and put into out
+ *
+ * @see check_available()
+ * @see @ref byte_stream_properties "ByteStream Properties"
+ */
+ size_t read(uint8_t out[], size_t length) override;
+
+ size_t peek(uint8_t out[], size_t length, size_t peek_offset) const override;
+
+ bool end_of_data() const override;
+
+ std::string id() const override { return m_url; }
+
+ /**
+ * Construct a ringbuffer backed Http byte stream
+ * @param url the URL of the data to read
+ * @param timeout maximum duration in fractions of seconds to wait @ check_available(), where fractions_i64::zero waits infinitely
+ * @param exp_size if > 0 it is additionally used to determine EOF, otherwise the underlying EOF mechanism is being used only (default).
+ */
+ ByteStream_URL(const std::string& url, jau::fraction_i64 timeout, const uint64_t exp_size=0);
+
+ ByteStream_URL(const ByteStream_URL&) = delete;
+
+ ByteStream_URL& operator=(const ByteStream_URL&) = delete;
+
+ void close() noexcept override;
+
+ ~ByteStream_URL() override { close(); }
+
+ size_t get_bytes_read() const override { return (size_t)m_bytes_consumed; }
+
+ /**
+ * Botan's get_bytes_read() API uses `size_t`,
+ * which only covers 32bit or 4GB on 32bit systems.
+ * @return uint64_t bytes read
+ */
+ uint64_t get_bytes_read_u64() const { return m_bytes_consumed; }
+
+ std::string to_string() const override;
+
+ private:
+ uint64_t get_available() const noexcept { return m_has_content_length ? m_content_size - m_bytes_consumed : 0; }
+ std::string to_string_int() const;
+
+ const std::string m_url;
+ const uint64_t m_exp_size;
+ jau::fraction_i64 m_timeout;
+ ByteRingbuffer m_buffer;
+ jau::relaxed_atomic_bool m_has_content_length;
+ jau::relaxed_atomic_uint64 m_content_size;
+ jau::relaxed_atomic_uint64 m_total_xfered;
+ relaxed_atomic_result_t m_result;
+ std::thread m_url_thread;
+ uint64_t m_bytes_consumed;
+ };
+
+ /**
+ * This class represents a Ringbuffer-Based externally provisioned data feed.
+ */
+ class ByteStream_Feed final : public ByteStream {
+ public:
+ /**
+ * Check whether n bytes are available in the input stream.
+ *
+ * Wait up to timeout duration given in constructor until n bytes become available, where fractions_i64::zero waits infinitely.
+ *
+ * This method is blocking.
+ *
+ * @param n byte count to wait for
+ * @return true if n bytes are available, otherwise false
+ *
+ * @see read()
+ * @see @ref byte_stream_properties "ByteStream Properties"
+ */
+ bool check_available(size_t n) override;
+
+ /**
+ * Read from the source. Moves the internal offset so that every
+ * call to read will return a new portion of the source.
+ *
+ * Use check_available() to wait and ensure a certain amount of bytes are available.
+ *
+ * This method is not blocking.
+ *
+ * @param out the byte array to write the result to
+ * @param length the length of the byte array out
+ * @return length in bytes that was actually read and put into out
+ *
+ * @see check_available()
+ * @see @ref byte_stream_properties "ByteStream Properties"
+ */
+ size_t read(uint8_t out[], size_t length) override;
+
+ size_t peek(uint8_t out[], size_t length, size_t peek_offset) const override;
+
+ bool end_of_data() const override;
+
+ std::string id() const override { return m_id; }
+
+ /**
+ * Construct a ringbuffer backed externally provisioned byte stream
+ * @param id_name arbitrary identifier for this instance
+ * @param timeout maximum duration in fractions of seconds to wait @ check_available() and write(), where fractions_i64::zero waits infinitely
+ * @param exp_size if > 0 it is additionally used to determine EOF, otherwise the underlying EOF mechanism is being used only (default).
+ */
+ ByteStream_Feed(const std::string& id_name, jau::fraction_i64 timeout, const uint64_t exp_size=0);
+
+ ByteStream_Feed(const ByteStream_URL&) = delete;
+
+ ByteStream_Feed& operator=(const ByteStream_URL&) = delete;
+
+ void close() noexcept override;
+
+ ~ByteStream_Feed() override { close(); }
+
+ size_t get_bytes_read() const override { return m_bytes_consumed; }
+
+ /**
+ * Botan's get_bytes_read() API uses `size_t`,
+ * which only covers 32bit or 4GB on 32bit systems.
+ * @return uint64_t bytes read
+ */
+ uint64_t get_bytes_read_u64() const { return m_bytes_consumed; }
+
+ /**
+ * Write given bytes to the async ringbuffer.
+ *
+ * Wait up to timeout duration given in constructor until ringbuffer space is available, where fractions_i64::zero waits infinitely.
+ *
+ * This method is blocking.
+ *
+ * @param n byte count to wait for
+ * @param in the byte array to transfer to the async ringbuffer
+ * @param length the length of the byte array in
+ */
+ void write(uint8_t in[], size_t length);
+
+ /**
+ * Set known content size, informal only.
+ * @param content_length the content size in bytes
+ */
+ void set_content_size(const uint64_t size) noexcept {
+ m_content_size = size;
+ m_has_content_length = true;
+ }
+
+ /**
+ * Set end-of-data (EOS), i.e. when feeder completed provisioning bytes.
+ *
+ * @param result should be either result_t::FAILED or result_t::SUCCESS.
+ */
+ void set_eof(const result_t result) noexcept { m_result = result; }
+
+ std::string to_string() const override;
+
+ private:
+ uint64_t get_available() const noexcept { return m_has_content_length ? m_content_size - m_bytes_consumed : 0; }
+ std::string to_string_int() const;
+
+ const std::string m_id;
+ const uint64_t m_exp_size;
+ jau::fraction_i64 m_timeout;
+ ByteRingbuffer m_buffer;
+ jau::relaxed_atomic_bool m_has_content_length;
+ jau::relaxed_atomic_uint64 m_content_size;
+ jau::relaxed_atomic_uint64 m_total_xfered;
+ relaxed_atomic_result_t m_result;
+ uint64_t m_bytes_consumed;
+ };
+
+ /**
+ * This class represents a wrapped byte stream with the capability
+ * to record the byte stream read out at will.
+ *
+ * Peek'ed bytes won't be recorded, only read bytes.
+ */
+ class ByteStream_Recorder final : public ByteStream {
+ public:
+ size_t read(uint8_t[], size_t) override;
+
+ size_t peek(uint8_t out[], size_t length, size_t peek_offset) const override {
+ return m_parent.peek(out, length, peek_offset);
+ }
+
+ bool check_available(size_t n) override {
+ return m_parent.check_available(n);
+ }
+
+ bool end_of_data() const override {
+ return m_parent.end_of_data();
+ }
+
+ std::string id() const override { return m_parent.id(); }
+
+ /**
+ * Construct a byte stream wrapper using the given parent ByteStream.
+ * @param parent the parent ByteStream
+ * @param buffer a user defined buffer for the recording
+ */
+ ByteStream_Recorder(ByteStream& parent, io::secure_vector<uint8_t>& buffer)
+ : m_parent(parent), m_bytes_consumed(0), m_buffer(buffer), m_rec_offset(0), m_is_recording(false) {};
+
+ ByteStream_Recorder(const ByteStream_Recorder&) = delete;
+
+ ByteStream_Recorder& operator=(const ByteStream_Recorder&) = delete;
+
+ void close() noexcept override;
+
+ ~ByteStream_Recorder() override { close(); }
+
+ size_t get_bytes_read() const override { return m_parent.get_bytes_read(); }
+
+ /**
+ * Botan's get_bytes_read() API uses `size_t`,
+ * which only covers 32bit or 4GB on 32bit systems.
+ * @return uint64_t bytes read
+ */
+ uint64_t get_bytes_read_u64() const { return m_bytes_consumed; }
+
+ /**
+ * Starts the recording.
+ * <p>
+ * A potential previous recording will be cleared.
+ * </p>
+ */
+ void start_recording() noexcept;
+
+ /**
+ * Stops the recording.
+ * <p>
+ * The recording persists.
+ * </p>
+ */
+ void stop_recording() noexcept;
+
+ /**
+ * Clears the recording.
+ * <p>
+ * If the recording was ongoing, also stops the recording.
+ * </p>
+ */
+ void clear_recording() noexcept;
+
+ /** Returns the reference of the recording buffer given by user. */
+ io::secure_vector<uint8_t>& get_recording() noexcept { return m_buffer; }
+
+ size_t get_bytes_recorded() noexcept { return m_buffer.size(); }
+
+ /** Returns the recording start position. */
+ uint64_t get_recording_start_pos() noexcept { return m_rec_offset; }
+
+ bool is_recording() noexcept { return m_is_recording; }
+
+ std::string to_string() const override;
+
+ private:
+ ByteStream& m_parent;
+ uint64_t m_bytes_consumed;
+ io::secure_vector<uint8_t>& m_buffer;
+ uint64_t m_rec_offset;
+ bool m_is_recording;
+ };
+
+} // namespace elevator::io
+
+#endif /* JAU_BYTE_STREAM_HPP_ */
diff --git a/include/jau/io_util.hpp b/include/jau/io_util.hpp
new file mode 100644
index 0000000..81bf430
--- /dev/null
+++ b/include/jau/io_util.hpp
@@ -0,0 +1,110 @@
+/*
+ * Author: Sven Gothel <[email protected]>
+ * Copyright (c) 2021 Gothel Software e.K.
+ * Copyright (c) 2021 ZAFENA AB
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining
+ * a copy of this software and associated documentation files (the
+ * "Software"), to deal in the Software without restriction, including
+ * without limitation the rights to use, copy, modify, merge, publish,
+ * distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so, subject to
+ * the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+ * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+ * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+ * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+#ifndef JAU_IO_UTIL_HPP_
+#define JAU_IO_UTIL_HPP_
+
+#include <string>
+#include <cstdint>
+#include <functional>
+#include <thread>
+#include <vector>
+
+#include <jau/basic_types.hpp>
+#include <jau/ringbuffer.hpp>
+
+// Include Botan header files before this one to be integrated w/ Botan!
+// #include <botan_all.h>
+
+#include <jau/byte_stream.hpp>
+
+namespace jau::io {
+ /**
+ * Stream consumer function
+ * - `bool consumer(secure_vector<uint8_t>& data, bool is_final)`
+ *
+ * Returns true to signal continuation, false to end streaming.
+ */
+ typedef std::function<bool (secure_vector<uint8_t>& /* data */, bool /* is_final */)> StreamConsumerFunc;
+
+ /**
+ *
+ * @param input_file
+ * @param exp_size if > 0 it is additionally used to determine EOF, otherwise the underlying EOF mechanism is being used only.
+ * @param buf_size
+ * @param consumer_fn
+ * @return total bytes read or 0 if error
+ */
+ uint64_t read_file(const std::string& input_file, const uint64_t exp_size,
+ secure_vector<uint8_t>& buffer,
+ StreamConsumerFunc consumer_fn);
+
+ /**
+ * @param in
+ * @param exp_size if > 0 it is additionally used to determine EOF, otherwise the underlying EOF mechanism is being used only.
+ * @param buf_size
+ * @param consumer_fn
+ * @return total bytes read
+ */
+ uint64_t read_stream(ByteStream& in, const uint64_t exp_size,
+ secure_vector<uint8_t>& buffer,
+ StreamConsumerFunc consumer_fn);
+
+ /**
+ *
+ * @param url
+ * @param exp_size if > 0 it is additionally used to determine EOF, otherwise the underlying EOF mechanism is being used only.
+ * @param buffer
+ * @param consumer_fn
+ * @return total bytes read or 0 if error
+ */
+ uint64_t read_url_stream(const std::string& url, const uint64_t exp_size,
+ secure_vector<uint8_t>& buffer,
+ StreamConsumerFunc consumer_fn);
+
+ /**
+ * Asynchronous URL read content using a byte jau::ringbuffer,
+ * allowing parallel reading.
+ *
+ * @param url the URL of the content to read
+ * @param exp_size if > 0 it is additionally used to determine EOF, otherwise the underlying EOF mechanism is being used only.
+ * @param buffer the ringbuffer destination to write into
+ * @param has_content_length indicating whether content_length is known from server
+ * @param content_length tracking the content_length
+ * @param total_read tracking the total_read
+ * @param result tracking the result_t
+ * @return the url background reading thread
+ */
+ std::thread read_url_stream(const std::string& url, const uint64_t& exp_size,
+ ByteRingbuffer& buffer,
+ jau::relaxed_atomic_bool& has_content_length,
+ jau::relaxed_atomic_uint64& content_length,
+ jau::relaxed_atomic_uint64& total_read,
+ relaxed_atomic_result_t& result) noexcept;
+
+ void print_stats(const std::string& prefix, const uint64_t& out_bytes_total, const jau::fraction_i64& td) noexcept;
+} // namespace elevator::io
+
+#endif /* JAU_IO_UTIL_HPP_ */