1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 from timeside.core import Processor, implements, interfacedoc
23 from timeside.encoder.core import GstEncoder
24 from timeside.api import IEncoder
25 from timeside.tools import *
29 """ gstreamer-based webm encoder and muxer """
30 implements(IEncoder)
31
32 - def __init__(self, output, streaming=False, overwrite=False, video=False):
35
36 @interfacedoc
37 - def setup(self, channels=None, samplerate=None, blocksize=None,
38 totalframes=None):
39 super(WebMEncoder, self).setup(channels, samplerate, blocksize,
40 totalframes)
41 from numpy import ceil
42 framerate = 30
43 num_buffers = ceil(self.mediainfo()['duration'] *
44 framerate).astype(int)
45 self.pipe = ''
46 if self.video:
47 self.pipe += '''videotestsrc pattern=black num_buffers=%d ! ffmpegcolorspace ! queue ! vp8enc speed=2 threads=4 quality=9.0 ! queue ! mux.
48 ''' % num_buffers
49 self.pipe += '''
50 appsrc name=src ! queue ! audioconvert ! vorbisenc quality=0.9 ! queue ! mux.
51 webmmux streamable=true name=mux
52 '''
53 if self.filename and self.streaming:
54 self.pipe += ''' ! tee name=t
55 ! queue ! filesink location=%s
56 t. ! queue ! appsink name=app sync=False
57 ''' % self.filename
58
59 elif self.filename:
60 self.pipe += '! filesink location=%s async=False sync=False ' % self.filename
61 else:
62 self.pipe += '! queue ! appsink name=app sync=False '
63
64 self.start_pipeline(channels, samplerate)
65
66 @staticmethod
67 @interfacedoc
70
71 @staticmethod
72 @interfacedoc
74 return "WebM GStreamer based encoder and muxer"
75
76 @staticmethod
77 @interfacedoc
80
81 @staticmethod
82 @interfacedoc
85
86 @staticmethod
87 @interfacedoc
90
91 @interfacedoc
94
95 if __name__ == "__main__":
96
97 from tests import test_encoding, test_transcoding
98 from tests.unit_timeside import run_test_module
99 run_test_module([test_encoding, test_transcoding], test_prefix='testWebM')
100