diff --git a/sys/v4l/gstv4lsrc.h b/sys/v4l/gstv4lsrc.h index bcf3738329..070ac1b39f 100644 --- a/sys/v4l/gstv4lsrc.h +++ b/sys/v4l/gstv4lsrc.h @@ -65,6 +65,7 @@ struct _GstV4lSrc { /* num of queued frames and some GThread stuff to wait if there's not enough */ guint16 num_queued_frames; + guint64 total_queued_frames; GMutex * mutex_queued_frames; GCond * cond_queued_frames; diff --git a/sys/v4l/v4lsrc_calls.c b/sys/v4l/v4lsrc_calls.c index eea55d19b9..b96e7235d9 100644 --- a/sys/v4l/v4lsrc_calls.c +++ b/sys/v4l/v4lsrc_calls.c @@ -77,12 +77,9 @@ gst_v4lsrc_queue_frame (GstV4lSrc *v4lsrc, v4lsrc->mmap.frame = num; - g_mutex_lock(v4lsrc->mutex_queued_frames); - - if (v4lsrc->frame_queued[num] < 0) + if (v4lsrc->frame_queued[num] == -1) { //v4lsrc->frame_queued[num] = 0; - g_mutex_unlock(v4lsrc->mutex_queued_frames); return TRUE; } @@ -91,15 +88,12 @@ gst_v4lsrc_queue_frame (GstV4lSrc *v4lsrc, gst_element_error(GST_ELEMENT(v4lsrc), "Error queueing a buffer (%d): %s", num, g_strerror(errno)); - g_mutex_unlock(v4lsrc->mutex_queued_frames); return FALSE; } v4lsrc->frame_queued[num] = 1; - v4lsrc->num_queued_frames++; - g_cond_broadcast(v4lsrc->cond_queued_frames); - g_mutex_unlock(v4lsrc->mutex_queued_frames); + v4lsrc->total_queued_frames++; return TRUE; } @@ -119,29 +113,86 @@ gst_v4lsrc_soft_sync_thread (void *arg) DEBUG("starting software sync thread"); -#if 0 - /* Allow easy shutting down by other processes... */ - pthread_setcancelstate( PTHREAD_CANCEL_ENABLE, NULL ); - pthread_setcanceltype( PTHREAD_CANCEL_ASYNCHRONOUS, NULL ); -#endif - while (1) { - /* are there queued frames left? */ - g_mutex_lock(v4lsrc->mutex_queued_frames); - if (v4lsrc->num_queued_frames < MIN_BUFFERS_QUEUED) + /* this cycle is non-onbligatory - we just queue frames + * as they become available, below, we'll wait for queues + * if we don't have enough of them */ + while (1) { - if (v4lsrc->frame_queued[frame] < 0) { + gint qframe = v4lsrc->total_queued_frames % v4lsrc->mbuf.frames; + + g_mutex_lock(v4lsrc->mutex_queued_frames); + + if (v4lsrc->frame_queued[qframe] == -2) + { + if (!gst_v4lsrc_queue_frame(v4lsrc, qframe)) + { + g_mutex_unlock(v4lsrc->mutex_queued_frames); + + g_mutex_lock(v4lsrc->mutex_soft_sync); + /* note that we use *frame* here, not *qframe* - + * reason is simple, the thread waiting for us needs + * to know that we stopped syncing on *this* frame, + * we didn't even start with qframe yet */ + v4lsrc->isready_soft_sync[frame] = -1; + g_cond_broadcast(v4lsrc->cond_soft_sync[frame]); + g_mutex_unlock(v4lsrc->mutex_soft_sync); + goto end; + } + } + else { g_mutex_unlock(v4lsrc->mutex_queued_frames); break; } - DEBUG("Waiting for new frames to be queued (%d < %d)", - v4lsrc->num_queued_frames, MIN_BUFFERS_QUEUED); - - g_cond_wait(v4lsrc->cond_queued_frames, v4lsrc->mutex_queued_frames); + g_mutex_unlock(v4lsrc->mutex_queued_frames); + } + + /* are there queued frames left? */ + while (v4lsrc->num_queued_frames < MIN_BUFFERS_QUEUED) + { + gint qframe = v4lsrc->total_queued_frames % v4lsrc->mbuf.frames; + + g_mutex_lock(v4lsrc->mutex_queued_frames); + + if (v4lsrc->frame_queued[frame] == -1) { + g_mutex_unlock(v4lsrc->mutex_queued_frames); + break; + } + + DEBUG("Waiting for new frames to be queued (%d < %d, frame=%d)", + v4lsrc->num_queued_frames, MIN_BUFFERS_QUEUED, qframe); + + /* sleep for new buffers to be completed encoding. After that, + * requeue them so we have more than MIN_QUEUES_NEEDED buffers + * free */ + while (v4lsrc->frame_queued[qframe] != -2) + { + g_cond_wait(v4lsrc->cond_queued_frames, + v4lsrc->mutex_queued_frames); + if (v4lsrc->frame_queued[qframe] == -1) { + g_mutex_unlock(v4lsrc->mutex_queued_frames); + goto end; + } + } + if (!gst_v4lsrc_queue_frame(v4lsrc, qframe)) + { + g_mutex_unlock(v4lsrc->mutex_queued_frames); + + g_mutex_lock(v4lsrc->mutex_soft_sync); + /* note that we use *frame* here, not *qframe* - + * reason is simple, the thread waiting for us needs + * to know that we stopped syncing on *this* frame, + * we didn't even start with qframe yet */ + v4lsrc->isready_soft_sync[frame] = -1; + g_cond_broadcast(v4lsrc->cond_soft_sync[frame]); + g_mutex_unlock(v4lsrc->mutex_soft_sync); + goto end; + } + + g_mutex_unlock(v4lsrc->mutex_queued_frames); } - g_mutex_unlock(v4lsrc->mutex_queued_frames); if (!v4lsrc->num_queued_frames) { @@ -363,6 +414,7 @@ gst_v4lsrc_capture_start (GstV4lSrc *v4lsrc) GST_V4L_CHECK_ACTIVE(GST_V4LELEMENT(v4lsrc)); v4lsrc->num_queued_frames = 0; + v4lsrc->total_queued_frames = 0; /* queue all buffers, this starts streaming capture */ for (n=0;nmbuf.frames;n++) @@ -441,9 +493,11 @@ gst_v4lsrc_requeue_frame (GstV4lSrc *v4lsrc, gint num) GST_V4L_CHECK_OPEN(GST_V4LELEMENT(v4lsrc)); GST_V4L_CHECK_ACTIVE(GST_V4LELEMENT(v4lsrc)); - /* and let's queue the buffer */ - if (!gst_v4lsrc_queue_frame(v4lsrc, num)) - return FALSE; + /* mark frame as 'ready to requeue' */ + g_mutex_lock(v4lsrc->mutex_queued_frames); + v4lsrc->frame_queued[num] = -2; + g_cond_broadcast(v4lsrc->cond_queued_frames); + g_mutex_unlock(v4lsrc->mutex_queued_frames); return TRUE; } @@ -464,7 +518,8 @@ gst_v4lsrc_capture_stop (GstV4lSrc *v4lsrc) GST_V4L_CHECK_OPEN(GST_V4LELEMENT(v4lsrc)); GST_V4L_CHECK_ACTIVE(GST_V4LELEMENT(v4lsrc)); - /* we actually need to sync on all queued buffers but not on the non-queued ones */ + /* we actually need to sync on all queued buffers but + * not on the non-queued ones */ g_mutex_lock(v4lsrc->mutex_queued_frames); for (n=0;nmbuf.frames;n++) v4lsrc->frame_queued[n] = -1;