1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
|
/*
* Author: Sven Gothel <sgothel@jausoft.com>
* Copyright (c) 2021-2023 Gothel Software e.K.
* Copyright (c) 2021 ZAFENA AB
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef JAU_IO_UTIL_HPP_
#define JAU_IO_UTIL_HPP_
#include <string>
#include <cstdint>
#include <thread>
#include <vector>
#include <jau/basic_types.hpp>
#include <jau/callocator_sec.hpp>
#include <jau/ringbuffer.hpp>
#include <jau/functional.hpp>
namespace jau::io {
/** @defgroup IOUtils IO Utilities
* Input and Output (IO) types and functionality.
*
* @{
*/
template<typename T> using secure_vector = std::vector<T, jau::callocator_sec<T>>;
typedef std::basic_string<char, std::char_traits<char>, jau::callocator_sec<char>> secure_string;
typedef jau::ringbuffer<uint8_t, size_t> ByteRingbuffer;
extern const size_t BEST_URLSTREAM_RINGBUFFER_SIZE;
/**
* I/O direction, read or write
*/
enum class io_dir_t : int8_t {
/** Read Operation */
READ = 0,
/** Write Operation */
WRITE = 1
};
/**
* Asynchronous I/O operation result value
*/
enum class async_io_result_t : int8_t {
/** Operation failed. */
FAILED = -1,
/** Operation still in progress. */
NONE = 0,
/** Operation succeeded. */
SUCCESS = 1
};
typedef jau::ordered_atomic<async_io_result_t, std::memory_order_relaxed> relaxed_atomic_async_io_result_t;
/**
* Stream consumer function
* - `bool consumer(secure_vector<uint8_t>& data, bool is_final)`
*
* Returns true to signal continuation, false to end streaming.
*/
typedef jau::function<bool(secure_vector<uint8_t>& /* data */, bool /* is_final */)> StreamConsumerFunc;
/**
* Synchronous byte input stream reader from given file path using the given StreamConsumerFunc consumer_fn.
*
* To abort streaming, user may return `false` from the given `consumer_func`.
*
* It is guaranteed that consumer_fn() is called with `is_final=true` once at the end,
* even if `input_file` stream has zero size.
*
* @param input_file input file name path, `-` denotes std::stdin.
* @param buffer secure std::vector buffer, passed down to consumer_fn
* @param consumer_fn StreamConsumerFunc consumer for each received heap of bytes, returning true to continue stream of false to abort.
* @return total bytes read or 0 if error
*/
uint64_t read_file(const std::string& input_file,
secure_vector<uint8_t>& buffer,
const StreamConsumerFunc& consumer_fn) noexcept;
class ByteInStream; // fwd
/**
* Synchronous byte input stream reader using the given StreamConsumerFunc consumer_fn.
*
* To abort streaming, user may return `false` from the given `consumer_func`.
*
* It is guaranteed that consumer_fn() is called with `is_final=true` once at the end,
* even input stream has zero size.
*
* @param in the input byte stream to read from
* @param buffer secure std::vector buffer, passed down to consumer_fn
* @param consumer_fn StreamConsumerFunc consumer for each received heap of bytes, returning true to continue stream of false to abort.
* @return total bytes read or 0 if error
*/
uint64_t read_stream(ByteInStream& in,
secure_vector<uint8_t>& buffer,
const StreamConsumerFunc& consumer_fn) noexcept;
/**
* Synchronous double-buffered byte input stream reader using the given StreamConsumerFunc consumer_fn.
*
* To abort streaming, user may return `false` from the given `consumer_func`.
*
* It is guaranteed that consumer_fn() is called with `is_final=true` once at the end,
* even if input stream has zero size.
*
* Implementation reads one buffer ahead in respect to consumer_fn(). <br/>
* If reading zero bytes on the next buffer,
* it propagates the end-of-file (EOF) to the previous buffer which will be send via consumer_fn() next.<br/>
*
* This way, the consumer_fn() will always receive its `is_final` flag on the last sent bytes (size > 0)
* even if the content-size is unknown (pipe). <br/>
* Hence it allows e.g. decryption to work where the final data chunck must be processed as such.
*
* @param in the input byte stream to read from
* @param buffer1 secure std::vector buffer, passed down to consumer_fn
* @param buffer2 secure std::vector buffer, passed down to consumer_fn
* @param consumer_fn StreamConsumerFunc consumer for each received heap of bytes, returning true to continue stream of false to abort.
* @return total bytes read or 0 if error
*/
uint64_t read_stream(ByteInStream& in,
secure_vector<uint8_t>& buffer1, secure_vector<uint8_t>& buffer2,
const StreamConsumerFunc& consumer_fn) noexcept;
/**
* Synchronous URL stream reader using the given StreamConsumerFunc consumer_fn.
*
* To abort streaming, user may return `false` from the given `consumer_func`.
*
* Standard implementation uses [curl](https://curl.se/),
* hence all [*libcurl* network protocols](https://curl.se/docs/url-syntax.html) are supported,
* see jau::io::uri::supported_protocols().
*
* If the uri-sheme doesn't match a supported protocol, see jau::io::uri::protocol_supported(),
* function returns immediately with zero bytes.
*
* @param url the URL to open a connection to and stream bytes from
* @param buffer secure std::vector buffer, passed down to consumer_fn
* @param consumer_fn StreamConsumerFunc consumer for each received heap of bytes, returning true to continue stream of false to abort.
* @return total bytes read or 0 if transmission error or protocol of given url is not supported
*/
uint64_t read_url_stream(const std::string& url,
secure_vector<uint8_t>& buffer,
const StreamConsumerFunc& consumer_fn) noexcept;
/**
* Synchronization for URL header completion
* as used by asynchronous read_url_stream().
*
* @see url_header_sync::completed()
*/
class url_header_sync {
private:
std::mutex m_sync;
std::condition_variable m_cv;
jau::relaxed_atomic_bool m_completed;
public:
url_header_sync() noexcept
: m_completed(false)
{ }
/**
* Returns whether URL header is completed.
*
* Completion is reached in any of the following cases
* - Final (http) CRLF message received
* - Any http header error response received
* - First data package received
* - End of operation
*/
bool completed() const noexcept { return m_completed; }
/**
* Notify completion, see completed()
*/
void notify_complete() noexcept;
/**
* Wait until completed() has been reached.
* @param timeout maximum duration in fractions of seconds to wait, where fractions_i64::zero waits infinitely
* @return true if completed within timeout, otherwise false
*/
bool wait_until_completion(const jau::fraction_i64& timeout) noexcept;
};
/**
* Asynchronous URL read content using the given byte jau::ringbuffer, allowing parallel reading.
*
* To abort streaming, user may set given reference `results` to a value other than async_io_result_t::NONE.
*
* Standard implementation uses [curl](https://curl.se/),
* hence all [*libcurl* network protocols](https://curl.se/docs/url-syntax.html) are supported,
* see jau::io::uri::supported_protocols().
*
* If the uri-sheme doesn't match a supported protocol, see jau::io::uri::protocol_supported(),
* function returns with nullptr.
*
* @param url the URL to open a connection to and stream bytes from
* @param buffer the ringbuffer destination to write into
* @param header_sync synchronization object for URL header completion
* @param has_content_length indicating whether content_length is known from server
* @param content_length tracking the content_length
* @param total_read tracking the total_read
* @param result reference to tracking async_io_result_t. If set to other than async_io_result_t::NONE while streaming, streaming is aborted.
* @return the url background reading thread unique-pointer or nullptr if protocol of given url is not supported
*/
std::unique_ptr<std::thread> read_url_stream(const std::string& url,
ByteRingbuffer& buffer,
jau::io::url_header_sync& header_sync,
jau::relaxed_atomic_bool& has_content_length,
jau::relaxed_atomic_uint64& content_length,
jau::relaxed_atomic_uint64& total_read,
relaxed_atomic_async_io_result_t& result) noexcept;
void print_stats(const std::string& prefix, const uint64_t& out_bytes_total, const jau::fraction_i64& td) noexcept;
/**@}*/
/**
* Limited URI toolkit to query handled protocols by the IO implementation.
*
* The URI scheme functionality exposed here is limited and only provided to decide whether the used implementation
* is able to handle the protocol. This is not a replacement for a proper URI class.
*/
namespace uri_tk {
/** \addtogroup IOUtils
*
* @{
*/
/**
* Returns a list of supported protocol supported by [*libcurl* network protocols](https://curl.se/docs/url-syntax.html),
* queried at runtime.
* @see protocol_supported()
*/
std::vector<std::string_view> supported_protocols() noexcept;
/**
* Returns the valid uri-scheme from given uri,
* which is empty if no valid scheme is included.
*
* The given uri must include at least a colon after the uri-scheme part.
*
* @param uri an uri
* @return valid uri-scheme, empty if non found
*/
std::string_view get_scheme(const std::string_view& uri) noexcept;
/**
* Returns true if the uri-scheme of given uri matches a supported by [*libcurl* network protocols](https://curl.se/docs/url-syntax.html) otherwise false.
*
* The uri-scheme is retrieved via get_scheme() passing given uri, hence must include at least a colon after the uri-scheme part.
*
* The *libcurl* supported protocols is queried at runtime, see supported_protocols().
*
* @param uri an uri to test
* @return true if the uri-scheme of given uri is supported, otherwise false.
* @see supported_protocols()
* @see get_scheme()
*/
bool protocol_supported(const std::string_view& uri) noexcept;
/**
* Returns true if the uri-scheme of given uri matches the local `file` protocol, i.e. starts with `file://`.
* @param uri an uri to test
*/
bool is_local_file_protocol(const std::string_view& uri) noexcept;
/**
* Returns true if the uri-scheme of given uri matches the `http` or `https` protocol, i.e. starts with `http:` or `https:`.
* @param uri an uri to test
*/
bool is_httpx_protocol(const std::string_view& uri) noexcept;
/**@}*/
}
} // namespace elevator::io
#endif /* JAU_IO_UTIL_HPP_ */
|