streamsynchronizer: Split up event handler code
No changes to behaviour, just split up the big parts into dedicated function for readability Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/6825>
This commit is contained in:
parent
a3a3264f02
commit
afc1eadfdc
@ -361,6 +361,370 @@ gst_stream_synchronizer_wait (GstStreamSynchronizer * self, GstPad * pad)
|
|||||||
return TRUE;
|
return TRUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
gst_stream_synchronizer_handle_stream_start (GstStreamSynchronizer * self,
|
||||||
|
GstPad * pad, GstEvent * event)
|
||||||
|
{
|
||||||
|
GstSyncStream *stream, *ostream;
|
||||||
|
guint32 seqnum = gst_event_get_seqnum (event);
|
||||||
|
guint group_id;
|
||||||
|
gboolean have_group_id;
|
||||||
|
GList *l;
|
||||||
|
gboolean all_wait = TRUE;
|
||||||
|
gboolean new_stream = TRUE;
|
||||||
|
|
||||||
|
have_group_id = gst_event_parse_group_id (event, &group_id);
|
||||||
|
|
||||||
|
GST_STREAM_SYNCHRONIZER_LOCK (self);
|
||||||
|
self->have_group_id &= have_group_id;
|
||||||
|
have_group_id = self->have_group_id;
|
||||||
|
self->eos = FALSE;
|
||||||
|
|
||||||
|
stream = gst_streamsync_pad_get_stream (pad);
|
||||||
|
|
||||||
|
gst_event_parse_stream_flags (event, &stream->flags);
|
||||||
|
|
||||||
|
if ((have_group_id && stream->group_id != group_id) || (!have_group_id
|
||||||
|
&& stream->stream_start_seqnum != seqnum)) {
|
||||||
|
stream->is_eos = FALSE;
|
||||||
|
stream->eos_sent = FALSE;
|
||||||
|
stream->flushing = FALSE;
|
||||||
|
stream->stream_start_seqnum = seqnum;
|
||||||
|
stream->group_id = group_id;
|
||||||
|
|
||||||
|
if (!have_group_id) {
|
||||||
|
/* Check if this belongs to a stream that is already there,
|
||||||
|
* e.g. we got the visualizations for an audio stream */
|
||||||
|
for (l = self->streams; l; l = l->next) {
|
||||||
|
ostream = l->data;
|
||||||
|
|
||||||
|
if (ostream != stream && ostream->stream_start_seqnum == seqnum
|
||||||
|
&& !ostream->wait) {
|
||||||
|
new_stream = FALSE;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!new_stream) {
|
||||||
|
GST_DEBUG_OBJECT (pad,
|
||||||
|
"Stream %d belongs to running stream %d, no waiting",
|
||||||
|
stream->stream_number, ostream->stream_number);
|
||||||
|
stream->wait = FALSE;
|
||||||
|
gst_syncstream_unref (stream);
|
||||||
|
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
} else if (group_id == self->group_id) {
|
||||||
|
GST_DEBUG_OBJECT (pad, "Stream %d belongs to running group %d, "
|
||||||
|
"no waiting", stream->stream_number, group_id);
|
||||||
|
gst_syncstream_unref (stream);
|
||||||
|
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number);
|
||||||
|
|
||||||
|
stream->wait = TRUE;
|
||||||
|
|
||||||
|
for (l = self->streams; l; l = l->next) {
|
||||||
|
GstSyncStream *ostream = l->data;
|
||||||
|
|
||||||
|
all_wait = all_wait && ((ostream->flags & GST_STREAM_FLAG_SPARSE)
|
||||||
|
|| (ostream->wait && (!have_group_id
|
||||||
|
|| ostream->group_id == group_id)));
|
||||||
|
if (!all_wait)
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (all_wait) {
|
||||||
|
gint64 position = 0;
|
||||||
|
|
||||||
|
if (have_group_id)
|
||||||
|
GST_DEBUG_OBJECT (self,
|
||||||
|
"All streams have changed to group id %u -- unblocking", group_id);
|
||||||
|
else
|
||||||
|
GST_DEBUG_OBJECT (self, "All streams have changed -- unblocking");
|
||||||
|
|
||||||
|
self->group_id = group_id;
|
||||||
|
|
||||||
|
for (l = self->streams; l; l = l->next) {
|
||||||
|
GstSyncStream *ostream = l->data;
|
||||||
|
gint64 stop_running_time;
|
||||||
|
gint64 position_running_time;
|
||||||
|
|
||||||
|
ostream->wait = FALSE;
|
||||||
|
|
||||||
|
if (ostream->segment.format == GST_FORMAT_TIME) {
|
||||||
|
if (ostream->segment.rate > 0)
|
||||||
|
stop_running_time =
|
||||||
|
gst_segment_to_running_time (&ostream->segment,
|
||||||
|
GST_FORMAT_TIME, ostream->segment.stop);
|
||||||
|
else
|
||||||
|
stop_running_time =
|
||||||
|
gst_segment_to_running_time (&ostream->segment,
|
||||||
|
GST_FORMAT_TIME, ostream->segment.start);
|
||||||
|
|
||||||
|
position_running_time =
|
||||||
|
gst_segment_to_running_time (&ostream->segment,
|
||||||
|
GST_FORMAT_TIME, ostream->segment.position);
|
||||||
|
|
||||||
|
position_running_time =
|
||||||
|
MAX (position_running_time, stop_running_time);
|
||||||
|
|
||||||
|
if (ostream->segment.rate > 0)
|
||||||
|
position_running_time -=
|
||||||
|
gst_segment_to_running_time (&ostream->segment,
|
||||||
|
GST_FORMAT_TIME, ostream->segment.start);
|
||||||
|
else
|
||||||
|
position_running_time -=
|
||||||
|
gst_segment_to_running_time (&ostream->segment,
|
||||||
|
GST_FORMAT_TIME, ostream->segment.stop);
|
||||||
|
|
||||||
|
position_running_time = MAX (0, position_running_time);
|
||||||
|
|
||||||
|
position = MAX (position, position_running_time);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self->group_start_time += position;
|
||||||
|
|
||||||
|
GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT,
|
||||||
|
GST_TIME_ARGS (self->group_start_time));
|
||||||
|
|
||||||
|
for (l = self->streams; l; l = l->next) {
|
||||||
|
GstSyncStream *ostream = l->data;
|
||||||
|
ostream->wait = FALSE;
|
||||||
|
g_cond_broadcast (&ostream->stream_finish_cond);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
gst_syncstream_unref (stream);
|
||||||
|
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Returns FALSE if the event was handled and shouldn't be propagated */
|
||||||
|
static gboolean
|
||||||
|
gst_stream_synchronizer_handle_segment (GstStreamSynchronizer * self,
|
||||||
|
GstPad * pad, GstEvent ** event)
|
||||||
|
{
|
||||||
|
GstSyncStream *stream;
|
||||||
|
GstSegment segment;
|
||||||
|
|
||||||
|
gst_event_copy_segment (*event, &segment);
|
||||||
|
|
||||||
|
GST_STREAM_SYNCHRONIZER_LOCK (self);
|
||||||
|
|
||||||
|
gst_stream_synchronizer_wait (self, pad);
|
||||||
|
|
||||||
|
if (self->shutdown) {
|
||||||
|
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
|
||||||
|
gst_event_unref (*event);
|
||||||
|
return FALSE;
|
||||||
|
}
|
||||||
|
|
||||||
|
stream = gst_streamsync_pad_get_stream (pad);
|
||||||
|
if (segment.format == GST_FORMAT_TIME) {
|
||||||
|
GST_DEBUG_OBJECT (pad,
|
||||||
|
"New stream, updating base from %" GST_TIME_FORMAT " to %"
|
||||||
|
GST_TIME_FORMAT, GST_TIME_ARGS (segment.base),
|
||||||
|
GST_TIME_ARGS (segment.base + self->group_start_time));
|
||||||
|
segment.base += self->group_start_time;
|
||||||
|
|
||||||
|
GST_DEBUG_OBJECT (pad, "Segment was: %" GST_SEGMENT_FORMAT,
|
||||||
|
&stream->segment);
|
||||||
|
gst_segment_copy_into (&segment, &stream->segment);
|
||||||
|
GST_DEBUG_OBJECT (pad, "Segment now is: %" GST_SEGMENT_FORMAT,
|
||||||
|
&stream->segment);
|
||||||
|
stream->segment_seqnum = gst_event_get_seqnum (*event);
|
||||||
|
|
||||||
|
GST_DEBUG_OBJECT (pad, "Stream start running time: %" GST_TIME_FORMAT,
|
||||||
|
GST_TIME_ARGS (stream->segment.base));
|
||||||
|
{
|
||||||
|
GstEvent *tmpev;
|
||||||
|
|
||||||
|
tmpev = gst_event_new_segment (&stream->segment);
|
||||||
|
gst_event_set_seqnum (tmpev, stream->segment_seqnum);
|
||||||
|
gst_event_unref (*event);
|
||||||
|
*event = tmpev;
|
||||||
|
}
|
||||||
|
} else if (stream) {
|
||||||
|
GST_WARNING_OBJECT (pad, "Non-TIME segment: %s",
|
||||||
|
gst_format_get_name (segment.format));
|
||||||
|
gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
|
||||||
|
}
|
||||||
|
gst_syncstream_unref (stream);
|
||||||
|
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
|
||||||
|
|
||||||
|
return TRUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
gst_stream_synchronizer_handle_flush_stop (GstStreamSynchronizer * self,
|
||||||
|
GstPad * pad, GstEvent * event)
|
||||||
|
{
|
||||||
|
GstSyncStream *stream;
|
||||||
|
GList *l;
|
||||||
|
GstClockTime new_group_start_time = 0;
|
||||||
|
gboolean reset_time;
|
||||||
|
|
||||||
|
gst_event_parse_flush_stop (event, &reset_time);
|
||||||
|
|
||||||
|
GST_STREAM_SYNCHRONIZER_LOCK (self);
|
||||||
|
|
||||||
|
stream = gst_streamsync_pad_get_stream (pad);
|
||||||
|
|
||||||
|
if (reset_time) {
|
||||||
|
GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d",
|
||||||
|
stream->stream_number);
|
||||||
|
gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
|
||||||
|
}
|
||||||
|
|
||||||
|
stream->is_eos = FALSE;
|
||||||
|
stream->eos_sent = FALSE;
|
||||||
|
stream->flushing = FALSE;
|
||||||
|
stream->wait = FALSE;
|
||||||
|
g_cond_broadcast (&stream->stream_finish_cond);
|
||||||
|
|
||||||
|
if (reset_time) {
|
||||||
|
for (l = self->streams; l; l = l->next) {
|
||||||
|
GstSyncStream *ostream = l->data;
|
||||||
|
GstClockTime start_running_time;
|
||||||
|
|
||||||
|
if (ostream == stream || ostream->flushing)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
if (ostream->segment.format == GST_FORMAT_TIME) {
|
||||||
|
if (ostream->segment.rate > 0)
|
||||||
|
start_running_time =
|
||||||
|
gst_segment_to_running_time (&ostream->segment,
|
||||||
|
GST_FORMAT_TIME, ostream->segment.start);
|
||||||
|
else
|
||||||
|
start_running_time =
|
||||||
|
gst_segment_to_running_time (&ostream->segment,
|
||||||
|
GST_FORMAT_TIME, ostream->segment.stop);
|
||||||
|
|
||||||
|
new_group_start_time = MAX (new_group_start_time, start_running_time);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
GST_DEBUG_OBJECT (pad,
|
||||||
|
"Updating group start time from %" GST_TIME_FORMAT " to %"
|
||||||
|
GST_TIME_FORMAT, GST_TIME_ARGS (self->group_start_time),
|
||||||
|
GST_TIME_ARGS (new_group_start_time));
|
||||||
|
self->group_start_time = new_group_start_time;
|
||||||
|
}
|
||||||
|
|
||||||
|
gst_syncstream_unref (stream);
|
||||||
|
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
static gboolean
|
||||||
|
gst_stream_synchronizer_handle_eos (GstStreamSynchronizer * self, GstPad * pad,
|
||||||
|
GstEvent * event)
|
||||||
|
{
|
||||||
|
gboolean ret = FALSE;
|
||||||
|
GstSyncStream *stream;
|
||||||
|
GList *l;
|
||||||
|
gboolean all_eos = TRUE;
|
||||||
|
gboolean seen_data;
|
||||||
|
GSList *pads = NULL;
|
||||||
|
GstPad *srcpad;
|
||||||
|
GstClockTime timestamp;
|
||||||
|
guint32 seqnum;
|
||||||
|
|
||||||
|
GST_STREAM_SYNCHRONIZER_LOCK (self);
|
||||||
|
stream = gst_streamsync_pad_get_stream (pad);
|
||||||
|
|
||||||
|
GST_DEBUG_OBJECT (pad, "Have EOS for stream %d", stream->stream_number);
|
||||||
|
stream->is_eos = TRUE;
|
||||||
|
|
||||||
|
seen_data = stream->seen_data;
|
||||||
|
srcpad = gst_object_ref (stream->srcpad);
|
||||||
|
seqnum = stream->segment_seqnum;
|
||||||
|
|
||||||
|
if (seen_data && stream->segment.position != -1)
|
||||||
|
timestamp = stream->segment.position;
|
||||||
|
else if (stream->segment.rate < 0.0 || stream->segment.stop == -1)
|
||||||
|
timestamp = stream->segment.start;
|
||||||
|
else
|
||||||
|
timestamp = stream->segment.stop;
|
||||||
|
|
||||||
|
stream->segment.position = timestamp;
|
||||||
|
|
||||||
|
for (l = self->streams; l; l = l->next) {
|
||||||
|
GstSyncStream *ostream = l->data;
|
||||||
|
|
||||||
|
all_eos = all_eos && ostream->is_eos;
|
||||||
|
if (!all_eos)
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (all_eos) {
|
||||||
|
GST_DEBUG_OBJECT (self, "All streams are EOS -- forwarding");
|
||||||
|
self->eos = TRUE;
|
||||||
|
for (l = self->streams; l; l = l->next) {
|
||||||
|
GstSyncStream *ostream = l->data;
|
||||||
|
/* local snapshot of current pads */
|
||||||
|
gst_object_ref (ostream->srcpad);
|
||||||
|
pads = g_slist_prepend (pads, ostream->srcpad);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (pads) {
|
||||||
|
GstPad *pad;
|
||||||
|
GSList *epad;
|
||||||
|
GstSyncStream *ostream;
|
||||||
|
|
||||||
|
ret = TRUE;
|
||||||
|
epad = pads;
|
||||||
|
while (epad) {
|
||||||
|
pad = epad->data;
|
||||||
|
ostream = gst_streamsync_pad_get_stream (pad);
|
||||||
|
g_cond_broadcast (&ostream->stream_finish_cond);
|
||||||
|
gst_syncstream_unref (ostream);
|
||||||
|
gst_object_unref (pad);
|
||||||
|
epad = g_slist_next (epad);
|
||||||
|
}
|
||||||
|
g_slist_free (pads);
|
||||||
|
} else {
|
||||||
|
if (seen_data) {
|
||||||
|
stream->send_gap_event = TRUE;
|
||||||
|
stream->gap_duration = GST_CLOCK_TIME_NONE;
|
||||||
|
stream->wait = TRUE;
|
||||||
|
ret = gst_stream_synchronizer_wait (self, srcpad);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* send eos if haven't seen data. seen_data will be true if data buffer
|
||||||
|
* of the track have received in anytime. sink is ready if seen_data is
|
||||||
|
* true, so can send GAP event. Will send EOS if sink isn't ready. The
|
||||||
|
* scenario for the case is one track haven't any media data and then
|
||||||
|
* send EOS. Or no any valid media data in one track, so decoder can't
|
||||||
|
* get valid CAPS for the track. sink can't ready without received CAPS.*/
|
||||||
|
if (!seen_data || self->eos) {
|
||||||
|
GstEvent *topush;
|
||||||
|
GST_DEBUG_OBJECT (pad, "send EOS event");
|
||||||
|
/* drop lock when sending eos, which may block in e.g. preroll */
|
||||||
|
topush = gst_event_new_eos ();
|
||||||
|
gst_event_set_seqnum (topush, seqnum);
|
||||||
|
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
|
||||||
|
ret = gst_pad_push_event (srcpad, topush);
|
||||||
|
GST_STREAM_SYNCHRONIZER_LOCK (self);
|
||||||
|
stream = gst_streamsync_pad_get_stream (pad);
|
||||||
|
stream->eos_sent = TRUE;
|
||||||
|
gst_syncstream_unref (stream);
|
||||||
|
}
|
||||||
|
|
||||||
|
gst_object_unref (srcpad);
|
||||||
|
gst_event_unref (event);
|
||||||
|
gst_syncstream_unref (stream);
|
||||||
|
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
/* sinkpad functions */
|
/* sinkpad functions */
|
||||||
static gboolean
|
static gboolean
|
||||||
gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
|
gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
|
||||||
@ -375,193 +739,12 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
|
|||||||
switch (GST_EVENT_TYPE (event)) {
|
switch (GST_EVENT_TYPE (event)) {
|
||||||
case GST_EVENT_STREAM_START:
|
case GST_EVENT_STREAM_START:
|
||||||
{
|
{
|
||||||
GstSyncStream *stream, *ostream;
|
gst_stream_synchronizer_handle_stream_start (self, pad, event);
|
||||||
guint32 seqnum = gst_event_get_seqnum (event);
|
|
||||||
guint group_id;
|
|
||||||
gboolean have_group_id;
|
|
||||||
GList *l;
|
|
||||||
gboolean all_wait = TRUE;
|
|
||||||
gboolean new_stream = TRUE;
|
|
||||||
|
|
||||||
have_group_id = gst_event_parse_group_id (event, &group_id);
|
|
||||||
|
|
||||||
GST_STREAM_SYNCHRONIZER_LOCK (self);
|
|
||||||
self->have_group_id &= have_group_id;
|
|
||||||
have_group_id = self->have_group_id;
|
|
||||||
self->eos = FALSE;
|
|
||||||
|
|
||||||
stream = gst_streamsync_pad_get_stream (pad);
|
|
||||||
|
|
||||||
gst_event_parse_stream_flags (event, &stream->flags);
|
|
||||||
|
|
||||||
if ((have_group_id && stream->group_id != group_id) || (!have_group_id
|
|
||||||
&& stream->stream_start_seqnum != seqnum)) {
|
|
||||||
stream->is_eos = FALSE;
|
|
||||||
stream->eos_sent = FALSE;
|
|
||||||
stream->flushing = FALSE;
|
|
||||||
stream->stream_start_seqnum = seqnum;
|
|
||||||
stream->group_id = group_id;
|
|
||||||
|
|
||||||
if (!have_group_id) {
|
|
||||||
/* Check if this belongs to a stream that is already there,
|
|
||||||
* e.g. we got the visualizations for an audio stream */
|
|
||||||
for (l = self->streams; l; l = l->next) {
|
|
||||||
ostream = l->data;
|
|
||||||
|
|
||||||
if (ostream != stream && ostream->stream_start_seqnum == seqnum
|
|
||||||
&& !ostream->wait) {
|
|
||||||
new_stream = FALSE;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!new_stream) {
|
|
||||||
GST_DEBUG_OBJECT (pad,
|
|
||||||
"Stream %d belongs to running stream %d, no waiting",
|
|
||||||
stream->stream_number, ostream->stream_number);
|
|
||||||
stream->wait = FALSE;
|
|
||||||
gst_syncstream_unref (stream);
|
|
||||||
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} else if (group_id == self->group_id) {
|
|
||||||
GST_DEBUG_OBJECT (pad, "Stream %d belongs to running group %d, "
|
|
||||||
"no waiting", stream->stream_number, group_id);
|
|
||||||
gst_syncstream_unref (stream);
|
|
||||||
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number);
|
|
||||||
|
|
||||||
stream->wait = TRUE;
|
|
||||||
|
|
||||||
for (l = self->streams; l; l = l->next) {
|
|
||||||
GstSyncStream *ostream = l->data;
|
|
||||||
|
|
||||||
all_wait = all_wait && ((ostream->flags & GST_STREAM_FLAG_SPARSE)
|
|
||||||
|| (ostream->wait && (!have_group_id
|
|
||||||
|| ostream->group_id == group_id)));
|
|
||||||
if (!all_wait)
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (all_wait) {
|
|
||||||
gint64 position = 0;
|
|
||||||
|
|
||||||
if (have_group_id)
|
|
||||||
GST_DEBUG_OBJECT (self,
|
|
||||||
"All streams have changed to group id %u -- unblocking",
|
|
||||||
group_id);
|
|
||||||
else
|
|
||||||
GST_DEBUG_OBJECT (self, "All streams have changed -- unblocking");
|
|
||||||
|
|
||||||
self->group_id = group_id;
|
|
||||||
|
|
||||||
for (l = self->streams; l; l = l->next) {
|
|
||||||
GstSyncStream *ostream = l->data;
|
|
||||||
gint64 stop_running_time;
|
|
||||||
gint64 position_running_time;
|
|
||||||
|
|
||||||
ostream->wait = FALSE;
|
|
||||||
|
|
||||||
if (ostream->segment.format == GST_FORMAT_TIME) {
|
|
||||||
if (ostream->segment.rate > 0)
|
|
||||||
stop_running_time =
|
|
||||||
gst_segment_to_running_time (&ostream->segment,
|
|
||||||
GST_FORMAT_TIME, ostream->segment.stop);
|
|
||||||
else
|
|
||||||
stop_running_time =
|
|
||||||
gst_segment_to_running_time (&ostream->segment,
|
|
||||||
GST_FORMAT_TIME, ostream->segment.start);
|
|
||||||
|
|
||||||
position_running_time =
|
|
||||||
gst_segment_to_running_time (&ostream->segment,
|
|
||||||
GST_FORMAT_TIME, ostream->segment.position);
|
|
||||||
|
|
||||||
position_running_time =
|
|
||||||
MAX (position_running_time, stop_running_time);
|
|
||||||
|
|
||||||
if (ostream->segment.rate > 0)
|
|
||||||
position_running_time -=
|
|
||||||
gst_segment_to_running_time (&ostream->segment,
|
|
||||||
GST_FORMAT_TIME, ostream->segment.start);
|
|
||||||
else
|
|
||||||
position_running_time -=
|
|
||||||
gst_segment_to_running_time (&ostream->segment,
|
|
||||||
GST_FORMAT_TIME, ostream->segment.stop);
|
|
||||||
|
|
||||||
position_running_time = MAX (0, position_running_time);
|
|
||||||
|
|
||||||
position = MAX (position, position_running_time);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
self->group_start_time += position;
|
|
||||||
|
|
||||||
GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT,
|
|
||||||
GST_TIME_ARGS (self->group_start_time));
|
|
||||||
|
|
||||||
for (l = self->streams; l; l = l->next) {
|
|
||||||
GstSyncStream *ostream = l->data;
|
|
||||||
ostream->wait = FALSE;
|
|
||||||
g_cond_broadcast (&ostream->stream_finish_cond);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
gst_syncstream_unref (stream);
|
|
||||||
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case GST_EVENT_SEGMENT:{
|
case GST_EVENT_SEGMENT:{
|
||||||
GstSyncStream *stream;
|
if (!gst_stream_synchronizer_handle_segment (self, pad, &event))
|
||||||
GstSegment segment;
|
|
||||||
|
|
||||||
gst_event_copy_segment (event, &segment);
|
|
||||||
|
|
||||||
GST_STREAM_SYNCHRONIZER_LOCK (self);
|
|
||||||
|
|
||||||
gst_stream_synchronizer_wait (self, pad);
|
|
||||||
|
|
||||||
if (self->shutdown) {
|
|
||||||
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
|
|
||||||
gst_event_unref (event);
|
|
||||||
goto done;
|
goto done;
|
||||||
}
|
|
||||||
|
|
||||||
stream = gst_streamsync_pad_get_stream (pad);
|
|
||||||
if (segment.format == GST_FORMAT_TIME) {
|
|
||||||
GST_DEBUG_OBJECT (pad,
|
|
||||||
"New stream, updating base from %" GST_TIME_FORMAT " to %"
|
|
||||||
GST_TIME_FORMAT, GST_TIME_ARGS (segment.base),
|
|
||||||
GST_TIME_ARGS (segment.base + self->group_start_time));
|
|
||||||
segment.base += self->group_start_time;
|
|
||||||
|
|
||||||
GST_DEBUG_OBJECT (pad, "Segment was: %" GST_SEGMENT_FORMAT,
|
|
||||||
&stream->segment);
|
|
||||||
gst_segment_copy_into (&segment, &stream->segment);
|
|
||||||
GST_DEBUG_OBJECT (pad, "Segment now is: %" GST_SEGMENT_FORMAT,
|
|
||||||
&stream->segment);
|
|
||||||
stream->segment_seqnum = gst_event_get_seqnum (event);
|
|
||||||
|
|
||||||
GST_DEBUG_OBJECT (pad, "Stream start running time: %" GST_TIME_FORMAT,
|
|
||||||
GST_TIME_ARGS (stream->segment.base));
|
|
||||||
{
|
|
||||||
GstEvent *tmpev;
|
|
||||||
|
|
||||||
tmpev = gst_event_new_segment (&stream->segment);
|
|
||||||
gst_event_set_seqnum (tmpev, stream->segment_seqnum);
|
|
||||||
gst_event_unref (event);
|
|
||||||
event = tmpev;
|
|
||||||
}
|
|
||||||
} else if (stream) {
|
|
||||||
GST_WARNING_OBJECT (pad, "Non-TIME segment: %s",
|
|
||||||
gst_format_get_name (segment.format));
|
|
||||||
gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
|
|
||||||
}
|
|
||||||
gst_syncstream_unref (stream);
|
|
||||||
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case GST_EVENT_FLUSH_START:{
|
case GST_EVENT_FLUSH_START:{
|
||||||
@ -578,61 +761,7 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case GST_EVENT_FLUSH_STOP:{
|
case GST_EVENT_FLUSH_STOP:{
|
||||||
GstSyncStream *stream;
|
gst_stream_synchronizer_handle_flush_stop (self, pad, event);
|
||||||
GList *l;
|
|
||||||
GstClockTime new_group_start_time = 0;
|
|
||||||
gboolean reset_time;
|
|
||||||
|
|
||||||
gst_event_parse_flush_stop (event, &reset_time);
|
|
||||||
|
|
||||||
GST_STREAM_SYNCHRONIZER_LOCK (self);
|
|
||||||
|
|
||||||
stream = gst_streamsync_pad_get_stream (pad);
|
|
||||||
|
|
||||||
if (reset_time) {
|
|
||||||
GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d",
|
|
||||||
stream->stream_number);
|
|
||||||
gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
|
|
||||||
}
|
|
||||||
|
|
||||||
stream->is_eos = FALSE;
|
|
||||||
stream->eos_sent = FALSE;
|
|
||||||
stream->flushing = FALSE;
|
|
||||||
stream->wait = FALSE;
|
|
||||||
g_cond_broadcast (&stream->stream_finish_cond);
|
|
||||||
|
|
||||||
if (reset_time) {
|
|
||||||
for (l = self->streams; l; l = l->next) {
|
|
||||||
GstSyncStream *ostream = l->data;
|
|
||||||
GstClockTime start_running_time;
|
|
||||||
|
|
||||||
if (ostream == stream || ostream->flushing)
|
|
||||||
continue;
|
|
||||||
|
|
||||||
if (ostream->segment.format == GST_FORMAT_TIME) {
|
|
||||||
if (ostream->segment.rate > 0)
|
|
||||||
start_running_time =
|
|
||||||
gst_segment_to_running_time (&ostream->segment,
|
|
||||||
GST_FORMAT_TIME, ostream->segment.start);
|
|
||||||
else
|
|
||||||
start_running_time =
|
|
||||||
gst_segment_to_running_time (&ostream->segment,
|
|
||||||
GST_FORMAT_TIME, ostream->segment.stop);
|
|
||||||
|
|
||||||
new_group_start_time =
|
|
||||||
MAX (new_group_start_time, start_running_time);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
GST_DEBUG_OBJECT (pad,
|
|
||||||
"Updating group start time from %" GST_TIME_FORMAT " to %"
|
|
||||||
GST_TIME_FORMAT, GST_TIME_ARGS (self->group_start_time),
|
|
||||||
GST_TIME_ARGS (new_group_start_time));
|
|
||||||
self->group_start_time = new_group_start_time;
|
|
||||||
}
|
|
||||||
|
|
||||||
gst_syncstream_unref (stream);
|
|
||||||
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
/* unblocking EOS wait when track switch. */
|
/* unblocking EOS wait when track switch. */
|
||||||
@ -654,101 +783,7 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case GST_EVENT_EOS:{
|
case GST_EVENT_EOS:{
|
||||||
GstSyncStream *stream;
|
ret = gst_stream_synchronizer_handle_eos (self, pad, event);
|
||||||
GList *l;
|
|
||||||
gboolean all_eos = TRUE;
|
|
||||||
gboolean seen_data;
|
|
||||||
GSList *pads = NULL;
|
|
||||||
GstPad *srcpad;
|
|
||||||
GstClockTime timestamp;
|
|
||||||
guint32 seqnum;
|
|
||||||
|
|
||||||
GST_STREAM_SYNCHRONIZER_LOCK (self);
|
|
||||||
stream = gst_streamsync_pad_get_stream (pad);
|
|
||||||
|
|
||||||
GST_DEBUG_OBJECT (pad, "Have EOS for stream %d", stream->stream_number);
|
|
||||||
stream->is_eos = TRUE;
|
|
||||||
|
|
||||||
seen_data = stream->seen_data;
|
|
||||||
srcpad = gst_object_ref (stream->srcpad);
|
|
||||||
seqnum = stream->segment_seqnum;
|
|
||||||
|
|
||||||
if (seen_data && stream->segment.position != -1)
|
|
||||||
timestamp = stream->segment.position;
|
|
||||||
else if (stream->segment.rate < 0.0 || stream->segment.stop == -1)
|
|
||||||
timestamp = stream->segment.start;
|
|
||||||
else
|
|
||||||
timestamp = stream->segment.stop;
|
|
||||||
|
|
||||||
stream->segment.position = timestamp;
|
|
||||||
|
|
||||||
for (l = self->streams; l; l = l->next) {
|
|
||||||
GstSyncStream *ostream = l->data;
|
|
||||||
|
|
||||||
all_eos = all_eos && ostream->is_eos;
|
|
||||||
if (!all_eos)
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (all_eos) {
|
|
||||||
GST_DEBUG_OBJECT (self, "All streams are EOS -- forwarding");
|
|
||||||
self->eos = TRUE;
|
|
||||||
for (l = self->streams; l; l = l->next) {
|
|
||||||
GstSyncStream *ostream = l->data;
|
|
||||||
/* local snapshot of current pads */
|
|
||||||
gst_object_ref (ostream->srcpad);
|
|
||||||
pads = g_slist_prepend (pads, ostream->srcpad);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (pads) {
|
|
||||||
GstPad *pad;
|
|
||||||
GSList *epad;
|
|
||||||
GstSyncStream *ostream;
|
|
||||||
|
|
||||||
ret = TRUE;
|
|
||||||
epad = pads;
|
|
||||||
while (epad) {
|
|
||||||
pad = epad->data;
|
|
||||||
ostream = gst_streamsync_pad_get_stream (pad);
|
|
||||||
g_cond_broadcast (&ostream->stream_finish_cond);
|
|
||||||
gst_syncstream_unref (ostream);
|
|
||||||
gst_object_unref (pad);
|
|
||||||
epad = g_slist_next (epad);
|
|
||||||
}
|
|
||||||
g_slist_free (pads);
|
|
||||||
} else {
|
|
||||||
if (seen_data) {
|
|
||||||
stream->send_gap_event = TRUE;
|
|
||||||
stream->gap_duration = GST_CLOCK_TIME_NONE;
|
|
||||||
stream->wait = TRUE;
|
|
||||||
ret = gst_stream_synchronizer_wait (self, srcpad);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* send eos if haven't seen data. seen_data will be true if data buffer
|
|
||||||
* of the track have received in anytime. sink is ready if seen_data is
|
|
||||||
* true, so can send GAP event. Will send EOS if sink isn't ready. The
|
|
||||||
* scenario for the case is one track haven't any media data and then
|
|
||||||
* send EOS. Or no any valid media data in one track, so decoder can't
|
|
||||||
* get valid CAPS for the track. sink can't ready without received CAPS.*/
|
|
||||||
if (!seen_data || self->eos) {
|
|
||||||
GstEvent *topush;
|
|
||||||
GST_DEBUG_OBJECT (pad, "send EOS event");
|
|
||||||
/* drop lock when sending eos, which may block in e.g. preroll */
|
|
||||||
topush = gst_event_new_eos ();
|
|
||||||
gst_event_set_seqnum (topush, seqnum);
|
|
||||||
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
|
|
||||||
ret = gst_pad_push_event (srcpad, topush);
|
|
||||||
GST_STREAM_SYNCHRONIZER_LOCK (self);
|
|
||||||
stream = gst_streamsync_pad_get_stream (pad);
|
|
||||||
stream->eos_sent = TRUE;
|
|
||||||
gst_syncstream_unref (stream);
|
|
||||||
}
|
|
||||||
|
|
||||||
gst_object_unref (srcpad);
|
|
||||||
gst_event_unref (event);
|
|
||||||
gst_syncstream_unref (stream);
|
|
||||||
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
|
|
||||||
goto done;
|
goto done;
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user