Improve consistency of the tasks shutdown code

This commit is contained in:
David Corvoysier 2012-10-16 13:43:39 +02:00 committed by Thiago Santos
parent 28aaff747c
commit 7f2207732b
2 changed files with 53 additions and 26 deletions

View File

@ -543,7 +543,7 @@ gst_dash_demux_src_event (GstPad * pad, GstEvent * event)
g_object_unref (fragment); g_object_unref (fragment);
j++; j++;
} }
g_list_free(listfragment); g_list_free (listfragment);
} }
g_queue_clear (demux->queue); g_queue_clear (demux->queue);
@ -780,13 +780,14 @@ gst_dash_demux_stop (GstDashDemux * demux)
gst_uri_downloader_cancel (demux->downloader); gst_uri_downloader_cancel (demux->downloader);
if (GST_TASK_STATE (demux->download_task) != GST_TASK_STOPPED) { if (GST_TASK_STATE (demux->download_task) != GST_TASK_STOPPED) {
demux->stop_stream_task = TRUE;
gst_task_stop (demux->download_task); gst_task_stop (demux->download_task);
GST_TASK_SIGNAL (demux->download_task); GST_TASK_SIGNAL (demux->download_task);
} }
if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) {
gst_task_stop (demux->stream_task); gst_task_stop (demux->stream_task);
GST_TASK_SIGNAL (demux->stream_task);
}
} }
/* switch_pads: /* switch_pads:
@ -916,7 +917,7 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
/* Wait until the next scheduled push downstream */ /* Wait until the next scheduled push downstream */
if (g_cond_timed_wait (GST_TASK_GET_COND (demux->stream_task), if (g_cond_timed_wait (GST_TASK_GET_COND (demux->stream_task),
demux->stream_timed_lock, &demux->next_push)) { demux->stream_timed_lock, &demux->next_push)) {
goto pause_task; goto pause_streaming;
} }
if (g_queue_is_empty (demux->queue)) { if (g_queue_is_empty (demux->queue)) {
@ -968,7 +969,7 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
if ((ret != GST_FLOW_OK) && (stream->mimeType == GST_STREAM_VIDEO)) if ((ret != GST_FLOW_OK) && (stream->mimeType == GST_STREAM_VIDEO))
goto error_pushing; goto error_pushing;
} }
g_list_free(listfragment); g_list_free (listfragment);
if (GST_STATE (demux) == GST_STATE_PLAYING) { if (GST_STATE (demux) == GST_STATE_PLAYING) {
/* Wait for the duration of a fragment before resuming this task */ /* Wait for the duration of a fragment before resuming this task */
g_get_current_time (&demux->next_push); g_get_current_time (&demux->next_push);
@ -979,7 +980,7 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
g_time_val_to_iso8601 (&demux->next_push)); g_time_val_to_iso8601 (&demux->next_push));
} else { } else {
/* The pipeline is now set up, wait until playback begins */ /* The pipeline is now set up, wait until playback begins */
goto pause_task; goto pause_streaming;
} }
return; return;
@ -991,21 +992,24 @@ end_of_manifest:
for (i = 0; i < gst_mpdparser_get_nb_active_stream (demux->client); i++) { for (i = 0; i < gst_mpdparser_get_nb_active_stream (demux->client); i++) {
gst_pad_push_event (demux->srcpad[i], gst_event_new_eos ()); gst_pad_push_event (demux->srcpad[i], gst_event_new_eos ());
} }
gst_dash_demux_stop (demux); GST_INFO_OBJECT (demux, "Stopped streaming task");
gst_task_stop (demux->stream_task);
return; return;
} }
error_pushing: error_pushing:
{ {
/* FIXME: handle error */ /* FIXME: handle error */
GST_ERROR_OBJECT (demux, "Error pushing buffer: %s... stopping task", GST_ERROR_OBJECT (demux,
"Error pushing buffer: %s... terminating the demux",
gst_flow_get_name (ret)); gst_flow_get_name (ret));
gst_dash_demux_stop (demux); gst_dash_demux_stop (demux);
return; return;
} }
pause_task: pause_streaming:
{ {
GST_INFO_OBJECT (demux, "Pausing streaming task");
gst_task_pause (demux->stream_task); gst_task_pause (demux->stream_task);
return; return;
} }
@ -1044,7 +1048,7 @@ gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose)
g_object_unref (fragment); g_object_unref (fragment);
j++; j++;
} }
g_list_free(listfragment); g_list_free (listfragment);
} }
g_queue_clear (demux->queue); g_queue_clear (demux->queue);
@ -1094,9 +1098,8 @@ gst_dash_demux_get_buffering_ratio (GstDashDemux * demux)
* *
* Teardown: * Teardown:
* *
* The task will exit when it encounters an error or will be terminated * The task will exit when it encounters an error or when the end of the
* by the stream task (FIXME: shouldn't it stop by itself as soon as it * manifest has been reached.
* has reached the end of the manifest ?)
* *
*/ */
void void
@ -1105,7 +1108,7 @@ gst_dash_demux_download_loop (GstDashDemux * demux)
/* Wait until the next scheduled download */ /* Wait until the next scheduled download */
if (g_cond_timed_wait (GST_TASK_GET_COND (demux->download_task), if (g_cond_timed_wait (GST_TASK_GET_COND (demux->download_task),
demux->download_timed_lock, &demux->next_download)) { demux->download_timed_lock, &demux->next_download)) {
goto quit; goto pause_download;
} }
/* Target buffering time MUST at least exceeds mimimum buffering time /* Target buffering time MUST at least exceeds mimimum buffering time
@ -1133,7 +1136,10 @@ gst_dash_demux_download_loop (GstDashDemux * demux)
/* fetch the next fragment */ /* fetch the next fragment */
if (!gst_dash_demux_get_next_fragment_set (demux)) { if (!gst_dash_demux_get_next_fragment_set (demux)) {
if (!demux->end_of_manifest && !demux->cancelled) { if (demux->end_of_manifest) {
GST_INFO_OBJECT (demux, "Reached the end of the manifest file");
goto end_of_manifest;
} else if (!demux->cancelled) {
demux->client->update_failed_count++; demux->client->update_failed_count++;
if (demux->client->update_failed_count < DEFAULT_FAILED_COUNT) { if (demux->client->update_failed_count < DEFAULT_FAILED_COUNT) {
GST_WARNING_OBJECT (demux, "Could not fetch the next fragment"); GST_WARNING_OBJECT (demux, "Could not fetch the next fragment");
@ -1141,7 +1147,7 @@ gst_dash_demux_download_loop (GstDashDemux * demux)
} else { } else {
GST_ELEMENT_ERROR (demux, RESOURCE, NOT_FOUND, GST_ELEMENT_ERROR (demux, RESOURCE, NOT_FOUND,
("Could not fetch the next fragment"), (NULL)); ("Could not fetch the next fragment"), (NULL));
goto quit; goto error_downloading;
} }
} }
} else { } else {
@ -1155,10 +1161,23 @@ gst_dash_demux_download_loop (GstDashDemux * demux)
g_time_val_add (&demux->next_download, 100000); g_time_val_add (&demux->next_download, 100000);
} }
return; return;
quit: end_of_manifest:
{ {
GST_INFO_OBJECT (demux, "Stopped download task"); GST_INFO_OBJECT (demux, "Stopped download task");
gst_task_stop (demux->download_task);
return;
}
error_downloading:
{
GST_INFO_OBJECT (demux, "Terminating the demux");
gst_dash_demux_stop (demux); gst_dash_demux_stop (demux);
return;
}
pause_download:
{
GST_INFO_OBJECT (demux, "Pausing download task");
gst_task_pause (demux->download_task);
return;
} }
} }
@ -1178,6 +1197,15 @@ gst_dash_demux_resume_stream_task (GstDashDemux * demux)
gst_task_start (demux->stream_task); gst_task_start (demux->stream_task);
} }
static void
gst_dash_demux_pause_download_task (GstDashDemux * demux)
{
/* Send a signal to the download task so that it pauses itself */
GST_TASK_SIGNAL (demux->download_task);
/* Pause it explicitly (if it was not in the COND) */
gst_task_pause (demux->download_task);
}
static void static void
gst_dash_demux_resume_download_task (GstDashDemux * demux) gst_dash_demux_resume_download_task (GstDashDemux * demux)
{ {

View File

@ -60,9 +60,9 @@ struct _GstDashDemux
{ {
GstElement parent; GstElement parent;
GstPad *sinkpad; GstPad *sinkpad;
GstPad *srcpad[MAX_LANGUAGES]; /*Video/Audio/Application src pad*/ GstPad *srcpad[MAX_LANGUAGES]; /*Video/Audio/Application src pad */
GstCaps *output_caps[MAX_LANGUAGES]; /*Video/Audio/Application output buf caps*/ GstCaps *output_caps[MAX_LANGUAGES]; /*Video/Audio/Application output buf caps */
GstCaps *input_caps[MAX_LANGUAGES]; /*Video/Audio/Application input caps*/ GstCaps *input_caps[MAX_LANGUAGES]; /*Video/Audio/Application input caps */
GstBuffer *manifest; GstBuffer *manifest;
GstUriDownloader *downloader; GstUriDownloader *downloader;
@ -71,17 +71,16 @@ struct _GstDashDemux
gboolean end_of_manifest; gboolean end_of_manifest;
/* Properties */ /* Properties */
GstClockTime min_buffering_time; /* Minimum buffering time accumulated before playback */ GstClockTime min_buffering_time; /* Minimum buffering time accumulated before playback */
GstClockTime max_buffering_time; /* Maximum buffering time accumulated during playback */ GstClockTime max_buffering_time; /* Maximum buffering time accumulated during playback */
gfloat bandwidth_usage; /* Percentage of the available bandwidth to use */ gfloat bandwidth_usage; /* Percentage of the available bandwidth to use */
guint64 max_bitrate; /* max of bitrate supported by target decoder */ guint64 max_bitrate; /* max of bitrate supported by target decoder */
/* Streaming task */ /* Streaming task */
GstTask *stream_task; GstTask *stream_task;
GStaticRecMutex stream_lock; GStaticRecMutex stream_lock;
gboolean stop_stream_task;
GMutex *stream_timed_lock; GMutex *stream_timed_lock;
GTimeVal next_push; /* Time of the next push */ GTimeVal next_push; /* Time of the next push */
/* Download task */ /* Download task */
GstTask *download_task; GstTask *download_task;