diff options
author | Sven Gothel <[email protected]> | 2022-08-30 05:31:40 +0200 |
---|---|---|
committer | Sven Gothel <[email protected]> | 2022-08-30 05:31:40 +0200 |
commit | 162849026394c85b85c989336bdbf2c629e2f853 (patch) | |
tree | 2457022bc5c3d5338e64cf0831523ab5802ecb24 /src/io_util.cpp | |
parent | f021e3d649ce9bed419e76921efa793b02897572 (diff) |
async read_url_stream(): Add `url_header_sync` object to notify user whether header is complete (or an error occured, i.e. past header streaming stage)
`url_header_sync` allows user to wait until the url streamed header is completed, either
- Final (http) CRLF message received
- Any http header error response received
- First data package received
- End of operation
This way content_size can be awaited and acted accordingly at read,
e.g. a blocking read w/ content_size.
Diffstat (limited to 'src/io_util.cpp')
-rw-r--r-- | src/io_util.cpp | 83 |
1 files changed, 67 insertions, 16 deletions
diff --git a/src/io_util.cpp b/src/io_util.cpp index 9c3543e..6c31035 100644 --- a/src/io_util.cpp +++ b/src/io_util.cpp @@ -30,6 +30,7 @@ #include <jau/debug.hpp> #include <jau/io_util.hpp> #include <jau/byte_stream.hpp> +#include <jau/string_util.hpp> #ifdef USE_LIBCURL #include <curl/curl.h> @@ -204,6 +205,14 @@ bool jau::io::uri_tk::is_local_file_protocol(const std::string_view& uri) noexce return 0 == uri.find("file://"); } +bool jau::io::uri_tk::is_httpx_protocol(const std::string_view& uri) noexcept { + const std::string_view scheme = get_scheme(uri); + if( scheme.empty() ) { + return false; + } + return "https" == scheme || "http" == scheme; +} + #ifdef USE_LIBCURL struct curl_glue1_t { @@ -422,16 +431,42 @@ errout: return 0; } +void jau::io::url_header_sync::notify_complete() noexcept { + { + std::unique_lock<std::mutex> lockWrite(m_sync); + m_completed = true; + } + m_cv.notify_all(); // have mutex unlocked before notify_all to avoid pessimistic re-block of notified wait() thread. +} + +bool jau::io::url_header_sync::wait_until_completion(const jau::fraction_i64& timeout) noexcept { + std::unique_lock<std::mutex> lock(m_sync); + const fraction_timespec timeout_time = getMonotonicTime() + fraction_timespec(timeout); + while( !m_completed ) { + if( fractions_i64::zero == timeout ) { + m_cv.wait(lock); + } else { + std::cv_status s = wait_until(m_cv, lock, timeout_time ); + if( std::cv_status::timeout == s && !m_completed ) { + return false; + } + } + } + return m_completed; +} + #ifdef USE_LIBCURL struct curl_glue2_t { curl_glue2_t(CURL *_curl_handle, + jau::io::url_header_sync& _header_sync, jau::relaxed_atomic_bool& _has_content_length, jau::relaxed_atomic_uint64& _content_length, jau::relaxed_atomic_uint64& _total_read, ByteRingbuffer& _buffer, relaxed_atomic_async_io_result_t& _result) : curl_handle(_curl_handle), + header_sync(_header_sync), has_content_length(_has_content_length), content_length(_content_length), total_read(_total_read), @@ -440,11 +475,17 @@ struct curl_glue2_t { {} CURL *curl_handle; + jau::io::url_header_sync& header_sync; jau::relaxed_atomic_bool& has_content_length; jau::relaxed_atomic_uint64& content_length; jau::relaxed_atomic_uint64& total_read; ByteRingbuffer& buffer; relaxed_atomic_async_io_result_t& result; + + void interrupt_all() noexcept { + buffer.interruptReader(); + header_sync.notify_complete(); + } }; static size_t consume_header_curl2(char *buffer, size_t size, size_t nmemb, void *userdata) noexcept { @@ -454,7 +495,7 @@ static size_t consume_header_curl2(char *buffer, size_t size, size_t nmemb, void // user abort! DBG_PRINT("consume_header_curl2 ABORT by User: total %" PRIi64 ", result %d, rb %s", cg->total_read.load(), cg->result.load(), cg->buffer.toString().c_str() ); - cg->buffer.interruptReader(); + cg->interrupt_all(); return 0; } @@ -465,7 +506,7 @@ static size_t consume_header_curl2(char *buffer, size_t size, size_t nmemb, void if( 400 <= v ) { IRQ_PRINT("response_code: %ld", v); cg->result = async_io_result_t::FAILED; - cg->buffer.interruptReader(); + cg->interrupt_all(); return 0; } else { DBG_PRINT("consume_header_curl2.0 response_code: %ld", v); @@ -476,10 +517,7 @@ static size_t consume_header_curl2(char *buffer, size_t size, size_t nmemb, void curl_off_t v = 0; const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v); if( CURLE_OK == r ) { - if( 0 > v ) { // curl returns -1 if the size if not known - cg->content_length = 0; - cg->has_content_length = false; - } else { + if( 0 <= v ) { // curl returns -1 if the size is not known cg->content_length = v; cg->has_content_length = true; } @@ -487,10 +525,16 @@ static size_t consume_header_curl2(char *buffer, size_t size, size_t nmemb, void } const size_t realsize = size * nmemb; + if( 2 == realsize && 0x0d == buffer[0] && 0x0a == buffer[1] ) { + cg->header_sync.notify_complete(); + DBG_PRINT("consume_header_curl2.0 header_completed"); + } + if( false ) { DBG_PRINT("consume_header_curl2.X realsize %zu, total %" PRIu64 " / ( content_len has %d, size %" PRIu64 " ), result %d, rb %s", realsize, cg->total_read.load(), cg->has_content_length.load(), cg->content_length.load(), cg->result.load(), cg->buffer.toString().c_str() ); std::string blob(buffer, realsize); + jau::PLAIN_PRINT(true, "%s", jau::bytesHexString((uint8_t*)buffer, 0, realsize, true /* lsbFirst */).c_str()); jau::PLAIN_PRINT(true, "%s", blob.c_str()); } @@ -504,18 +548,20 @@ static size_t consume_data_curl2(char *ptr, size_t size, size_t nmemb, void *use // user abort! DBG_PRINT("consume_data_curl2 ABORT by User: total %" PRIi64 ", result %d, rb %s", cg->total_read.load(), cg->result.load(), cg->buffer.toString().c_str() ); - cg->buffer.interruptReader(); + cg->interrupt_all(); return 0; } + // Ensure header completion is being sent + if( !cg->header_sync.completed() ) { + cg->header_sync.notify_complete(); + } + if( !cg->has_content_length ) { curl_off_t v = 0; const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v); if( CURLE_OK == r ) { - if( 0 > v ) { // curl returns -1 if the size if not known - cg->content_length = 0; - cg->has_content_length = false; - } else { + if( 0 <= v ) { // curl returns -1 if the size if not known cg->content_length = v; cg->has_content_length = true; } @@ -533,8 +579,10 @@ static size_t consume_data_curl2(char *ptr, size_t size, size_t nmemb, void *use cg->result = async_io_result_t::SUCCESS; } - DBG_PRINT("consume_data_curl2.X realsize %zu, total %" PRIu64 " / ( content_len has %d, size %" PRIu64 " ), is_final %d, result %d, rb %s", - realsize, cg->total_read.load(), cg->has_content_length.load(), cg->content_length.load(), is_final, cg->result.load(), cg->buffer.toString().c_str() ); + if( false ) { + DBG_PRINT("consume_data_curl2.X realsize %zu, total %" PRIu64 " / ( content_len has %d, size %" PRIu64 " ), is_final %d, result %d, rb %s", + realsize, cg->total_read.load(), cg->has_content_length.load(), cg->content_length.load(), is_final, cg->result.load(), cg->buffer.toString().c_str() ); + } return realsize; } @@ -648,10 +696,12 @@ static void read_url_stream_thread(const char *url, std::unique_ptr<curl_glue2_t /* cleanup curl stuff */ cg->result = async_io_result_t::SUCCESS; + cg->header_sync.notify_complete(); goto cleanup; errout: cg->result = async_io_result_t::FAILED; + cg->interrupt_all(); cleanup: if( nullptr != curl_handle ) { @@ -664,6 +714,7 @@ cleanup: std::unique_ptr<std::thread> jau::io::read_url_stream(const std::string& url, ByteRingbuffer& buffer, + jau::io::url_header_sync& header_sync, jau::relaxed_atomic_bool& has_content_length, jau::relaxed_atomic_uint64& content_length, jau::relaxed_atomic_uint64& total_read, @@ -675,10 +726,10 @@ std::unique_ptr<std::thread> jau::io::read_url_stream(const std::string& url, #ifdef USE_LIBCURL if( !uri_tk::protocol_supported(url) ) { -#else // USE_LIBCURL - (void) buffer; #endif // USE_LIBCURL result = io::async_io_result_t::FAILED; + header_sync.notify_complete(); + buffer.interruptReader(); const std::string_view scheme = uri_tk::get_scheme(url); DBG_PRINT("Protocol of given uri-scheme '%s' not supported. Supported protocols [%s].", std::string(scheme).c_str(), jau::to_string(uri_tk::supported_protocols(), ",").c_str()); @@ -691,7 +742,7 @@ std::unique_ptr<std::thread> jau::io::read_url_stream(const std::string& url, buffer.recapacity( BEST_URLSTREAM_RINGBUFFER_SIZE ); } - std::unique_ptr<curl_glue2_t> cg ( std::make_unique<curl_glue2_t>(nullptr, has_content_length, content_length, total_read, buffer, result ) ); + std::unique_ptr<curl_glue2_t> cg ( std::make_unique<curl_glue2_t>(nullptr, header_sync, has_content_length, content_length, total_read, buffer, result ) ); return std::make_unique<std::thread>(&::read_url_stream_thread, url.c_str(), std::move(cg)); // @suppress("Invalid arguments") #endif // USE_LIBCURL |