Original commit message from CVS: * testsuite/test_ghostpad.py: Add while loop in teardown to wait for the pipeline state to hit NULL. Hopefully this will ensure the refcount has always hit 1.
186 lines
6.3 KiB
Python
186 lines
6.3 KiB
Python
# -*- Mode: Python -*-
|
|
# vi:si:et:sw=4:sts=4:ts=4
|
|
#
|
|
# gst-python - Python bindings for GStreamer
|
|
# Copyright (C) 2002 David I. Lehn
|
|
# Copyright (C) 2004 Johan Dahlin
|
|
# Copyright (C) 2005 Edward Hervey
|
|
#
|
|
# This library is free software; you can redistribute it and/or
|
|
# modify it under the terms of the GNU Lesser General Public
|
|
# License as published by the Free Software Foundation; either
|
|
# version 2.1 of the License, or (at your option) any later version.
|
|
#
|
|
# This library is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
# Lesser General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Lesser General Public
|
|
# License along with this library; if not, write to the Free Software
|
|
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
|
|
from common import gst, unittest, TestCase
|
|
|
|
import sys
|
|
import gc
|
|
import gobject
|
|
|
|
class SrcBin(gst.Bin):
|
|
def prepare(self):
|
|
src = gst.element_factory_make('fakesrc')
|
|
self.add(src)
|
|
pad = src.get_pad("src")
|
|
ghostpad = gst.GhostPad("src", pad)
|
|
self.add_pad(ghostpad)
|
|
gobject.type_register(SrcBin)
|
|
|
|
class SinkBin(gst.Bin):
|
|
def prepare(self):
|
|
sink = gst.element_factory_make('fakesink')
|
|
self.add(sink)
|
|
pad = sink.get_pad("sink")
|
|
ghostpad = gst.GhostPad("sink", pad)
|
|
self.add_pad(ghostpad)
|
|
self.sink = sink
|
|
|
|
def connect_handoff(self, cb, *args, **kwargs):
|
|
self.sink.set_property('signal-handoffs', True)
|
|
self.sink.connect('handoff', cb, *args, **kwargs)
|
|
|
|
gobject.type_register(SinkBin)
|
|
|
|
|
|
class PipeTest(TestCase):
|
|
def setUp(self):
|
|
gst.info("setUp")
|
|
TestCase.setUp(self)
|
|
self.pipeline = gst.Pipeline()
|
|
self.assertEquals(self.pipeline.__gstrefcount__, 1)
|
|
self.assertEquals(sys.getrefcount(self.pipeline), 3)
|
|
|
|
self.src = SrcBin()
|
|
self.src.prepare()
|
|
self.sink = SinkBin()
|
|
self.sink.prepare()
|
|
self.assertEquals(self.src.__gstrefcount__, 1)
|
|
self.assertEquals(sys.getrefcount(self.src), 3)
|
|
self.assertEquals(self.sink.__gstrefcount__, 1)
|
|
self.assertEquals(sys.getrefcount(self.sink), 3)
|
|
gst.info("end of SetUp")
|
|
|
|
def tearDown(self):
|
|
gst.info("tearDown")
|
|
while True:
|
|
(ret, cur, pen) = self.pipeline.get_state()
|
|
if ret == gst.STATE_CHANGE_SUCCESS and cur == gst.STATE_NULL:
|
|
break
|
|
self.assertEquals(self.pipeline.__gstrefcount__, 1)
|
|
self.assertEquals(sys.getrefcount(self.pipeline), 3)
|
|
self.assertEquals(self.src.__gstrefcount__, 2)
|
|
self.assertEquals(sys.getrefcount(self.src), 3)
|
|
self.assertEquals(self.sink.__gstrefcount__, 2)
|
|
self.assertEquals(sys.getrefcount(self.sink), 3)
|
|
gst.debug('deleting pipeline')
|
|
del self.pipeline
|
|
self.gccollect()
|
|
|
|
self.assertEquals(self.src.__gstrefcount__, 1) # parent gone
|
|
self.assertEquals(self.sink.__gstrefcount__, 1) # parent gone
|
|
self.assertEquals(sys.getrefcount(self.src), 3)
|
|
self.assertEquals(sys.getrefcount(self.sink), 3)
|
|
gst.debug('deleting src')
|
|
del self.src
|
|
self.gccollect()
|
|
gst.debug('deleting sink')
|
|
del self.sink
|
|
self.gccollect()
|
|
|
|
TestCase.tearDown(self)
|
|
|
|
def testBinState(self):
|
|
self.pipeline.add(self.src, self.sink)
|
|
self.src.link(self.sink)
|
|
self.sink.connect_handoff(self._sink_handoff_cb)
|
|
self._handoffs = 0
|
|
|
|
self.pipeline.set_state(gst.STATE_PLAYING)
|
|
while True:
|
|
(ret, cur, pen) = self.pipeline.get_state()
|
|
if ret == gst.STATE_CHANGE_SUCCESS and cur == gst.STATE_PLAYING:
|
|
break
|
|
|
|
while self._handoffs < 10:
|
|
pass
|
|
|
|
self.pipeline.set_state(gst.STATE_NULL)
|
|
while True:
|
|
(ret, cur, pen) = self.pipeline.get_state()
|
|
if ret == gst.STATE_CHANGE_SUCCESS and cur == gst.STATE_NULL:
|
|
break
|
|
|
|
## def testProbedLink(self):
|
|
## self.pipeline.add(self.src)
|
|
## pad = self.src.get_pad("src")
|
|
|
|
## self.sink.connect_handoff(self._sink_handoff_cb)
|
|
## self._handoffs = 0
|
|
|
|
## # FIXME: adding a probe to the ghost pad does not work atm
|
|
## # id = pad.add_buffer_probe(self._src_buffer_probe_cb)
|
|
## realpad = pad.get_target()
|
|
## self._probe_id = realpad.add_buffer_probe(self._src_buffer_probe_cb)
|
|
|
|
## self._probed = False
|
|
|
|
## while True:
|
|
## (ret, cur, pen) = self.pipeline.get_state()
|
|
## if ret == gst.STATE_CHANGE_SUCCESS and cur == gst.STATE_PLAYING:
|
|
## break
|
|
|
|
## while not self._probed:
|
|
## pass
|
|
|
|
## while self._handoffs < 10:
|
|
## pass
|
|
|
|
## self.pipeline.set_state(gst.STATE_NULL)
|
|
## while True:
|
|
## (ret, cur, pen) = self.pipeline.get_state()
|
|
## if ret == gst.STATE_CHANGE_SUCCESS and cur == gst.STATE_NULL:
|
|
## break
|
|
|
|
def _src_buffer_probe_cb(self, pad, buffer):
|
|
gst.debug("received probe on pad %r" % pad)
|
|
self._probed = True
|
|
gst.debug('adding sink bin')
|
|
self.pipeline.add(self.sink)
|
|
# this seems to get rid of the warnings about pushing on an unactivated
|
|
# pad
|
|
gst.debug('setting sink state')
|
|
|
|
# FIXME: attempt one: sync to current pending state of bin
|
|
(res, cur, pen) = self.pipeline.get_state(timeout=0)
|
|
target = pen
|
|
if target == gst.STATE_VOID_PENDING:
|
|
target = cur
|
|
gst.debug("setting sink state to %r" % target)
|
|
# FIXME: the following print can cause a lock-up; why ?
|
|
# print target
|
|
# if we don't set async, it will possibly end up in PAUSED
|
|
self.sink.set_state(target)
|
|
|
|
gst.debug('linking')
|
|
self.src.link(self.sink)
|
|
gst.debug('removing buffer probe id %r' % self._probe_id)
|
|
pad.remove_buffer_probe(self._probe_id)
|
|
self._probe_id = None
|
|
gst.debug('done')
|
|
|
|
def _sink_handoff_cb(self, sink, buffer, pad):
|
|
gst.debug('received handoff on pad %r' % pad)
|
|
self._handoffs += 1
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|