From fecd38c8f6157d463fa83ffc94da8aaccc806fd4 Mon Sep 17 00:00:00 2001 From: Vivia Nikolaidou Date: Fri, 10 Apr 2020 19:54:31 +0300 Subject: [PATCH] tsmux: Ability for streams to disappear and reappear Until now, any streams in tsmux had to be present when the element started its first buffer. Now they can appear at any point during the stream, or even disappear and reappear later using the same PID. --- gst/mpegtsmux/gstbasetsmux.c | 217 ++++++++++++++++++------------ gst/mpegtsmux/tsmux/tsmux.c | 50 +++++++ gst/mpegtsmux/tsmux/tsmux.h | 2 + gst/mpegtsmux/tsmux/tsmuxstream.c | 1 + gst/mpegtsmux/tsmux/tsmuxstream.h | 2 + tests/check/elements/mpegtsmux.c | 74 ++++++++-- 6 files changed, 246 insertions(+), 100 deletions(-) diff --git a/gst/mpegtsmux/gstbasetsmux.c b/gst/mpegtsmux/gstbasetsmux.c index ca551fb822..887b251305 100644 --- a/gst/mpegtsmux/gstbasetsmux.c +++ b/gst/mpegtsmux/gstbasetsmux.c @@ -688,89 +688,78 @@ not_negotiated: } static GstFlowReturn -gst_base_ts_mux_create_streams (GstBaseTsMux * mux) +gst_base_ts_mux_create_pad_stream (GstBaseTsMux * mux, GstPad * pad) { + GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (pad); + gchar *name = NULL; + gchar *pcr_name; GstFlowReturn ret = GST_FLOW_OK; - GList *walk = GST_ELEMENT (mux)->sinkpads; - /* Create the streams */ - while (walk) { - GstPad *pad = GST_PAD (walk->data); - GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (walk->data); - gchar *name = NULL; - gchar *pcr_name; - - walk = g_list_next (walk); - - if (ts_pad->prog_id == -1) { - name = GST_PAD_NAME (pad); - if (mux->prog_map != NULL && gst_structure_has_field (mux->prog_map, - name)) { - gint idx; - gboolean ret = gst_structure_get_int (mux->prog_map, name, &idx); - if (!ret) { - GST_ELEMENT_ERROR (mux, STREAM, MUX, - ("Reading program map failed. Assuming default"), (NULL)); - idx = DEFAULT_PROG_ID; - } - if (idx < 0) { - GST_DEBUG_OBJECT (mux, "Program number %d associate with pad %s less " - "than zero; DEFAULT_PROGRAM = %d is used instead", - idx, name, DEFAULT_PROG_ID); - idx = DEFAULT_PROG_ID; - } - ts_pad->prog_id = idx; - } else { - ts_pad->prog_id = DEFAULT_PROG_ID; + if (ts_pad->prog_id == -1) { + name = GST_PAD_NAME (pad); + if (mux->prog_map != NULL && gst_structure_has_field (mux->prog_map, name)) { + gint idx; + gboolean ret = gst_structure_get_int (mux->prog_map, name, &idx); + if (!ret) { + GST_ELEMENT_ERROR (mux, STREAM, MUX, + ("Reading program map failed. Assuming default"), (NULL)); + idx = DEFAULT_PROG_ID; } - } - - ts_pad->prog = - (TsMuxProgram *) g_hash_table_lookup (mux->programs, - GINT_TO_POINTER (ts_pad->prog_id)); - if (ts_pad->prog == NULL) { - ts_pad->prog = tsmux_program_new (mux->tsmux, ts_pad->prog_id); - if (ts_pad->prog == NULL) - goto no_program; - tsmux_set_pmt_interval (ts_pad->prog, mux->pmt_interval); - tsmux_program_set_scte35_pid (ts_pad->prog, mux->scte35_pid); - tsmux_program_set_scte35_interval (ts_pad->prog, - mux->scte35_null_interval); - g_hash_table_insert (mux->programs, GINT_TO_POINTER (ts_pad->prog_id), - ts_pad->prog); - } - - if (ts_pad->stream == NULL) { - ret = gst_base_ts_mux_create_stream (mux, ts_pad); - if (ret != GST_FLOW_OK) - goto no_stream; - } - - if (ts_pad->prog->pcr_stream == NULL) { - /* Take the first stream of the program for the PCR */ - GST_DEBUG_OBJECT (ts_pad, - "Use stream (pid=%d) from pad as PCR for program (prog_id = %d)", - ts_pad->pid, ts_pad->prog_id); - - tsmux_program_set_pcr_stream (ts_pad->prog, ts_pad->stream); - } - - /* Check for user-specified PCR PID */ - pcr_name = g_strdup_printf ("PCR_%d", ts_pad->prog->pgm_number); - if (mux->prog_map && gst_structure_has_field (mux->prog_map, pcr_name)) { - const gchar *sink_name = - gst_structure_get_string (mux->prog_map, pcr_name); - - if (!g_strcmp0 (name, sink_name)) { - GST_DEBUG_OBJECT (mux, "User specified stream (pid=%d) as PCR for " - "program (prog_id = %d)", ts_pad->pid, ts_pad->prog->pgm_number); - tsmux_program_set_pcr_stream (ts_pad->prog, ts_pad->stream); + if (idx < 0) { + GST_DEBUG_OBJECT (mux, "Program number %d associate with pad %s less " + "than zero; DEFAULT_PROGRAM = %d is used instead", + idx, name, DEFAULT_PROG_ID); + idx = DEFAULT_PROG_ID; } + ts_pad->prog_id = idx; + } else { + ts_pad->prog_id = DEFAULT_PROG_ID; } - g_free (pcr_name); } - return GST_FLOW_OK; + ts_pad->prog = + (TsMuxProgram *) g_hash_table_lookup (mux->programs, + GINT_TO_POINTER (ts_pad->prog_id)); + if (ts_pad->prog == NULL) { + ts_pad->prog = tsmux_program_new (mux->tsmux, ts_pad->prog_id); + if (ts_pad->prog == NULL) + goto no_program; + tsmux_set_pmt_interval (ts_pad->prog, mux->pmt_interval); + tsmux_program_set_scte35_pid (ts_pad->prog, mux->scte35_pid); + tsmux_program_set_scte35_interval (ts_pad->prog, mux->scte35_null_interval); + g_hash_table_insert (mux->programs, GINT_TO_POINTER (ts_pad->prog_id), + ts_pad->prog); + } + + if (ts_pad->stream == NULL) { + ret = gst_base_ts_mux_create_stream (mux, ts_pad); + if (ret != GST_FLOW_OK) + goto no_stream; + } + + if (ts_pad->prog->pcr_stream == NULL) { + /* Take the first stream of the program for the PCR */ + GST_DEBUG_OBJECT (ts_pad, + "Use stream (pid=%d) from pad as PCR for program (prog_id = %d)", + ts_pad->pid, ts_pad->prog_id); + + tsmux_program_set_pcr_stream (ts_pad->prog, ts_pad->stream); + } + + /* Check for user-specified PCR PID */ + pcr_name = g_strdup_printf ("PCR_%d", ts_pad->prog->pgm_number); + if (mux->prog_map && gst_structure_has_field (mux->prog_map, pcr_name)) { + const gchar *sink_name = gst_structure_get_string (mux->prog_map, pcr_name); + + if (!g_strcmp0 (name, sink_name)) { + GST_DEBUG_OBJECT (mux, "User specified stream (pid=%d) as PCR for " + "program (prog_id = %d)", ts_pad->pid, ts_pad->prog->pgm_number); + tsmux_program_set_pcr_stream (ts_pad->prog, ts_pad->stream); + } + } + g_free (pcr_name); + + return ret; /* ERRORS */ no_program: @@ -787,6 +776,25 @@ no_stream: } } +static GstFlowReturn +gst_base_ts_mux_create_streams (GstBaseTsMux * mux) +{ + GstFlowReturn ret = GST_FLOW_OK; + GList *walk = GST_ELEMENT (mux)->sinkpads; + + /* Create the streams */ + while (walk) { + GstPad *pad = GST_PAD (walk->data); + + ret = gst_base_ts_mux_create_pad_stream (mux, pad); + if (ret != GST_FLOW_OK) + return ret; + walk = g_list_next (walk); + } + + return GST_FLOW_OK; +} + static void new_packet_common_init (GstBaseTsMux * mux, GstBuffer * buf, guint8 * data, guint len) @@ -1078,8 +1086,22 @@ gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux, } prog = best->prog; - if (prog == NULL) - goto no_program; + if (prog == NULL) { + GList *cur; + + gst_base_ts_mux_create_pad_stream (mux, GST_PAD (best)); + tsmux_resend_pat (mux->tsmux); + tsmux_resend_si (mux->tsmux); + prog = best->prog; + g_assert_nonnull (prog); + + /* output PMT for each program */ + for (cur = mux->tsmux->programs; cur; cur = cur->next) { + TsMuxProgram *program = (TsMuxProgram *) cur->data; + + tsmux_resend_pmt (program); + } + } g_assert (buf != NULL); @@ -1213,15 +1235,6 @@ write_fail: { return mux->last_flow_ret; } -no_program: - { - if (buf) - gst_buffer_unref (buf); - GST_ELEMENT_ERROR (mux, STREAM, MUX, - ("Stream on pad %" GST_PTR_FORMAT - " is not associated with any program", best), (NULL)); - return GST_FLOW_ERROR; - } } /* GstElement implementation */ @@ -1259,6 +1272,37 @@ stream_exists: } } +static void +gst_base_ts_mux_release_pad (GstElement * element, GstPad * pad) +{ + GstBaseTsMux *mux = GST_BASE_TS_MUX (element); + + if (mux->tsmux) { + GList *cur; + GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (pad); + gint pid = ts_pad->pid; + + if (ts_pad->prog->pcr_stream == ts_pad->stream) { + tsmux_stream_pcr_unref (ts_pad->prog->pcr_stream); + ts_pad->prog->pcr_stream = NULL; + } + if (tsmux_remove_stream (mux->tsmux, pid, ts_pad->prog)) { + g_hash_table_remove (mux->programs, GINT_TO_POINTER (ts_pad->prog_id)); + } + tsmux_resend_pat (mux->tsmux); + tsmux_resend_si (mux->tsmux); + + /* output PMT for each program */ + for (cur = mux->tsmux->programs; cur; cur = cur->next) { + TsMuxProgram *program = (TsMuxProgram *) cur->data; + + tsmux_resend_pmt (program); + } + } + + gst_element_remove_pad (element, pad); +} + static gboolean gst_base_ts_mux_send_event (GstElement * element, GstEvent * event) { @@ -1863,6 +1907,7 @@ gst_base_ts_mux_class_init (GstBaseTsMuxClass * klass) gobject_class->constructed = gst_base_ts_mux_constructed; gstelement_class->request_new_pad = gst_base_ts_mux_request_new_pad; + gstelement_class->release_pad = gst_base_ts_mux_release_pad; gstelement_class->send_event = gst_base_ts_mux_send_event; gstagg_class->update_src_caps = gst_base_ts_mux_update_src_caps; diff --git a/gst/mpegtsmux/tsmux/tsmux.c b/gst/mpegtsmux/tsmux/tsmux.c index 92625b9728..4c1ace92a1 100644 --- a/gst/mpegtsmux/tsmux/tsmux.c +++ b/gst/mpegtsmux/tsmux/tsmux.c @@ -467,6 +467,25 @@ tsmux_program_new (TsMux * mux, gint prog_id) return program; } +gboolean +tsmux_program_delete (TsMux * mux, TsMuxProgram * program) +{ + g_return_val_if_fail (mux != NULL, FALSE); + + if (mux->nb_programs == 0) + return FALSE; + + if (!program) + return FALSE; + + mux->programs = g_list_remove (mux->programs, program); + mux->nb_programs--; + mux->pat_changed = TRUE; + tsmux_program_free ((TsMuxProgram *) program); + + return TRUE; +} + /** * tsmux_set_pmt_interval: * @program: a #TsMuxProgram @@ -594,6 +613,8 @@ tsmux_program_add_stream (TsMuxProgram * program, TsMuxStream * stream) g_return_if_fail (program != NULL); g_return_if_fail (stream != NULL); + stream->program_array_index = program->streams->len; + g_array_append_val (program->streams, stream); program->pmt_changed = TRUE; } @@ -720,6 +741,35 @@ tsmux_find_stream (TsMux * mux, guint16 pid) return found; } +gboolean +tsmux_remove_stream (TsMux * mux, guint16 pid, TsMuxProgram * program) +{ + GList *cur; + gboolean ret = FALSE; + + g_return_val_if_fail (mux != NULL, FALSE); + + for (cur = mux->streams; cur; cur = cur->next) { + TsMuxStream *stream = (TsMuxStream *) cur->data; + + if (tsmux_stream_get_pid (stream) == pid) { + if (program->streams->len == 1) { + tsmux_program_delete (mux, program); + ret = TRUE; + } else { + program->streams = + g_array_remove_index (program->streams, + stream->program_array_index); + } + + mux->streams = g_list_remove (mux->streams, stream); + tsmux_stream_free (stream); + return ret; + } + } + return ret; +} + static gboolean tsmux_get_buffer (TsMux * mux, GstBuffer ** buf) { diff --git a/gst/mpegtsmux/tsmux/tsmux.h b/gst/mpegtsmux/tsmux/tsmux.h index a7e62a5785..8dbb9dae62 100644 --- a/gst/mpegtsmux/tsmux/tsmux.h +++ b/gst/mpegtsmux/tsmux/tsmux.h @@ -225,6 +225,7 @@ void tsmux_resend_pmt (TsMuxProgram *program); void tsmux_program_set_scte35_pid (TsMuxProgram *program, guint16 pid); guint16 tsmux_program_get_scte35_pid (TsMuxProgram *program); void tsmux_program_set_scte35_interval (TsMuxProgram *mux, guint interval); +gboolean tsmux_program_delete (TsMux *mux, TsMuxProgram *program); /* SI table management */ @@ -239,6 +240,7 @@ gboolean tsmux_send_section (TsMux *mux, GstMpegtsSection *s /* stream management */ TsMuxStream * tsmux_create_stream (TsMux *mux, guint stream_type, guint16 pid, gchar *language); TsMuxStream * tsmux_find_stream (TsMux *mux, guint16 pid); +gboolean tsmux_remove_stream (TsMux *mux, guint16 pid, TsMuxProgram *program); void tsmux_program_add_stream (TsMuxProgram *program, TsMuxStream *stream); void tsmux_program_set_pcr_stream (TsMuxProgram *program, TsMuxStream *stream); diff --git a/gst/mpegtsmux/tsmux/tsmuxstream.c b/gst/mpegtsmux/tsmux/tsmuxstream.c index 5eb6abff48..beb93bd8b4 100644 --- a/gst/mpegtsmux/tsmux/tsmuxstream.c +++ b/gst/mpegtsmux/tsmux/tsmuxstream.c @@ -133,6 +133,7 @@ tsmux_stream_new (guint16 pid, guint stream_type) stream->pes_payload_size = 0; stream->cur_pes_payload_size = 0; stream->pes_bytes_written = 0; + stream->program_array_index = -1; switch (stream_type) { case TSMUX_ST_VIDEO_MPEG1: diff --git a/gst/mpegtsmux/tsmux/tsmuxstream.h b/gst/mpegtsmux/tsmux/tsmuxstream.h index ce8c512c5e..93f04671b2 100644 --- a/gst/mpegtsmux/tsmux/tsmuxstream.h +++ b/gst/mpegtsmux/tsmux/tsmuxstream.h @@ -166,6 +166,8 @@ struct TsMuxStream { guint8 id; /* extended stream id (13818-1 Amdt 2) */ guint8 id_extended; + /* array index in program array */ + gint program_array_index; gboolean is_video_stream; diff --git a/tests/check/elements/mpegtsmux.c b/tests/check/elements/mpegtsmux.c index 2c8a4951c1..3695434ce1 100644 --- a/tests/check/elements/mpegtsmux.c +++ b/tests/check/elements/mpegtsmux.c @@ -148,30 +148,18 @@ cleanup_tsmux (GstElement * mux, const gchar * sinkname) } static void -check_tsmux_pad (GstStaticPadTemplate * srctemplate, +check_tsmux_pad_given_muxer (GstElement * mux, const gchar * src_caps_string, gint pes_id, gint pmt_id, - const gchar * sinkname, CheckOutputBuffersFunc check_func, guint n_bufs, - gssize input_buf_size, guint alignment) + CheckOutputBuffersFunc check_func, guint n_bufs, gssize input_buf_size) { GstClockTime ts; - GstElement *mux; GstBuffer *inbuffer, *outbuffer; GstCaps *caps; gint num_buffers; gint i; gint pmt_pid = -1, el_pid = -1, pcr_pid = -1, packets = 0; - gchar *padname; GstQuery *drain; - mux = setup_tsmux (srctemplate, sinkname, &padname); - - if (alignment != 0) - g_object_set (mux, "alignment", alignment, NULL); - - fail_unless (gst_element_set_state (mux, - GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS, - "could not set to playing"); - caps = gst_caps_from_string (src_caps_string); gst_check_setup_events (mysrcpad, mux, caps, GST_FORMAT_TIME); gst_caps_unref (caps); @@ -348,11 +336,68 @@ check_tsmux_pad (GstStaticPadTemplate * srctemplate, g_list_free (buffers); buffers = NULL; +} + +static void +check_tsmux_pad (GstStaticPadTemplate * srctemplate, + const gchar * src_caps_string, gint pes_id, gint pmt_id, + const gchar * sinkname, CheckOutputBuffersFunc check_func, guint n_bufs, + gssize input_buf_size, guint alignment) +{ + gchar *padname; + GstElement *mux; + + mux = setup_tsmux (srctemplate, sinkname, &padname); + + if (alignment != 0) + g_object_set (mux, "alignment", alignment, NULL); + + fail_unless (gst_element_set_state (mux, + GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS, + "could not set to playing"); + + check_tsmux_pad_given_muxer (mux, src_caps_string, pes_id, pmt_id, + check_func, n_bufs, input_buf_size); cleanup_tsmux (mux, padname); g_free (padname); } +GST_START_TEST (test_reappearing_pad) +{ + gchar *padname; + GstElement *mux; + GstPad *pad; + + mux = gst_check_setup_element ("mpegtsmux"); + mysrcpad = setup_src_pad (mux, &video_src_template, "sink_%d", &padname); + mysinkpad = gst_check_setup_sink_pad (mux, &sink_template); + gst_pad_set_active (mysrcpad, TRUE); + gst_pad_set_active (mysinkpad, TRUE); + + fail_unless (gst_element_set_state (mux, + GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS, + "could not set to playing"); + + check_tsmux_pad_given_muxer (mux, VIDEO_CAPS_STRING, 0xE0, 0x1b, NULL, 1, 1); + + pad = gst_element_get_static_pad (mux, padname); + gst_pad_set_active (mysrcpad, FALSE); + gst_object_unref (pad); + teardown_src_pad (mux, padname); + gst_element_release_request_pad (mux, pad); + g_free (padname); + + mysrcpad = setup_src_pad (mux, &video_src_template, "sink_%d", &padname); + gst_pad_set_active (mysrcpad, TRUE); + + check_tsmux_pad_given_muxer (mux, VIDEO_CAPS_STRING, 0xE0, 0x1b, NULL, 1, 1); + + cleanup_tsmux (mux, padname); + g_free (padname); +} + +GST_END_TEST; GST_START_TEST (test_video) { @@ -491,6 +536,7 @@ mpegtsmux_suite (void) tcase_add_test (tc_chain, test_multiple_state_change); tcase_add_test (tc_chain, test_align); tcase_add_test (tc_chain, test_keyframe_flag_propagation); + tcase_add_test (tc_chain, test_reappearing_pad); return s; }