aboutsummaryrefslogtreecommitdiffstats
path: root/src/byte_stream.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/byte_stream.cpp')
-rw-r--r--src/byte_stream.cpp114
1 files changed, 114 insertions, 0 deletions
diff --git a/src/byte_stream.cpp b/src/byte_stream.cpp
index b834003..ac90ab8 100644
--- a/src/byte_stream.cpp
+++ b/src/byte_stream.cpp
@@ -375,6 +375,11 @@ bool ByteInStream_URL::available(size_t n) noexcept {
return m_buffer.waitForElements(n, m_timeout) >= n;
}
+bool ByteInStream_URL::is_open() const noexcept {
+ // url thread ended, only remaining bytes in buffer available left
+ return async_io_result_t::NONE != m_result && m_buffer.size() > 0;
+}
+
size_t ByteInStream_URL::read(void* out, size_t length) noexcept {
if( 0 == length || end_of_data() ) {
return 0;
@@ -464,6 +469,11 @@ bool ByteInStream_Feed::available(size_t n) noexcept {
return m_buffer.waitForElements(n, m_timeout) >= n;
}
+bool ByteInStream_Feed::is_open() const noexcept {
+ // url thread ended, only remaining bytes in buffer available left
+ return async_io_result_t::NONE != m_result && m_buffer.size() > 0;
+}
+
size_t ByteInStream_Feed::read(void* out, size_t length) noexcept {
if( 0 == length || end_of_data() ) {
return 0;
@@ -558,3 +568,107 @@ std::string ByteInStream_Recorder::to_string() const noexcept {
"], consumed "+jau::to_decstring(m_bytes_consumed)+
", iostate["+jau::io::to_string(rdstate())+"]]";
}
+
+size_t ByteOutStream_File::write(const void* out, size_t length) noexcept {
+ if( 0 == length || fail() ) {
+ return 0;
+ }
+ const uint8_t* out_u8 = static_cast<const uint8_t*>(out);
+ size_t total = 0;
+ while( total < length ) {
+ ssize_t len;
+ while ( ( len = ::write(m_fd, out_u8+total, length-total) ) < 0 ) {
+ if ( errno == EAGAIN || errno == EINTR ) {
+ // cont temp unavail or interruption
+ // unlikely for regular files and we open w/o O_NONBLOCK
+ // - EAGAIN (file) and EWOULDBLOCK (socket) if blocking
+ // - EINTR (signal)
+ continue;
+ }
+ // Check errno == ETIMEDOUT ??
+ setstate_impl( iostate::failbit );
+ DBG_PRINT("ByteOutStream_File::write: Error occurred in %s, errno %d %s", to_string().c_str(), errno, strerror(errno));
+ return 0;
+ }
+ total += static_cast<size_t>(len);
+ if( 0 == len ) {
+ setstate_impl( iostate::failbit);
+ break;
+ }
+ }
+ m_bytes_consumed += total;
+ return total;
+}
+
+ByteOutStream_File::ByteOutStream_File(const int fd) noexcept
+: stats(fd), m_fd(-1),
+ m_bytes_consumed(0)
+{
+ if( !stats.exists() || !stats.has_access() ) {
+ setstate_impl( iostate::failbit ); // Note: conforming with std::ifstream open
+ DBG_PRINT("ByteOutStream_File::ctor: Error, not an existing or accessible file in %s, %s", stats.to_string().c_str(), to_string().c_str());
+ } else {
+ m_fd = ::dup(fd);
+ if ( 0 > m_fd ) {
+ setstate_impl( iostate::failbit ); // Note: conforming with std::ifstream open
+ DBG_PRINT("ByteOutStream_File::ctor: Error occurred in %s, %s", stats.to_string().c_str(), to_string().c_str());
+ }
+ // open file-descriptor is appending anyways
+ }
+}
+
+ByteOutStream_File::ByteOutStream_File(const int dirfd, const std::string& path, const jau::fs::fmode_t mode) noexcept
+: stats(), m_fd(-1),
+ m_bytes_consumed(0)
+{
+ if( jau::io::uri_tk::is_local_file_protocol(path) ) {
+ // cut of leading `file://`
+ std::string path2 = path.substr(7);
+ stats = jau::fs::file_stats(dirfd, path2);
+ } else {
+ stats = jau::fs::file_stats(dirfd, path);
+ }
+ if( ( stats.exists() && !stats.is_file() && !stats.has_fd() ) || !stats.has_access() ) {
+ setstate_impl( iostate::failbit ); // Note: conforming with std::ofstream open (?)
+ DBG_PRINT("ByteOutStream_File::ctor: Error, an existing non[file, fd] or not accessible element in %s, %s", stats.to_string().c_str(), to_string().c_str());
+ } else {
+ // O_NONBLOCK, is useless on files and counter to this class logic
+ if( stats.has_fd() ) {
+ m_fd = ::dup( stats.fd() );
+ } else {
+ const int dst_flags = ( stats.exists() ? 0 : O_CREAT|O_EXCL ) | O_WRONLY|O_BINARY|O_NOCTTY;
+ m_fd = __posix_openat64(dirfd, stats.path().c_str(), dst_flags, jau::fs::posix_protection_bits( mode ));
+ }
+ if ( 0 > m_fd ) {
+ setstate_impl( iostate::failbit ); // Note: conforming with std::ifstream open
+ DBG_PRINT("ByteOutStream_File::ctor: Error while opening %s, %s", stats.to_string().c_str(), to_string().c_str());
+ }
+ if( stats.is_file() ) {
+ off64_t abs_pos = __posix_lseek64(m_fd, 0, SEEK_END);
+ if( 0 > abs_pos ) {
+ setstate_impl( iostate::failbit );
+ ERR_PRINT("Failed to position existing file to end %s, errno %d %s",
+ to_string().c_str(), errno, strerror(errno));
+ }
+ }
+ }
+}
+
+ByteOutStream_File::ByteOutStream_File(const std::string& path, const jau::fs::fmode_t mode) noexcept
+: ByteOutStream_File(AT_FDCWD, path, mode) {}
+
+void ByteOutStream_File::close() noexcept {
+ if( 0 <= m_fd ) {
+ ::close(m_fd);
+ m_fd = -1;
+ }
+}
+
+std::string ByteOutStream_File::to_string() const noexcept {
+ return "ByteOutStream_File[consumed "+jau::to_decstring(m_bytes_consumed)+
+ ", fd "+std::to_string(m_fd)+
+ ", iostate["+jau::io::to_string(rdstate())+
+ "], "+stats.to_string()+
+ "]";
+}
+