diff options
author | Chris Robinson <[email protected]> | 2019-07-08 13:18:10 -0700 |
---|---|---|
committer | Chris Robinson <[email protected]> | 2019-07-08 13:18:10 -0700 |
commit | 3ffb6867a3bac856bacebf25dfbcf48b4482d50f (patch) | |
tree | d4d86883fc891b34ef8fd9ecb32a88340a643381 /examples | |
parent | 2783b4c04b92d949178e9acd4d3407b2315efe16 (diff) |
Rework packet handling in alffplay
Turns out avcodec_send_packet is what can invoke the decode for serialized
codecs, so don't call that in the parse handler thread. The packet queue is
used to get the compressed data from the parse handler to the audio/video
threads.
Additionally, don't serialize the video frame preparation with the decode
thread.
Diffstat (limited to 'examples')
-rw-r--r-- | examples/alffplay.cpp | 398 |
1 files changed, 170 insertions, 228 deletions
diff --git a/examples/alffplay.cpp b/examples/alffplay.cpp index 21b3f9c8..f6bf6863 100644 --- a/examples/alffplay.cpp +++ b/examples/alffplay.cpp @@ -119,7 +119,7 @@ LPALEVENTCALLBACKSOFT alEventCallbackSOFT; const seconds AVNoSyncThreshold{10}; -const milliseconds VideoSyncThreshold(10); +const milliseconds VideoSyncThreshold{10}; #define VIDEO_PICTURE_QUEUE_SIZE 16 const seconds_d64 AudioSyncThreshold{0.03}; @@ -132,8 +132,6 @@ const milliseconds AudioBufferTime{20}; /* Buffer total size, in time (should be divisible by the buffer time) */ const milliseconds AudioBufferTotalTime{800}; -#define MAX_QUEUE_SIZE (15 * 1024 * 1024) /* Bytes of compressed data to keep queued */ - enum { FF_UPDATE_EVENT = SDL_USEREVENT, FF_REFRESH_EVENT, @@ -184,27 +182,58 @@ struct SwsContextDeleter { using SwsContextPtr = std::unique_ptr<SwsContext,SwsContextDeleter>; +template<size_t SizeLimit> class PacketQueue { + std::mutex mMutex; + std::condition_variable mCondVar; std::deque<AVPacket> mPackets; size_t mTotalSize{0}; + bool mFinished{false}; public: - ~PacketQueue() { clear(); } + ~PacketQueue() + { + for(AVPacket &pkt : mPackets) + av_packet_unref(&pkt); + mPackets.clear(); + mTotalSize = 0; + } - bool empty() const noexcept { return mPackets.empty(); } - size_t totalSize() const noexcept { return mTotalSize; } + void setFinished() + { + { + std::lock_guard<std::mutex> _{mMutex}; + mFinished = true; + } + mCondVar.notify_one(); + } - void put(const AVPacket *pkt) + AVPacket *getPacket(std::unique_lock<std::mutex> &lock) { - mPackets.push_back(AVPacket{}); - if(av_packet_ref(&mPackets.back(), pkt) != 0) - mPackets.pop_back(); - else - mTotalSize += mPackets.back().size; + while(mPackets.empty() && !mFinished) + mCondVar.wait(lock); + return mPackets.empty() ? nullptr : &mPackets.front(); } - AVPacket *front() noexcept - { return &mPackets.front(); } + bool put(const AVPacket *pkt) + { + { + std::unique_lock<std::mutex> lock{mMutex}; + if(mTotalSize >= SizeLimit) + return false; + + mPackets.push_back(AVPacket{}); + if(av_packet_ref(&mPackets.back(), pkt) != 0) + { + mPackets.pop_back(); + return true; + } + + mTotalSize += mPackets.back().size; + } + mCondVar.notify_one(); + return true; + } void pop() { @@ -214,13 +243,7 @@ public: mPackets.pop_front(); } - void clear() - { - for(AVPacket &pkt : mPackets) - av_packet_unref(&pkt); - mPackets.clear(); - mTotalSize = 0; - } + std::mutex &getMutex() noexcept { return mMutex; } }; @@ -232,8 +255,7 @@ struct AudioState { AVStream *mStream{nullptr}; AVCodecCtxPtr mCodecCtx; - std::mutex mQueueMtx; - std::condition_variable mQueueCond; + PacketQueue<2*1024*1024> mPackets; /* Used for clock difference average computation */ seconds_d64 mClockDiffAvg{0}; @@ -309,8 +331,7 @@ struct VideoState { AVStream *mStream{nullptr}; AVCodecCtxPtr mCodecCtx; - std::mutex mQueueMtx; - std::condition_variable mQueueCond; + PacketQueue<14*1024*1024> mPackets; nanoseconds mClock{0}; nanoseconds mFrameTimer{0}; @@ -320,14 +341,13 @@ struct VideoState { /* time (av_gettime) at which we updated mCurrentPts - used to have running video pts */ microseconds mCurrentPtsTime{0}; - /* Decompressed video frame, and swscale context for conversion */ - AVFramePtr mDecodedFrame; + /* Swscale context for format conversion */ SwsContextPtr mSwscaleCtx; struct Picture { SDL_Texture *mImage{nullptr}; int mWidth{0}, mHeight{0}; /* Logical image size (actual size may be larger) */ - std::atomic<bool> mUpdated{false}; + AVFramePtr mFrame{av_frame_alloc()}; nanoseconds mPts{0}; ~Picture() @@ -339,6 +359,7 @@ struct VideoState { }; std::array<Picture,VIDEO_PICTURE_QUEUE_SIZE> mPictQ; size_t mPictQSize{0}, mPictQRead{0}, mPictQWrite{0}; + size_t mPictQPrepSize{0}, mPictQPrep{0}; std::mutex mPictQMutex; std::condition_variable mPictQCond; bool mFirstUpdate{true}; @@ -354,7 +375,7 @@ struct VideoState { void display(SDL_Window *screen, SDL_Renderer *renderer); void refreshTimer(SDL_Window *screen, SDL_Renderer *renderer); void updatePicture(SDL_Window *screen, SDL_Renderer *renderer); - int queuePicture(nanoseconds pts); + bool queuePicture(nanoseconds pts, AVFrame *frame); int handler(); }; @@ -366,11 +387,6 @@ struct MovieState { microseconds mClockBase{0}; - std::mutex mSendMtx; - std::condition_variable mSendCond; - /* NOTE: false/clear = need data, true/set = no data needed */ - std::atomic_flag mSendDataGood; - std::atomic<bool> mQuit{false}; AudioState mAudio; @@ -536,22 +552,20 @@ int AudioState::decodeFrame() { while(!mMovie.mQuit.load(std::memory_order_relaxed)) { - std::unique_lock<std::mutex> lock(mQueueMtx); - int ret = avcodec_receive_frame(mCodecCtx.get(), mDecodedFrame.get()); - if(ret == AVERROR(EAGAIN)) { - mMovie.mSendDataGood.clear(std::memory_order_relaxed); - std::unique_lock<std::mutex>(mMovie.mSendMtx).unlock(); - mMovie.mSendCond.notify_one(); - do { - mQueueCond.wait(lock); - ret = avcodec_receive_frame(mCodecCtx.get(), mDecodedFrame.get()); - } while(ret == AVERROR(EAGAIN)); + std::unique_lock<std::mutex> lock{mPackets.getMutex()}; + AVPacket *lastpkt{}; + while((lastpkt=mPackets.getPacket(lock)) != nullptr) + { + int ret{avcodec_send_packet(mCodecCtx.get(), lastpkt)}; + if(ret == AVERROR(EAGAIN)) break; + mPackets.pop(); + } + if(!lastpkt) + avcodec_send_packet(mCodecCtx.get(), nullptr); } - lock.unlock(); + int ret{avcodec_receive_frame(mCodecCtx.get(), mDecodedFrame.get())}; if(ret == AVERROR_EOF) break; - mMovie.mSendDataGood.clear(std::memory_order_relaxed); - mMovie.mSendCond.notify_one(); if(ret < 0) { std::cerr<< "Failed to decode frame: "<<ret <<std::endl; @@ -567,7 +581,7 @@ int AudioState::decodeFrame() /* If provided, update w/ pts */ if(mDecodedFrame->best_effort_timestamp != AV_NOPTS_VALUE) mCurrentPts = std::chrono::duration_cast<nanoseconds>( - seconds_d64(av_q2d(mStream->time_base)*mDecodedFrame->best_effort_timestamp) + seconds_d64{av_q2d(mStream->time_base)*mDecodedFrame->best_effort_timestamp} ); if(mDecodedFrame->nb_samples > mSamplesMax) @@ -639,7 +653,7 @@ bool AudioState::readAudio(uint8_t *samples, int length) // Adjust the device start time and current pts by the amount we're // skipping/duplicating, so that the clock remains correct for the // current stream position. - auto skip = nanoseconds(seconds(mSamplesPos)) / mCodecCtx->sample_rate; + auto skip = nanoseconds{seconds{mSamplesPos}} / mCodecCtx->sample_rate; mDeviceStartTime -= skip; mCurrentPts += skip; continue; @@ -692,7 +706,7 @@ void AL_APIENTRY AudioState::EventCallback(ALenum eventType, ALuint object, ALui ALsizei length, const ALchar *message, void *userParam) { - AudioState *self = reinterpret_cast<AudioState*>(userParam); + AudioState *self = static_cast<AudioState*>(userParam); if(eventType == AL_EVENT_TYPE_BUFFER_COMPLETED_SOFT) { @@ -719,7 +733,7 @@ void AL_APIENTRY AudioState::EventCallback(ALenum eventType, ALuint object, ALui std::cout<< "\n" "Object ID: "<<object<<"\n" "Parameter: "<<param<<"\n" - "Message: "<<std::string(message, length)<<"\n----"<< + "Message: "<<std::string{message, static_cast<ALuint>(length)}<<"\n----"<< std::endl; if(eventType == AL_EVENT_TYPE_DISCONNECTED_SOFT) @@ -727,7 +741,6 @@ void AL_APIENTRY AudioState::EventCallback(ALenum eventType, ALuint object, ALui { std::lock_guard<std::mutex> lock(self->mSrcMutex); self->mConnected.clear(std::memory_order_release); } - std::unique_lock<std::mutex>(self->mSrcMutex).unlock(); self->mSrcCond.notify_one(); } } @@ -903,7 +916,7 @@ int AudioState::handler() mFormat = AL_FORMAT_STEREO16; } } - void *samples = nullptr; + void *samples{nullptr}; ALsizei buffer_len = std::chrono::duration_cast<std::chrono::duration<int>>( mCodecCtx->sample_rate * AudioBufferTime).count() * mFrameSize; @@ -1012,29 +1025,26 @@ int AudioState::handler() alGetSourcei(mSource, AL_BUFFERS_QUEUED, &queued); while(static_cast<ALuint>(queued) < mBuffers.size()) { - ALuint bufid = mBuffers[mBufferIdx]; - uint8_t *ptr = static_cast<uint8_t*>(samples); + const ALuint bufid{mBuffers[mBufferIdx]}; + /* Read the next chunk of data, filling the buffer, and queue it on + * the source. + */ #ifdef AL_SOFT_map_buffer - bool mapped{false}; - if(!ptr) + if(!samples) { - ptr = static_cast<uint8_t*>(alMapBufferSOFT(bufid, 0, buffer_len, + auto ptr = static_cast<uint8_t*>(alMapBufferSOFT(bufid, 0, buffer_len, AL_MAP_WRITE_BIT_SOFT)); - if(!ptr) break; - mapped = true; + bool got_audio{readAudio(ptr, buffer_len)}; + alUnmapBufferSOFT(bufid); + if(!got_audio) break; } + else #endif - - /* Read the next chunk of data, filling the buffer, and queue it on - * the source */ - bool got_audio = readAudio(ptr, buffer_len); -#ifdef AL_SOFT_map_buffer - if(mapped) alUnmapBufferSOFT(bufid); -#endif - if(!got_audio) break; - - if(samples) + { + if(!readAudio(static_cast<uint8_t*>(samples), buffer_len)) + break; alBufferData(bufid, mFormat, samples, buffer_len, mCodecCtx->sample_rate); + } alSourceQueueBuffers(mSource, 1, &bufid); mBufferIdx = (mBufferIdx+1) % mBuffers.size(); @@ -1169,18 +1179,18 @@ void VideoState::refreshTimer(SDL_Window *screen, SDL_Renderer *renderer) mPictQCond.notify_all(); return; } - schedRefresh(milliseconds(100)); + schedRefresh(milliseconds{100}); return; } - std::unique_lock<std::mutex> lock(mPictQMutex); + std::unique_lock<std::mutex> lock{mPictQMutex}; retry: - if(mPictQSize == 0) + if(mPictQPrepSize == 0 || mMovie.mQuit.load(std::memory_order_relaxed)) { if(mEOS) mFinalUpdate = true; else - schedRefresh(milliseconds(1)); + schedRefresh(milliseconds{10}); lock.unlock(); mPictQCond.notify_all(); return; @@ -1192,7 +1202,7 @@ retry: /* Get delay using the frame pts and the pts from last frame. */ auto delay = vp->mPts - mFrameLastPts; - if(delay <= seconds::zero() || delay >= seconds(1)) + if(delay <= seconds::zero() || delay >= seconds{1}) { /* If incorrect delay, use previous one. */ delay = mFrameLastDelay; @@ -1200,6 +1210,7 @@ retry: /* Save for next frame. */ mFrameLastDelay = delay; mFrameLastPts = vp->mPts; + lock.unlock(); /* Update delay to sync to clock if not master source. */ if(mMovie.mAVSyncType != SyncMaster::Video) @@ -1224,8 +1235,10 @@ retry: if(!(actual_delay >= VideoSyncThreshold)) { /* We don't have time to handle this picture, just skip to the next one. */ + lock.lock(); mPictQRead = (mPictQRead+1)%mPictQ.size(); - mPictQSize--; + --mPictQSize; --mPictQPrepSize; + mPictQCond.notify_all(); goto retry; } schedRefresh(std::chrono::duration_cast<milliseconds>(actual_delay)); @@ -1234,8 +1247,9 @@ retry: display(screen, renderer); /* Update queue for next picture. */ + lock.lock(); mPictQRead = (mPictQRead+1)%mPictQ.size(); - mPictQSize--; + --mPictQSize; --mPictQPrepSize; lock.unlock(); mPictQCond.notify_all(); } @@ -1245,7 +1259,10 @@ retry: */ void VideoState::updatePicture(SDL_Window *screen, SDL_Renderer *renderer) { - Picture *vp = &mPictQ[mPictQWrite]; + if(mMovie.mQuit.load(std::memory_order_relaxed)) + return; + + Picture *vp = &mPictQ[mPictQPrep]; bool fmt_updated = false; /* allocate or resize the buffer! */ @@ -1282,11 +1299,11 @@ void VideoState::updatePicture(SDL_Window *screen, SDL_Renderer *renderer) } } + AVFrame *frame{vp->mFrame.get()}; if(vp->mImage) { - AVFrame *frame = mDecodedFrame.get(); - void *pixels = nullptr; - int pitch = 0; + void *pixels{nullptr}; + int pitch{0}; if(mCodecCtx->pix_fmt == AV_PIX_FMT_YUV420P) SDL_UpdateYUVTexture(vp->mImage, nullptr, @@ -1299,10 +1316,10 @@ void VideoState::updatePicture(SDL_Window *screen, SDL_Renderer *renderer) else { // Convert the image into YUV format that SDL uses - int coded_w = mCodecCtx->coded_width; - int coded_h = mCodecCtx->coded_height; - int w = mCodecCtx->width; - int h = mCodecCtx->height; + int coded_w{mCodecCtx->coded_width}; + int coded_h{mCodecCtx->coded_height}; + int w{mCodecCtx->width}; + int h{mCodecCtx->height}; if(!mSwscaleCtx || fmt_updated) { mSwscaleCtx.reset(sws_getContext( @@ -1323,78 +1340,64 @@ void VideoState::updatePicture(SDL_Window *screen, SDL_Renderer *renderer) pict_linesize[1] = pitch / 2; pict_linesize[2] = pitch / 2; - sws_scale(mSwscaleCtx.get(), reinterpret_cast<uint8_t**>(frame->data), - frame->linesize, 0, h, pict_data, pict_linesize); + sws_scale(mSwscaleCtx.get(), reinterpret_cast<uint8_t**>(frame->data), frame->linesize, + 0, h, pict_data, pict_linesize); SDL_UnlockTexture(vp->mImage); } } + av_frame_unref(frame); - vp->mUpdated.store(true, std::memory_order_release); - std::unique_lock<std::mutex>(mPictQMutex).unlock(); - mPictQCond.notify_one(); + mPictQPrep = (mPictQPrep+1)%mPictQ.size(); + ++mPictQPrepSize; } -int VideoState::queuePicture(nanoseconds pts) +bool VideoState::queuePicture(nanoseconds pts, AVFrame *frame) { /* Wait until we have space for a new pic */ - std::unique_lock<std::mutex> lock(mPictQMutex); + std::unique_lock<std::mutex> lock{mPictQMutex}; while(mPictQSize >= mPictQ.size() && !mMovie.mQuit.load(std::memory_order_relaxed)) mPictQCond.wait(lock); - lock.unlock(); if(mMovie.mQuit.load(std::memory_order_relaxed)) - return -1; + return false; - Picture *vp = &mPictQ[mPictQWrite]; + Picture *vp{&mPictQ[mPictQWrite]}; + av_frame_move_ref(vp->mFrame.get(), frame); + vp->mPts = pts; + + mPictQWrite = (mPictQWrite+1)%mPictQ.size(); + ++mPictQSize; + lock.unlock(); /* We have to create/update the picture in the main thread */ - vp->mUpdated.store(false, std::memory_order_relaxed); SDL_Event evt{}; evt.user.type = FF_UPDATE_EVENT; evt.user.data1 = this; SDL_PushEvent(&evt); - /* Wait until the picture is updated. */ - lock.lock(); - while(!vp->mUpdated.load(std::memory_order_relaxed)) - { - if(mMovie.mQuit.load(std::memory_order_relaxed)) - return -1; - mPictQCond.wait(lock); - } - if(mMovie.mQuit.load(std::memory_order_relaxed)) - return -1; - vp->mPts = pts; - - mPictQWrite = (mPictQWrite+1)%mPictQ.size(); - mPictQSize++; - lock.unlock(); - - return 0; + return true; } int VideoState::handler() { - mDecodedFrame.reset(av_frame_alloc()); + AVFramePtr decoded_frame{av_frame_alloc()}; while(!mMovie.mQuit.load(std::memory_order_relaxed)) { - std::unique_lock<std::mutex> lock(mQueueMtx); - /* Decode video frame */ - int ret = avcodec_receive_frame(mCodecCtx.get(), mDecodedFrame.get()); - if(ret == AVERROR(EAGAIN)) { - mMovie.mSendDataGood.clear(std::memory_order_relaxed); - std::unique_lock<std::mutex>(mMovie.mSendMtx).unlock(); - mMovie.mSendCond.notify_one(); - do { - mQueueCond.wait(lock); - ret = avcodec_receive_frame(mCodecCtx.get(), mDecodedFrame.get()); - } while(ret == AVERROR(EAGAIN)); + std::unique_lock<std::mutex> lock{mPackets.getMutex()}; + AVPacket *lastpkt{}; + while((lastpkt=mPackets.getPacket(lock)) != nullptr) + { + int ret{avcodec_send_packet(mCodecCtx.get(), lastpkt)}; + if(ret == AVERROR(EAGAIN)) break; + mPackets.pop(); + } + if(!lastpkt) + avcodec_send_packet(mCodecCtx.get(), nullptr); } - lock.unlock(); + /* Decode video frame */ + int ret = avcodec_receive_frame(mCodecCtx.get(), decoded_frame.get()); if(ret == AVERROR_EOF) break; - mMovie.mSendDataGood.clear(std::memory_order_relaxed); - mMovie.mSendCond.notify_one(); if(ret < 0) { std::cerr<< "Failed to decode frame: "<<ret <<std::endl; @@ -1402,31 +1405,22 @@ int VideoState::handler() } /* Get the PTS for this frame. */ - nanoseconds pts; - if(mDecodedFrame->best_effort_timestamp != AV_NOPTS_VALUE) + if(decoded_frame->best_effort_timestamp != AV_NOPTS_VALUE) mClock = std::chrono::duration_cast<nanoseconds>( - seconds_d64(av_q2d(mStream->time_base)*mDecodedFrame->best_effort_timestamp) - ); - pts = mClock; + seconds_d64{av_q2d(mStream->time_base)*decoded_frame->best_effort_timestamp}); + nanoseconds pts{mClock}; /* Update the video clock to the next expected PTS. */ auto frame_delay = av_q2d(mCodecCtx->time_base); - frame_delay += mDecodedFrame->repeat_pict * (frame_delay * 0.5); - mClock += std::chrono::duration_cast<nanoseconds>(seconds_d64(frame_delay)); + frame_delay += decoded_frame->repeat_pict * (frame_delay * 0.5); + mClock += std::chrono::duration_cast<nanoseconds>(seconds_d64{frame_delay}); - if(queuePicture(pts) < 0) + if(!queuePicture(pts, decoded_frame.get())) break; - av_frame_unref(mDecodedFrame.get()); } mEOS = true; std::unique_lock<std::mutex> lock(mPictQMutex); - if(mMovie.mQuit.load(std::memory_order_relaxed)) - { - mPictQRead = 0; - mPictQWrite = 0; - mPictQSize = 0; - } while(!mFinalUpdate) mPictQCond.wait(lock); @@ -1436,7 +1430,7 @@ int VideoState::handler() int MovieState::decode_interrupt_cb(void *ctx) { - return reinterpret_cast<MovieState*>(ctx)->mQuit.load(std::memory_order_relaxed); + return static_cast<MovieState*>(ctx)->mQuit.load(std::memory_order_relaxed); } bool MovieState::prepare() @@ -1531,15 +1525,11 @@ int MovieState::streamComponentOpen(int stream_index) case AVMEDIA_TYPE_AUDIO: mAudio.mStream = mFormatCtx->streams[stream_index]; mAudio.mCodecCtx = std::move(avctx); - - mAudioThread = std::thread(std::mem_fn(&AudioState::handler), &mAudio); break; case AVMEDIA_TYPE_VIDEO: mVideo.mStream = mFormatCtx->streams[stream_index]; mVideo.mCodecCtx = std::move(avctx); - - mVideoThread = std::thread(std::mem_fn(&VideoState::handler), &mVideo); break; default: @@ -1551,17 +1541,15 @@ int MovieState::streamComponentOpen(int stream_index) int MovieState::parse_handler() { - int video_index = -1; - int audio_index = -1; + auto &audio_queue = mAudio.mPackets; + auto &video_queue = mVideo.mPackets; + + int video_index{-1}; + int audio_index{-1}; /* Dump information about file onto standard error */ av_dump_format(mFormatCtx.get(), 0, mFilename.c_str(), 0); - /* Set the base time 500ms ahead of the current av time. */ - mClockBase = get_avtime() + milliseconds{500}; - mVideo.mCurrentPtsTime = mClockBase; - mVideo.mFrameTimer = mVideo.mCurrentPtsTime; - /* Find the first video and audio streams */ for(unsigned int i = 0;i < mFormatCtx->nb_streams;i++) { @@ -1578,82 +1566,40 @@ int MovieState::parse_handler() mQuit = true; } - PacketQueue audio_queue, video_queue; - bool input_finished = false; + /* Set the base time 500ms ahead of the current av time. */ + mClockBase = get_avtime() + milliseconds{500}; + mVideo.mCurrentPtsTime = mClockBase; + mVideo.mFrameTimer = mVideo.mCurrentPtsTime; + + if(audio_index >= 0) + mAudioThread = std::thread{std::mem_fn(&AudioState::handler), &mAudio}; + if(video_index >= 0) + mVideoThread = std::thread{std::mem_fn(&VideoState::handler), &mVideo}; /* Main packet reading/dispatching loop */ - while(!mQuit.load(std::memory_order_relaxed) && !input_finished) + while(!mQuit.load(std::memory_order_relaxed)) { AVPacket packet; if(av_read_frame(mFormatCtx.get(), &packet) < 0) - input_finished = true; - else - { - /* Copy the packet into the queue it's meant for. */ - if(packet.stream_index == video_index) - video_queue.put(&packet); - else if(packet.stream_index == audio_index) - audio_queue.put(&packet); - av_packet_unref(&packet); - } - - do { - /* Send whatever queued packets we have. */ - if(!audio_queue.empty()) - { - std::unique_lock<std::mutex> lock(mAudio.mQueueMtx); - int ret; - do { - ret = avcodec_send_packet(mAudio.mCodecCtx.get(), audio_queue.front()); - if(ret != AVERROR(EAGAIN)) audio_queue.pop(); - } while(ret != AVERROR(EAGAIN) && !audio_queue.empty()); - lock.unlock(); - mAudio.mQueueCond.notify_one(); - } - if(!video_queue.empty()) - { - std::unique_lock<std::mutex> lock(mVideo.mQueueMtx); - int ret; - do { - ret = avcodec_send_packet(mVideo.mCodecCtx.get(), video_queue.front()); - if(ret != AVERROR(EAGAIN)) video_queue.pop(); - } while(ret != AVERROR(EAGAIN) && !video_queue.empty()); - lock.unlock(); - mVideo.mQueueCond.notify_one(); - } - /* If the queues are completely empty, or it's not full and there's - * more input to read, go get more. - */ - size_t queue_size = audio_queue.totalSize() + video_queue.totalSize(); - if(queue_size == 0 || (queue_size < MAX_QUEUE_SIZE && !input_finished)) - break; + break; - /* Nothing to send or get for now, wait a bit and try again. */ - { std::unique_lock<std::mutex> lock(mSendMtx); - if(mSendDataGood.test_and_set(std::memory_order_relaxed)) - mSendCond.wait_for(lock, milliseconds{10}); - } - } while(!mQuit.load(std::memory_order_relaxed)); - } - /* Pass a null packet to finish the send buffers (the receive functions - * will get AVERROR_EOF when emptied). - */ - if(mVideo.mCodecCtx) - { - { std::lock_guard<std::mutex> lock(mVideo.mQueueMtx); - avcodec_send_packet(mVideo.mCodecCtx.get(), nullptr); + /* Copy the packet into the queue it's meant for. */ + if(packet.stream_index == video_index) + { + while(!mQuit.load(std::memory_order_acquire) && !video_queue.put(&packet)) + std::this_thread::sleep_for(milliseconds{50}); } - mVideo.mQueueCond.notify_one(); - } - if(mAudio.mCodecCtx) - { - { std::lock_guard<std::mutex> lock(mAudio.mQueueMtx); - avcodec_send_packet(mAudio.mCodecCtx.get(), nullptr); + else if(packet.stream_index == audio_index) + { + while(!mQuit.load(std::memory_order_acquire) && !audio_queue.put(&packet)) + std::this_thread::sleep_for(milliseconds{50}); } - mAudio.mQueueCond.notify_one(); + + av_packet_unref(&packet); } - video_queue.clear(); - audio_queue.clear(); + /* Finish the queues so the receivers know nothing more is coming. */ + if(mVideo.mCodecCtx) video_queue.setFinished(); + if(mAudio.mCodecCtx) audio_queue.setFinished(); /* all done - wait for it */ if(mVideoThread.joinable()) @@ -1916,15 +1862,11 @@ int main(int argc, char *argv[]) break; case FF_UPDATE_EVENT: - reinterpret_cast<VideoState*>(event.user.data1)->updatePicture( - screen, renderer - ); + static_cast<VideoState*>(event.user.data1)->updatePicture(screen, renderer); break; case FF_REFRESH_EVENT: - reinterpret_cast<VideoState*>(event.user.data1)->refreshTimer( - screen, renderer - ); + static_cast<VideoState*>(event.user.data1)->refreshTimer(screen, renderer); break; case FF_MOVIE_DONE_EVENT: |