diff options
Diffstat (limited to 'src/byte_stream.cpp')
-rw-r--r-- | src/byte_stream.cpp | 114 |
1 files changed, 114 insertions, 0 deletions
diff --git a/src/byte_stream.cpp b/src/byte_stream.cpp index b834003..ac90ab8 100644 --- a/src/byte_stream.cpp +++ b/src/byte_stream.cpp @@ -375,6 +375,11 @@ bool ByteInStream_URL::available(size_t n) noexcept { return m_buffer.waitForElements(n, m_timeout) >= n; } +bool ByteInStream_URL::is_open() const noexcept { + // url thread ended, only remaining bytes in buffer available left + return async_io_result_t::NONE != m_result && m_buffer.size() > 0; +} + size_t ByteInStream_URL::read(void* out, size_t length) noexcept { if( 0 == length || end_of_data() ) { return 0; @@ -464,6 +469,11 @@ bool ByteInStream_Feed::available(size_t n) noexcept { return m_buffer.waitForElements(n, m_timeout) >= n; } +bool ByteInStream_Feed::is_open() const noexcept { + // url thread ended, only remaining bytes in buffer available left + return async_io_result_t::NONE != m_result && m_buffer.size() > 0; +} + size_t ByteInStream_Feed::read(void* out, size_t length) noexcept { if( 0 == length || end_of_data() ) { return 0; @@ -558,3 +568,107 @@ std::string ByteInStream_Recorder::to_string() const noexcept { "], consumed "+jau::to_decstring(m_bytes_consumed)+ ", iostate["+jau::io::to_string(rdstate())+"]]"; } + +size_t ByteOutStream_File::write(const void* out, size_t length) noexcept { + if( 0 == length || fail() ) { + return 0; + } + const uint8_t* out_u8 = static_cast<const uint8_t*>(out); + size_t total = 0; + while( total < length ) { + ssize_t len; + while ( ( len = ::write(m_fd, out_u8+total, length-total) ) < 0 ) { + if ( errno == EAGAIN || errno == EINTR ) { + // cont temp unavail or interruption + // unlikely for regular files and we open w/o O_NONBLOCK + // - EAGAIN (file) and EWOULDBLOCK (socket) if blocking + // - EINTR (signal) + continue; + } + // Check errno == ETIMEDOUT ?? + setstate_impl( iostate::failbit ); + DBG_PRINT("ByteOutStream_File::write: Error occurred in %s, errno %d %s", to_string().c_str(), errno, strerror(errno)); + return 0; + } + total += static_cast<size_t>(len); + if( 0 == len ) { + setstate_impl( iostate::failbit); + break; + } + } + m_bytes_consumed += total; + return total; +} + +ByteOutStream_File::ByteOutStream_File(const int fd) noexcept +: stats(fd), m_fd(-1), + m_bytes_consumed(0) +{ + if( !stats.exists() || !stats.has_access() ) { + setstate_impl( iostate::failbit ); // Note: conforming with std::ifstream open + DBG_PRINT("ByteOutStream_File::ctor: Error, not an existing or accessible file in %s, %s", stats.to_string().c_str(), to_string().c_str()); + } else { + m_fd = ::dup(fd); + if ( 0 > m_fd ) { + setstate_impl( iostate::failbit ); // Note: conforming with std::ifstream open + DBG_PRINT("ByteOutStream_File::ctor: Error occurred in %s, %s", stats.to_string().c_str(), to_string().c_str()); + } + // open file-descriptor is appending anyways + } +} + +ByteOutStream_File::ByteOutStream_File(const int dirfd, const std::string& path, const jau::fs::fmode_t mode) noexcept +: stats(), m_fd(-1), + m_bytes_consumed(0) +{ + if( jau::io::uri_tk::is_local_file_protocol(path) ) { + // cut of leading `file://` + std::string path2 = path.substr(7); + stats = jau::fs::file_stats(dirfd, path2); + } else { + stats = jau::fs::file_stats(dirfd, path); + } + if( ( stats.exists() && !stats.is_file() && !stats.has_fd() ) || !stats.has_access() ) { + setstate_impl( iostate::failbit ); // Note: conforming with std::ofstream open (?) + DBG_PRINT("ByteOutStream_File::ctor: Error, an existing non[file, fd] or not accessible element in %s, %s", stats.to_string().c_str(), to_string().c_str()); + } else { + // O_NONBLOCK, is useless on files and counter to this class logic + if( stats.has_fd() ) { + m_fd = ::dup( stats.fd() ); + } else { + const int dst_flags = ( stats.exists() ? 0 : O_CREAT|O_EXCL ) | O_WRONLY|O_BINARY|O_NOCTTY; + m_fd = __posix_openat64(dirfd, stats.path().c_str(), dst_flags, jau::fs::posix_protection_bits( mode )); + } + if ( 0 > m_fd ) { + setstate_impl( iostate::failbit ); // Note: conforming with std::ifstream open + DBG_PRINT("ByteOutStream_File::ctor: Error while opening %s, %s", stats.to_string().c_str(), to_string().c_str()); + } + if( stats.is_file() ) { + off64_t abs_pos = __posix_lseek64(m_fd, 0, SEEK_END); + if( 0 > abs_pos ) { + setstate_impl( iostate::failbit ); + ERR_PRINT("Failed to position existing file to end %s, errno %d %s", + to_string().c_str(), errno, strerror(errno)); + } + } + } +} + +ByteOutStream_File::ByteOutStream_File(const std::string& path, const jau::fs::fmode_t mode) noexcept +: ByteOutStream_File(AT_FDCWD, path, mode) {} + +void ByteOutStream_File::close() noexcept { + if( 0 <= m_fd ) { + ::close(m_fd); + m_fd = -1; + } +} + +std::string ByteOutStream_File::to_string() const noexcept { + return "ByteOutStream_File[consumed "+jau::to_decstring(m_bytes_consumed)+ + ", fd "+std::to_string(m_fd)+ + ", iostate["+jau::io::to_string(rdstate())+ + "], "+stats.to_string()+ + "]"; +} + |