diff options
author | Sven Gothel <[email protected]> | 2022-05-29 13:20:41 +0200 |
---|---|---|
committer | Sven Gothel <[email protected]> | 2022-05-29 13:20:41 +0200 |
commit | 68a026914fa27a8dd3bebbb358b30ee24f4de193 (patch) | |
tree | 85b580ada046f3e44a31a28b986ecaf2a0deec24 | |
parent | 13ce66b10dedf4d26eea4ff2262649ab23687c69 (diff) |
io::read_url_stream: Fix content_length (-1 == unknown), add consume_header w/ response_code for errors >= 400 and buffer.interruptReader()
- curl replies -1 for unknown content_length
- also consume the curl header to detect response_code errors >= 400 (404 not found)
- in case of an error, we need to interrupt the ringbuffer-reader thread -> buffer.interruptReader(),
otherwise the thread would be blocked until timeout and renders application to not be responsive.
- interruptReader() has been added for the synchronous- and asynchronous read_url_stream() functions
- interruptReader() has been exposed as ByteInStream_Feed::interruptReader()
and added to ByteInStream_Feed::set_eof().
- the above is tested via test_iostream01 for synchronous- and asynchronous read_url_stream() functions
using a non-existing URL entity
- the above is tested via test_bytestream01 for ByteInStream_URL using a non-existing URL entity
and for and ByteInStream_Feed having the feeder thread prematurely end transmission.
-rw-r--r-- | include/jau/byte_stream.hpp | 18 | ||||
-rw-r--r-- | src/byte_stream.cpp | 8 | ||||
-rw-r--r-- | src/io_util.cpp | 211 | ||||
-rw-r--r-- | test/test_bytestream01.cpp | 151 | ||||
-rw-r--r-- | test/test_iostream01.cpp | 98 |
5 files changed, 428 insertions, 58 deletions
diff --git a/include/jau/byte_stream.hpp b/include/jau/byte_stream.hpp index 7f92fe8..c8d5411 100644 --- a/include/jau/byte_stream.hpp +++ b/include/jau/byte_stream.hpp @@ -556,6 +556,18 @@ namespace jau::io { uint64_t content_size() const noexcept override { return m_content_size; } /** + * Interrupt a potentially blocked reader. + * + * Call this method if intended to abort streaming and to interrupt the reader thread's potentially blocked check_available() call, + * i.e. done at set_eof() + * + * @see set_eof() + */ + void interruptReader() noexcept { + m_buffer.interruptReader(); + } + + /** * Write given bytes to the async ringbuffer. * * Wait up to timeout duration given in constructor until ringbuffer space is available, where fractions_i64::zero waits infinitely. @@ -580,9 +592,13 @@ namespace jau::io { /** * Set end-of-data (EOS), i.e. when feeder completed provisioning bytes. * + * Implementation issues interruptReader() to unblock a potentially blocked reader thread. + * * @param result should be either result_t::FAILED or result_t::SUCCESS. + * + * @see interruptReader() */ - void set_eof(const async_io_result_t result) noexcept { m_result = result; } + void set_eof(const async_io_result_t result) noexcept { m_result = result; interruptReader(); } std::string to_string() const noexcept override; diff --git a/src/byte_stream.cpp b/src/byte_stream.cpp index cab17f9..22bde0e 100644 --- a/src/byte_stream.cpp +++ b/src/byte_stream.cpp @@ -368,8 +368,8 @@ bool ByteInStream_URL::end_of_data() const NOEXCEPT_BOTAN { } std::string ByteInStream_URL::to_string_int() const noexcept { - return m_url+", Url[content_length "+std::to_string(m_has_content_length.load())+ - " "+jau::to_decstring(m_content_size.load())+ + return m_url+", Url[content_length has "+std::to_string(m_has_content_length.load())+ + ", size "+jau::to_decstring(m_content_size.load())+ ", xfered "+jau::to_decstring(m_total_xfered.load())+ ", result "+std::to_string((int8_t)m_result.load())+ "], consumed "+jau::to_decstring(m_bytes_consumed)+ @@ -441,8 +441,8 @@ void ByteInStream_Feed::write(uint8_t in[], size_t length) noexcept { } std::string ByteInStream_Feed::to_string_int() const noexcept { - return m_id+", ext[content_length "+std::to_string(m_has_content_length.load())+ - " "+jau::to_decstring(m_content_size.load())+ + return m_id+", ext[content_length has "+std::to_string(m_has_content_length.load())+ + ", size "+jau::to_decstring(m_content_size.load())+ ", xfered "+jau::to_decstring(m_total_xfered.load())+ ", result "+std::to_string((int8_t)m_result.load())+ "], consumed "+std::to_string(m_bytes_consumed)+ diff --git a/src/io_util.cpp b/src/io_util.cpp index 08aac7a..f2b0e3b 100644 --- a/src/io_util.cpp +++ b/src/io_util.cpp @@ -91,19 +91,64 @@ struct curl_glue1_t { StreamConsumerFunc consumer_fn; }; -static size_t consume_curl1(void *ptr, size_t size, size_t nmemb, void *stream) noexcept { - curl_glue1_t * cg = (curl_glue1_t*)stream; +static size_t consume_header_curl1(char *buffer, size_t size, size_t nmemb, void *userdata) noexcept { + curl_glue1_t * cg = (curl_glue1_t*)userdata; + + { + long v; + const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_RESPONSE_CODE, &v); + if( CURLE_OK == r ) { + if( 400 <= v ) { + IRQ_PRINT("response_code: %ld", v); + return 0; + } else { + DBG_PRINT("consume_header_curl1.0 response_code: %ld", v); + } + } + } + if( !cg->has_content_length ) { + curl_off_t v = 0; + const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v); + if( CURLE_OK == r ) { + if( 0 > v ) { // curl returns -1 if the size if not known + cg->content_length = 0; + cg->has_content_length = false; + } else { + cg->content_length = v; + cg->has_content_length = true; + } + } + } + const size_t realsize = size * nmemb; + + if( false ) { + DBG_PRINT("consume_header_curl1.X realsize %zu, total %" PRIu64 " / ( content_len has %d, size %" PRIu64 " )", + realsize, cg->total_read, cg->has_content_length, cg->content_length ); + std::string blob(buffer, realsize); + jau::PLAIN_PRINT(true, "%s", blob.c_str()); + } + + return realsize; +} + +static size_t consume_data_curl1(char *ptr, size_t size, size_t nmemb, void *userdata) noexcept { + curl_glue1_t * cg = (curl_glue1_t*)userdata; if( !cg->has_content_length ) { curl_off_t v = 0; CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v); - if( !r ) { - cg->content_length = v; - cg->has_content_length = true; + if( CURLE_OK == r ) { + if( 0 > v ) { // curl returns -1 if the size if not known + cg->content_length = 0; + cg->has_content_length = false; + } else { + cg->content_length = v; + cg->has_content_length = true; + } } } const size_t realsize = size * nmemb; - DBG_PRINT("consume_curl1.0 realsize %zu", realsize); + DBG_PRINT("consume_data_curl1.0 realsize %zu", realsize); cg->buffer.resize(realsize); memcpy(cg->buffer.data(), ptr, realsize); @@ -111,8 +156,8 @@ static size_t consume_curl1(void *ptr, size_t size, size_t nmemb, void *stream) const bool is_final = 0 == realsize || cg->has_content_length ? cg->total_read >= cg->content_length : false; - DBG_PRINT("consume_curl1.X realsize %zu, total %" PRIu64 " / ( content_len %" PRIu64 " ), is_final %d", - realsize, cg->total_read, cg->content_length, is_final ); + DBG_PRINT("consume_data_curl1.X realsize %zu, total %" PRIu64 " / ( content_len has %d, size %" PRIu64 " ), is_final %d", + realsize, cg->total_read, cg->has_content_length, cg->content_length, is_final ); try { if( !cg->consumer_fn(cg->buffer, is_final) ) { @@ -173,8 +218,40 @@ uint64_t jau::io::read_url_stream(const std::string& url, goto errout; } + /* Suppress proxy CONNECT response headers from user callbacks */ + res = curl_easy_setopt(curl_handle, CURLOPT_SUPPRESS_CONNECT_HEADERS, 1L); + if( CURLE_OK != res ) { + ERR_PRINT("Error setting up url %s, error %d %d", + url, (int)res, errorbuffer.data()); + goto errout; + } + + /* Don't pass headers to the data stream. */ + res = curl_easy_setopt(curl_handle, CURLOPT_HEADER, 0L); + if( CURLE_OK != res ) { + ERR_PRINT("Error setting up url %s, error %d %d", + url, (int)res, errorbuffer.data()); + goto errout; + } + + /* send header data to this function */ + res = curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, consume_header_curl1); + if( CURLE_OK != res ) { + ERR_PRINT("Error setting up url %s, error %d %d", + url, (int)res, errorbuffer.data()); + goto errout; + } + + /* set userdata for consume_header_curl2 */ + res = curl_easy_setopt(curl_handle, CURLOPT_HEADERDATA, (void*)&cg); + if( CURLE_OK != res ) { + ERR_PRINT("Error setting up url %s, error %d %d", + url, (int)res, errorbuffer.data()); + goto errout; + } + /* send all data to this function */ - res = curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, consume_curl1); + res = curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, consume_data_curl1); if( CURLE_OK != res ) { ERR_PRINT("Error setting up url %s, error %d %d", url.c_str(), (int)res, errorbuffer.data()); @@ -192,7 +269,7 @@ uint64_t jau::io::read_url_stream(const std::string& url, /* performs the tast, blocking! */ res = curl_easy_perform(curl_handle); if( CURLE_OK != res ) { - ERR_PRINT("Error processing url %s, error %d %d", + IRQ_PRINT("processing url %s, error %d %d", url.c_str(), (int)res, errorbuffer.data()); goto errout; } @@ -229,26 +306,82 @@ struct curl_glue2_t { relaxed_atomic_async_io_result_t& result; }; -static size_t consume_curl2(void *ptr, size_t size, size_t nmemb, void *stream) noexcept { - curl_glue2_t * cg = (curl_glue2_t*)stream; +static size_t consume_header_curl2(char *buffer, size_t size, size_t nmemb, void *userdata) noexcept { + curl_glue2_t * cg = (curl_glue2_t*)userdata; if( async_io_result_t::NONE!= cg->result ) { // user abort! - DBG_PRINT("consume_curl2 ABORT by User: total %" PRIi64 ", result %d, rb %s", + DBG_PRINT("consume_header_curl2 ABORT by User: total %" PRIi64 ", result %d, rb %s", cg->total_read.load(), cg->result.load(), cg->buffer.toString().c_str() ); + cg->buffer.interruptReader(); return 0; } + { + long v; + const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_RESPONSE_CODE, &v); + if( CURLE_OK == r ) { + if( 400 <= v ) { + IRQ_PRINT("response_code: %ld", v); + cg->result = async_io_result_t::FAILED; + cg->buffer.interruptReader(); + return 0; + } else { + DBG_PRINT("consume_header_curl2.0 response_code: %ld", v); + } + } + } if( !cg->has_content_length ) { curl_off_t v = 0; const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v); if( CURLE_OK == r ) { - cg->content_length = v; - cg->has_content_length = true; + if( 0 > v ) { // curl returns -1 if the size if not known + cg->content_length = 0; + cg->has_content_length = false; + } else { + cg->content_length = v; + cg->has_content_length = true; + } + } + } + const size_t realsize = size * nmemb; + + if( false ) { + DBG_PRINT("consume_header_curl2.X realsize %zu, total %" PRIu64 " / ( content_len has %d, size %" PRIu64 " ), result %d, rb %s", + realsize, cg->total_read.load(), cg->has_content_length.load(), cg->content_length.load(), cg->result.load(), cg->buffer.toString().c_str() ); + std::string blob(buffer, realsize); + jau::PLAIN_PRINT(true, "%s", blob.c_str()); + } + + return realsize; +} + +static size_t consume_data_curl2(char *ptr, size_t size, size_t nmemb, void *userdata) noexcept { + curl_glue2_t * cg = (curl_glue2_t*)userdata; + + if( async_io_result_t::NONE!= cg->result ) { + // user abort! + DBG_PRINT("consume_data_curl2 ABORT by User: total %" PRIi64 ", result %d, rb %s", + cg->total_read.load(), cg->result.load(), cg->buffer.toString().c_str() ); + cg->buffer.interruptReader(); + return 0; + } + + if( !cg->has_content_length ) { + curl_off_t v = 0; + const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v); + if( CURLE_OK == r ) { + if( 0 > v ) { // curl returns -1 if the size if not known + cg->content_length = 0; + cg->has_content_length = false; + } else { + cg->content_length = v; + cg->has_content_length = true; + } } } const size_t realsize = size * nmemb; - DBG_PRINT("consume_curl2.0 realsize %zu, rb %s", realsize, cg->buffer.toString().c_str() ); + DBG_PRINT("consume_data_curl2.0 realsize %zu, rb %s", realsize, cg->buffer.toString().c_str() ); cg->buffer.putBlocking(reinterpret_cast<uint8_t*>(ptr), reinterpret_cast<uint8_t*>(ptr)+realsize, 0_s); @@ -259,8 +392,8 @@ static size_t consume_curl2(void *ptr, size_t size, size_t nmemb, void *stream) cg->result = async_io_result_t::SUCCESS; } - DBG_PRINT("consume_curl2.X realsize %zu, total %" PRIu64 " / ( content_len %" PRIu64 " ), is_final %d, result %d, rb %s", - realsize, cg->total_read.load(), cg->result.load(), cg->content_length.load(), is_final, cg->buffer.toString().c_str() ); + DBG_PRINT("consume_data_curl2.X realsize %zu, total %" PRIu64 " / ( content_len has %d, size %" PRIu64 " ), is_final %d, result %d, rb %s", + realsize, cg->total_read.load(), cg->has_content_length.load(), cg->content_length.load(), is_final, cg->result.load(), cg->buffer.toString().c_str() ); return realsize; } @@ -309,15 +442,47 @@ static void read_url_stream_thread(const char *url, std::unique_ptr<curl_glue2_t goto errout; } - /* send all data to this function */ - res = curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, consume_curl2); + /* Suppress proxy CONNECT response headers from user callbacks */ + res = curl_easy_setopt(curl_handle, CURLOPT_SUPPRESS_CONNECT_HEADERS, 1L); if( CURLE_OK != res ) { ERR_PRINT("Error setting up url %s, error %d %d", url, (int)res, errorbuffer.data()); goto errout; } - /* write the page body to this file handle */ + /* Don't pass headers to the data stream. */ + res = curl_easy_setopt(curl_handle, CURLOPT_HEADER, 0L); + if( CURLE_OK != res ) { + ERR_PRINT("Error setting up url %s, error %d %d", + url, (int)res, errorbuffer.data()); + goto errout; + } + + /* send header data to this function */ + res = curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, consume_header_curl2); + if( CURLE_OK != res ) { + ERR_PRINT("Error setting up url %s, error %d %d", + url, (int)res, errorbuffer.data()); + goto errout; + } + + /* set userdata for consume_header_curl2 */ + res = curl_easy_setopt(curl_handle, CURLOPT_HEADERDATA, (void*)cg.get()); + if( CURLE_OK != res ) { + ERR_PRINT("Error setting up url %s, error %d %d", + url, (int)res, errorbuffer.data()); + goto errout; + } + + /* send received data to this function */ + res = curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, consume_data_curl2); + if( CURLE_OK != res ) { + ERR_PRINT("Error setting up url %s, error %d %d", + url, (int)res, errorbuffer.data()); + goto errout; + } + + /* set userdata for consume_data_curl2 */ res = curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void*)cg.get()); if( CURLE_OK != res ) { ERR_PRINT("Error setting up url %s, error %d %d", @@ -330,10 +495,10 @@ static void read_url_stream_thread(const char *url, std::unique_ptr<curl_glue2_t if( CURLE_OK != res ) { if( async_io_result_t::NONE == cg->result ) { // Error during normal processing - ERR_PRINT("Error processing url %s, error %d %d", + IRQ_PRINT("Error processing url %s, error %d %d", url, (int)res, errorbuffer.data()); } else { - // User aborted + // User aborted or response code error detected via consume_header_curl2 DBG_PRINT("Processing aborted url %s, error %d %d", url, (int)res, errorbuffer.data()); } diff --git a/test/test_bytestream01.cpp b/test/test_bytestream01.cpp index 5c4d4b8..777b57c 100644 --- a/test/test_bytestream01.cpp +++ b/test/test_bytestream01.cpp @@ -149,7 +149,7 @@ class TestByteStream01 { input.close(); if ( 0==in_bytes_total || outfile.fail() ) { - ERR_PRINT2("ByteStream copy failed: Output file write failed %s", output_fname.c_str()); + IRQ_PRINT("ByteStream copy failed: Output file write failed %s", output_fname.c_str()); return false; } @@ -197,7 +197,7 @@ class TestByteStream01 { const std::string uri_original = url_input_root + fname_payload_lst[file_idx]; - jau::io::ByteInStream_URL data_stream(uri_original, 3_s); + jau::io::ByteInStream_URL data_stream(uri_original, 500_ms); bool res = transfer(data_stream, fname_payload_copy_lst[file_idx]); REQUIRE( true == res ); @@ -213,7 +213,7 @@ class TestByteStream01 { const std::string uri_original = url_input_root + fname_payload_lst[file_idx]; - jau::io::ByteInStream_URL data_stream(uri_original, 3_s); + jau::io::ByteInStream_URL data_stream(uri_original, 500_ms); bool res = transfer(data_stream, fname_payload_copy_lst[file_idx]); REQUIRE( true == res ); @@ -226,13 +226,36 @@ class TestByteStream01 { } } - // throttled, no content size + void test12_copy_http_404() { + httpd_start(); + + { + const size_t file_idx = IDX_11kiB; + + const std::string uri_original = url_input_root + "doesnt_exists.txt"; + + jau::io::ByteInStream_URL data_stream(uri_original, 500_ms); + + bool res = transfer(data_stream, fname_payload_copy_lst[file_idx]); + REQUIRE( false == res ); + + jau::fs::file_stats out_stats(fname_payload_copy_lst[file_idx]); + REQUIRE( true == out_stats.exists() ); + REQUIRE( true == out_stats.is_file() ); + REQUIRE( data_stream.error() == true ); + REQUIRE( data_stream.has_content_size() == false ); + REQUIRE( data_stream.content_size() == 0 ); + REQUIRE( 0 == out_stats.size() ); + } + } + + // throttled, no content size, interruptReader() via set_eof() will avoid timeout static void feed_source_00(jau::io::ByteInStream_Feed * data_feed) { uint64_t xfer_total = 0; - jau::io::ByteInStream_File enc_stream(data_feed->id(), true /* use_binary */); - while( !enc_stream.end_of_data() ) { + jau::io::ByteInStream_File data_stream(data_feed->id(), true /* use_binary */); + while( !data_stream.end_of_data() ) { uint8_t buffer[1024]; // 1k - size_t count = enc_stream.read(buffer, sizeof(buffer)); + size_t count = data_stream.read(buffer, sizeof(buffer)); if( 0 < count ) { xfer_total += count; data_feed->write(buffer, count); @@ -250,10 +273,10 @@ class TestByteStream01 { data_feed->set_content_size( file_size ); uint64_t xfer_total = 0; - jau::io::ByteInStream_File enc_stream(data_feed->id(), true /* use_binary */); - while( !enc_stream.end_of_data() && xfer_total < file_size ) { + jau::io::ByteInStream_File data_stream(data_feed->id(), true /* use_binary */); + while( !data_stream.end_of_data() && xfer_total < file_size ) { uint8_t buffer[1024]; // 1k - size_t count = enc_stream.read(buffer, sizeof(buffer)); + size_t count = data_stream.read(buffer, sizeof(buffer)); if( 0 < count ) { xfer_total += count; data_feed->write(buffer, count); @@ -271,16 +294,63 @@ class TestByteStream01 { data_feed->set_content_size( file_size ); uint64_t xfer_total = 0; + jau::io::ByteInStream_File data_stream(data_feed->id(), true /* use_binary */); + while( !data_stream.end_of_data() && xfer_total < file_size ) { + uint8_t buffer[1024]; // 1k + size_t count = data_stream.read(buffer, sizeof(buffer)); + if( 0 < count ) { + xfer_total += count; + data_feed->write(buffer, count); + } + } + data_feed->set_eof( xfer_total == file_size ? jau::io::async_io_result_t::SUCCESS : jau::io::async_io_result_t::FAILED ); + } + + // full speed, no content size, interrupting 1/4 way + static void feed_source_20(jau::io::ByteInStream_Feed * data_feed) { + jau::fs::file_stats fs_feed(data_feed->id()); + const uint64_t file_size = fs_feed.size(); + + uint64_t xfer_total = 0; jau::io::ByteInStream_File enc_stream(data_feed->id(), true /* use_binary */); - while( !enc_stream.end_of_data() && xfer_total < file_size ) { + while( !enc_stream.end_of_data() ) { uint8_t buffer[1024]; // 1k size_t count = enc_stream.read(buffer, sizeof(buffer)); if( 0 < count ) { xfer_total += count; data_feed->write(buffer, count); + if( xfer_total >= file_size/4 ) { + data_feed->set_eof( jau::io::async_io_result_t::FAILED ); // calls data_feed->interruptReader(); + return; + } } } - data_feed->set_eof( xfer_total == file_size ? jau::io::async_io_result_t::SUCCESS : jau::io::async_io_result_t::FAILED ); + // probably set after transfering due to above sleep, which also ends when total size has been reached. + data_feed->set_eof( jau::io::async_io_result_t::SUCCESS ); + } + + // full speed, with content size, interrupting 1/4 way + static void feed_source_21(jau::io::ByteInStream_Feed * data_feed) { + jau::fs::file_stats fs_feed(data_feed->id()); + const uint64_t file_size = fs_feed.size(); + data_feed->set_content_size( file_size ); + + uint64_t xfer_total = 0; + jau::io::ByteInStream_File enc_stream(data_feed->id(), true /* use_binary */); + while( !enc_stream.end_of_data() ) { + uint8_t buffer[1024]; // 1k + size_t count = enc_stream.read(buffer, sizeof(buffer)); + if( 0 < count ) { + xfer_total += count; + data_feed->write(buffer, count); + if( xfer_total >= file_size/4 ) { + data_feed->set_eof( jau::io::async_io_result_t::FAILED ); // calls data_feed->interruptReader(); + return; + } + } + } + // probably set after transfering due to above sleep, which also ends when total size has been reached. + data_feed->set_eof( jau::io::async_io_result_t::SUCCESS ); } void test21_copy_fed_ok() { @@ -288,7 +358,7 @@ class TestByteStream01 { const size_t file_idx = IDX_11kiB; { // full speed, with content size - jau::io::ByteInStream_Feed data_feed(fname_payload_lst[file_idx], 3_s); + jau::io::ByteInStream_Feed data_feed(fname_payload_lst[file_idx], 500_ms); std::thread feeder_thread= std::thread(&feed_source_10, &data_feed); bool res = transfer(data_feed, fname_payload_copy_lst[file_idx]); @@ -305,7 +375,7 @@ class TestByteStream01 { } { // throttled, with content size - jau::io::ByteInStream_Feed data_feed(fname_payload_lst[file_idx], 3_s); + jau::io::ByteInStream_Feed data_feed(fname_payload_lst[file_idx], 500_ms); std::thread feeder_thread= std::thread(&feed_source_01, &data_feed); bool res = transfer(data_feed, fname_payload_copy_lst[file_idx]); @@ -321,7 +391,7 @@ class TestByteStream01 { REQUIRE( fname_payload_size_lst[file_idx] == out_stats.size() ); } { - // throttled, no content size, will timeout after 500_ms + // throttled, no content size, interruptReader() via set_eof() will avoid timeout jau::io::ByteInStream_Feed data_feed(fname_payload_lst[file_idx], 500_ms); std::thread feeder_thread= std::thread(&feed_source_00, &data_feed); @@ -342,7 +412,7 @@ class TestByteStream01 { const size_t file_idx = IDX_65MiB; { // full speed, with content size - jau::io::ByteInStream_Feed data_feed(fname_payload_lst[file_idx], 3_s); + jau::io::ByteInStream_Feed data_feed(fname_payload_lst[file_idx], 500_ms); std::thread feeder_thread= std::thread(&feed_source_10, &data_feed); bool res = transfer(data_feed, fname_payload_copy_lst[file_idx]); @@ -359,16 +429,61 @@ class TestByteStream01 { } } } + + void test22_copy_fed_irq() { + { + const size_t file_idx = IDX_65MiB; + { + // full speed, no content size, interrupting 1/4 way + jau::io::ByteInStream_Feed data_feed(fname_payload_lst[file_idx], 500_ms); + std::thread feeder_thread= std::thread(&feed_source_20, &data_feed); + + bool res = transfer(data_feed, fname_payload_copy_lst[file_idx]); + if( feeder_thread.joinable() ) { + feeder_thread.join(); + } + REQUIRE( true == res ); + + jau::fs::file_stats out_stats(fname_payload_copy_lst[file_idx]); + REQUIRE( true == out_stats.exists() ); + REQUIRE( true == out_stats.is_file() ); + REQUIRE( false == data_feed.has_content_size() ); + REQUIRE( 0 == data_feed.content_size() ); + REQUIRE( fname_payload_size_lst[file_idx] > out_stats.size() ); // interrupted... + } + { + // full speed, with content size, interrupting 1/4 way + jau::io::ByteInStream_Feed data_feed(fname_payload_lst[file_idx], 500_ms); + std::thread feeder_thread= std::thread(&feed_source_21, &data_feed); + + bool res = transfer(data_feed, fname_payload_copy_lst[file_idx]); + if( feeder_thread.joinable() ) { + feeder_thread.join(); + } + REQUIRE( true == res ); + + jau::fs::file_stats out_stats(fname_payload_copy_lst[file_idx]); + REQUIRE( true == out_stats.exists() ); + REQUIRE( true == out_stats.is_file() ); + REQUIRE( true == data_feed.has_content_size() ); + REQUIRE( fname_payload_size_lst[file_idx] == data_feed.content_size() ); + REQUIRE( data_feed.content_size() > out_stats.size() ); // interrupted... + } + } + } + }; std::vector<std::string> TestByteStream01::fname_payload_lst; std::vector<std::string> TestByteStream01::fname_payload_copy_lst; std::vector<uint64_t> TestByteStream01::fname_payload_size_lst; -// METHOD_AS_TEST_CASE( TestByteStream01::test01_copy_file_ok, "TestByteStream01 test01_copy_file_ok"); +METHOD_AS_TEST_CASE( TestByteStream01::test01_copy_file_ok, "TestByteStream01 test01_copy_file_ok"); -// METHOD_AS_TEST_CASE( TestByteStream01::test11_copy_http_ok, "TestByteStream01 test11_copy_http_ok"); +METHOD_AS_TEST_CASE( TestByteStream01::test11_copy_http_ok, "TestByteStream01 test11_copy_http_ok"); +METHOD_AS_TEST_CASE( TestByteStream01::test12_copy_http_404, "TestByteStream01 test12_copy_http_404"); METHOD_AS_TEST_CASE( TestByteStream01::test21_copy_fed_ok, "TestByteStream01 test21_copy_fed_ok"); +METHOD_AS_TEST_CASE( TestByteStream01::test22_copy_fed_irq, "TestByteStream01 test22_copy_fed_irq"); diff --git a/test/test_iostream01.cpp b/test/test_iostream01.cpp index 266e81d..d82f7d2 100644 --- a/test/test_iostream01.cpp +++ b/test/test_iostream01.cpp @@ -80,12 +80,12 @@ class TestIOStream01 { std::system("killall mini_httpd"); } - void test01() { + void test01_sync_ok() { const jau::fs::file_stats in_stats(basename_10kiB); const size_t file_size = in_stats.size(); const std::string url_input = url_input_root + basename_10kiB; - std::ofstream outfile("test02_01_out.bin", std::ios::out | std::ios::binary); + std::ofstream outfile("test01_01_out.bin", std::ios::out | std::ios::binary); REQUIRE( outfile.good() ); REQUIRE( outfile.is_open() ); @@ -96,25 +96,52 @@ class TestIOStream01 { consumed_calls++; consumed_total_bytes += data.size(); outfile.write(reinterpret_cast<char*>(data.data()), data.size()); - jau::PLAIN_PRINT(true, "test02io01 #%zu: consumed size %zu, total %" PRIu64 ", capacity %zu, final %d", + jau::PLAIN_PRINT(true, "test01_sync_ok #%zu: consumed size %zu, total %" PRIu64 ", capacity %zu, final %d", consumed_calls, data.size(), consumed_total_bytes, data.capacity(), is_final ); return true; }; uint64_t http_total_bytes = jau::io::read_url_stream(url_input, buffer, consume); const uint64_t out_bytes_total = outfile.tellp(); - jau::PLAIN_PRINT(true, "test02io01 Done: total %" PRIu64 ", capacity %zu", consumed_total_bytes, buffer.capacity()); + jau::PLAIN_PRINT(true, "test01_sync_ok Done: total %" PRIu64 ", capacity %zu", consumed_total_bytes, buffer.capacity()); REQUIRE( file_size == http_total_bytes ); REQUIRE( consumed_total_bytes == http_total_bytes ); REQUIRE( consumed_total_bytes == out_bytes_total ); } - void test02() { + void test02_sync_404() { + const std::string url_input = url_input_root + "doesnt_exists.txt"; + + std::ofstream outfile("test02_01_out.bin", std::ios::out | std::ios::binary); + REQUIRE( outfile.good() ); + REQUIRE( outfile.is_open() ); + + jau::io::secure_vector<uint8_t> buffer(4096); + size_t consumed_calls = 0; + uint64_t consumed_total_bytes = 0; + jau::io::StreamConsumerFunc consume = [&](jau::io::secure_vector<uint8_t>& data, bool is_final) noexcept -> bool { + consumed_calls++; + consumed_total_bytes += data.size(); + outfile.write(reinterpret_cast<char*>(data.data()), data.size()); + jau::PLAIN_PRINT(true, "test02_sync_404 #%zu: consumed size %zu, total %" PRIu64 ", capacity %zu, final %d", + consumed_calls, data.size(), consumed_total_bytes, data.capacity(), is_final ); + return true; + }; + uint64_t http_total_bytes = jau::io::read_url_stream(url_input, buffer, consume); + const uint64_t out_bytes_total = outfile.tellp(); + jau::PLAIN_PRINT(true, "test02_sync_404 Done: total %" PRIu64 ", capacity %zu", consumed_total_bytes, buffer.capacity()); + + REQUIRE( 0 == http_total_bytes ); + REQUIRE( consumed_total_bytes == http_total_bytes ); + REQUIRE( consumed_total_bytes == out_bytes_total ); + } + + void test11_async_ok() { const jau::fs::file_stats in_stats(basename_10kiB); const size_t file_size = in_stats.size(); const std::string url_input = url_input_root + basename_10kiB; - std::ofstream outfile("test02_02_out.bin", std::ios::out | std::ios::binary); + std::ofstream outfile("test11_01_out.bin", std::ios::out | std::ios::binary); REQUIRE( outfile.good() ); REQUIRE( outfile.is_open() ); @@ -134,15 +161,15 @@ class TestIOStream01 { while( jau::io::async_io_result_t::NONE == result || !rb.isEmpty() ) { consumed_loops++; // const size_t consumed_bytes = content_length >= 0 ? std::min(buffer_size, content_length - consumed_total_bytes) : rb.getSize(); - const size_t consumed_bytes = rb.getBlocking(buffer.data(), buffer_size, 1, 0_s); + const size_t consumed_bytes = rb.getBlocking(buffer.data(), buffer_size, 1, 500_ms); consumed_total_bytes += consumed_bytes; - jau::PLAIN_PRINT(true, "test02io02.0 #%zu: consumed[this %zu, total %" PRIu64 ", result %d, rb %s", + jau::PLAIN_PRINT(true, "test11_async_ok.0 #%zu: consumed[this %zu, total %" PRIu64 ", result %d, rb %s", consumed_loops, consumed_bytes, consumed_total_bytes, result.load(), rb.toString().c_str() ); outfile.write(reinterpret_cast<char*>(buffer.data()), consumed_bytes); } const uint64_t out_bytes_total = outfile.tellp(); - jau::PLAIN_PRINT(true, "test02io02.X Done: total %" PRIu64 ", result %d, rb %s", - consumed_total_bytes, result.load(), rb.toString().c_str() ); + jau::PLAIN_PRINT(true, "test11_async_ok.X Done: total %" PRIu64 ", result %d, rb %s", + consumed_total_bytes, (int)result.load(), rb.toString().c_str() ); http_thread.join(); @@ -153,7 +180,54 @@ class TestIOStream01 { REQUIRE( url_content_length == out_bytes_total ); REQUIRE( jau::io::async_io_result_t::SUCCESS == result ); } + + void test12_async_404() { + const std::string url_input = url_input_root + "doesnt_exists.txt"; + + std::ofstream outfile("test12_01_out.bin", std::ios::out | std::ios::binary); + REQUIRE( outfile.good() ); + REQUIRE( outfile.is_open() ); + + constexpr const size_t buffer_size = 4096; + jau::io::ByteRingbuffer rb(0x00, jau::io::BEST_URLSTREAM_RINGBUFFER_SIZE); + jau::relaxed_atomic_bool url_has_content_length; + jau::relaxed_atomic_uint64 url_content_length; + jau::relaxed_atomic_uint64 url_total_read; + jau::io::relaxed_atomic_async_io_result_t result; + + std::thread http_thread = jau::io::read_url_stream(url_input, rb, url_has_content_length, url_content_length, url_total_read, result); + + jau::io::secure_vector<uint8_t> buffer(buffer_size); + size_t consumed_loops = 0; + uint64_t consumed_total_bytes = 0; + + while( jau::io::async_io_result_t::NONE == result || !rb.isEmpty() ) { + consumed_loops++; + // const size_t consumed_bytes = content_length >= 0 ? std::min(buffer_size, content_length - consumed_total_bytes) : rb.getSize(); + const size_t consumed_bytes = rb.getBlocking(buffer.data(), buffer_size, 1, 500_ms); + consumed_total_bytes += consumed_bytes; + jau::PLAIN_PRINT(true, "test12_async_404.0 #%zu: consumed[this %zu, total %" PRIu64 ", result %d, rb %s", + consumed_loops, consumed_bytes, consumed_total_bytes, result.load(), rb.toString().c_str() ); + outfile.write(reinterpret_cast<char*>(buffer.data()), consumed_bytes); + } + const uint64_t out_bytes_total = outfile.tellp(); + jau::PLAIN_PRINT(true, "test12_async_404.X Done: total %" PRIu64 ", result %d, rb %s", + consumed_total_bytes, (int)result.load(), rb.toString().c_str() ); + + http_thread.join(); + + REQUIRE( url_has_content_length == false ); + REQUIRE( url_content_length == 0 ); + REQUIRE( url_content_length == consumed_total_bytes ); + REQUIRE( url_content_length == url_total_read ); + REQUIRE( url_content_length == out_bytes_total ); + REQUIRE( jau::io::async_io_result_t::FAILED == result ); + } + }; -METHOD_AS_TEST_CASE( TestIOStream01::test01, "TestIOStream01 - 01"); -METHOD_AS_TEST_CASE( TestIOStream01::test02, "TestIOStream01 - 02"); +METHOD_AS_TEST_CASE( TestIOStream01::test01_sync_ok, "TestIOStream01 - test01_sync_ok"); +METHOD_AS_TEST_CASE( TestIOStream01::test02_sync_404, "TestIOStream01 - test02_sync_404"); +METHOD_AS_TEST_CASE( TestIOStream01::test11_async_ok, "TestIOStream01 - test11_async_ok"); +METHOD_AS_TEST_CASE( TestIOStream01::test12_async_404, "TestIOStream01 - test12_async_404"); + |