aboutsummaryrefslogtreecommitdiffstats
path: root/src/io_util.cpp
diff options
context:
space:
mode:
authorSven Gothel <[email protected]>2022-08-30 05:31:40 +0200
committerSven Gothel <[email protected]>2022-08-30 05:31:40 +0200
commit162849026394c85b85c989336bdbf2c629e2f853 (patch)
tree2457022bc5c3d5338e64cf0831523ab5802ecb24 /src/io_util.cpp
parentf021e3d649ce9bed419e76921efa793b02897572 (diff)
async read_url_stream(): Add `url_header_sync` object to notify user whether header is complete (or an error occured, i.e. past header streaming stage)
`url_header_sync` allows user to wait until the url streamed header is completed, either - Final (http) CRLF message received - Any http header error response received - First data package received - End of operation This way content_size can be awaited and acted accordingly at read, e.g. a blocking read w/ content_size.
Diffstat (limited to 'src/io_util.cpp')
-rw-r--r--src/io_util.cpp83
1 files changed, 67 insertions, 16 deletions
diff --git a/src/io_util.cpp b/src/io_util.cpp
index 9c3543e..6c31035 100644
--- a/src/io_util.cpp
+++ b/src/io_util.cpp
@@ -30,6 +30,7 @@
#include <jau/debug.hpp>
#include <jau/io_util.hpp>
#include <jau/byte_stream.hpp>
+#include <jau/string_util.hpp>
#ifdef USE_LIBCURL
#include <curl/curl.h>
@@ -204,6 +205,14 @@ bool jau::io::uri_tk::is_local_file_protocol(const std::string_view& uri) noexce
return 0 == uri.find("file://");
}
+bool jau::io::uri_tk::is_httpx_protocol(const std::string_view& uri) noexcept {
+ const std::string_view scheme = get_scheme(uri);
+ if( scheme.empty() ) {
+ return false;
+ }
+ return "https" == scheme || "http" == scheme;
+}
+
#ifdef USE_LIBCURL
struct curl_glue1_t {
@@ -422,16 +431,42 @@ errout:
return 0;
}
+void jau::io::url_header_sync::notify_complete() noexcept {
+ {
+ std::unique_lock<std::mutex> lockWrite(m_sync);
+ m_completed = true;
+ }
+ m_cv.notify_all(); // have mutex unlocked before notify_all to avoid pessimistic re-block of notified wait() thread.
+}
+
+bool jau::io::url_header_sync::wait_until_completion(const jau::fraction_i64& timeout) noexcept {
+ std::unique_lock<std::mutex> lock(m_sync);
+ const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout);
+ while( !m_completed ) {
+ if( fractions_i64::zero == timeout ) {
+ m_cv.wait(lock);
+ } else {
+ std::cv_status s = wait_until(m_cv, lock, timeout_time );
+ if( std::cv_status::timeout == s && !m_completed ) {
+ return false;
+ }
+ }
+ }
+ return m_completed;
+}
+
#ifdef USE_LIBCURL
struct curl_glue2_t {
curl_glue2_t(CURL *_curl_handle,
+ jau::io::url_header_sync& _header_sync,
jau::relaxed_atomic_bool& _has_content_length,
jau::relaxed_atomic_uint64& _content_length,
jau::relaxed_atomic_uint64& _total_read,
ByteRingbuffer& _buffer,
relaxed_atomic_async_io_result_t& _result)
: curl_handle(_curl_handle),
+ header_sync(_header_sync),
has_content_length(_has_content_length),
content_length(_content_length),
total_read(_total_read),
@@ -440,11 +475,17 @@ struct curl_glue2_t {
{}
CURL *curl_handle;
+ jau::io::url_header_sync& header_sync;
jau::relaxed_atomic_bool& has_content_length;
jau::relaxed_atomic_uint64& content_length;
jau::relaxed_atomic_uint64& total_read;
ByteRingbuffer& buffer;
relaxed_atomic_async_io_result_t& result;
+
+ void interrupt_all() noexcept {
+ buffer.interruptReader();
+ header_sync.notify_complete();
+ }
};
static size_t consume_header_curl2(char *buffer, size_t size, size_t nmemb, void *userdata) noexcept {
@@ -454,7 +495,7 @@ static size_t consume_header_curl2(char *buffer, size_t size, size_t nmemb, void
// user abort!
DBG_PRINT("consume_header_curl2 ABORT by User: total %" PRIi64 ", result %d, rb %s",
cg->total_read.load(), cg->result.load(), cg->buffer.toString().c_str() );
- cg->buffer.interruptReader();
+ cg->interrupt_all();
return 0;
}
@@ -465,7 +506,7 @@ static size_t consume_header_curl2(char *buffer, size_t size, size_t nmemb, void
if( 400 <= v ) {
IRQ_PRINT("response_code: %ld", v);
cg->result = async_io_result_t::FAILED;
- cg->buffer.interruptReader();
+ cg->interrupt_all();
return 0;
} else {
DBG_PRINT("consume_header_curl2.0 response_code: %ld", v);
@@ -476,10 +517,7 @@ static size_t consume_header_curl2(char *buffer, size_t size, size_t nmemb, void
curl_off_t v = 0;
const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
if( CURLE_OK == r ) {
- if( 0 > v ) { // curl returns -1 if the size if not known
- cg->content_length = 0;
- cg->has_content_length = false;
- } else {
+ if( 0 <= v ) { // curl returns -1 if the size is not known
cg->content_length = v;
cg->has_content_length = true;
}
@@ -487,10 +525,16 @@ static size_t consume_header_curl2(char *buffer, size_t size, size_t nmemb, void
}
const size_t realsize = size * nmemb;
+ if( 2 == realsize && 0x0d == buffer[0] && 0x0a == buffer[1] ) {
+ cg->header_sync.notify_complete();
+ DBG_PRINT("consume_header_curl2.0 header_completed");
+ }
+
if( false ) {
DBG_PRINT("consume_header_curl2.X realsize %zu, total %" PRIu64 " / ( content_len has %d, size %" PRIu64 " ), result %d, rb %s",
realsize, cg->total_read.load(), cg->has_content_length.load(), cg->content_length.load(), cg->result.load(), cg->buffer.toString().c_str() );
std::string blob(buffer, realsize);
+ jau::PLAIN_PRINT(true, "%s", jau::bytesHexString((uint8_t*)buffer, 0, realsize, true /* lsbFirst */).c_str());
jau::PLAIN_PRINT(true, "%s", blob.c_str());
}
@@ -504,18 +548,20 @@ static size_t consume_data_curl2(char *ptr, size_t size, size_t nmemb, void *use
// user abort!
DBG_PRINT("consume_data_curl2 ABORT by User: total %" PRIi64 ", result %d, rb %s",
cg->total_read.load(), cg->result.load(), cg->buffer.toString().c_str() );
- cg->buffer.interruptReader();
+ cg->interrupt_all();
return 0;
}
+ // Ensure header completion is being sent
+ if( !cg->header_sync.completed() ) {
+ cg->header_sync.notify_complete();
+ }
+
if( !cg->has_content_length ) {
curl_off_t v = 0;
const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
if( CURLE_OK == r ) {
- if( 0 > v ) { // curl returns -1 if the size if not known
- cg->content_length = 0;
- cg->has_content_length = false;
- } else {
+ if( 0 <= v ) { // curl returns -1 if the size if not known
cg->content_length = v;
cg->has_content_length = true;
}
@@ -533,8 +579,10 @@ static size_t consume_data_curl2(char *ptr, size_t size, size_t nmemb, void *use
cg->result = async_io_result_t::SUCCESS;
}
- DBG_PRINT("consume_data_curl2.X realsize %zu, total %" PRIu64 " / ( content_len has %d, size %" PRIu64 " ), is_final %d, result %d, rb %s",
- realsize, cg->total_read.load(), cg->has_content_length.load(), cg->content_length.load(), is_final, cg->result.load(), cg->buffer.toString().c_str() );
+ if( false ) {
+ DBG_PRINT("consume_data_curl2.X realsize %zu, total %" PRIu64 " / ( content_len has %d, size %" PRIu64 " ), is_final %d, result %d, rb %s",
+ realsize, cg->total_read.load(), cg->has_content_length.load(), cg->content_length.load(), is_final, cg->result.load(), cg->buffer.toString().c_str() );
+ }
return realsize;
}
@@ -648,10 +696,12 @@ static void read_url_stream_thread(const char *url, std::unique_ptr<curl_glue2_t
/* cleanup curl stuff */
cg->result = async_io_result_t::SUCCESS;
+ cg->header_sync.notify_complete();
goto cleanup;
errout:
cg->result = async_io_result_t::FAILED;
+ cg->interrupt_all();
cleanup:
if( nullptr != curl_handle ) {
@@ -664,6 +714,7 @@ cleanup:
std::unique_ptr<std::thread> jau::io::read_url_stream(const std::string& url,
ByteRingbuffer& buffer,
+ jau::io::url_header_sync& header_sync,
jau::relaxed_atomic_bool& has_content_length,
jau::relaxed_atomic_uint64& content_length,
jau::relaxed_atomic_uint64& total_read,
@@ -675,10 +726,10 @@ std::unique_ptr<std::thread> jau::io::read_url_stream(const std::string& url,
#ifdef USE_LIBCURL
if( !uri_tk::protocol_supported(url) ) {
-#else // USE_LIBCURL
- (void) buffer;
#endif // USE_LIBCURL
result = io::async_io_result_t::FAILED;
+ header_sync.notify_complete();
+ buffer.interruptReader();
const std::string_view scheme = uri_tk::get_scheme(url);
DBG_PRINT("Protocol of given uri-scheme '%s' not supported. Supported protocols [%s].",
std::string(scheme).c_str(), jau::to_string(uri_tk::supported_protocols(), ",").c_str());
@@ -691,7 +742,7 @@ std::unique_ptr<std::thread> jau::io::read_url_stream(const std::string& url,
buffer.recapacity( BEST_URLSTREAM_RINGBUFFER_SIZE );
}
- std::unique_ptr<curl_glue2_t> cg ( std::make_unique<curl_glue2_t>(nullptr, has_content_length, content_length, total_read, buffer, result ) );
+ std::unique_ptr<curl_glue2_t> cg ( std::make_unique<curl_glue2_t>(nullptr, header_sync, has_content_length, content_length, total_read, buffer, result ) );
return std::make_unique<std::thread>(&::read_url_stream_thread, url.c_str(), std::move(cg)); // @suppress("Invalid arguments")
#endif // USE_LIBCURL