1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 from timeside.core import Processor, implements, interfacedoc
23 from timeside.component import implements, abstract
24 from timeside.api import IEncoder
25 from timeside.tools import *
26
27 from gst import _gst as gst
31 implements(IEncoder)
32 abstract()
33
34 - def __init__(self, output, streaming=False, overwrite=False):
35
36 super(GstEncoder, self).__init__()
37
38 if isinstance(output, basestring):
39 import os.path
40 if os.path.isdir(output):
41 raise IOError("Encoder output must be a file, not a directory")
42 elif os.path.isfile(output) and not overwrite:
43 raise IOError("Encoder output %s exists, but overwrite set to False")
44 self.filename = output
45 else:
46 self.filename = None
47 self.streaming = streaming
48
49 if not self.filename and not self.streaming:
50 raise Exception('Must give an output')
51
52 import threading
53 self.end_cond = threading.Condition(threading.Lock())
54
55 self.eod = False
56 self.metadata = None
57 self.num_samples = 0
58
59 @interfacedoc
61 if hasattr(self, 'eod') and hasattr(self, 'mainloopthread'):
62 self.end_cond.acquire()
63 while not hasattr(self, 'end_reached'):
64 self.end_cond.wait()
65 self.end_cond.release()
66 if hasattr(self, 'error_msg'):
67 raise IOError(self.error_msg)
68
71
73 self.pipeline = gst.parse_launch(self.pipe)
74
75 self.src = self.pipeline.get_by_name('src')
76
77 self.app = self.pipeline.get_by_name('app')
78
79 srccaps = gst.Caps("""audio/x-raw-float,
80 endianness=(int)1234,
81 channels=(int)%s,
82 width=(int)32,
83 rate=(int)%d""" % (int(channels), int(samplerate)))
84 self.src.set_property("caps", srccaps)
85 self.src.set_property('emit-signals', True)
86 self.src.set_property('num-buffers', -1)
87 self.src.set_property('block', True)
88 self.src.set_property('do-timestamp', True)
89
90 self.bus = self.pipeline.get_bus()
91 self.bus.add_signal_watch()
92 self.bus.connect("message", self._on_message_cb)
93
94 import threading
95 class MainloopThread(threading.Thread):
96 def __init__(self, mainloop):
97 threading.Thread.__init__(self)
98 self.mainloop = mainloop
99
100 def run(self):
101 self.mainloop.run()
102 self.mainloop = gobject.MainLoop()
103 self.mainloopthread = MainloopThread(self.mainloop)
104 self.mainloopthread.start()
105
106
107 self.pipeline.set_state(gst.STATE_PLAYING)
108
110 t = message.type
111 if t == gst.MESSAGE_EOS:
112 self.end_cond.acquire()
113 self.pipeline.set_state(gst.STATE_NULL)
114 self.mainloop.quit()
115 self.end_reached = True
116 self.end_cond.notify()
117 self.end_cond.release()
118
119 elif t == gst.MESSAGE_ERROR:
120 self.end_cond.acquire()
121 self.pipeline.set_state(gst.STATE_NULL)
122 self.mainloop.quit()
123 self.end_reached = True
124 err, debug = message.parse_error()
125 self.error_msg = "Error: %s" % err, debug
126 self.end_cond.notify()
127 self.end_cond.release()
128
129 @interfacedoc
130 - def process(self, frames, eod=False):
131 self.eod = eod
132 if eod:
133 self.num_samples += frames.shape[0]
134 else:
135 self.num_samples += self.blocksize()
136 buf = numpy_array_to_gst_buffer(frames, frames.shape[0],self.num_samples, self.samplerate())
137 self.src.emit('push-buffer', buf)
138 if self.eod:
139 self.src.emit('end-of-stream')
140 if self.streaming:
141 self.chunk = self.app.emit('pull-buffer')
142 return frames, eod
143
144
145 if __name__ == "__main__":
146
147 from tests import test_encoding, test_transcoding
148 from tests.unit_timeside import run_test_module
149 run_test_module([test_encoding, test_transcoding])
150