diff options
author | Sven Gothel <[email protected]> | 2022-05-29 13:20:41 +0200 |
---|---|---|
committer | Sven Gothel <[email protected]> | 2022-05-29 13:20:41 +0200 |
commit | 68a026914fa27a8dd3bebbb358b30ee24f4de193 (patch) | |
tree | 85b580ada046f3e44a31a28b986ecaf2a0deec24 /test/test_bytestream01.cpp | |
parent | 13ce66b10dedf4d26eea4ff2262649ab23687c69 (diff) |
io::read_url_stream: Fix content_length (-1 == unknown), add consume_header w/ response_code for errors >= 400 and buffer.interruptReader()
- curl replies -1 for unknown content_length
- also consume the curl header to detect response_code errors >= 400 (404 not found)
- in case of an error, we need to interrupt the ringbuffer-reader thread -> buffer.interruptReader(),
otherwise the thread would be blocked until timeout and renders application to not be responsive.
- interruptReader() has been added for the synchronous- and asynchronous read_url_stream() functions
- interruptReader() has been exposed as ByteInStream_Feed::interruptReader()
and added to ByteInStream_Feed::set_eof().
- the above is tested via test_iostream01 for synchronous- and asynchronous read_url_stream() functions
using a non-existing URL entity
- the above is tested via test_bytestream01 for ByteInStream_URL using a non-existing URL entity
and for and ByteInStream_Feed having the feeder thread prematurely end transmission.
Diffstat (limited to 'test/test_bytestream01.cpp')
-rw-r--r-- | test/test_bytestream01.cpp | 151 |
1 files changed, 133 insertions, 18 deletions
diff --git a/test/test_bytestream01.cpp b/test/test_bytestream01.cpp index 5c4d4b8..777b57c 100644 --- a/test/test_bytestream01.cpp +++ b/test/test_bytestream01.cpp @@ -149,7 +149,7 @@ class TestByteStream01 { input.close(); if ( 0==in_bytes_total || outfile.fail() ) { - ERR_PRINT2("ByteStream copy failed: Output file write failed %s", output_fname.c_str()); + IRQ_PRINT("ByteStream copy failed: Output file write failed %s", output_fname.c_str()); return false; } @@ -197,7 +197,7 @@ class TestByteStream01 { const std::string uri_original = url_input_root + fname_payload_lst[file_idx]; - jau::io::ByteInStream_URL data_stream(uri_original, 3_s); + jau::io::ByteInStream_URL data_stream(uri_original, 500_ms); bool res = transfer(data_stream, fname_payload_copy_lst[file_idx]); REQUIRE( true == res ); @@ -213,7 +213,7 @@ class TestByteStream01 { const std::string uri_original = url_input_root + fname_payload_lst[file_idx]; - jau::io::ByteInStream_URL data_stream(uri_original, 3_s); + jau::io::ByteInStream_URL data_stream(uri_original, 500_ms); bool res = transfer(data_stream, fname_payload_copy_lst[file_idx]); REQUIRE( true == res ); @@ -226,13 +226,36 @@ class TestByteStream01 { } } - // throttled, no content size + void test12_copy_http_404() { + httpd_start(); + + { + const size_t file_idx = IDX_11kiB; + + const std::string uri_original = url_input_root + "doesnt_exists.txt"; + + jau::io::ByteInStream_URL data_stream(uri_original, 500_ms); + + bool res = transfer(data_stream, fname_payload_copy_lst[file_idx]); + REQUIRE( false == res ); + + jau::fs::file_stats out_stats(fname_payload_copy_lst[file_idx]); + REQUIRE( true == out_stats.exists() ); + REQUIRE( true == out_stats.is_file() ); + REQUIRE( data_stream.error() == true ); + REQUIRE( data_stream.has_content_size() == false ); + REQUIRE( data_stream.content_size() == 0 ); + REQUIRE( 0 == out_stats.size() ); + } + } + + // throttled, no content size, interruptReader() via set_eof() will avoid timeout static void feed_source_00(jau::io::ByteInStream_Feed * data_feed) { uint64_t xfer_total = 0; - jau::io::ByteInStream_File enc_stream(data_feed->id(), true /* use_binary */); - while( !enc_stream.end_of_data() ) { + jau::io::ByteInStream_File data_stream(data_feed->id(), true /* use_binary */); + while( !data_stream.end_of_data() ) { uint8_t buffer[1024]; // 1k - size_t count = enc_stream.read(buffer, sizeof(buffer)); + size_t count = data_stream.read(buffer, sizeof(buffer)); if( 0 < count ) { xfer_total += count; data_feed->write(buffer, count); @@ -250,10 +273,10 @@ class TestByteStream01 { data_feed->set_content_size( file_size ); uint64_t xfer_total = 0; - jau::io::ByteInStream_File enc_stream(data_feed->id(), true /* use_binary */); - while( !enc_stream.end_of_data() && xfer_total < file_size ) { + jau::io::ByteInStream_File data_stream(data_feed->id(), true /* use_binary */); + while( !data_stream.end_of_data() && xfer_total < file_size ) { uint8_t buffer[1024]; // 1k - size_t count = enc_stream.read(buffer, sizeof(buffer)); + size_t count = data_stream.read(buffer, sizeof(buffer)); if( 0 < count ) { xfer_total += count; data_feed->write(buffer, count); @@ -271,16 +294,63 @@ class TestByteStream01 { data_feed->set_content_size( file_size ); uint64_t xfer_total = 0; + jau::io::ByteInStream_File data_stream(data_feed->id(), true /* use_binary */); + while( !data_stream.end_of_data() && xfer_total < file_size ) { + uint8_t buffer[1024]; // 1k + size_t count = data_stream.read(buffer, sizeof(buffer)); + if( 0 < count ) { + xfer_total += count; + data_feed->write(buffer, count); + } + } + data_feed->set_eof( xfer_total == file_size ? jau::io::async_io_result_t::SUCCESS : jau::io::async_io_result_t::FAILED ); + } + + // full speed, no content size, interrupting 1/4 way + static void feed_source_20(jau::io::ByteInStream_Feed * data_feed) { + jau::fs::file_stats fs_feed(data_feed->id()); + const uint64_t file_size = fs_feed.size(); + + uint64_t xfer_total = 0; jau::io::ByteInStream_File enc_stream(data_feed->id(), true /* use_binary */); - while( !enc_stream.end_of_data() && xfer_total < file_size ) { + while( !enc_stream.end_of_data() ) { uint8_t buffer[1024]; // 1k size_t count = enc_stream.read(buffer, sizeof(buffer)); if( 0 < count ) { xfer_total += count; data_feed->write(buffer, count); + if( xfer_total >= file_size/4 ) { + data_feed->set_eof( jau::io::async_io_result_t::FAILED ); // calls data_feed->interruptReader(); + return; + } } } - data_feed->set_eof( xfer_total == file_size ? jau::io::async_io_result_t::SUCCESS : jau::io::async_io_result_t::FAILED ); + // probably set after transfering due to above sleep, which also ends when total size has been reached. + data_feed->set_eof( jau::io::async_io_result_t::SUCCESS ); + } + + // full speed, with content size, interrupting 1/4 way + static void feed_source_21(jau::io::ByteInStream_Feed * data_feed) { + jau::fs::file_stats fs_feed(data_feed->id()); + const uint64_t file_size = fs_feed.size(); + data_feed->set_content_size( file_size ); + + uint64_t xfer_total = 0; + jau::io::ByteInStream_File enc_stream(data_feed->id(), true /* use_binary */); + while( !enc_stream.end_of_data() ) { + uint8_t buffer[1024]; // 1k + size_t count = enc_stream.read(buffer, sizeof(buffer)); + if( 0 < count ) { + xfer_total += count; + data_feed->write(buffer, count); + if( xfer_total >= file_size/4 ) { + data_feed->set_eof( jau::io::async_io_result_t::FAILED ); // calls data_feed->interruptReader(); + return; + } + } + } + // probably set after transfering due to above sleep, which also ends when total size has been reached. + data_feed->set_eof( jau::io::async_io_result_t::SUCCESS ); } void test21_copy_fed_ok() { @@ -288,7 +358,7 @@ class TestByteStream01 { const size_t file_idx = IDX_11kiB; { // full speed, with content size - jau::io::ByteInStream_Feed data_feed(fname_payload_lst[file_idx], 3_s); + jau::io::ByteInStream_Feed data_feed(fname_payload_lst[file_idx], 500_ms); std::thread feeder_thread= std::thread(&feed_source_10, &data_feed); bool res = transfer(data_feed, fname_payload_copy_lst[file_idx]); @@ -305,7 +375,7 @@ class TestByteStream01 { } { // throttled, with content size - jau::io::ByteInStream_Feed data_feed(fname_payload_lst[file_idx], 3_s); + jau::io::ByteInStream_Feed data_feed(fname_payload_lst[file_idx], 500_ms); std::thread feeder_thread= std::thread(&feed_source_01, &data_feed); bool res = transfer(data_feed, fname_payload_copy_lst[file_idx]); @@ -321,7 +391,7 @@ class TestByteStream01 { REQUIRE( fname_payload_size_lst[file_idx] == out_stats.size() ); } { - // throttled, no content size, will timeout after 500_ms + // throttled, no content size, interruptReader() via set_eof() will avoid timeout jau::io::ByteInStream_Feed data_feed(fname_payload_lst[file_idx], 500_ms); std::thread feeder_thread= std::thread(&feed_source_00, &data_feed); @@ -342,7 +412,7 @@ class TestByteStream01 { const size_t file_idx = IDX_65MiB; { // full speed, with content size - jau::io::ByteInStream_Feed data_feed(fname_payload_lst[file_idx], 3_s); + jau::io::ByteInStream_Feed data_feed(fname_payload_lst[file_idx], 500_ms); std::thread feeder_thread= std::thread(&feed_source_10, &data_feed); bool res = transfer(data_feed, fname_payload_copy_lst[file_idx]); @@ -359,16 +429,61 @@ class TestByteStream01 { } } } + + void test22_copy_fed_irq() { + { + const size_t file_idx = IDX_65MiB; + { + // full speed, no content size, interrupting 1/4 way + jau::io::ByteInStream_Feed data_feed(fname_payload_lst[file_idx], 500_ms); + std::thread feeder_thread= std::thread(&feed_source_20, &data_feed); + + bool res = transfer(data_feed, fname_payload_copy_lst[file_idx]); + if( feeder_thread.joinable() ) { + feeder_thread.join(); + } + REQUIRE( true == res ); + + jau::fs::file_stats out_stats(fname_payload_copy_lst[file_idx]); + REQUIRE( true == out_stats.exists() ); + REQUIRE( true == out_stats.is_file() ); + REQUIRE( false == data_feed.has_content_size() ); + REQUIRE( 0 == data_feed.content_size() ); + REQUIRE( fname_payload_size_lst[file_idx] > out_stats.size() ); // interrupted... + } + { + // full speed, with content size, interrupting 1/4 way + jau::io::ByteInStream_Feed data_feed(fname_payload_lst[file_idx], 500_ms); + std::thread feeder_thread= std::thread(&feed_source_21, &data_feed); + + bool res = transfer(data_feed, fname_payload_copy_lst[file_idx]); + if( feeder_thread.joinable() ) { + feeder_thread.join(); + } + REQUIRE( true == res ); + + jau::fs::file_stats out_stats(fname_payload_copy_lst[file_idx]); + REQUIRE( true == out_stats.exists() ); + REQUIRE( true == out_stats.is_file() ); + REQUIRE( true == data_feed.has_content_size() ); + REQUIRE( fname_payload_size_lst[file_idx] == data_feed.content_size() ); + REQUIRE( data_feed.content_size() > out_stats.size() ); // interrupted... + } + } + } + }; std::vector<std::string> TestByteStream01::fname_payload_lst; std::vector<std::string> TestByteStream01::fname_payload_copy_lst; std::vector<uint64_t> TestByteStream01::fname_payload_size_lst; -// METHOD_AS_TEST_CASE( TestByteStream01::test01_copy_file_ok, "TestByteStream01 test01_copy_file_ok"); +METHOD_AS_TEST_CASE( TestByteStream01::test01_copy_file_ok, "TestByteStream01 test01_copy_file_ok"); -// METHOD_AS_TEST_CASE( TestByteStream01::test11_copy_http_ok, "TestByteStream01 test11_copy_http_ok"); +METHOD_AS_TEST_CASE( TestByteStream01::test11_copy_http_ok, "TestByteStream01 test11_copy_http_ok"); +METHOD_AS_TEST_CASE( TestByteStream01::test12_copy_http_404, "TestByteStream01 test12_copy_http_404"); METHOD_AS_TEST_CASE( TestByteStream01::test21_copy_fed_ok, "TestByteStream01 test21_copy_fed_ok"); +METHOD_AS_TEST_CASE( TestByteStream01::test22_copy_fed_irq, "TestByteStream01 test22_copy_fed_irq"); |