Package timeside :: Package decoder :: Module utils
[hide private]
[frames] | no frames]

Source Code for Module timeside.decoder.utils

  1  # -*- coding: utf-8 -*- 
  2   
  3  # Copyright (c) 2007-2013 Parisson 
  4  # Copyright (c) 2007-2013 Guillaume Pellerin <pellerin@parisson.com> 
  5  # Copyright (c) 2010-2013 Paul Brossier <piem@piem.org> 
  6  # 
  7  # This file is part of TimeSide. 
  8   
  9  # TimeSide is free software: you can redistribute it and/or modify 
 10  # it under the terms of the GNU General Public License as published by 
 11  # the Free Software Foundation, either version 2 of the License, or 
 12  # (at your option) any later version. 
 13   
 14  # TimeSide is distributed in the hope that it will be useful, 
 15  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 16  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 17  # GNU General Public License for more details. 
 18   
 19  # You should have received a copy of the GNU General Public License 
 20  # along with TimeSide.  If not, see <http://www.gnu.org/licenses/>. 
 21   
 22  # Authors: 
 23  # Paul Brossier <piem@piem.org> 
 24  # Guillaume Pellerin <yomguy@parisson.com> 
 25  # Thomas Fillon <thomas@parisson.com> 
 26   
 27  from __future__ import division 
 28   
 29  import numpy 
 30   
31 -class Noise(object):
32 """A class that mimics audiolab.sndfile but generates noise instead of reading 33 a wave file. Additionally it can be told to have a "broken" header and thus crashing 34 in the middle of the file. Also useful for testing ultra-short files of 20 samples.""" 35
36 - def __init__(self, num_frames, has_broken_header=False):
37 self.seekpoint = 0 38 self.num_frames = num_frames 39 self.has_broken_header = has_broken_header
40
41 - def seek(self, seekpoint):
42 self.seekpoint = seekpoint
43
44 - def get_nframes(self):
45 return self.num_frames
46
47 - def get_samplerate(self):
48 return 44100
49
50 - def get_channels(self):
51 return 1
52
53 - def read_frames(self, frames_to_read):
54 if self.has_broken_header and self.seekpoint + frames_to_read > self.num_frames // 2: 55 raise IOError() 56 57 num_frames_left = self.num_frames - self.seekpoint 58 if num_frames_left < frames_to_read: 59 will_read = num_frames_left 60 else: 61 will_read = frames_to_read 62 self.seekpoint += will_read 63 return numpy.random.random(will_read)*2 - 1
64 65
66 -def path2uri(path):
67 """ 68 Return a valid uri (file scheme) from absolute path name of a file 69 70 >>> path2uri('/home/john/my_file.wav') 71 'file:///home/john/my_file.wav' 72 73 >>> path2uri('C:\Windows\my_file.wav') 74 'file:///C%3A%5CWindows%5Cmy_file.wav' 75 """ 76 import urlparse, urllib 77 78 return urlparse.urljoin('file:', urllib.pathname2url(path))
79 80
81 -def get_uri(source):
82 """ 83 Check a media source as a valid file or uri and return the proper uri 84 """ 85 86 import gst 87 # Is this an valid URI source 88 if gst.uri_is_valid(source): 89 uri_protocol = gst.uri_get_protocol(source) 90 if gst.uri_protocol_is_supported(gst.URI_SRC, uri_protocol): 91 return source 92 else: 93 raise IOError('Invalid URI source for Gstreamer') 94 95 # is this a file? 96 import os.path 97 if os.path.exists(source): 98 # get the absolute path 99 pathname = os.path.abspath(source) 100 # and make a uri of it 101 uri = path2uri(pathname) 102 103 return get_uri(uri) 104 else: 105 raise IOError('Failed getting uri for path %s: not such file or directoy' % source) 106 107 return uri
108
109 -def get_media_uri_info(uri):
110 111 from gst.pbutils import Discoverer 112 from gst import SECOND as GST_SECOND 113 from glib import GError 114 #import gobject 115 GST_DISCOVER_TIMEOUT = 5000000000L 116 uri_discoverer = Discoverer(GST_DISCOVER_TIMEOUT) 117 try: 118 uri_info = uri_discoverer.discover_uri(uri) 119 except GError as e: 120 raise IOError(e) 121 info = dict() 122 123 # Duration in seconds 124 info['duration'] = uri_info.get_duration() / GST_SECOND 125 126 audio_streams = uri_info.get_audio_streams() 127 info['streams'] = [] 128 for stream in audio_streams: 129 stream_info = {'bitrate': stream.get_bitrate (), 130 'channels': stream.get_channels (), 131 'depth': stream.get_depth (), 132 'max_bitrate': stream.get_max_bitrate(), 133 'samplerate': stream.get_sample_rate() 134 } 135 info['streams'].append(stream_info) 136 137 return info
138 139 140 141 if __name__ == "__main__": 142 # Run doctest from __main__ and unittest from tests 143 from tests.unit_timeside import run_test_module 144 # load corresponding tests 145 from tests import test_decoder_utils 146 147 run_test_module(test_decoder_utils) 148