validate:scenario: Add a 'create-sub-pipeline' action type

And allow running parallel pipelines, scenarios, and pass data from
one to the other using appsrc/appsink

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/5803>
This commit is contained in:
Thibault Saunier 2023-12-07 13:07:44 -03:00 committed by GStreamer Marge Bot
parent 8c898b3b39
commit fd4cd3d85f
6 changed files with 595 additions and 46 deletions

View File

@ -83,6 +83,8 @@ gst_dep = dependency('gstreamer-' + apiversion, version : gst_req,
fallback : ['gstreamer', 'gst_dep'])
gstbase_dep = dependency('gstreamer-base-' + apiversion, version : gst_req,
fallback : ['gstreamer', 'gst_base_dep'])
gstapp_dep = dependency('gstreamer-app-' + apiversion, version : gst_req,
fallback : ['gst-plugins-base', 'app_dep'])
gst_pbutils_dep = dependency('gstreamer-pbutils-' + apiversion, version : gst_req,
fallback : ['gst-plugins-base', 'pbutils_dep'])
gst_video_dep = dependency('gstreamer-video-' + apiversion, version : gst_req,

View File

@ -25,6 +25,7 @@
#include <gst/gst.h>
#include "gst-validate-scenario.h"
#include "gst-validate-monitor.h"
#include "gst-validate-pipeline-monitor.h"
#include <json-glib/json-glib.h>
extern G_GNUC_INTERNAL GstDebugCategory *gstvalidate_debug;
@ -65,9 +66,14 @@ G_GNUC_INTERNAL gboolean gst_validate_send (JsonNode * root);
G_GNUC_INTERNAL void gst_validate_set_test_file_globals (GstStructure* meta, const gchar* testfile, gboolean use_fakesinks);
G_GNUC_INTERNAL gboolean gst_validate_get_test_file_scenario (GList** structs, const gchar** scenario_name, gchar** original_name);
G_GNUC_INTERNAL GstValidateScenario* gst_validate_scenario_from_structs (GstValidateRunner* runner, GstElement* pipeline, GList* structures,
gchar* origin_file);
const gchar* origin_file);
G_GNUC_INTERNAL GList* gst_validate_get_config (const gchar *structname);
G_GNUC_INTERNAL GList * gst_validate_get_test_file_expected_issues (void);
G_GNUC_INTERNAL GstValidatePipelineMonitor *
gst_validate_pipeline_monitor_new_full (GstPipeline * pipeline,
GstValidateRunner * runner, GstValidateMonitor * parent,
const gchar *scenario_name, GList *actions,
gboolean find_scenario);
G_GNUC_INTERNAL gboolean gst_validate_extra_checks_init (void);
G_GNUC_INTERNAL gboolean gst_validate_flow_init (void);

View File

@ -759,10 +759,11 @@ _bus_handler (GstBus * bus, GstMessage * message,
}
static void
gst_validate_pipeline_monitor_create_scenarios (GstValidateBinMonitor * monitor)
gst_validate_pipeline_monitor_create_scenarios (GstValidateBinMonitor * monitor,
const gchar * scenario_name, GList * actions, gboolean find_scenario)
{
/* scenarios currently only make sense for pipelines */
const gchar *scenarios_names, *scenario_name = NULL;
const gchar *scenarios_names, *testfile_scenario_name = NULL;
gchar **scenarios = NULL, *testfile = NULL;
GstObject *target =
gst_validate_monitor_get_target (GST_VALIDATE_MONITOR (monitor));
@ -770,18 +771,24 @@ gst_validate_pipeline_monitor_create_scenarios (GstValidateBinMonitor * monitor)
gst_validate_reporter_get_runner (GST_VALIDATE_REPORTER (monitor));
GList *scenario_structs = NULL;
if (gst_validate_get_test_file_scenario (&scenario_structs, &scenario_name,
&testfile)) {
if (scenario_name) {
if (!find_scenario
|| gst_validate_get_test_file_scenario (&scenario_structs,
&testfile_scenario_name, &testfile)) {
if (testfile_scenario_name || scenario_name) {
monitor->scenario =
gst_validate_scenario_factory_create (runner,
GST_ELEMENT_CAST (target), scenario_name);
GST_ELEMENT_CAST (target),
scenario_name ? scenario_name : testfile_scenario_name);
goto done;
}
if (actions || scenario_structs) {
monitor->scenario =
gst_validate_scenario_from_structs (runner,
GST_ELEMENT_CAST (target), scenario_structs, testfile);
GST_ELEMENT_CAST (target),
scenario_structs ? scenario_structs : actions,
find_scenario ? testfile : "subscenario");
}
goto done;
}
@ -820,17 +827,10 @@ done:
gst_object_unref (runner);
}
/**
* gst_validate_pipeline_monitor_new:
* @pipeline: (transfer none): a #GstPipeline to run Validate on
* @runner: (transfer none): a #GstValidateRunner
* @parent: (nullable): The parent of the new monitor
*
* Returns: (transfer full): A #GstValidatePipelineMonitor
*/
GstValidatePipelineMonitor *
gst_validate_pipeline_monitor_new (GstPipeline * pipeline,
GstValidateRunner * runner, GstValidateMonitor * parent)
gst_validate_pipeline_monitor_new_full (GstPipeline * pipeline,
GstValidateRunner * runner, GstValidateMonitor * parent,
const gchar * scenario_name, GList * actions, gboolean find_scenario)
{
GstBus *bus;
GstValidatePipelineMonitor *monitor;
@ -844,7 +844,7 @@ gst_validate_pipeline_monitor_new (GstPipeline * pipeline,
"pipeline", pipeline, NULL);
gst_validate_pipeline_monitor_create_scenarios (GST_VALIDATE_BIN_MONITOR
(monitor));
(monitor), scenario_name, actions, find_scenario);
bus = gst_element_get_bus (GST_ELEMENT (pipeline));
gst_bus_enable_sync_message_emission (bus);
@ -866,3 +866,20 @@ gst_validate_pipeline_monitor_new (GstPipeline * pipeline,
return monitor;
}
/**
* gst_validate_pipeline_monitor_new:
* @pipeline: (transfer none): a #GstPipeline to run Validate on
* @runner: (transfer none): a #GstValidateRunner
* @parent: (nullable): The parent of the new monitor
*
* Returns: (transfer full): A #GstValidatePipelineMonitor
*/
GstValidatePipelineMonitor *
gst_validate_pipeline_monitor_new (GstPipeline * pipeline,
GstValidateRunner * runner, GstValidateMonitor * parent)
{
return gst_validate_pipeline_monitor_new_full (pipeline,
runner, parent, NULL, NULL, TRUE);
}

View File

@ -1005,11 +1005,14 @@ gst_validate_print_action (GstValidateAction * action, const gchar * message)
if (message == NULL) {
gint indent = (gst_validate_action_get_level (action) * 2);
PrintActionFieldData d = { NULL, indent, 0 };
GstValidateScenario *scenario = gst_validate_action_get_scenario (action);
d.str = string = g_string_new (NULL);
g_string_append_printf (string, "`%s` at %s:%d", action->type,
g_string_append_printf (string, "`%s` at %s:%d(%s)", action->type,
GST_VALIDATE_ACTION_FILENAME (action),
GST_VALIDATE_ACTION_LINENO (action));
GST_VALIDATE_ACTION_LINENO (action),
scenario ? GST_OBJECT_NAME (scenario) : "no scenario");
gst_object_unref (scenario);
if (GST_VALIDATE_ACTION_N_REPEATS (action))
g_string_append_printf (string, " [%s=%d/%d]",

View File

@ -379,6 +379,7 @@ struct _GstValidateActionPrivate
GMainContext *context;
GValue it_value;
GWeakRef sub_pipeline;
};
static JsonNode *
@ -497,6 +498,7 @@ _action_free (GstValidateAction * action)
if (action->priv->it_value.g_type != 0)
g_value_reset (&action->priv->it_value);
g_weak_ref_clear (&action->priv->scenario);
g_weak_ref_clear (&action->priv->sub_pipeline);
g_free (GST_VALIDATE_ACTION_FILENAME (action));
g_free (GST_VALIDATE_ACTION_DEBUG (action));
@ -589,6 +591,40 @@ _action_check_and_set_printed (GstValidateAction * action)
return TRUE;
}
/**
* gst_validate_action_get_sub_pipeline:
* @action: The action to retrieve sub pipeline from
*
* Returns: (transfer full): The sub pipeline of @action if the action is running a sub pipeline
*/
static GstPipeline *
gst_validate_action_get_sub_pipeline (GstValidateAction * action)
{
return g_weak_ref_get (&action->priv->sub_pipeline);
}
static GstElement *
gst_validate_scenario_get_sub_pipeline (GstValidateScenario * scenario,
const gchar * pipeline_name)
{
GstPipeline *pipeline = NULL;
SCENARIO_LOCK (scenario);
for (GList * tmp = scenario->priv->non_blocking_running_actions; tmp;
tmp = tmp->next) {
pipeline = gst_validate_action_get_sub_pipeline (tmp->data);
if (pipeline && !g_strcmp0 (GST_OBJECT_NAME (pipeline), pipeline_name)) {
break;
}
gst_clear_object (&pipeline);
}
SCENARIO_UNLOCK (scenario);
return (GstElement *) pipeline;
}
gint
gst_validate_action_get_level (GstValidateAction * action)
{
@ -1128,10 +1164,10 @@ gst_validate_scenario_execute_seek (GstValidateScenario * scenario,
GST_VALIDATE_REPORT_ACTION (scenario, action, EVENT_SEEK_NOT_HANDLED,
"Could not execute seek: '(position %" GST_TIME_FORMAT
"), %s (num %u, missing repeat: %i), seeking to: %" GST_TIME_FORMAT
" stop: %" GST_TIME_FORMAT " Rate %lf'",
" stop: %" GST_TIME_FORMAT " Rate %lf' on %" GST_PTR_FORMAT,
GST_TIME_ARGS (action->playback_time), action->name,
action->action_number, action->repeat, GST_TIME_ARGS (start),
GST_TIME_ARGS (stop), rate);
GST_TIME_ARGS (stop), rate, pipeline);
break;
default:
{
@ -2659,6 +2695,160 @@ _foreach_find_iterator (GQuark field_id, GValue * value,
}
static GstValidateExecuteActionReturn
gst_validate_action_get_execution_scenario (GstValidateAction * action,
GstValidateScenario ** target_scenario)
{
GstValidateExecuteActionReturn res = GST_VALIDATE_EXECUTE_ACTION_OK;
GstElement *sub_pipeline = NULL;
GstValidateScenario *scenario = gst_validate_action_get_scenario (action);
g_assert (scenario);
const gchar *sub_scenario_name =
gst_structure_get_string (action->structure, "pipeline-name");
g_assert (sub_scenario_name);
sub_pipeline =
gst_validate_scenario_get_sub_pipeline (scenario, sub_scenario_name);
REPORT_UNLESS (sub_pipeline, err, "Could not find sub-pipeline %s",
sub_scenario_name);
GstValidateBinMonitor *subbin_monitor = NULL;
subbin_monitor =
GST_VALIDATE_BIN_MONITOR (gst_validate_get_monitor (G_OBJECT
(sub_pipeline)));
REPORT_UNLESS (subbin_monitor->scenario, err,
"Sub pipeline doesn't have a scenario");
GST_DEBUG_OBJECT (scenario, "Action %s running on scenario %" GST_PTR_FORMAT,
action->type, subbin_monitor->scenario);
gst_object_unref (scenario);
*target_scenario = gst_object_ref (subbin_monitor->scenario);
done:
g_clear_object (&sub_pipeline);
return res;
err:
g_clear_object (&scenario);
g_clear_object (target_scenario);
res = GST_VALIDATE_EXECUTE_ACTION_ERROR_REPORTED;
goto done;
}
typedef struct
{
gulong subaction_done_sigid;
GstValidateAction *action;
GMutex sigid_lock;
} ValidateActionForeignScenarioData;
static void
validate_action_foreign_scenario_data_clear (ValidateActionForeignScenarioData *
data)
{
gst_validate_action_unref (data->action);
}
static void
validate_action_foreign_scenario_data_unref (ValidateActionForeignScenarioData *
data)
{
g_atomic_rc_box_release_full (data,
(GDestroyNotify) validate_action_foreign_scenario_data_clear);
}
static void
gst_validate_foreign_subaction_done_cb (GstValidateScenario *
execution_scenario, GstValidateAction * sub_action,
ValidateActionForeignScenarioData * data)
{
gst_validate_action_set_done (data->action);
g_mutex_lock (&data->sigid_lock);
if (data->subaction_done_sigid) {
g_signal_handler_disconnect (execution_scenario,
data->subaction_done_sigid);
data->subaction_done_sigid = 0;
}
g_mutex_unlock (&data->sigid_lock);
}
static GstValidateExecuteActionReturn
_execute_on_sub_scenario (GstValidateScenario * scenario,
GstValidateAction * action)
{
GstValidateExecuteActionReturn res;
GstValidateScenario *execution_scenario = NULL;
res =
gst_validate_action_get_execution_scenario (action, &execution_scenario);
if (res != GST_VALIDATE_EXECUTE_ACTION_OK) {
GST_ERROR ("Action %" GST_PTR_FORMAT " could not get execution scenario",
action->structure);
gst_object_unref (execution_scenario);
return res;
}
GstStructure *structure;
GstValidateActionType *action_type;
REPORT_UNLESS (gst_structure_get (action->structure, "action",
GST_TYPE_STRUCTURE, &structure, NULL), err,
"Could not get `action` structure");
action_type = _find_action_type (gst_structure_get_name (structure));
REPORT_UNLESS (action_type, err, "Action type %s no found",
gst_structure_get_name (structure));
GstValidateAction *subaction =
gst_validate_create_subaction (execution_scenario, NULL,
action, structure, 0, 0);
/* FIXME Cleanly inject action type instead of stepping on the other
* scenario's feet */
SCENARIO_LOCK (execution_scenario);
execution_scenario->priv->actions =
g_list_prepend (execution_scenario->priv->actions, subaction);
SCENARIO_UNLOCK (execution_scenario);
ValidateActionForeignScenarioData *data =
g_atomic_rc_box_new0 (ValidateActionForeignScenarioData);
g_mutex_lock (&data->sigid_lock);
data->action = gst_validate_action_ref (action);
data->subaction_done_sigid =
g_signal_connect_data (execution_scenario, "action-done",
G_CALLBACK (gst_validate_foreign_subaction_done_cb),
g_atomic_rc_box_acquire (data),
(GClosureNotify) validate_action_foreign_scenario_data_unref, 0);
gst_validate_print_action (action, NULL);
res = subaction->priv->state =
gst_validate_execute_action (action_type, subaction);
if (res != GST_VALIDATE_EXECUTE_ACTION_NON_BLOCKING
&& res != GST_VALIDATE_EXECUTE_ACTION_ASYNC) {
if (data->subaction_done_sigid) {
g_signal_handler_disconnect (execution_scenario,
data->subaction_done_sigid);
data->subaction_done_sigid = 0;
}
}
g_mutex_unlock (&data->sigid_lock);
validate_action_foreign_scenario_data_unref (data);
done:
return res;
err:
res = GST_VALIDATE_EXECUTE_ACTION_ERROR_REPORTED;
goto done;
}
/**
* gst_validate_execute_action:
* @action_type: The #GstValidateActionType to execute
@ -3245,6 +3435,52 @@ _execute_wait_for_message (GstValidateScenario * scenario,
return GST_VALIDATE_EXECUTE_ACTION_ASYNC;
}
static void
sub_pipeline_done_cb (GstBus * bus, GstMessage * message, gpointer data)
{
GstState state;
gst_message_parse_request_state (message, &state);
if (!GST_IS_VALIDATE_SCENARIO (GST_MESSAGE_SRC (message))
|| state != GST_STATE_NULL) {
return;
}
gst_validate_action_set_done (data);
}
static gboolean
_execute_wait_for_sub_pipeline (GstValidateScenario * scenario,
GstValidateAction * action)
{
const gchar *subpipeline_name =
gst_structure_get_string (action->structure, "subpipeline-done");
GstElement *pipeline = gst_validate_scenario_get_sub_pipeline (scenario,
subpipeline_name);
if (!pipeline) {
/* FIXME - ensure it actually ran at some point */
GST_INFO_OBJECT (scenario, "Could not find %s - considering done",
subpipeline_name);
return GST_VALIDATE_EXECUTE_ACTION_OK;
}
GstBus *bus = gst_element_get_bus (GST_ELEMENT (pipeline));
gst_bus_enable_sync_message_emission (bus);
g_signal_connect_data (bus, "sync-message::request-state",
(GCallback) sub_pipeline_done_cb, gst_validate_action_ref (action),
(GClosureNotify) gst_validate_action_unref, G_CONNECT_AFTER);
gst_clear_object (&bus);
return GST_VALIDATE_EXECUTE_ACTION_ASYNC;
}
static GstValidateExecuteActionReturn
_execute_wait (GstValidateScenario * scenario, GstValidateAction * action)
{
@ -3261,6 +3497,9 @@ _execute_wait (GstValidateScenario * scenario, GstValidateAction * action)
gst_test_clock_wait_for_next_pending_id (scenario->priv->clock, NULL);
return GST_VALIDATE_EXECUTE_ACTION_OK;
} else if (gst_structure_has_field_typed (action->structure,
"subpipeline-done", G_TYPE_STRING)) {
return _execute_wait_for_sub_pipeline (scenario, action);
} else {
return _execute_timed_wait (scenario, action);
}
@ -3951,6 +4190,7 @@ _execute_appsrc_push (GstValidateScenario * scenario,
GstSegment segment;
GstCaps *caps = NULL;
GstSample *sample;
GstElement *pipeline = NULL;
static guint64 counter = 0;
/* We will only wait for the the buffer to be pushed if we are in a state
@ -3961,6 +4201,31 @@ _execute_appsrc_push (GstValidateScenario * scenario,
target = _get_target_element (scenario, action);
REPORT_UNLESS (target, err, "No element found.");
const gchar *from_appsink =
gst_structure_get_string (action->structure, "from-appsink");
if (from_appsink) {
gchar **pipeline_elements = g_strsplit (from_appsink, "/", 2);
if (pipeline_elements[1]) {
pipeline =
gst_validate_scenario_get_sub_pipeline (scenario,
pipeline_elements[0]);
REPORT_UNLESS (pipeline, err, "Could not find subpipeline `%s`",
pipeline_elements[0]);
} else {
pipeline = gst_validate_scenario_get_pipeline (scenario);
}
GstElement *appsink =
gst_bin_get_by_name (GST_BIN (pipeline), pipeline_elements[1]);
REPORT_UNLESS (appsink, err, "Couldn't find appsink %s in %" GST_PTR_FORMAT,
pipeline_elements[1], pipeline);
g_signal_emit_by_name (appsink, "pull-sample", &sample, NULL);
goto push_sample;
}
structure_get_uint64_permissive (action->structure, "offset", &offset);
structure_get_uint64_permissive (action->structure, "size", &size);
@ -4056,17 +4321,6 @@ _execute_appsrc_push (GstValidateScenario * scenario,
}
}
/* We temporarily override the peer pad chain function to finish the action
* once the buffer chain actually ends. */
appsrc_pad = gst_element_get_static_pad (target, "src");
peer_pad = gst_pad_get_peer (appsrc_pad);
REPORT_UNLESS (peer_pad, err, "Action failed, pad not linked");
wrap_pad_chain_function (peer_pad, appsrc_push_chain_wrapper, action);
/* Keep the action alive until set done is called. */
gst_validate_action_ref (action);
sample = gst_sample_new (buffer, caps, NULL, NULL);
gst_clear_caps (&caps);
gst_buffer_unref (buffer);
@ -4105,6 +4359,17 @@ _execute_appsrc_push (GstValidateScenario * scenario,
gst_sample_set_segment (sample, &segment);
}
push_sample:
/* We temporarily override the peer pad chain function to finish the action
* once the buffer chain actually ends. */
appsrc_pad = gst_element_get_static_pad (target, "src");
peer_pad = gst_pad_get_peer (appsrc_pad);
REPORT_UNLESS (peer_pad, err, "Action failed, pad not linked");
wrap_pad_chain_function (peer_pad, appsrc_push_chain_wrapper, action);
/* Keep the action alive until set done is called. */
gst_validate_action_ref (action);
g_signal_emit_by_name (target, "push-sample", sample, &push_sample_ret);
gst_sample_unref (sample);
REPORT_UNLESS (push_sample_ret == GST_FLOW_OK, err,
@ -4127,6 +4392,7 @@ done:
g_clear_object (&f);
g_clear_object (&finfo);
g_clear_object (&stream);
g_clear_object (&pipeline);
return res;
@ -4697,8 +4963,8 @@ handle_bus_message (MessageData * d)
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_ASYNC_DONE:
if (!gst_validate_scenario_is_flush_seeking (scenario) &&
priv->needs_async_done) {
if (!gst_validate_scenario_is_flush_seeking (scenario)
&& priv->needs_async_done) {
priv->needs_async_done = FALSE;
if (priv->actions && _action_sets_state (priv->actions->data)
&& !priv->changing_state)
@ -4998,7 +5264,7 @@ _action_type_has_parameter (GstValidateActionType * atype,
static gboolean
gst_validate_scenario_load_structures (GstValidateScenario * scenario,
GList * structures, gboolean * is_config, gchar * origin_file)
GList * structures, gboolean * is_config, const gchar * origin_file)
{
gboolean ret = TRUE;
GList *tmp;
@ -5614,12 +5880,15 @@ _element_added_cb (GstBin * bin, GstElement * element,
static GstValidateScenario *
gst_validate_scenario_new (GstValidateRunner *
runner, GstElement * pipeline, gchar * scenario_name, GList * structures)
runner, GstElement * pipeline, const gchar * scenario_name,
GList * structures)
{
GList *config;
gchar *name = g_strdup_printf ("%s-scenario", GST_OBJECT_NAME (pipeline));
GstValidateScenario *scenario =
g_object_new (GST_TYPE_VALIDATE_SCENARIO, "validate-runner",
runner, NULL);
runner, "name", name, NULL);
g_free (name);
g_object_ref_sink (scenario);
@ -5719,7 +5988,7 @@ gst_validate_scenario_new (GstValidateRunner *
GstValidateScenario *
gst_validate_scenario_from_structs (GstValidateRunner * runner,
GstElement * pipeline, GList * structures, gchar * origin_file)
GstElement * pipeline, GList * structures, const gchar * origin_file)
{
g_return_val_if_fail (structures, NULL);
@ -5738,8 +6007,7 @@ GstValidateScenario *
gst_validate_scenario_factory_create (GstValidateRunner *
runner, GstElement * pipeline, const gchar * scenario_name)
{
return gst_validate_scenario_new (runner, pipeline, (gchar *) scenario_name,
NULL);
return gst_validate_scenario_new (runner, pipeline, scenario_name, NULL);
}
static gboolean
@ -6730,6 +6998,125 @@ gst_validate_action_set_done (GstValidateAction * action)
g_main_context_unref (context);
}
typedef struct
{
GstValidateMonitor *monitor;
GstValidateAction *action;
} SubPipelineData;
static void
sub_pipeline_data_free (gpointer data)
{
SubPipelineData *sub_data = (SubPipelineData *) data;
g_clear_object (&sub_data->monitor);
gst_validate_action_unref (sub_data->action);
}
static void
sub_pipeline_data_unref (gpointer data)
{
g_atomic_rc_box_release_full (data, (GDestroyNotify) sub_pipeline_data_free);
}
static void
subscenario_done_cb (GstBus * bus, GstMessage * message, gpointer data)
{
SubPipelineData *sub_data = (SubPipelineData *) data;
GstElement *pipeline =
GST_ELEMENT (gst_validate_monitor_get_target (sub_data->monitor));
g_assert (pipeline);
GstState state;
gst_message_parse_request_state (message, &state);
if (!GST_IS_VALIDATE_SCENARIO (GST_MESSAGE_SRC (message))
|| state != GST_STATE_NULL) {
return;
}
gst_element_set_state (pipeline, GST_STATE_NULL);
gst_validate_action_set_done (sub_data->action);
g_signal_handlers_disconnect_by_func (bus, subscenario_done_cb, data);
}
static GstValidateExecuteActionReturn
_create_sub_pipeline (GstValidateScenario * scenario, GstValidateAction * action)
{
const gchar *pipeline_desc = NULL, *scenario_name = NULL, *name = NULL;
GError *error = NULL;
GstPipeline *pipeline;
GList *scenario_structures = NULL;
GstValidateRunner *runner = NULL;
GstValidateExecuteActionReturn res =
GST_VALIDATE_EXECUTE_ACTION_ERROR_REPORTED;
REPORT_UNLESS ((pipeline_desc = gst_structure_get_string (action->structure,
"desc")), done,
"Couldn't find `pipeline` as string in %" GST_PTR_FORMAT,
action->structure);
REPORT_UNLESS ((pipeline =
GST_PIPELINE (gst_parse_launch (pipeline_desc, &error))), done,
"Couldn't create pipeline: %s", error->message);
if ((name = gst_structure_get_string (action->structure, "name"))) {
gst_object_set_name (GST_OBJECT (pipeline), name);
}
scenario_structures =
gst_validate_utils_get_structures (action, action->structure, "scenario");
if (!scenario_structures) {
scenario_name = gst_structure_get_string (action->structure, "scenario");
}
runner = gst_validate_reporter_get_runner (GST_VALIDATE_REPORTER (scenario));
SubPipelineData *data = g_atomic_rc_box_alloc0 (sizeof (SubPipelineData));
data->monitor =
GST_VALIDATE_MONITOR (gst_validate_pipeline_monitor_new_full (pipeline,
runner, NULL, scenario_name ? scenario_name : NULL,
scenario_structures, FALSE));
data->action = gst_validate_action_ref (action);
gboolean monitor_handles_state;
g_object_get (data->monitor, "handles-states", &monitor_handles_state, NULL);
if (monitor_handles_state == FALSE) {
if (gst_element_set_state (GST_ELEMENT (pipeline),
GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
GST_VALIDATE_REPORT_ACTION (scenario, action,
SCENARIO_ACTION_EXECUTION_ERROR, "Could not set pipeline to PLAYING");
sub_pipeline_data_unref (data);
goto done;
}
}
GstBus *bus = gst_element_get_bus (GST_ELEMENT (pipeline));
gst_bus_enable_sync_message_emission (bus);
g_signal_connect_data (bus, "sync-message::request-state",
(GCallback) subscenario_done_cb, data,
(GClosureNotify) sub_pipeline_data_unref, 0);
gst_clear_object (&bus);
g_weak_ref_set (&action->priv->sub_pipeline, pipeline);
res = GST_VALIDATE_EXECUTE_ACTION_NON_BLOCKING;
done:
g_clear_error (&error);
g_clear_object (&runner);
return res;
}
/**
* gst_validate_action_get_scenario:
* @action: The action for which to retrieve the scenario
@ -7400,6 +7787,13 @@ register_action_types (void)
.types = "boolean",
NULL
},
{
.name = "subpipeline-done",
.description = "Waits that the subpipeline with that name is done",
.mandatory = FALSE,
.types = "string",
NULL
},
{
.name = "check",
.description = "The check action to execute when non blocking signal is received",
@ -7771,6 +8165,14 @@ register_action_types (void)
"[postion=(GstClockTime)]"
"[duration=(GstClockTime)]"
},
{
.name = "from-appsink",
.description = "Pull sample from another appsink, "
"if appsink is in another pipeline, "
"use the `other-pipeline-name/target-element-name` synthax",
.mandatory = FALSE,
.types = "string"
},
{NULL}
}),
"Queues a sample in an appsrc. If the pipeline state allows flow of buffers, "
@ -7782,13 +8184,13 @@ register_action_types (void)
{
{
.name = "target-element-name",
.description = "The name of the appsrc to emit EOS on",
.description = "the name of the appsrc to emit eos on",
.mandatory = TRUE,
.types = "string"
},
{NULL}
}),
"Queues a EOS event in an appsrc.",
"queues a eos event in an appsrc.",
GST_VALIDATE_ACTION_TYPE_NONE);
REGISTER_ACTION_TYPE ("flush", _execute_flush,
@ -8082,6 +8484,58 @@ register_action_types (void)
GST_VALIDATE_ACTION_TYPE_NONE);
type->prepare = gst_validate_foreach_prepare;
REGISTER_ACTION_TYPE("run-on-sub-pipeline", _execute_on_sub_scenario,
((GstValidateActionParameter[]) {
{
.name = "pipeline-name",
.description = "The name of the sub scenario pipeline",
.mandatory = TRUE,
.types = "(string)",
NULL
},
{
.name = "action",
.description = "The action to execute on @pipeline-name",
.mandatory = FALSE,
.types = "[structures]",
NULL
},
{NULL}
}),
"Execute @action on a sub scenario/pipeline.\n",
GST_VALIDATE_ACTION_TYPE_NONE);
REGISTER_ACTION_TYPE("create-sub-pipeline", _create_sub_pipeline,
((GstValidateActionParameter[]) {
{
.name = "name",
.description = "The name of the new pipeline",
.mandatory = FALSE,
.types = "(string)",
NULL
},
{
.name = "desc",
.description = "Pipeline description as passed to gst_parse_launch()",
.mandatory = TRUE,
.types = "string",
NULL
},
{
.name = "scenario",
.description = "Array of action and metadatas to run on the new pipeline",
.mandatory = FALSE,
.types = "{array of [structures]}",
NULL
},
{NULL}
}),
"Start another pipeline potentially running a scenario on it. \n"
"When a scenario is specified, and while the sub pipeline is running\n"
" it will be possible to execute actions from the main scenario on that pipeline\n"
" using the `run-on-sub-pipeline` action type.",
GST_VALIDATE_ACTION_TYPE_NONE);
/* Internal actions types to test the validate scenario implementation */
REGISTER_ACTION_TYPE("priv_check-action-type-calls",
_execute_check_action_type_calls, NULL, NULL, 0);

View File

@ -0,0 +1,67 @@
set-globals,
appsrc_props="handle-segment-change=true \
automatic-eos=false \
min-latency=0 \
max-latency=-1 \
format=time \
is-live=true \
max-buffers=0 \
max-bytes=0 \
max-time=5000000000 \
leaky-type=downstream"
meta,
args = {
"appsrc name=src $(appsrc_props) ! videoconvert ! $(videosink) name=sink",
},
handles-states=true
# Start a new pipeline with an `appsink`
create-sub-pipeline,
name=subpipeline,
desc="videotestsrc ! timeoverlay ! appsink name=appsink sync=true max-buffers=10 drop=true",
scenario={
[meta, ignore-eos=true],
}
# Pull sample from the sub pipeline and push it into the main pipeline appsrc
# as soon as the pipeline will start
appsrc-push, target-element-name=src, from-appsink="subpipeline/appsink"
# And preroll the pipeline
play
# Seek on the sub pipeline
run-on-sub-pipeline,
pipeline-name=subpipeline,
action=[seek, flags=accurate+flush, start=5.0, stop=15.0]
# Then forward 10 buffers from sub pipeline to main pipeline
foreach,
i=[0, 10],
actions = {
[
appsrc-push, target-element-name=src, from-appsink="subpipeline/appsink",
],
}
# And seek the sub pipeline again
run-on-sub-pipeline,
pipeline-name=subpipeline,
action=[seek, flags=accurate+flush, start=1.0, stop=5.0]
# And forward 10 more buffers from sub pipeline to main pipeline
foreach,
i=[0, 10],
actions = {
[
appsrc-push, target-element-name=src, from-appsink="subpipeline/appsink",
],
}
# Stop the sub pipeline
run-on-sub-pipeline,
pipeline-name=subpipeline,
action=[stop]
# And finish everything by marking main pipeline as EOS
appsrc-eos, target-element-name=src