aboutsummaryrefslogtreecommitdiffstats
path: root/src/elevator/IOUtil.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/elevator/IOUtil.cpp')
-rw-r--r--src/elevator/IOUtil.cpp289
1 files changed, 264 insertions, 25 deletions
diff --git a/src/elevator/IOUtil.cpp b/src/elevator/IOUtil.cpp
index 205cbad..6d47fd6 100644
--- a/src/elevator/IOUtil.cpp
+++ b/src/elevator/IOUtil.cpp
@@ -5,6 +5,7 @@
#include <fstream>
#include <iostream>
+#include <chrono>
#include <elevator/elevator.hpp>
@@ -14,6 +15,9 @@
#include <curl/curl.h>
+#include <thread>
+#include <pthread.h>
+
// #define USE_CXX17lib_FS 1
#if USE_CXX17lib_FS
#include <filesystem>
@@ -22,12 +26,12 @@
using namespace elevator;
-bool IOUtil::file_exists(const std::string& name) {
+bool IOUtil::file_exists(const std::string& name) noexcept {
std::ifstream f(name);
return f.good() && f.is_open();
}
-bool IOUtil::remove(const std::string& fname) {
+bool IOUtil::remove(const std::string& fname) noexcept {
#if USE_CXX17lib_FS
const fs::path fname2 = fname;
return fs::remove(fname);
@@ -87,7 +91,7 @@ ssize_t IOUtil::read_stream(Botan::DataSource& in, Botan::secure_vector<uint8_t>
return total;
}
-struct curl_glue_t {
+struct curl_glue1_t {
CURL *curl_handle;
ssize_t content_length;
ssize_t total_read;
@@ -95,8 +99,8 @@ struct curl_glue_t {
IOUtil::StreamConsumerFunc consumer_fn;
};
-static size_t consume_curl0(void *ptr, size_t size, size_t nmemb, void *stream) {
- curl_glue_t * cg = (curl_glue_t*)stream;
+static size_t consume_curl1(void *ptr, size_t size, size_t nmemb, void *stream) noexcept {
+ curl_glue1_t * cg = (curl_glue1_t*)stream;
if( 0 > cg->content_length ) {
curl_off_t v = 0;
@@ -110,7 +114,8 @@ static size_t consume_curl0(void *ptr, size_t size, size_t nmemb, void *stream)
memcpy(cg->buffer.data(), ptr, realsize);
cg->total_read += realsize;
- bool is_final = 0 < cg->content_length ? cg->total_read >= cg->content_length : false;
+ const bool is_final = 0 == realsize ||
+ ( 0 < cg->content_length ) ? cg->total_read >= cg->content_length : false;
cg->consumer_fn(cg->buffer, is_final);
return realsize;
@@ -118,36 +123,270 @@ static size_t consume_curl0(void *ptr, size_t size, size_t nmemb, void *stream)
ssize_t IOUtil::read_http_get(const std::string& url, Botan::secure_vector<uint8_t>& buffer,
StreamConsumerFunc consumer_fn) {
- /* init the curl session */
- CURL *curl_handle = curl_easy_init();
+ std::vector<char> errorbuffer;
+ errorbuffer.reserve(CURL_ERROR_SIZE);
+ CURLcode res;
+
+ /* init the curl session */
+ CURL *curl_handle = curl_easy_init();
+ if( nullptr == curl_handle ) {
+ ERR_PRINT("Error setting up http url %s, null curl handle", url.c_str());
+ return -1;
+ }
+
+ curl_glue1_t cg = { curl_handle, -1, 0, buffer, consumer_fn };
+
+ res = curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, errorbuffer.data());
+ if( CURLE_OK != res ) {
+ ERR_PRINT("Error setting up http url %s, error %d %d",
+ url.c_str(), (int)res, curl_easy_strerror(res));
+ goto errout;
+ }
+
+ /* set URL to get here */
+ res = curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
+ if( CURLE_OK != res ) {
+ ERR_PRINT("Error setting up http url %s, error %d %d",
+ url.c_str(), (int)res, errorbuffer.data());
+ goto errout;
+ }
+
+ /* Switch on full protocol/debug output while testing */
+ res = curl_easy_setopt(curl_handle, CURLOPT_VERBOSE, 0L);
+ if( CURLE_OK != res ) {
+ ERR_PRINT("Error setting up http url %s, error %d %d",
+ url.c_str(), (int)res, errorbuffer.data());
+ goto errout;
+ }
+
+ /* disable progress meter, set to 0L to enable it */
+ res = curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L);
+ if( CURLE_OK != res ) {
+ ERR_PRINT("Error setting up http url %s, error %d %d",
+ url.c_str(), (int)res, errorbuffer.data());
+ goto errout;
+ }
+
+ /* send all data to this function */
+ res = curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, consume_curl1);
+ if( CURLE_OK != res ) {
+ ERR_PRINT("Error setting up http url %s, error %d %d",
+ url.c_str(), (int)res, errorbuffer.data());
+ goto errout;
+ }
+
+ /* write the page body to this file handle */
+ res = curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void*)&cg);
+ if( CURLE_OK != res ) {
+ ERR_PRINT("Error setting up http url %s, error %d %d",
+ url.c_str(), (int)res, errorbuffer.data());
+ goto errout;
+ }
+
+ /* performs the tast, blocking! */
+ res = curl_easy_perform(curl_handle);
+ if( CURLE_OK != res ) {
+ ERR_PRINT("Error processing http url %s, error %d %d",
+ url.c_str(), (int)res, errorbuffer.data());
+ goto errout;
+ }
+
+ /* cleanup curl stuff */
+ curl_easy_cleanup(curl_handle);
+ return cg.total_read;
+
+errout:
+ curl_easy_cleanup(curl_handle);
+ return -1;
+}
+
+struct curl_glue2_t {
+ curl_glue2_t(CURL *_curl_handle,
+ jau::relaxed_atomic_ssize_t* _content_length,
+ bool _content_length_mine,
+ jau::relaxed_atomic_ssize_t* _total_read,
+ bool _total_read_mine,
+ IOUtil::ByteRingbuffer& _buffer,
+ IOUtil::relaxed_atomic_result_t& _result)
+ : curl_handle(_curl_handle),
+ content_length(_content_length),
+ content_length_mine(_content_length_mine),
+ total_read(_total_read),
+ total_read_mine(_total_read_mine),
+ buffer(_buffer),
+ result(_result)
+ {}
+
+ CURL *curl_handle;
+ jau::relaxed_atomic_ssize_t* content_length;
+ bool content_length_mine;
+ jau::relaxed_atomic_ssize_t* total_read;
+ bool total_read_mine;
+ IOUtil::ByteRingbuffer& buffer;
+ IOUtil::relaxed_atomic_result_t& result;
+};
+
+static size_t consume_curl2(void *ptr, size_t size, size_t nmemb, void *stream) noexcept {
+ curl_glue2_t * cg = (curl_glue2_t*)stream;
+
+ if( 0 > *cg->content_length ) {
+ curl_off_t v = 0;
+ const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
+ if( CURLE_OK == r ) {
+ *cg->content_length = v;
+ }
+ }
+ ssize_t total_read = *cg->total_read;
+ const size_t realsize = size * nmemb;
+ DBG_PRINT("consume_curl2.0 realsize % " PRIu64 ", rb %s", realsize, cg->buffer.toString().c_str() );
+ cg->buffer.putBlocking(reinterpret_cast<uint8_t*>(ptr),
+ reinterpret_cast<uint8_t*>(ptr)+realsize, 0 /* timeoutMS */);
+
+ total_read += realsize;
+ *cg->total_read = total_read;
+ const bool is_final = 0 == realsize ||
+ ( 0 < *cg->content_length ) ? total_read >= *cg->content_length : false;
+ if( is_final ) {
+ cg->result = IOUtil::result_t::SUCCESS;
+ }
+
+ DBG_PRINT("consume_curl2.X realsize % " PRIu64 ", total %" PRIi64 ", result %d, rb %s",
+ realsize, total_read, cg->result.load(), cg->buffer.toString().c_str() );
- curl_glue_t cg = { curl_handle, -1, 0, buffer, consumer_fn };
+ return realsize;
+}
+
+static void read_http_get_thread(const char *url, std::unique_ptr<curl_glue2_t> && cg) noexcept {
+ std::vector<char> errorbuffer;
+ errorbuffer.reserve(CURL_ERROR_SIZE);
+ CURLcode res;
+
+ /* init the curl session */
+ CURL *curl_handle = curl_easy_init();
+ if( nullptr == curl_handle ) {
+ ERR_PRINT("Error setting up http url %s, null curl handle", url);
+ goto errout;
+ }
+ cg->curl_handle = curl_handle;
+
+ res = curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, errorbuffer.data());
+ if( CURLE_OK != res ) {
+ ERR_PRINT("Error setting up http url %s, error %d %d",
+ url, (int)res, curl_easy_strerror(res));
+ goto errout;
+ }
+
+ /* set URL to get here */
+ res = curl_easy_setopt(curl_handle, CURLOPT_URL, url);
+ if( CURLE_OK != res ) {
+ ERR_PRINT("Error setting up http url %s, error %d %d",
+ url, (int)res, errorbuffer.data());
+ goto errout;
+ }
+
+ /* Switch on full protocol/debug output while testing */
+ res = curl_easy_setopt(curl_handle, CURLOPT_VERBOSE, 0L);
+ if( CURLE_OK != res ) {
+ ERR_PRINT("Error setting up http url %s, error %d %d",
+ url, (int)res, errorbuffer.data());
+ goto errout;
+ }
+
+ /* disable progress meter, set to 0L to enable it */
+ res = curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L);
+ if( CURLE_OK != res ) {
+ ERR_PRINT("Error setting up http url %s, error %d %d",
+ url, (int)res, errorbuffer.data());
+ goto errout;
+ }
+
+ /* send all data to this function */
+ res = curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, consume_curl2);
+ if( CURLE_OK != res ) {
+ ERR_PRINT("Error setting up http url %s, error %d %d",
+ url, (int)res, errorbuffer.data());
+ goto errout;
+ }
+
+ /* write the page body to this file handle */
+ res = curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void*)cg.get());
+ if( CURLE_OK != res ) {
+ ERR_PRINT("Error setting up http url %s, error %d %d",
+ url, (int)res, errorbuffer.data());
+ goto errout;
+ }
+
+ /* performs the tast, blocking! */
+ res = curl_easy_perform(curl_handle);
+ if( CURLE_OK != res ) {
+ ERR_PRINT("Error processing http url %s, error %d %d",
+ url, (int)res, errorbuffer.data());
+ goto errout;
+ }
+
+ /* cleanup curl stuff */
+ cg->result = IOUtil::result_t::SUCCESS;
+ goto cleanup;
+
+errout:
+ cg->result = IOUtil::result_t::FAILED;
+
+cleanup:
+ if( nullptr != curl_handle ) {
+ curl_easy_cleanup(curl_handle);
+ }
+
+ if( cg->content_length_mine ) {
+ delete cg->content_length;
+ cg->content_length = nullptr;
+ }
+ if( cg->total_read_mine ) {
+ delete cg->total_read;
+ cg->total_read = nullptr;
+ }
+ return;
+}
+
+const size_t IOUtil::BEST_HTTP_RINGBUFFER_SIZE = 2*CURL_MAX_WRITE_SIZE;
- /* set URL to get here */
- curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
+void IOUtil::read_http_get(const std::string& url, ByteRingbuffer& buffer,
+ jau::relaxed_atomic_ssize_t& content_length,
+ jau::relaxed_atomic_ssize_t& total_read,
+ relaxed_atomic_result_t& result) noexcept {
+ /* init user referenced values */
+ content_length = -1;
+ total_read = 0;
+ result = IOUtil::result_t::NONE;
- /* Switch on full protocol/debug output while testing */
- curl_easy_setopt(curl_handle, CURLOPT_VERBOSE, 0L);
+ if( buffer.capacity() < BEST_HTTP_RINGBUFFER_SIZE ) {
+ buffer.recapacity( BEST_HTTP_RINGBUFFER_SIZE );
+ }
+
+ std::unique_ptr<curl_glue2_t> cg ( std::make_unique<curl_glue2_t>(nullptr, &content_length, false, &total_read, false, buffer, result ) );
- /* disable progress meter, set to 0L to enable it */
- curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L);
+ std::thread http_thread00(&::read_http_get_thread, url.c_str(), std::move(cg)); // @suppress("Invalid arguments")
+ http_thread00.detach();
+}
- /* send all data to this function */
- curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, consume_curl0);
+void IOUtil::read_http_get(const std::string& url, ByteRingbuffer& buffer,
+ relaxed_atomic_result_t& result) noexcept {
- /* write the page body to this file handle */
- curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void*)&cg);
+ /* init user referenced values */
+ result = IOUtil::result_t::NONE;
- /* performs the tast, blocking! */
- curl_easy_perform(curl_handle);
+ if( buffer.capacity() < BEST_HTTP_RINGBUFFER_SIZE ) {
+ buffer.recapacity( BEST_HTTP_RINGBUFFER_SIZE );
+ }
- /* cleanup curl stuff */
- curl_easy_cleanup(curl_handle);
+ jau::relaxed_atomic_ssize_t* content_length = new jau::relaxed_atomic_ssize_t(-1);
+ jau::relaxed_atomic_ssize_t* total_read = new jau::relaxed_atomic_ssize_t(0);
+ std::unique_ptr<curl_glue2_t> cg ( std::make_unique<curl_glue2_t>(nullptr, content_length, true, total_read, true, buffer, result ) );
- return cg.total_read;
+ std::thread http_thread00(&::read_http_get_thread, url.c_str(), std::move(cg)); // @suppress("Invalid arguments")
+ http_thread00.detach();
}
-void IOUtil::print_stats(const std::string &prefix, const uint64_t out_bytes_total, uint64_t td_ms) {
+void IOUtil::print_stats(const std::string &prefix, const uint64_t out_bytes_total, uint64_t td_ms) noexcept {
if( jau::environment::get().verbose ) {
jau::PLAIN_PRINT(true, "%s: Duration %s s, %s ms", prefix.c_str(),