/* * Author: Sven Gothel * Copyright (c) 2021-2023 Gothel Software e.K. * Copyright (c) 2021 ZAFENA AB * * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the * "Software"), to deal in the Software without restriction, including * without limitation the rights to use, copy, modify, merge, publish, * distribute, sublicense, and/or sell copies of the Software, and to * permit persons to whom the Software is furnished to do so, subject to * the following conditions: * * The above copyright notice and this permission notice shall be * included in all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #ifndef JAU_IO_UTIL_HPP_ #define JAU_IO_UTIL_HPP_ #include #include #include #include #include #include #include #include namespace jau::io { /** @defgroup IOUtils IO Utilities * Input and Output (IO) types and functionality. * * @{ */ template using secure_vector = std::vector>; typedef std::basic_string, jau::callocator_sec> secure_string; typedef jau::ringbuffer ByteRingbuffer; extern const size_t BEST_URLSTREAM_RINGBUFFER_SIZE; /** * I/O direction, read or write */ enum class io_dir_t : int8_t { /** Read Operation */ READ = 0, /** Write Operation */ WRITE = 1 }; /** * Asynchronous I/O operation result value */ enum class async_io_result_t : int8_t { /** Operation failed. */ FAILED = -1, /** Operation still in progress. */ NONE = 0, /** Operation succeeded. */ SUCCESS = 1 }; typedef jau::ordered_atomic relaxed_atomic_async_io_result_t; /** * Stream consumer function * - `bool consumer(secure_vector& data, bool is_final)` * * Returns true to signal continuation, false to end streaming. */ typedef jau::function& /* data */, bool /* is_final */)> StreamConsumerFunc; /** * Synchronous byte input stream reader from given file path using the given StreamConsumerFunc consumer_fn. * * To abort streaming, user may return `false` from the given `consumer_func`. * * It is guaranteed that consumer_fn() is called with `is_final=true` once at the end, * even if `input_file` stream has zero size. * * @param input_file input file name path, `-` denotes std::stdin. * @param buffer secure std::vector buffer, passed down to consumer_fn * @param consumer_fn StreamConsumerFunc consumer for each received heap of bytes, returning true to continue stream of false to abort. * @return total bytes read or 0 if error */ uint64_t read_file(const std::string& input_file, secure_vector& buffer, const StreamConsumerFunc& consumer_fn) noexcept; class ByteInStream; // fwd /** * Synchronous byte input stream reader using the given StreamConsumerFunc consumer_fn. * * To abort streaming, user may return `false` from the given `consumer_func`. * * It is guaranteed that consumer_fn() is called with `is_final=true` once at the end, * even input stream has zero size. * * @param in the input byte stream to read from * @param buffer secure std::vector buffer, passed down to consumer_fn * @param consumer_fn StreamConsumerFunc consumer for each received heap of bytes, returning true to continue stream of false to abort. * @return total bytes read or 0 if error */ uint64_t read_stream(ByteInStream& in, secure_vector& buffer, const StreamConsumerFunc& consumer_fn) noexcept; /** * Synchronous double-buffered byte input stream reader using the given StreamConsumerFunc consumer_fn. * * To abort streaming, user may return `false` from the given `consumer_func`. * * It is guaranteed that consumer_fn() is called with `is_final=true` once at the end, * even if input stream has zero size. * * Implementation reads one buffer ahead in respect to consumer_fn().
* If reading zero bytes on the next buffer, * it propagates the end-of-file (EOF) to the previous buffer which will be send via consumer_fn() next.
* * This way, the consumer_fn() will always receive its `is_final` flag on the last sent bytes (size > 0) * even if the content-size is unknown (pipe).
* Hence it allows e.g. decryption to work where the final data chunck must be processed as such. * * @param in the input byte stream to read from * @param buffer1 secure std::vector buffer, passed down to consumer_fn * @param buffer2 secure std::vector buffer, passed down to consumer_fn * @param consumer_fn StreamConsumerFunc consumer for each received heap of bytes, returning true to continue stream of false to abort. * @return total bytes read or 0 if error */ uint64_t read_stream(ByteInStream& in, secure_vector& buffer1, secure_vector& buffer2, const StreamConsumerFunc& consumer_fn) noexcept; /** * Synchronous URL stream reader using the given StreamConsumerFunc consumer_fn. * * To abort streaming, user may return `false` from the given `consumer_func`. * * Standard implementation uses [curl](https://curl.se/), * hence all [*libcurl* network protocols](https://curl.se/docs/url-syntax.html) are supported, * see jau::io::uri::supported_protocols(). * * If the uri-sheme doesn't match a supported protocol, see jau::io::uri::protocol_supported(), * function returns immediately with zero bytes. * * @param url the URL to open a connection to and stream bytes from * @param buffer secure std::vector buffer, passed down to consumer_fn * @param consumer_fn StreamConsumerFunc consumer for each received heap of bytes, returning true to continue stream of false to abort. * @return total bytes read or 0 if transmission error or protocol of given url is not supported */ uint64_t read_url_stream(const std::string& url, secure_vector& buffer, const StreamConsumerFunc& consumer_fn) noexcept; /** * Synchronization for URL header completion * as used by asynchronous read_url_stream(). * * @see url_header_sync::completed() */ class url_header_sync { private: std::mutex m_sync; std::condition_variable m_cv; jau::relaxed_atomic_bool m_completed; public: url_header_sync() noexcept : m_completed(false) { } /** * Returns whether URL header is completed. * * Completion is reached in any of the following cases * - Final (http) CRLF message received * - Any http header error response received * - First data package received * - End of operation */ bool completed() const noexcept { return m_completed; } /** * Notify completion, see completed() */ void notify_complete() noexcept; /** * Wait until completed() has been reached. * @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely * @return true if completed within timeout, otherwise false */ bool wait_until_completion(const jau::fraction_i64& timeout) noexcept; }; /** * Asynchronous URL read content using the given byte jau::ringbuffer, allowing parallel reading. * * To abort streaming, user may set given reference `results` to a value other than async_io_result_t::NONE. * * Standard implementation uses [curl](https://curl.se/), * hence all [*libcurl* network protocols](https://curl.se/docs/url-syntax.html) are supported, * see jau::io::uri::supported_protocols(). * * If the uri-sheme doesn't match a supported protocol, see jau::io::uri::protocol_supported(), * function returns with nullptr. * * @param url the URL to open a connection to and stream bytes from * @param buffer the ringbuffer destination to write into * @param header_sync synchronization object for URL header completion * @param has_content_length indicating whether content_length is known from server * @param content_length tracking the content_length * @param total_read tracking the total_read * @param result reference to tracking async_io_result_t. If set to other than async_io_result_t::NONE while streaming, streaming is aborted. * @return the url background reading thread unique-pointer or nullptr if protocol of given url is not supported */ std::unique_ptr read_url_stream(const std::string& url, ByteRingbuffer& buffer, jau::io::url_header_sync& header_sync, jau::relaxed_atomic_bool& has_content_length, jau::relaxed_atomic_uint64& content_length, jau::relaxed_atomic_uint64& total_read, relaxed_atomic_async_io_result_t& result) noexcept; void print_stats(const std::string& prefix, const uint64_t& out_bytes_total, const jau::fraction_i64& td) noexcept; /**@}*/ /** * Limited URI toolkit to query handled protocols by the IO implementation. * * The URI scheme functionality exposed here is limited and only provided to decide whether the used implementation * is able to handle the protocol. This is not a replacement for a proper URI class. */ namespace uri_tk { /** \addtogroup IOUtils * * @{ */ /** * Returns a list of supported protocol supported by [*libcurl* network protocols](https://curl.se/docs/url-syntax.html), * queried at runtime. * @see protocol_supported() */ std::vector supported_protocols() noexcept; /** * Returns the valid uri-scheme from given uri, * which is empty if no valid scheme is included. * * The given uri must include at least a colon after the uri-scheme part. * * @param uri an uri * @return valid uri-scheme, empty if non found */ std::string_view get_scheme(const std::string_view& uri) noexcept; /** * Returns true if the uri-scheme of given uri matches a supported by [*libcurl* network protocols](https://curl.se/docs/url-syntax.html) otherwise false. * * The uri-scheme is retrieved via get_scheme() passing given uri, hence must include at least a colon after the uri-scheme part. * * The *libcurl* supported protocols is queried at runtime, see supported_protocols(). * * @param uri an uri to test * @return true if the uri-scheme of given uri is supported, otherwise false. * @see supported_protocols() * @see get_scheme() */ bool protocol_supported(const std::string_view& uri) noexcept; /** * Returns true if the uri-scheme of given uri matches the local `file` protocol, i.e. starts with `file://`. * @param uri an uri to test */ bool is_local_file_protocol(const std::string_view& uri) noexcept; /** * Returns true if the uri-scheme of given uri matches the `http` or `https` protocol, i.e. starts with `http:` or `https:`. * @param uri an uri to test */ bool is_httpx_protocol(const std::string_view& uri) noexcept; /**@}*/ } } // namespace elevator::io #endif /* JAU_IO_UTIL_HPP_ */