aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSven Gothel <[email protected]>2022-05-29 13:20:41 +0200
committerSven Gothel <[email protected]>2022-05-29 13:20:41 +0200
commit68a026914fa27a8dd3bebbb358b30ee24f4de193 (patch)
tree85b580ada046f3e44a31a28b986ecaf2a0deec24
parent13ce66b10dedf4d26eea4ff2262649ab23687c69 (diff)
io::read_url_stream: Fix content_length (-1 == unknown), add consume_header w/ response_code for errors >= 400 and buffer.interruptReader()
- curl replies -1 for unknown content_length - also consume the curl header to detect response_code errors >= 400 (404 not found) - in case of an error, we need to interrupt the ringbuffer-reader thread -> buffer.interruptReader(), otherwise the thread would be blocked until timeout and renders application to not be responsive. - interruptReader() has been added for the synchronous- and asynchronous read_url_stream() functions - interruptReader() has been exposed as ByteInStream_Feed::interruptReader() and added to ByteInStream_Feed::set_eof(). - the above is tested via test_iostream01 for synchronous- and asynchronous read_url_stream() functions using a non-existing URL entity - the above is tested via test_bytestream01 for ByteInStream_URL using a non-existing URL entity and for and ByteInStream_Feed having the feeder thread prematurely end transmission.
-rw-r--r--include/jau/byte_stream.hpp18
-rw-r--r--src/byte_stream.cpp8
-rw-r--r--src/io_util.cpp211
-rw-r--r--test/test_bytestream01.cpp151
-rw-r--r--test/test_iostream01.cpp98
5 files changed, 428 insertions, 58 deletions
diff --git a/include/jau/byte_stream.hpp b/include/jau/byte_stream.hpp
index 7f92fe8..c8d5411 100644
--- a/include/jau/byte_stream.hpp
+++ b/include/jau/byte_stream.hpp
@@ -556,6 +556,18 @@ namespace jau::io {
uint64_t content_size() const noexcept override { return m_content_size; }
/**
+ * Interrupt a potentially blocked reader.
+ *
+ * Call this method if intended to abort streaming and to interrupt the reader thread's potentially blocked check_available() call,
+ * i.e. done at set_eof()
+ *
+ * @see set_eof()
+ */
+ void interruptReader() noexcept {
+ m_buffer.interruptReader();
+ }
+
+ /**
* Write given bytes to the async ringbuffer.
*
* Wait up to timeout duration given in constructor until ringbuffer space is available, where fractions_i64::zero waits infinitely.
@@ -580,9 +592,13 @@ namespace jau::io {
/**
* Set end-of-data (EOS), i.e. when feeder completed provisioning bytes.
*
+ * Implementation issues interruptReader() to unblock a potentially blocked reader thread.
+ *
* @param result should be either result_t::FAILED or result_t::SUCCESS.
+ *
+ * @see interruptReader()
*/
- void set_eof(const async_io_result_t result) noexcept { m_result = result; }
+ void set_eof(const async_io_result_t result) noexcept { m_result = result; interruptReader(); }
std::string to_string() const noexcept override;
diff --git a/src/byte_stream.cpp b/src/byte_stream.cpp
index cab17f9..22bde0e 100644
--- a/src/byte_stream.cpp
+++ b/src/byte_stream.cpp
@@ -368,8 +368,8 @@ bool ByteInStream_URL::end_of_data() const NOEXCEPT_BOTAN {
}
std::string ByteInStream_URL::to_string_int() const noexcept {
- return m_url+", Url[content_length "+std::to_string(m_has_content_length.load())+
- " "+jau::to_decstring(m_content_size.load())+
+ return m_url+", Url[content_length has "+std::to_string(m_has_content_length.load())+
+ ", size "+jau::to_decstring(m_content_size.load())+
", xfered "+jau::to_decstring(m_total_xfered.load())+
", result "+std::to_string((int8_t)m_result.load())+
"], consumed "+jau::to_decstring(m_bytes_consumed)+
@@ -441,8 +441,8 @@ void ByteInStream_Feed::write(uint8_t in[], size_t length) noexcept {
}
std::string ByteInStream_Feed::to_string_int() const noexcept {
- return m_id+", ext[content_length "+std::to_string(m_has_content_length.load())+
- " "+jau::to_decstring(m_content_size.load())+
+ return m_id+", ext[content_length has "+std::to_string(m_has_content_length.load())+
+ ", size "+jau::to_decstring(m_content_size.load())+
", xfered "+jau::to_decstring(m_total_xfered.load())+
", result "+std::to_string((int8_t)m_result.load())+
"], consumed "+std::to_string(m_bytes_consumed)+
diff --git a/src/io_util.cpp b/src/io_util.cpp
index 08aac7a..f2b0e3b 100644
--- a/src/io_util.cpp
+++ b/src/io_util.cpp
@@ -91,19 +91,64 @@ struct curl_glue1_t {
StreamConsumerFunc consumer_fn;
};
-static size_t consume_curl1(void *ptr, size_t size, size_t nmemb, void *stream) noexcept {
- curl_glue1_t * cg = (curl_glue1_t*)stream;
+static size_t consume_header_curl1(char *buffer, size_t size, size_t nmemb, void *userdata) noexcept {
+ curl_glue1_t * cg = (curl_glue1_t*)userdata;
+
+ {
+ long v;
+ const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_RESPONSE_CODE, &v);
+ if( CURLE_OK == r ) {
+ if( 400 <= v ) {
+ IRQ_PRINT("response_code: %ld", v);
+ return 0;
+ } else {
+ DBG_PRINT("consume_header_curl1.0 response_code: %ld", v);
+ }
+ }
+ }
+ if( !cg->has_content_length ) {
+ curl_off_t v = 0;
+ const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
+ if( CURLE_OK == r ) {
+ if( 0 > v ) { // curl returns -1 if the size if not known
+ cg->content_length = 0;
+ cg->has_content_length = false;
+ } else {
+ cg->content_length = v;
+ cg->has_content_length = true;
+ }
+ }
+ }
+ const size_t realsize = size * nmemb;
+
+ if( false ) {
+ DBG_PRINT("consume_header_curl1.X realsize %zu, total %" PRIu64 " / ( content_len has %d, size %" PRIu64 " )",
+ realsize, cg->total_read, cg->has_content_length, cg->content_length );
+ std::string blob(buffer, realsize);
+ jau::PLAIN_PRINT(true, "%s", blob.c_str());
+ }
+
+ return realsize;
+}
+
+static size_t consume_data_curl1(char *ptr, size_t size, size_t nmemb, void *userdata) noexcept {
+ curl_glue1_t * cg = (curl_glue1_t*)userdata;
if( !cg->has_content_length ) {
curl_off_t v = 0;
CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
- if( !r ) {
- cg->content_length = v;
- cg->has_content_length = true;
+ if( CURLE_OK == r ) {
+ if( 0 > v ) { // curl returns -1 if the size if not known
+ cg->content_length = 0;
+ cg->has_content_length = false;
+ } else {
+ cg->content_length = v;
+ cg->has_content_length = true;
+ }
}
}
const size_t realsize = size * nmemb;
- DBG_PRINT("consume_curl1.0 realsize %zu", realsize);
+ DBG_PRINT("consume_data_curl1.0 realsize %zu", realsize);
cg->buffer.resize(realsize);
memcpy(cg->buffer.data(), ptr, realsize);
@@ -111,8 +156,8 @@ static size_t consume_curl1(void *ptr, size_t size, size_t nmemb, void *stream)
const bool is_final = 0 == realsize ||
cg->has_content_length ? cg->total_read >= cg->content_length : false;
- DBG_PRINT("consume_curl1.X realsize %zu, total %" PRIu64 " / ( content_len %" PRIu64 " ), is_final %d",
- realsize, cg->total_read, cg->content_length, is_final );
+ DBG_PRINT("consume_data_curl1.X realsize %zu, total %" PRIu64 " / ( content_len has %d, size %" PRIu64 " ), is_final %d",
+ realsize, cg->total_read, cg->has_content_length, cg->content_length, is_final );
try {
if( !cg->consumer_fn(cg->buffer, is_final) ) {
@@ -173,8 +218,40 @@ uint64_t jau::io::read_url_stream(const std::string& url,
goto errout;
}
+ /* Suppress proxy CONNECT response headers from user callbacks */
+ res = curl_easy_setopt(curl_handle, CURLOPT_SUPPRESS_CONNECT_HEADERS, 1L);
+ if( CURLE_OK != res ) {
+ ERR_PRINT("Error setting up url %s, error %d %d",
+ url, (int)res, errorbuffer.data());
+ goto errout;
+ }
+
+ /* Don't pass headers to the data stream. */
+ res = curl_easy_setopt(curl_handle, CURLOPT_HEADER, 0L);
+ if( CURLE_OK != res ) {
+ ERR_PRINT("Error setting up url %s, error %d %d",
+ url, (int)res, errorbuffer.data());
+ goto errout;
+ }
+
+ /* send header data to this function */
+ res = curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, consume_header_curl1);
+ if( CURLE_OK != res ) {
+ ERR_PRINT("Error setting up url %s, error %d %d",
+ url, (int)res, errorbuffer.data());
+ goto errout;
+ }
+
+ /* set userdata for consume_header_curl2 */
+ res = curl_easy_setopt(curl_handle, CURLOPT_HEADERDATA, (void*)&cg);
+ if( CURLE_OK != res ) {
+ ERR_PRINT("Error setting up url %s, error %d %d",
+ url, (int)res, errorbuffer.data());
+ goto errout;
+ }
+
/* send all data to this function */
- res = curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, consume_curl1);
+ res = curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, consume_data_curl1);
if( CURLE_OK != res ) {
ERR_PRINT("Error setting up url %s, error %d %d",
url.c_str(), (int)res, errorbuffer.data());
@@ -192,7 +269,7 @@ uint64_t jau::io::read_url_stream(const std::string& url,
/* performs the tast, blocking! */
res = curl_easy_perform(curl_handle);
if( CURLE_OK != res ) {
- ERR_PRINT("Error processing url %s, error %d %d",
+ IRQ_PRINT("processing url %s, error %d %d",
url.c_str(), (int)res, errorbuffer.data());
goto errout;
}
@@ -229,26 +306,82 @@ struct curl_glue2_t {
relaxed_atomic_async_io_result_t& result;
};
-static size_t consume_curl2(void *ptr, size_t size, size_t nmemb, void *stream) noexcept {
- curl_glue2_t * cg = (curl_glue2_t*)stream;
+static size_t consume_header_curl2(char *buffer, size_t size, size_t nmemb, void *userdata) noexcept {
+ curl_glue2_t * cg = (curl_glue2_t*)userdata;
if( async_io_result_t::NONE!= cg->result ) {
// user abort!
- DBG_PRINT("consume_curl2 ABORT by User: total %" PRIi64 ", result %d, rb %s",
+ DBG_PRINT("consume_header_curl2 ABORT by User: total %" PRIi64 ", result %d, rb %s",
cg->total_read.load(), cg->result.load(), cg->buffer.toString().c_str() );
+ cg->buffer.interruptReader();
return 0;
}
+ {
+ long v;
+ const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_RESPONSE_CODE, &v);
+ if( CURLE_OK == r ) {
+ if( 400 <= v ) {
+ IRQ_PRINT("response_code: %ld", v);
+ cg->result = async_io_result_t::FAILED;
+ cg->buffer.interruptReader();
+ return 0;
+ } else {
+ DBG_PRINT("consume_header_curl2.0 response_code: %ld", v);
+ }
+ }
+ }
if( !cg->has_content_length ) {
curl_off_t v = 0;
const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
if( CURLE_OK == r ) {
- cg->content_length = v;
- cg->has_content_length = true;
+ if( 0 > v ) { // curl returns -1 if the size if not known
+ cg->content_length = 0;
+ cg->has_content_length = false;
+ } else {
+ cg->content_length = v;
+ cg->has_content_length = true;
+ }
+ }
+ }
+ const size_t realsize = size * nmemb;
+
+ if( false ) {
+ DBG_PRINT("consume_header_curl2.X realsize %zu, total %" PRIu64 " / ( content_len has %d, size %" PRIu64 " ), result %d, rb %s",
+ realsize, cg->total_read.load(), cg->has_content_length.load(), cg->content_length.load(), cg->result.load(), cg->buffer.toString().c_str() );
+ std::string blob(buffer, realsize);
+ jau::PLAIN_PRINT(true, "%s", blob.c_str());
+ }
+
+ return realsize;
+}
+
+static size_t consume_data_curl2(char *ptr, size_t size, size_t nmemb, void *userdata) noexcept {
+ curl_glue2_t * cg = (curl_glue2_t*)userdata;
+
+ if( async_io_result_t::NONE!= cg->result ) {
+ // user abort!
+ DBG_PRINT("consume_data_curl2 ABORT by User: total %" PRIi64 ", result %d, rb %s",
+ cg->total_read.load(), cg->result.load(), cg->buffer.toString().c_str() );
+ cg->buffer.interruptReader();
+ return 0;
+ }
+
+ if( !cg->has_content_length ) {
+ curl_off_t v = 0;
+ const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
+ if( CURLE_OK == r ) {
+ if( 0 > v ) { // curl returns -1 if the size if not known
+ cg->content_length = 0;
+ cg->has_content_length = false;
+ } else {
+ cg->content_length = v;
+ cg->has_content_length = true;
+ }
}
}
const size_t realsize = size * nmemb;
- DBG_PRINT("consume_curl2.0 realsize %zu, rb %s", realsize, cg->buffer.toString().c_str() );
+ DBG_PRINT("consume_data_curl2.0 realsize %zu, rb %s", realsize, cg->buffer.toString().c_str() );
cg->buffer.putBlocking(reinterpret_cast<uint8_t*>(ptr),
reinterpret_cast<uint8_t*>(ptr)+realsize, 0_s);
@@ -259,8 +392,8 @@ static size_t consume_curl2(void *ptr, size_t size, size_t nmemb, void *stream)
cg->result = async_io_result_t::SUCCESS;
}
- DBG_PRINT("consume_curl2.X realsize %zu, total %" PRIu64 " / ( content_len %" PRIu64 " ), is_final %d, result %d, rb %s",
- realsize, cg->total_read.load(), cg->result.load(), cg->content_length.load(), is_final, cg->buffer.toString().c_str() );
+ DBG_PRINT("consume_data_curl2.X realsize %zu, total %" PRIu64 " / ( content_len has %d, size %" PRIu64 " ), is_final %d, result %d, rb %s",
+ realsize, cg->total_read.load(), cg->has_content_length.load(), cg->content_length.load(), is_final, cg->result.load(), cg->buffer.toString().c_str() );
return realsize;
}
@@ -309,15 +442,47 @@ static void read_url_stream_thread(const char *url, std::unique_ptr<curl_glue2_t
goto errout;
}
- /* send all data to this function */
- res = curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, consume_curl2);
+ /* Suppress proxy CONNECT response headers from user callbacks */
+ res = curl_easy_setopt(curl_handle, CURLOPT_SUPPRESS_CONNECT_HEADERS, 1L);
if( CURLE_OK != res ) {
ERR_PRINT("Error setting up url %s, error %d %d",
url, (int)res, errorbuffer.data());
goto errout;
}
- /* write the page body to this file handle */
+ /* Don't pass headers to the data stream. */
+ res = curl_easy_setopt(curl_handle, CURLOPT_HEADER, 0L);
+ if( CURLE_OK != res ) {
+ ERR_PRINT("Error setting up url %s, error %d %d",
+ url, (int)res, errorbuffer.data());
+ goto errout;
+ }
+
+ /* send header data to this function */
+ res = curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, consume_header_curl2);
+ if( CURLE_OK != res ) {
+ ERR_PRINT("Error setting up url %s, error %d %d",
+ url, (int)res, errorbuffer.data());
+ goto errout;
+ }
+
+ /* set userdata for consume_header_curl2 */
+ res = curl_easy_setopt(curl_handle, CURLOPT_HEADERDATA, (void*)cg.get());
+ if( CURLE_OK != res ) {
+ ERR_PRINT("Error setting up url %s, error %d %d",
+ url, (int)res, errorbuffer.data());
+ goto errout;
+ }
+
+ /* send received data to this function */
+ res = curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, consume_data_curl2);
+ if( CURLE_OK != res ) {
+ ERR_PRINT("Error setting up url %s, error %d %d",
+ url, (int)res, errorbuffer.data());
+ goto errout;
+ }
+
+ /* set userdata for consume_data_curl2 */
res = curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void*)cg.get());
if( CURLE_OK != res ) {
ERR_PRINT("Error setting up url %s, error %d %d",
@@ -330,10 +495,10 @@ static void read_url_stream_thread(const char *url, std::unique_ptr<curl_glue2_t
if( CURLE_OK != res ) {
if( async_io_result_t::NONE == cg->result ) {
// Error during normal processing
- ERR_PRINT("Error processing url %s, error %d %d",
+ IRQ_PRINT("Error processing url %s, error %d %d",
url, (int)res, errorbuffer.data());
} else {
- // User aborted
+ // User aborted or response code error detected via consume_header_curl2
DBG_PRINT("Processing aborted url %s, error %d %d",
url, (int)res, errorbuffer.data());
}
diff --git a/test/test_bytestream01.cpp b/test/test_bytestream01.cpp
index 5c4d4b8..777b57c 100644
--- a/test/test_bytestream01.cpp
+++ b/test/test_bytestream01.cpp
@@ -149,7 +149,7 @@ class TestByteStream01 {
input.close();
if ( 0==in_bytes_total || outfile.fail() ) {
- ERR_PRINT2("ByteStream copy failed: Output file write failed %s", output_fname.c_str());
+ IRQ_PRINT("ByteStream copy failed: Output file write failed %s", output_fname.c_str());
return false;
}
@@ -197,7 +197,7 @@ class TestByteStream01 {
const std::string uri_original = url_input_root + fname_payload_lst[file_idx];
- jau::io::ByteInStream_URL data_stream(uri_original, 3_s);
+ jau::io::ByteInStream_URL data_stream(uri_original, 500_ms);
bool res = transfer(data_stream, fname_payload_copy_lst[file_idx]);
REQUIRE( true == res );
@@ -213,7 +213,7 @@ class TestByteStream01 {
const std::string uri_original = url_input_root + fname_payload_lst[file_idx];
- jau::io::ByteInStream_URL data_stream(uri_original, 3_s);
+ jau::io::ByteInStream_URL data_stream(uri_original, 500_ms);
bool res = transfer(data_stream, fname_payload_copy_lst[file_idx]);
REQUIRE( true == res );
@@ -226,13 +226,36 @@ class TestByteStream01 {
}
}
- // throttled, no content size
+ void test12_copy_http_404() {
+ httpd_start();
+
+ {
+ const size_t file_idx = IDX_11kiB;
+
+ const std::string uri_original = url_input_root + "doesnt_exists.txt";
+
+ jau::io::ByteInStream_URL data_stream(uri_original, 500_ms);
+
+ bool res = transfer(data_stream, fname_payload_copy_lst[file_idx]);
+ REQUIRE( false == res );
+
+ jau::fs::file_stats out_stats(fname_payload_copy_lst[file_idx]);
+ REQUIRE( true == out_stats.exists() );
+ REQUIRE( true == out_stats.is_file() );
+ REQUIRE( data_stream.error() == true );
+ REQUIRE( data_stream.has_content_size() == false );
+ REQUIRE( data_stream.content_size() == 0 );
+ REQUIRE( 0 == out_stats.size() );
+ }
+ }
+
+ // throttled, no content size, interruptReader() via set_eof() will avoid timeout
static void feed_source_00(jau::io::ByteInStream_Feed * data_feed) {
uint64_t xfer_total = 0;
- jau::io::ByteInStream_File enc_stream(data_feed->id(), true /* use_binary */);
- while( !enc_stream.end_of_data() ) {
+ jau::io::ByteInStream_File data_stream(data_feed->id(), true /* use_binary */);
+ while( !data_stream.end_of_data() ) {
uint8_t buffer[1024]; // 1k
- size_t count = enc_stream.read(buffer, sizeof(buffer));
+ size_t count = data_stream.read(buffer, sizeof(buffer));
if( 0 < count ) {
xfer_total += count;
data_feed->write(buffer, count);
@@ -250,10 +273,10 @@ class TestByteStream01 {
data_feed->set_content_size( file_size );
uint64_t xfer_total = 0;
- jau::io::ByteInStream_File enc_stream(data_feed->id(), true /* use_binary */);
- while( !enc_stream.end_of_data() && xfer_total < file_size ) {
+ jau::io::ByteInStream_File data_stream(data_feed->id(), true /* use_binary */);
+ while( !data_stream.end_of_data() && xfer_total < file_size ) {
uint8_t buffer[1024]; // 1k
- size_t count = enc_stream.read(buffer, sizeof(buffer));
+ size_t count = data_stream.read(buffer, sizeof(buffer));
if( 0 < count ) {
xfer_total += count;
data_feed->write(buffer, count);
@@ -271,16 +294,63 @@ class TestByteStream01 {
data_feed->set_content_size( file_size );
uint64_t xfer_total = 0;
+ jau::io::ByteInStream_File data_stream(data_feed->id(), true /* use_binary */);
+ while( !data_stream.end_of_data() && xfer_total < file_size ) {
+ uint8_t buffer[1024]; // 1k
+ size_t count = data_stream.read(buffer, sizeof(buffer));
+ if( 0 < count ) {
+ xfer_total += count;
+ data_feed->write(buffer, count);
+ }
+ }
+ data_feed->set_eof( xfer_total == file_size ? jau::io::async_io_result_t::SUCCESS : jau::io::async_io_result_t::FAILED );
+ }
+
+ // full speed, no content size, interrupting 1/4 way
+ static void feed_source_20(jau::io::ByteInStream_Feed * data_feed) {
+ jau::fs::file_stats fs_feed(data_feed->id());
+ const uint64_t file_size = fs_feed.size();
+
+ uint64_t xfer_total = 0;
jau::io::ByteInStream_File enc_stream(data_feed->id(), true /* use_binary */);
- while( !enc_stream.end_of_data() && xfer_total < file_size ) {
+ while( !enc_stream.end_of_data() ) {
uint8_t buffer[1024]; // 1k
size_t count = enc_stream.read(buffer, sizeof(buffer));
if( 0 < count ) {
xfer_total += count;
data_feed->write(buffer, count);
+ if( xfer_total >= file_size/4 ) {
+ data_feed->set_eof( jau::io::async_io_result_t::FAILED ); // calls data_feed->interruptReader();
+ return;
+ }
}
}
- data_feed->set_eof( xfer_total == file_size ? jau::io::async_io_result_t::SUCCESS : jau::io::async_io_result_t::FAILED );
+ // probably set after transfering due to above sleep, which also ends when total size has been reached.
+ data_feed->set_eof( jau::io::async_io_result_t::SUCCESS );
+ }
+
+ // full speed, with content size, interrupting 1/4 way
+ static void feed_source_21(jau::io::ByteInStream_Feed * data_feed) {
+ jau::fs::file_stats fs_feed(data_feed->id());
+ const uint64_t file_size = fs_feed.size();
+ data_feed->set_content_size( file_size );
+
+ uint64_t xfer_total = 0;
+ jau::io::ByteInStream_File enc_stream(data_feed->id(), true /* use_binary */);
+ while( !enc_stream.end_of_data() ) {
+ uint8_t buffer[1024]; // 1k
+ size_t count = enc_stream.read(buffer, sizeof(buffer));
+ if( 0 < count ) {
+ xfer_total += count;
+ data_feed->write(buffer, count);
+ if( xfer_total >= file_size/4 ) {
+ data_feed->set_eof( jau::io::async_io_result_t::FAILED ); // calls data_feed->interruptReader();
+ return;
+ }
+ }
+ }
+ // probably set after transfering due to above sleep, which also ends when total size has been reached.
+ data_feed->set_eof( jau::io::async_io_result_t::SUCCESS );
}
void test21_copy_fed_ok() {
@@ -288,7 +358,7 @@ class TestByteStream01 {
const size_t file_idx = IDX_11kiB;
{
// full speed, with content size
- jau::io::ByteInStream_Feed data_feed(fname_payload_lst[file_idx], 3_s);
+ jau::io::ByteInStream_Feed data_feed(fname_payload_lst[file_idx], 500_ms);
std::thread feeder_thread= std::thread(&feed_source_10, &data_feed);
bool res = transfer(data_feed, fname_payload_copy_lst[file_idx]);
@@ -305,7 +375,7 @@ class TestByteStream01 {
}
{
// throttled, with content size
- jau::io::ByteInStream_Feed data_feed(fname_payload_lst[file_idx], 3_s);
+ jau::io::ByteInStream_Feed data_feed(fname_payload_lst[file_idx], 500_ms);
std::thread feeder_thread= std::thread(&feed_source_01, &data_feed);
bool res = transfer(data_feed, fname_payload_copy_lst[file_idx]);
@@ -321,7 +391,7 @@ class TestByteStream01 {
REQUIRE( fname_payload_size_lst[file_idx] == out_stats.size() );
}
{
- // throttled, no content size, will timeout after 500_ms
+ // throttled, no content size, interruptReader() via set_eof() will avoid timeout
jau::io::ByteInStream_Feed data_feed(fname_payload_lst[file_idx], 500_ms);
std::thread feeder_thread= std::thread(&feed_source_00, &data_feed);
@@ -342,7 +412,7 @@ class TestByteStream01 {
const size_t file_idx = IDX_65MiB;
{
// full speed, with content size
- jau::io::ByteInStream_Feed data_feed(fname_payload_lst[file_idx], 3_s);
+ jau::io::ByteInStream_Feed data_feed(fname_payload_lst[file_idx], 500_ms);
std::thread feeder_thread= std::thread(&feed_source_10, &data_feed);
bool res = transfer(data_feed, fname_payload_copy_lst[file_idx]);
@@ -359,16 +429,61 @@ class TestByteStream01 {
}
}
}
+
+ void test22_copy_fed_irq() {
+ {
+ const size_t file_idx = IDX_65MiB;
+ {
+ // full speed, no content size, interrupting 1/4 way
+ jau::io::ByteInStream_Feed data_feed(fname_payload_lst[file_idx], 500_ms);
+ std::thread feeder_thread= std::thread(&feed_source_20, &data_feed);
+
+ bool res = transfer(data_feed, fname_payload_copy_lst[file_idx]);
+ if( feeder_thread.joinable() ) {
+ feeder_thread.join();
+ }
+ REQUIRE( true == res );
+
+ jau::fs::file_stats out_stats(fname_payload_copy_lst[file_idx]);
+ REQUIRE( true == out_stats.exists() );
+ REQUIRE( true == out_stats.is_file() );
+ REQUIRE( false == data_feed.has_content_size() );
+ REQUIRE( 0 == data_feed.content_size() );
+ REQUIRE( fname_payload_size_lst[file_idx] > out_stats.size() ); // interrupted...
+ }
+ {
+ // full speed, with content size, interrupting 1/4 way
+ jau::io::ByteInStream_Feed data_feed(fname_payload_lst[file_idx], 500_ms);
+ std::thread feeder_thread= std::thread(&feed_source_21, &data_feed);
+
+ bool res = transfer(data_feed, fname_payload_copy_lst[file_idx]);
+ if( feeder_thread.joinable() ) {
+ feeder_thread.join();
+ }
+ REQUIRE( true == res );
+
+ jau::fs::file_stats out_stats(fname_payload_copy_lst[file_idx]);
+ REQUIRE( true == out_stats.exists() );
+ REQUIRE( true == out_stats.is_file() );
+ REQUIRE( true == data_feed.has_content_size() );
+ REQUIRE( fname_payload_size_lst[file_idx] == data_feed.content_size() );
+ REQUIRE( data_feed.content_size() > out_stats.size() ); // interrupted...
+ }
+ }
+ }
+
};
std::vector<std::string> TestByteStream01::fname_payload_lst;
std::vector<std::string> TestByteStream01::fname_payload_copy_lst;
std::vector<uint64_t> TestByteStream01::fname_payload_size_lst;
-// METHOD_AS_TEST_CASE( TestByteStream01::test01_copy_file_ok, "TestByteStream01 test01_copy_file_ok");
+METHOD_AS_TEST_CASE( TestByteStream01::test01_copy_file_ok, "TestByteStream01 test01_copy_file_ok");
-// METHOD_AS_TEST_CASE( TestByteStream01::test11_copy_http_ok, "TestByteStream01 test11_copy_http_ok");
+METHOD_AS_TEST_CASE( TestByteStream01::test11_copy_http_ok, "TestByteStream01 test11_copy_http_ok");
+METHOD_AS_TEST_CASE( TestByteStream01::test12_copy_http_404, "TestByteStream01 test12_copy_http_404");
METHOD_AS_TEST_CASE( TestByteStream01::test21_copy_fed_ok, "TestByteStream01 test21_copy_fed_ok");
+METHOD_AS_TEST_CASE( TestByteStream01::test22_copy_fed_irq, "TestByteStream01 test22_copy_fed_irq");
diff --git a/test/test_iostream01.cpp b/test/test_iostream01.cpp
index 266e81d..d82f7d2 100644
--- a/test/test_iostream01.cpp
+++ b/test/test_iostream01.cpp
@@ -80,12 +80,12 @@ class TestIOStream01 {
std::system("killall mini_httpd");
}
- void test01() {
+ void test01_sync_ok() {
const jau::fs::file_stats in_stats(basename_10kiB);
const size_t file_size = in_stats.size();
const std::string url_input = url_input_root + basename_10kiB;
- std::ofstream outfile("test02_01_out.bin", std::ios::out | std::ios::binary);
+ std::ofstream outfile("test01_01_out.bin", std::ios::out | std::ios::binary);
REQUIRE( outfile.good() );
REQUIRE( outfile.is_open() );
@@ -96,25 +96,52 @@ class TestIOStream01 {
consumed_calls++;
consumed_total_bytes += data.size();
outfile.write(reinterpret_cast<char*>(data.data()), data.size());
- jau::PLAIN_PRINT(true, "test02io01 #%zu: consumed size %zu, total %" PRIu64 ", capacity %zu, final %d",
+ jau::PLAIN_PRINT(true, "test01_sync_ok #%zu: consumed size %zu, total %" PRIu64 ", capacity %zu, final %d",
consumed_calls, data.size(), consumed_total_bytes, data.capacity(), is_final );
return true;
};
uint64_t http_total_bytes = jau::io::read_url_stream(url_input, buffer, consume);
const uint64_t out_bytes_total = outfile.tellp();
- jau::PLAIN_PRINT(true, "test02io01 Done: total %" PRIu64 ", capacity %zu", consumed_total_bytes, buffer.capacity());
+ jau::PLAIN_PRINT(true, "test01_sync_ok Done: total %" PRIu64 ", capacity %zu", consumed_total_bytes, buffer.capacity());
REQUIRE( file_size == http_total_bytes );
REQUIRE( consumed_total_bytes == http_total_bytes );
REQUIRE( consumed_total_bytes == out_bytes_total );
}
- void test02() {
+ void test02_sync_404() {
+ const std::string url_input = url_input_root + "doesnt_exists.txt";
+
+ std::ofstream outfile("test02_01_out.bin", std::ios::out | std::ios::binary);
+ REQUIRE( outfile.good() );
+ REQUIRE( outfile.is_open() );
+
+ jau::io::secure_vector<uint8_t> buffer(4096);
+ size_t consumed_calls = 0;
+ uint64_t consumed_total_bytes = 0;
+ jau::io::StreamConsumerFunc consume = [&](jau::io::secure_vector<uint8_t>& data, bool is_final) noexcept -> bool {
+ consumed_calls++;
+ consumed_total_bytes += data.size();
+ outfile.write(reinterpret_cast<char*>(data.data()), data.size());
+ jau::PLAIN_PRINT(true, "test02_sync_404 #%zu: consumed size %zu, total %" PRIu64 ", capacity %zu, final %d",
+ consumed_calls, data.size(), consumed_total_bytes, data.capacity(), is_final );
+ return true;
+ };
+ uint64_t http_total_bytes = jau::io::read_url_stream(url_input, buffer, consume);
+ const uint64_t out_bytes_total = outfile.tellp();
+ jau::PLAIN_PRINT(true, "test02_sync_404 Done: total %" PRIu64 ", capacity %zu", consumed_total_bytes, buffer.capacity());
+
+ REQUIRE( 0 == http_total_bytes );
+ REQUIRE( consumed_total_bytes == http_total_bytes );
+ REQUIRE( consumed_total_bytes == out_bytes_total );
+ }
+
+ void test11_async_ok() {
const jau::fs::file_stats in_stats(basename_10kiB);
const size_t file_size = in_stats.size();
const std::string url_input = url_input_root + basename_10kiB;
- std::ofstream outfile("test02_02_out.bin", std::ios::out | std::ios::binary);
+ std::ofstream outfile("test11_01_out.bin", std::ios::out | std::ios::binary);
REQUIRE( outfile.good() );
REQUIRE( outfile.is_open() );
@@ -134,15 +161,15 @@ class TestIOStream01 {
while( jau::io::async_io_result_t::NONE == result || !rb.isEmpty() ) {
consumed_loops++;
// const size_t consumed_bytes = content_length >= 0 ? std::min(buffer_size, content_length - consumed_total_bytes) : rb.getSize();
- const size_t consumed_bytes = rb.getBlocking(buffer.data(), buffer_size, 1, 0_s);
+ const size_t consumed_bytes = rb.getBlocking(buffer.data(), buffer_size, 1, 500_ms);
consumed_total_bytes += consumed_bytes;
- jau::PLAIN_PRINT(true, "test02io02.0 #%zu: consumed[this %zu, total %" PRIu64 ", result %d, rb %s",
+ jau::PLAIN_PRINT(true, "test11_async_ok.0 #%zu: consumed[this %zu, total %" PRIu64 ", result %d, rb %s",
consumed_loops, consumed_bytes, consumed_total_bytes, result.load(), rb.toString().c_str() );
outfile.write(reinterpret_cast<char*>(buffer.data()), consumed_bytes);
}
const uint64_t out_bytes_total = outfile.tellp();
- jau::PLAIN_PRINT(true, "test02io02.X Done: total %" PRIu64 ", result %d, rb %s",
- consumed_total_bytes, result.load(), rb.toString().c_str() );
+ jau::PLAIN_PRINT(true, "test11_async_ok.X Done: total %" PRIu64 ", result %d, rb %s",
+ consumed_total_bytes, (int)result.load(), rb.toString().c_str() );
http_thread.join();
@@ -153,7 +180,54 @@ class TestIOStream01 {
REQUIRE( url_content_length == out_bytes_total );
REQUIRE( jau::io::async_io_result_t::SUCCESS == result );
}
+
+ void test12_async_404() {
+ const std::string url_input = url_input_root + "doesnt_exists.txt";
+
+ std::ofstream outfile("test12_01_out.bin", std::ios::out | std::ios::binary);
+ REQUIRE( outfile.good() );
+ REQUIRE( outfile.is_open() );
+
+ constexpr const size_t buffer_size = 4096;
+ jau::io::ByteRingbuffer rb(0x00, jau::io::BEST_URLSTREAM_RINGBUFFER_SIZE);
+ jau::relaxed_atomic_bool url_has_content_length;
+ jau::relaxed_atomic_uint64 url_content_length;
+ jau::relaxed_atomic_uint64 url_total_read;
+ jau::io::relaxed_atomic_async_io_result_t result;
+
+ std::thread http_thread = jau::io::read_url_stream(url_input, rb, url_has_content_length, url_content_length, url_total_read, result);
+
+ jau::io::secure_vector<uint8_t> buffer(buffer_size);
+ size_t consumed_loops = 0;
+ uint64_t consumed_total_bytes = 0;
+
+ while( jau::io::async_io_result_t::NONE == result || !rb.isEmpty() ) {
+ consumed_loops++;
+ // const size_t consumed_bytes = content_length >= 0 ? std::min(buffer_size, content_length - consumed_total_bytes) : rb.getSize();
+ const size_t consumed_bytes = rb.getBlocking(buffer.data(), buffer_size, 1, 500_ms);
+ consumed_total_bytes += consumed_bytes;
+ jau::PLAIN_PRINT(true, "test12_async_404.0 #%zu: consumed[this %zu, total %" PRIu64 ", result %d, rb %s",
+ consumed_loops, consumed_bytes, consumed_total_bytes, result.load(), rb.toString().c_str() );
+ outfile.write(reinterpret_cast<char*>(buffer.data()), consumed_bytes);
+ }
+ const uint64_t out_bytes_total = outfile.tellp();
+ jau::PLAIN_PRINT(true, "test12_async_404.X Done: total %" PRIu64 ", result %d, rb %s",
+ consumed_total_bytes, (int)result.load(), rb.toString().c_str() );
+
+ http_thread.join();
+
+ REQUIRE( url_has_content_length == false );
+ REQUIRE( url_content_length == 0 );
+ REQUIRE( url_content_length == consumed_total_bytes );
+ REQUIRE( url_content_length == url_total_read );
+ REQUIRE( url_content_length == out_bytes_total );
+ REQUIRE( jau::io::async_io_result_t::FAILED == result );
+ }
+
};
-METHOD_AS_TEST_CASE( TestIOStream01::test01, "TestIOStream01 - 01");
-METHOD_AS_TEST_CASE( TestIOStream01::test02, "TestIOStream01 - 02");
+METHOD_AS_TEST_CASE( TestIOStream01::test01_sync_ok, "TestIOStream01 - test01_sync_ok");
+METHOD_AS_TEST_CASE( TestIOStream01::test02_sync_404, "TestIOStream01 - test02_sync_404");
+METHOD_AS_TEST_CASE( TestIOStream01::test11_async_ok, "TestIOStream01 - test11_async_ok");
+METHOD_AS_TEST_CASE( TestIOStream01::test12_async_404, "TestIOStream01 - test12_async_404");
+