Package timeside :: Package decoder :: Module core
[hide private]
[frames] | no frames]

Source Code for Module timeside.decoder.core

  1  #!/usr/bin/python 
  2  # -*- coding: utf-8 -*- 
  3   
  4  # Copyright (c) 2007-2013 Parisson 
  5  # Copyright (c) 2007 Olivier Guilyardi <olivier@samalyse.com> 
  6  # Copyright (c) 2007-2013 Guillaume Pellerin <pellerin@parisson.com> 
  7  # Copyright (c) 2010-2013 Paul Brossier <piem@piem.org> 
  8  # 
  9  # This file is part of TimeSide. 
 10   
 11  # TimeSide is free software: you can redistribute it and/or modify 
 12  # it under the terms of the GNU General Public License as published by 
 13  # the Free Software Foundation, either version 2 of the License, or 
 14  # (at your option) any later version. 
 15   
 16  # TimeSide is distributed in the hope that it will be useful, 
 17  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 18  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 19  # GNU General Public License for more details. 
 20   
 21  # You should have received a copy of the GNU General Public License 
 22  # along with TimeSide.  If not, see <http://www.gnu.org/licenses/>. 
 23   
 24  # Authors: 
 25  # Paul Brossier <piem@piem.org> 
 26  # Guillaume Pellerin <yomguy@parisson.com> 
 27  # Thomas Fillon <thomas@parisson.com> 
 28   
 29  from __future__ import division 
 30   
 31  from timeside.core import Processor, implements, interfacedoc 
 32  from timeside.api import IDecoder 
 33  from timeside.tools import * 
 34   
 35  from utils import get_uri, get_media_uri_info 
 36   
 37  import Queue 
 38  from gst import _gst as gst 
 39  import numpy as np 
 40   
 41   
 42  GST_APPSINK_MAX_BUFFERS = 10 
 43  QUEUE_SIZE = 10 
44 45 46 -class FileDecoder(Processor):
47 """ gstreamer-based decoder """ 48 implements(IDecoder) 49 50 mimetype = '' 51 output_blocksize = 8*1024 52 output_samplerate = None 53 output_channels = None 54 55 pipeline = None 56 mainloopthread = None 57 58 # IProcessor methods 59 60 @staticmethod 61 @interfacedoc
62 - def id():
63 return "gst_dec"
64
65 - def __init__(self, uri, start=0, duration=None):
66 67 """ 68 Construct a new FileDecoder 69 70 Parameters 71 ---------- 72 uri : str 73 uri of the media 74 start : float 75 start time of the segment in seconds 76 duration : float 77 duration of the segment in seconds 78 """ 79 80 super(FileDecoder, self).__init__() 81 82 self.uri = get_uri(uri) 83 84 self.uri_start = float(start) 85 if duration: 86 self.uri_duration = float(duration) 87 else: 88 self.uri_duration = duration 89 90 if start==0 and duration is None: 91 self.is_segment = False 92 else: 93 self.is_segment = True
94
95 - def set_uri_default_duration(self):
96 # Set the duration from the length of the file 97 uri_total_duration = get_media_uri_info(self.uri)['duration'] 98 self.uri_duration = uri_total_duration - self.uri_start
99
100 - def setup(self, channels=None, samplerate=None, blocksize=None):
101 102 if self.uri_duration is None: 103 self.set_uri_default_duration() 104 105 # a lock to wait wait for gstreamer thread to be ready 106 import threading 107 self.discovered_cond = threading.Condition(threading.Lock()) 108 self.discovered = False 109 110 # the output data format we want 111 if blocksize: 112 self.output_blocksize = blocksize 113 if samplerate: 114 self.output_samplerate = int(samplerate) 115 if channels: 116 self.output_channels = int(channels) 117 118 if self.is_segment: 119 # Create the pipe with Gnonlin gnlurisource 120 self.pipe = ''' gnlurisource uri={uri} 121 start=0 122 duration={uri_duration} 123 media-start={uri_start} 124 media-duration={uri_duration} 125 ! audioconvert name=audioconvert 126 ! audioresample 127 ! appsink name=sink sync=False async=True 128 '''.format(uri = self.uri, 129 uri_start = np.uint64(round(self.uri_start * gst.SECOND)), 130 uri_duration = np.int64(round(self.uri_duration * gst.SECOND))) 131 # convert uri_start and uri_duration to nanoseconds 132 else: 133 # Create the pipe with standard Gstreamer uridecodbin 134 self.pipe = ''' uridecodebin name=uridecodebin uri={uri} 135 ! audioconvert name=audioconvert 136 ! audioresample 137 ! appsink name=sink sync=False async=True 138 '''.format(uri = self.uri) 139 140 self.pipeline = gst.parse_launch(self.pipe) 141 142 if self.output_channels: 143 caps_channels = int(self.output_channels) 144 else: 145 caps_channels = "[ 1, 2 ]" 146 if self.output_samplerate: 147 caps_samplerate = int(self.output_samplerate) 148 else: 149 caps_samplerate = "{ 8000, 11025, 12000, 16000, 22050, 24000, 32000, 44100, 48000 }" 150 sink_caps = gst.Caps("""audio/x-raw-float, 151 endianness=(int)1234, 152 channels=(int)%s, 153 width=(int)32, 154 rate=(int)%s""" % (caps_channels, caps_samplerate)) 155 156 self.conv = self.pipeline.get_by_name('audioconvert') 157 self.conv.get_pad("sink").connect("notify::caps", self._notify_caps_cb) 158 159 self.sink = self.pipeline.get_by_name('sink') 160 self.sink.set_property("caps", sink_caps) 161 self.sink.set_property('max-buffers', GST_APPSINK_MAX_BUFFERS) 162 self.sink.set_property("drop", False) 163 self.sink.set_property('emit-signals', True) 164 self.sink.connect("new-buffer", self._on_new_buffer_cb) 165 166 self.bus = self.pipeline.get_bus() 167 self.bus.add_signal_watch() 168 self.bus.connect('message', self._on_message_cb) 169 170 self.queue = Queue.Queue(QUEUE_SIZE) 171 172 import threading 173 174 class MainloopThread(threading.Thread): 175 def __init__(self, mainloop): 176 threading.Thread.__init__(self) 177 self.mainloop = mainloop
178 179 def run(self): 180 self.mainloop.run()
181 self.mainloop = gobject.MainLoop() 182 self.mainloopthread = MainloopThread(self.mainloop) 183 self.mainloopthread.start() 184 #self.mainloopthread = get_loop_thread() 185 ##self.mainloop = self.mainloopthread.mainloop 186 187 self.eod = False 188 189 self.last_buffer = None 190 191 # start pipeline 192 self.pipeline.set_state(gst.STATE_PLAYING) 193 194 self.discovered_cond.acquire() 195 while not self.discovered: 196 #print 'waiting' 197 self.discovered_cond.wait() 198 self.discovered_cond.release() 199 200 if not hasattr(self, 'input_samplerate'): 201 if hasattr(self, 'error_msg'): 202 raise IOError(self.error_msg) 203 else: 204 raise IOError('no known audio stream found') 205
206 - def _notify_caps_cb(self, pad, args):
207 self.discovered_cond.acquire() 208 209 caps = pad.get_negotiated_caps() 210 if not caps: 211 pad.info("no negotiated caps available") 212 self.discovered = True 213 self.discovered_cond.notify() 214 self.discovered_cond.release() 215 return 216 # the caps are fixed 217 # We now get the total length of that stream 218 q = gst.query_new_duration(gst.FORMAT_TIME) 219 pad.info("sending duration query") 220 if pad.get_peer().query(q): 221 format, length = q.parse_duration() 222 if format == gst.FORMAT_TIME: 223 pad.info("got duration (time) : %s" % (gst.TIME_ARGS(length),)) 224 else: 225 pad.info("got duration : %d [format:%d]" % (length, format)) 226 else: 227 length = -1 228 gst.warning("duration query failed") 229 230 # We store the caps and length in the proper location 231 if "audio" in caps.to_string(): 232 self.input_samplerate = caps[0]["rate"] 233 if not self.output_samplerate: 234 self.output_samplerate = self.input_samplerate 235 self.input_channels = caps[0]["channels"] 236 if not self.output_channels: 237 self.output_channels = self.input_channels 238 self.input_duration = length / 1.e9 239 240 self.input_totalframes = int(self.input_duration * self.input_samplerate) 241 if "x-raw-float" in caps.to_string(): 242 self.input_width = caps[0]["width"] 243 else: 244 self.input_width = caps[0]["depth"] 245 246 self.discovered = True 247 self.discovered_cond.notify() 248 self.discovered_cond.release()
249
250 - def _on_message_cb(self, bus, message):
251 t = message.type 252 if t == gst.MESSAGE_EOS: 253 self.queue.put(gst.MESSAGE_EOS) 254 self.pipeline.set_state(gst.STATE_NULL) 255 self.mainloop.quit() 256 elif t == gst.MESSAGE_ERROR: 257 self.pipeline.set_state(gst.STATE_NULL) 258 err, debug = message.parse_error() 259 self.discovered_cond.acquire() 260 self.discovered = True 261 self.mainloop.quit() 262 self.error_msg = "Error: %s" % err, debug 263 self.discovered_cond.notify() 264 self.discovered_cond.release() 265 elif t == gst.MESSAGE_TAG: 266 # TODO 267 # msg.parse_tags() 268 pass
269
270 - def _on_new_buffer_cb(self, sink):
271 buf = sink.emit('pull-buffer') 272 new_array = gst_buffer_to_numpy_array(buf, self.output_channels) 273 #print 'processing new buffer', new_array.shape 274 if self.last_buffer is None: 275 self.last_buffer = new_array 276 else: 277 self.last_buffer = np.concatenate((self.last_buffer, new_array), axis=0) 278 while self.last_buffer.shape[0] >= self.output_blocksize: 279 new_block = self.last_buffer[:self.output_blocksize] 280 self.last_buffer = self.last_buffer[self.output_blocksize:] 281 #print 'queueing', new_block.shape, 'remaining', self.last_buffer.shape 282 self.queue.put([new_block, False])
283 284 @interfacedoc
285 - def process(self, frames=None, eod=False):
286 buf = self.queue.get() 287 if buf == gst.MESSAGE_EOS: 288 return self.last_buffer, True 289 frames, eod = buf 290 return frames, eod
291 292 @interfacedoc
293 - def channels(self):
294 return self.output_channels
295 296 @interfacedoc
297 - def samplerate(self):
298 return self.output_samplerate
299 300 @interfacedoc
301 - def blocksize(self):
302 return self.output_blocksize
303 304 @interfacedoc
305 - def totalframes(self):
306 if self.input_samplerate == self.output_samplerate: 307 return self.input_totalframes 308 else: 309 ratio = self.output_samplerate / self.input_samplerate 310 return int(self.input_totalframes * ratio)
311 312 @interfacedoc
313 - def release(self):
314 pass
315 316 @interfacedoc
317 - def mediainfo(self):
318 return dict(uri=self.uri, 319 duration=self.uri_duration, 320 start=self.uri_start, 321 is_segment=self.is_segment, 322 samplerate=self.input_samplerate)
323
324 - def __del__(self):
325 self.release()
326 327 ## IDecoder methods 328 329 @interfacedoc
330 - def format(self):
331 # TODO check 332 if self.mimetype == 'application/x-id3': 333 self.mimetype = 'audio/mpeg' 334 return self.mimetype
335 336 @interfacedoc
337 - def encoding(self):
338 # TODO check 339 return self.mimetype.split('/')[-1]
340 341 @interfacedoc
342 - def resolution(self):
343 # TODO check: width or depth? 344 return self.input_width
345 346 @interfacedoc
347 - def metadata(self):
348 # TODO check 349 return self.tags
350
351 352 -class ArrayDecoder(Processor):
353 """ Decoder taking Numpy array as input""" 354 implements(IDecoder) 355 356 mimetype = '' 357 output_blocksize = 8*1024 358 output_samplerate = None 359 output_channels = None 360 361 # IProcessor methods 362 363 @staticmethod 364 @interfacedoc
365 - def id():
366 return "array_dec"
367
368 - def __init__(self, samples, samplerate=44100, start=0, duration=None):
369 ''' 370 Construct a new ArrayDecoder from an numpy array 371 372 Parameters 373 ---------- 374 samples : numpy array of dimension 1 (mono) or 2 (multichannel) 375 if shape = (n) or (n,1) : n samples, mono 376 if shape = (n,m) : n samples with m channels 377 start : float 378 start time of the segment in seconds 379 duration : float 380 duration of the segment in seconds 381 ''' 382 super(ArrayDecoder, self).__init__() 383 384 # Check array dimension 385 if samples.ndim > 2: 386 raise TypeError('Wrong number of dimensions for argument samples') 387 if samples.ndim == 1: 388 samples = samples[:, np.newaxis] # reshape to 2D array 389 390 self.samples = samples # Create a 2 dimensions array 391 self.input_samplerate = samplerate 392 self.input_channels = self.samples.shape[1] 393 394 self.uri = '_'.join(['raw_audio_array', 395 'x'.join([str(dim) for dim in samples.shape]), 396 samples.dtype.type.__name__]) 397 398 self.uri_start = float(start) 399 if duration: 400 self.uri_duration = float(duration) 401 else: 402 self.uri_duration = duration 403 404 if start == 0 and duration is None: 405 self.is_segment = False 406 else: 407 self.is_segment = True 408 409 self.frames = self.get_frames()
410
411 - def setup(self, channels=None, samplerate=None, blocksize=None):
412 413 # the output data format we want 414 if blocksize: 415 self.output_blocksize = blocksize 416 if samplerate: 417 self.output_samplerate = int(samplerate) 418 if channels: 419 self.output_channels = int(channels) 420 421 if self.uri_duration is None: 422 self.uri_duration = (len(self.samples) / self.input_samplerate 423 - self.uri_start) 424 425 if self.is_segment: 426 start_index = self.uri_start * self.input_samplerate 427 stop_index = start_index + int(np.ceil(self.uri_duration 428 * self.input_samplerate)) 429 stop_index = min(stop_index, len(self.samples)) 430 self.samples = self.samples[start_index:stop_index] 431 432 if not self.output_samplerate: 433 self.output_samplerate = self.input_samplerate 434 435 if not self.output_channels: 436 self.output_channels = self.input_channels 437 438 self.input_totalframes = len(self.samples) 439 self.input_duration = self.input_totalframes / self.input_samplerate 440 self.input_width = self.samples.itemsize * 8
441
442 - def get_frames(self):
443 "Define an iterator that will return frames at the given blocksize" 444 nb_frames = self.input_totalframes // self.output_blocksize 445 446 if self.input_totalframes % self.output_blocksize == 0: 447 nb_frames -= 1 # Last frame must send eod=True 448 449 for index in xrange(0, 450 nb_frames * self.output_blocksize, 451 self.output_blocksize): 452 yield (self.samples[index:index+self.output_blocksize], False) 453 454 yield (self.samples[nb_frames * self.output_blocksize:], True)
455 456 @interfacedoc
457 - def process(self, frames=None, eod=False):
458 459 return self.frames.next()
460 461 @interfacedoc
462 - def channels(self):
463 return self.output_channels
464 465 @interfacedoc
466 - def samplerate(self):
467 return self.output_samplerate
468 469 @interfacedoc
470 - def blocksize(self):
471 return self.output_blocksize
472 473 @interfacedoc
474 - def totalframes(self):
475 if self.input_samplerate == self.output_samplerate: 476 return self.input_totalframes 477 else: 478 ratio = self.output_samplerate / self.input_samplerate 479 return int(self.input_totalframes * ratio)
480 481 @interfacedoc
482 - def release(self):
483 pass
484 485 @interfacedoc
486 - def mediainfo(self):
487 return dict(uri=self.uri, 488 duration=self.uri_duration, 489 start=self.uri_start, 490 is_segment=self.is_segment, 491 samplerate=self.input_samplerate)
492
493 - def __del__(self):
494 self.release()
495 496 ## IDecoder methods 497 @interfacedoc
498 - def format(self):
499 import re 500 base_type = re.search('^[a-z]*', self.samples.dtype.name).group(0) 501 return 'audio/x-raw-'+base_type
502 503 @interfacedoc
504 - def encoding(self):
505 return self.format().split('/')[-1]
506 507 @interfacedoc
508 - def resolution(self):
509 return self.input_width
510 511 @interfacedoc
512 - def metadata(self):
513 return None
514 515 516 if __name__ == "__main__": 517 # Run doctest from __main__ and unittest from tests 518 from tests.unit_timeside import run_test_module 519 # load corresponding tests 520 from tests import test_decoding, test_array_decoding 521 522 run_test_module([test_decoding, test_array_decoding]) 523