dots-viewer: Add "Dump Pipelines" button

Add a button in the web interface to trigger pipeline dumps via websocket,
replacing the need to manually send SIGUSR1 to the process. Also set up
the pipeline-snapshot tracer with the proper websocket URL by default.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/7999>
This commit is contained in:
Thibault Saunier 2024-11-29 16:49:47 -03:00 committed by GStreamer Marge Bot
parent 0bfc9a8350
commit d95417621d
7 changed files with 162 additions and 48 deletions

View File

@ -41,11 +41,7 @@ so pipelines that are 'manually' dumped by the application are also dumped.
## Demo
Demo of the `gstdump`, gst-dots-viewer used in combination with the [tracer-pipeline-snapshot](tracer-pipeline-snapshot)
### Video:
[![](static/images/gst-dots-viewer-video.jpeg){width=70%}](https://youtu.be/-cHME_eNKbc "GStreamer dot files viewer")
How to use `gstdump`, gst-dots-viewer used in combination with the [tracer-pipeline-snapshot](tracer-pipeline-snapshot)
### Start gst-dots
@ -62,6 +58,9 @@ $ gst-dots-viewer
# This runs the pipeline with `gstdump` which sets up:
#
# - the `pipeline-snapshot` tracer with the following parameters:
# - `dots-viewer-ws-url=ws://127.0.0.1:3000/snapshot/`: Sets the URL
# of the default websocket server used by `gst-dots-viewer` so that the
# pipelines can be dumped from the web interface.
# - xdg-cache=true: Use the default 'cache' directory to store `.dot` files,
# the same as what `gst-dots-viewer` uses by default
# - folder-mode=numbered: Use folders to store the `.dot` files, with
@ -72,11 +71,7 @@ $ gst-dots-viewer
gstdump gst-launch-1.0 videotestsrc ! webrtcsink run-signalling-server=true0
```
### Dump pipelines manually thanks to the `pipeline-snapshot` tracer
### Dump pipelines from the web interface
``` sh
kill -SIGUSR1 $(pgrep gst-launch-1.0)
```
Each time the pipeline is dumped, the `gst-dots-viewer` server will refresh
the page to display the new pipelines.
You can clock the "Dump Pipelines" button in the `gst-dots-viewer` web interface
to force

View File

@ -50,17 +50,18 @@ fn main() {
// Set the environment variable to use the determined directory
env::set_var("GST_DEBUG_DUMP_DOT_DIR", &gstdot_path);
let default_pipeline_snapshot = "pipeline-snapshot(dots-viewer-ws-url=ws://127.0.0.1:3000/snapshot/,xdg-cache=true,folder-mode=numbered)";
env::set_var(
"GST_TRACERS",
env::var("GST_TRACERS").map_or_else(
|_| "pipeline-snapshot(xdg-cache=true,folder-mode=numbered)".to_string(),
|_| default_pipeline_snapshot.to_string(),
|tracers| {
if !tracers.contains("pipeline-snapshot") {
println!("pipeline-snapshot already enabled");
tracers
} else {
format!("{tracers},pipeline-snapshot(xdg-cache=true,folder-mode=numbered)")
format!("{tracers},{default_pipeline_snapshot}")
}
},
),

View File

@ -1,3 +1,4 @@
mod protocol;
use actix::Addr;
use actix::AsyncContext;
use actix::Message;
@ -63,7 +64,8 @@ struct Args {
struct GstDots {
gstdot_path: std::path::PathBuf,
clients: Arc<Mutex<Vec<Addr<WebSocket>>>>,
viewer_clients: Arc<Mutex<Vec<Addr<Client>>>>,
snapshot_clients: Arc<Mutex<Vec<Addr<Client>>>>,
dot_watcher: Mutex<Option<notify::RecommendedWatcher>>,
args: Args,
id: String,
@ -76,7 +78,8 @@ impl std::fmt::Debug for GstDots {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GstDots")
.field("gstdot_path", &self.gstdot_path)
.field("clients", &self.clients)
.field("viewer_clients", &self.viewer_clients)
.field("snapshot_clients", &self.snapshot_clients)
.field("dot_watcher", &self.dot_watcher)
.field("args", &self.args)
.field("id", &self.id)
@ -109,7 +112,8 @@ impl GstDots {
id,
http_address: format!("http://{}:{}", args.address, args.port),
args,
clients: Arc::new(Mutex::new(Vec::new())),
viewer_clients: Arc::new(Mutex::new(Vec::new())),
snapshot_clients: Arc::new(Mutex::new(Vec::new())),
dot_watcher: Default::default(),
exit_on_socket_close,
instance,
@ -164,8 +168,8 @@ impl GstDots {
}
}
fn list_dots(&self, client: Addr<WebSocket>) {
event!(Level::DEBUG, "Listing dot files in {:?}", self.gstdot_path);
fn list_dots(&self, client: Addr<Client>) {
debug!("Listing dot files in {:?}", self.gstdot_path);
let mut entries: Vec<(PathBuf, SystemTime)> = Vec::new();
let start_path = PathBuf::from(&self.gstdot_path);
@ -177,12 +181,12 @@ impl GstDots {
let content = match std::fs::read_to_string(&dot_path) {
Ok(c) => c,
Err(e) => {
event!(Level::ERROR, "===>Error reading file: {dot_path:?}: {e:?}");
error!("===>Error reading file: {dot_path:?}: {e:?}");
continue;
}
};
if content.is_empty() {
event!(Level::ERROR, "===>Empty file: {:?}", dot_path);
error!("===>Empty file: {:?}", dot_path);
continue;
}
@ -200,6 +204,18 @@ impl GstDots {
}
}
fn send(&self, msg: String, client_type: ClientType) {
let clients = if matches!(client_type, ClientType::Snapshot) {
self.snapshot_clients.lock().unwrap()
} else {
self.viewer_clients.lock().unwrap()
};
let clients = clients.clone();
for client in clients.iter() {
client.do_send(TextMessage(msg.clone()));
}
}
fn watch_dot_files(self: &Arc<Self>) {
let app_clone = self.clone();
let mut dot_watcher =
@ -215,14 +231,12 @@ impl GstDots {
debug!("File created: {:?}", path);
if path.extension().map(|e| e == "dot").unwrap_or(false) {
let path = path.to_path_buf();
let clients = app_clone.clients.lock().unwrap();
let clients = clients.clone();
let name = app_clone.relative_dot_path(&path);
for client in clients.iter() {
let name = app_clone.relative_dot_path(&path);
event!(Level::DEBUG, "Sending {name} to client: {client:?}");
match std::fs::read_to_string(&path) {
Ok(content) => client.do_send(TextMessage(
debug!("Sending {name}");
match std::fs::read_to_string(&path) {
Ok(content) => {
app_clone.send(
json!({
"type": "NewDot",
"name": name,
@ -230,10 +244,13 @@ impl GstDots {
"creation_time": app_clone.modify_time(&event.paths[0]),
})
.to_string(),
)),
Err(err) => error!("Could not read file {path:?}: {err:?}"),
}
ClientType::Viewer
);
},
Err(err) => error!("Could not read file {path:?}: {err:?}"),
}
}
}
}
@ -243,7 +260,7 @@ impl GstDots {
debug!("File removed: {:?}", path);
if path.extension().map(|e| e == "dot").unwrap_or(false) {
let path = path.to_path_buf();
let clients = app_clone.clients.lock().unwrap();
let clients = app_clone.viewer_clients.lock().unwrap();
let clients = clients.clone();
for client in clients.iter() {
@ -264,7 +281,7 @@ impl GstDots {
}
}
}
Err(err) => event!(Level::ERROR, "watch error: {:?}", err),
Err(err) => error!("watch error: {:?}", err),
}
})
.expect("Could not create dot_watcher");
@ -277,22 +294,29 @@ impl GstDots {
}
#[instrument(level = "trace")]
fn add_client(&self, client: Addr<WebSocket>) {
let mut clients = self.clients.lock().unwrap();
fn add_client(&self, client: Addr<Client>, client_type: ClientType) {
let mut clients = if matches!(client_type, ClientType::Snapshot) {
self.snapshot_clients.lock().unwrap()
} else {
self.viewer_clients.lock().unwrap()
};
info!("Client added: {:?}", client);
info!("{client_type:?} Client added: {:?}", client);
clients.push(client.clone());
drop(clients);
self.list_dots(client);
if matches!(client_type, ClientType::Viewer) {
self.list_dots(client);
}
}
#[instrument(level = "trace")]
fn remove_client(&self, addr: &Addr<WebSocket>) {
fn remove_client(&self, addr: &Addr<Client>) {
info!("Client removed: {:?}", addr);
let mut clients = self.clients.lock().unwrap();
let mut clients = self.snapshot_clients.lock().unwrap();
clients.retain(|a| a != addr);
let mut clients = self.viewer_clients.lock().unwrap();
clients.retain(|a| a != addr);
if self.exit_on_socket_close && clients.is_empty() {
info!("No more clients, exiting");
std::process::exit(0);
@ -383,6 +407,8 @@ impl GstDots {
App::new()
.app_data(app_data.clone())
.route("/ws/", web::get().to(ws_index))
.route("/snapshot/", web::get().to(snapshot))
.service(ResourceFiles::new("/", generated))
})
.bind(&address)
.context("Couldn't bind adresss")?
@ -392,20 +418,27 @@ impl GstDots {
}
}
#[derive(Debug, Clone)]
enum ClientType {
Viewer,
Snapshot,
}
#[derive(Debug)]
struct WebSocket {
struct Client {
app: Arc<GstDots>,
type_: ClientType,
}
#[derive(Message)]
#[rtype(result = "()")] // Indicates that no response is expected
pub struct TextMessage(pub String);
impl Actor for WebSocket {
impl Actor for Client {
type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
self.app.add_client(ctx.address());
self.app.add_client(ctx.address(), self.type_.clone());
}
fn stopping(&mut self, ctx: &mut Self::Context) -> actix::Running {
@ -414,7 +447,7 @@ impl Actor for WebSocket {
}
}
impl Handler<TextMessage> for WebSocket {
impl Handler<TextMessage> for Client {
type Result = ();
fn handle(&mut self, msg: TextMessage, ctx: &mut Self::Context) {
@ -423,14 +456,52 @@ impl Handler<TextMessage> for WebSocket {
}
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WebSocket {
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Client {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, _ctx: &mut Self::Context) {
if let Ok(ws::Message::Text(text)) = msg {
debug!("Message received: {:?}", text);
let msg: protocol::IncomingMessage = match serde_json::from_str(&text) {
Ok(s) => s,
Err(e) => {
error!("Failed to parse message `{text}`: {}", e);
return;
}
};
match msg.type_ {
protocol::IncomingMessageType::Hello => {
debug!("Got Hello message");
}
protocol::IncomingMessageType::Snapshot => {
self.app.send(
json!({
"type": "Snapshot"
})
.to_string(),
ClientType::Snapshot,
);
}
}
}
}
}
async fn snapshot(
req: HttpRequest,
stream: web::Payload,
data: web::Data<Arc<GstDots>>,
) -> Result<HttpResponse, Error> {
let app = data.get_ref().clone();
ws::start(
Client {
app,
type_: ClientType::Snapshot,
},
&req,
stream,
)
}
async fn ws_index(
req: HttpRequest,
stream: web::Payload,
@ -438,7 +509,14 @@ async fn ws_index(
) -> Result<HttpResponse, Error> {
let app = data.get_ref().clone();
ws::start(WebSocket { app }, &req, stream)
ws::start(
Client {
app,
type_: ClientType::Viewer,
},
&req,
stream,
)
}
#[actix_web::main]

View File

@ -0,0 +1,13 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
pub enum IncomingMessageType {
Hello,
Snapshot,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct IncomingMessage {
#[serde(rename = "type")]
pub type_: IncomingMessageType,
}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 419 KiB

After

Width:  |  Height:  |  Size: 432 KiB

View File

@ -109,6 +109,25 @@
border-color: #0069ff;
}
button {
background-color: #2a2a2a;
color: #e0e0e0;
border: 1px solid #404040;
padding: 8px 16px;
border-radius: 4px;
margin: 10px 0;
cursor: pointer;
}
button:hover {
background-color: #3a3a3a;
border-color: #505050;
}
button:active {
background-color: #404040;
}
@media screen and (max-height: 250000px) {
.overlay a {font-size: 20px}
.overlay .closebtn {
@ -167,7 +186,7 @@
<script src="/js/gstdots.js" type="module"> </script>
<script type="module">
import {updateFromUrl, connectWs, connectSearch, removePipelineOverlay} from '/js/gstdots.js';
import {updateFromUrl, connectWs, connectSearch, removePipelineOverlay, dumpPipelines } from '/js/gstdots.js';
window.addEventListener('popstate', function(event) {
updateFromUrl(true);
@ -177,6 +196,8 @@
connectWs();
connectSearch();
updateFromUrl(true);
document.getElementById('dump-button').addEventListener('click', dumpPipelines);
});
document.addEventListener('keyup', function(e) {
@ -205,6 +226,7 @@
<div>
<input type="text" id="search" placeholder="Search for pipeline">
<button id="dump-button">Dump Pipelines</button>
</div>
<div id="pipelines"></div>

View File

@ -336,4 +336,9 @@ export function removePipelineOverlay(noHistoryUpdate) {
updateSearch();
}
export function dumpPipelines() {
if (ws) {
ws.send(JSON.stringify({ type: "Snapshot" }));
}
}