1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 from __future__ import division
30
31 from timeside.core import Processor, implements, interfacedoc
32 from timeside.api import IDecoder
33 from timeside.tools import *
34
35 from utils import get_uri, get_media_uri_info
36
37 import Queue
38 from gst import _gst as gst
39 import numpy as np
40
41
42 GST_APPSINK_MAX_BUFFERS = 10
43 QUEUE_SIZE = 10
47 """ gstreamer-based decoder """
48 implements(IDecoder)
49
50 mimetype = ''
51 output_blocksize = 8*1024
52 output_samplerate = None
53 output_channels = None
54
55 pipeline = None
56 mainloopthread = None
57
58
59
60 @staticmethod
61 @interfacedoc
64
65 - def __init__(self, uri, start=0, duration=None):
66
67 """
68 Construct a new FileDecoder
69
70 Parameters
71 ----------
72 uri : str
73 uri of the media
74 start : float
75 start time of the segment in seconds
76 duration : float
77 duration of the segment in seconds
78 """
79
80 super(FileDecoder, self).__init__()
81
82 self.uri = get_uri(uri)
83
84 self.uri_start = float(start)
85 if duration:
86 self.uri_duration = float(duration)
87 else:
88 self.uri_duration = duration
89
90 if start==0 and duration is None:
91 self.is_segment = False
92 else:
93 self.is_segment = True
94
96
97 uri_total_duration = get_media_uri_info(self.uri)['duration']
98 self.uri_duration = uri_total_duration - self.uri_start
99
100 - def setup(self, channels=None, samplerate=None, blocksize=None):
101
102 if self.uri_duration is None:
103 self.set_uri_default_duration()
104
105
106 import threading
107 self.discovered_cond = threading.Condition(threading.Lock())
108 self.discovered = False
109
110
111 if blocksize:
112 self.output_blocksize = blocksize
113 if samplerate:
114 self.output_samplerate = int(samplerate)
115 if channels:
116 self.output_channels = int(channels)
117
118 if self.is_segment:
119
120 self.pipe = ''' gnlurisource uri={uri}
121 start=0
122 duration={uri_duration}
123 media-start={uri_start}
124 media-duration={uri_duration}
125 ! audioconvert name=audioconvert
126 ! audioresample
127 ! appsink name=sink sync=False async=True
128 '''.format(uri = self.uri,
129 uri_start = np.uint64(round(self.uri_start * gst.SECOND)),
130 uri_duration = np.int64(round(self.uri_duration * gst.SECOND)))
131
132 else:
133
134 self.pipe = ''' uridecodebin name=uridecodebin uri={uri}
135 ! audioconvert name=audioconvert
136 ! audioresample
137 ! appsink name=sink sync=False async=True
138 '''.format(uri = self.uri)
139
140 self.pipeline = gst.parse_launch(self.pipe)
141
142 if self.output_channels:
143 caps_channels = int(self.output_channels)
144 else:
145 caps_channels = "[ 1, 2 ]"
146 if self.output_samplerate:
147 caps_samplerate = int(self.output_samplerate)
148 else:
149 caps_samplerate = "{ 8000, 11025, 12000, 16000, 22050, 24000, 32000, 44100, 48000 }"
150 sink_caps = gst.Caps("""audio/x-raw-float,
151 endianness=(int)1234,
152 channels=(int)%s,
153 width=(int)32,
154 rate=(int)%s""" % (caps_channels, caps_samplerate))
155
156 self.conv = self.pipeline.get_by_name('audioconvert')
157 self.conv.get_pad("sink").connect("notify::caps", self._notify_caps_cb)
158
159 self.sink = self.pipeline.get_by_name('sink')
160 self.sink.set_property("caps", sink_caps)
161 self.sink.set_property('max-buffers', GST_APPSINK_MAX_BUFFERS)
162 self.sink.set_property("drop", False)
163 self.sink.set_property('emit-signals', True)
164 self.sink.connect("new-buffer", self._on_new_buffer_cb)
165
166 self.bus = self.pipeline.get_bus()
167 self.bus.add_signal_watch()
168 self.bus.connect('message', self._on_message_cb)
169
170 self.queue = Queue.Queue(QUEUE_SIZE)
171
172 import threading
173
174 class MainloopThread(threading.Thread):
175 def __init__(self, mainloop):
176 threading.Thread.__init__(self)
177 self.mainloop = mainloop
178
179 def run(self):
180 self.mainloop.run()
181 self.mainloop = gobject.MainLoop()
182 self.mainloopthread = MainloopThread(self.mainloop)
183 self.mainloopthread.start()
184
185
186
187 self.eod = False
188
189 self.last_buffer = None
190
191
192 self.pipeline.set_state(gst.STATE_PLAYING)
193
194 self.discovered_cond.acquire()
195 while not self.discovered:
196
197 self.discovered_cond.wait()
198 self.discovered_cond.release()
199
200 if not hasattr(self, 'input_samplerate'):
201 if hasattr(self, 'error_msg'):
202 raise IOError(self.error_msg)
203 else:
204 raise IOError('no known audio stream found')
205
207 self.discovered_cond.acquire()
208
209 caps = pad.get_negotiated_caps()
210 if not caps:
211 pad.info("no negotiated caps available")
212 self.discovered = True
213 self.discovered_cond.notify()
214 self.discovered_cond.release()
215 return
216
217
218 q = gst.query_new_duration(gst.FORMAT_TIME)
219 pad.info("sending duration query")
220 if pad.get_peer().query(q):
221 format, length = q.parse_duration()
222 if format == gst.FORMAT_TIME:
223 pad.info("got duration (time) : %s" % (gst.TIME_ARGS(length),))
224 else:
225 pad.info("got duration : %d [format:%d]" % (length, format))
226 else:
227 length = -1
228 gst.warning("duration query failed")
229
230
231 if "audio" in caps.to_string():
232 self.input_samplerate = caps[0]["rate"]
233 if not self.output_samplerate:
234 self.output_samplerate = self.input_samplerate
235 self.input_channels = caps[0]["channels"]
236 if not self.output_channels:
237 self.output_channels = self.input_channels
238 self.input_duration = length / 1.e9
239
240 self.input_totalframes = int(self.input_duration * self.input_samplerate)
241 if "x-raw-float" in caps.to_string():
242 self.input_width = caps[0]["width"]
243 else:
244 self.input_width = caps[0]["depth"]
245
246 self.discovered = True
247 self.discovered_cond.notify()
248 self.discovered_cond.release()
249
251 t = message.type
252 if t == gst.MESSAGE_EOS:
253 self.queue.put(gst.MESSAGE_EOS)
254 self.pipeline.set_state(gst.STATE_NULL)
255 self.mainloop.quit()
256 elif t == gst.MESSAGE_ERROR:
257 self.pipeline.set_state(gst.STATE_NULL)
258 err, debug = message.parse_error()
259 self.discovered_cond.acquire()
260 self.discovered = True
261 self.mainloop.quit()
262 self.error_msg = "Error: %s" % err, debug
263 self.discovered_cond.notify()
264 self.discovered_cond.release()
265 elif t == gst.MESSAGE_TAG:
266
267
268 pass
269
271 buf = sink.emit('pull-buffer')
272 new_array = gst_buffer_to_numpy_array(buf, self.output_channels)
273
274 if self.last_buffer is None:
275 self.last_buffer = new_array
276 else:
277 self.last_buffer = np.concatenate((self.last_buffer, new_array), axis=0)
278 while self.last_buffer.shape[0] >= self.output_blocksize:
279 new_block = self.last_buffer[:self.output_blocksize]
280 self.last_buffer = self.last_buffer[self.output_blocksize:]
281
282 self.queue.put([new_block, False])
283
284 @interfacedoc
285 - def process(self, frames=None, eod=False):
286 buf = self.queue.get()
287 if buf == gst.MESSAGE_EOS:
288 return self.last_buffer, True
289 frames, eod = buf
290 return frames, eod
291
292 @interfacedoc
295
296 @interfacedoc
299
300 @interfacedoc
303
304 @interfacedoc
306 if self.input_samplerate == self.output_samplerate:
307 return self.input_totalframes
308 else:
309 ratio = self.output_samplerate / self.input_samplerate
310 return int(self.input_totalframes * ratio)
311
312 @interfacedoc
315
316 @interfacedoc
323
326
327
328
329 @interfacedoc
335
336 @interfacedoc
338
339 return self.mimetype.split('/')[-1]
340
341 @interfacedoc
343
344 return self.input_width
345
346 @interfacedoc
350
353 """ Decoder taking Numpy array as input"""
354 implements(IDecoder)
355
356 mimetype = ''
357 output_blocksize = 8*1024
358 output_samplerate = None
359 output_channels = None
360
361
362
363 @staticmethod
364 @interfacedoc
367
368 - def __init__(self, samples, samplerate=44100, start=0, duration=None):
369 '''
370 Construct a new ArrayDecoder from an numpy array
371
372 Parameters
373 ----------
374 samples : numpy array of dimension 1 (mono) or 2 (multichannel)
375 if shape = (n) or (n,1) : n samples, mono
376 if shape = (n,m) : n samples with m channels
377 start : float
378 start time of the segment in seconds
379 duration : float
380 duration of the segment in seconds
381 '''
382 super(ArrayDecoder, self).__init__()
383
384
385 if samples.ndim > 2:
386 raise TypeError('Wrong number of dimensions for argument samples')
387 if samples.ndim == 1:
388 samples = samples[:, np.newaxis]
389
390 self.samples = samples
391 self.input_samplerate = samplerate
392 self.input_channels = self.samples.shape[1]
393
394 self.uri = '_'.join(['raw_audio_array',
395 'x'.join([str(dim) for dim in samples.shape]),
396 samples.dtype.type.__name__])
397
398 self.uri_start = float(start)
399 if duration:
400 self.uri_duration = float(duration)
401 else:
402 self.uri_duration = duration
403
404 if start == 0 and duration is None:
405 self.is_segment = False
406 else:
407 self.is_segment = True
408
409 self.frames = self.get_frames()
410
411 - def setup(self, channels=None, samplerate=None, blocksize=None):
412
413
414 if blocksize:
415 self.output_blocksize = blocksize
416 if samplerate:
417 self.output_samplerate = int(samplerate)
418 if channels:
419 self.output_channels = int(channels)
420
421 if self.uri_duration is None:
422 self.uri_duration = (len(self.samples) / self.input_samplerate
423 - self.uri_start)
424
425 if self.is_segment:
426 start_index = self.uri_start * self.input_samplerate
427 stop_index = start_index + int(np.ceil(self.uri_duration
428 * self.input_samplerate))
429 stop_index = min(stop_index, len(self.samples))
430 self.samples = self.samples[start_index:stop_index]
431
432 if not self.output_samplerate:
433 self.output_samplerate = self.input_samplerate
434
435 if not self.output_channels:
436 self.output_channels = self.input_channels
437
438 self.input_totalframes = len(self.samples)
439 self.input_duration = self.input_totalframes / self.input_samplerate
440 self.input_width = self.samples.itemsize * 8
441
455
456 @interfacedoc
457 - def process(self, frames=None, eod=False):
458
459 return self.frames.next()
460
461 @interfacedoc
464
465 @interfacedoc
468
469 @interfacedoc
472
473 @interfacedoc
475 if self.input_samplerate == self.output_samplerate:
476 return self.input_totalframes
477 else:
478 ratio = self.output_samplerate / self.input_samplerate
479 return int(self.input_totalframes * ratio)
480
481 @interfacedoc
484
485 @interfacedoc
492
495
496
497 @interfacedoc
502
503 @interfacedoc
505 return self.format().split('/')[-1]
506
507 @interfacedoc
509 return self.input_width
510
511 @interfacedoc
514
515
516 if __name__ == "__main__":
517
518 from tests.unit_timeside import run_test_module
519
520 from tests import test_decoding, test_array_decoding
521
522 run_test_module([test_decoding, test_array_decoding])
523