Package timeside :: Package encoder :: Module core
[hide private]
[frames] | no frames]

Source Code for Module timeside.encoder.core

  1  #!/usr/bin/python 
  2  # -*- coding: utf-8 -*- 
  3  # 
  4  # Copyright (c) 2012 Paul Brossier <piem@piem.org> 
  5   
  6  # This file is part of TimeSide. 
  7   
  8  # TimeSide is free software: you can redistribute it and/or modify 
  9  # it under the terms of the GNU General Public License as published by 
 10  # the Free Software Foundation, either version 2 of the License, or 
 11  # (at your option) any later version. 
 12   
 13  # TimeSide is distributed in the hope that it will be useful, 
 14  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 15  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 16  # GNU General Public License for more details. 
 17   
 18  # You should have received a copy of the GNU General Public License 
 19  # along with TimeSide.  If not, see <http://www.gnu.org/licenses/>. 
 20   
 21   
 22  from timeside.core import Processor, implements, interfacedoc 
 23  from timeside.component import implements, abstract 
 24  from timeside.api import IEncoder 
 25  from timeside.tools import * 
 26   
 27  from gst import _gst as gst 
28 29 30 -class GstEncoder(Processor):
31 implements(IEncoder) 32 abstract() 33
34 - def __init__(self, output, streaming=False, overwrite=False):
35 36 super(GstEncoder, self).__init__() 37 38 if isinstance(output, basestring): 39 import os.path 40 if os.path.isdir(output): 41 raise IOError("Encoder output must be a file, not a directory") 42 elif os.path.isfile(output) and not overwrite: 43 raise IOError("Encoder output %s exists, but overwrite set to False") 44 self.filename = output 45 else: 46 self.filename = None 47 self.streaming = streaming 48 49 if not self.filename and not self.streaming: 50 raise Exception('Must give an output') 51 52 import threading 53 self.end_cond = threading.Condition(threading.Lock()) 54 55 self.eod = False 56 self.metadata = None 57 self.num_samples = 0
58 59 @interfacedoc
60 - def release(self):
61 if hasattr(self, 'eod') and hasattr(self, 'mainloopthread'): 62 self.end_cond.acquire() 63 while not hasattr(self, 'end_reached'): 64 self.end_cond.wait() 65 self.end_cond.release() 66 if hasattr(self, 'error_msg'): 67 raise IOError(self.error_msg)
68
69 - def __del__(self):
70 self.release()
71
72 - def start_pipeline(self, channels, samplerate):
73 self.pipeline = gst.parse_launch(self.pipe) 74 # store a pointer to appsrc in our encoder object 75 self.src = self.pipeline.get_by_name('src') 76 # store a pointer to appsink in our encoder object 77 self.app = self.pipeline.get_by_name('app') 78 79 srccaps = gst.Caps("""audio/x-raw-float, 80 endianness=(int)1234, 81 channels=(int)%s, 82 width=(int)32, 83 rate=(int)%d""" % (int(channels), int(samplerate))) 84 self.src.set_property("caps", srccaps) 85 self.src.set_property('emit-signals', True) 86 self.src.set_property('num-buffers', -1) 87 self.src.set_property('block', True) 88 self.src.set_property('do-timestamp', True) 89 90 self.bus = self.pipeline.get_bus() 91 self.bus.add_signal_watch() 92 self.bus.connect("message", self._on_message_cb) 93 94 import threading 95 class MainloopThread(threading.Thread): 96 def __init__(self, mainloop): 97 threading.Thread.__init__(self) 98 self.mainloop = mainloop
99 100 def run(self): 101 self.mainloop.run()
102 self.mainloop = gobject.MainLoop() 103 self.mainloopthread = MainloopThread(self.mainloop) 104 self.mainloopthread.start() 105 106 # start pipeline 107 self.pipeline.set_state(gst.STATE_PLAYING) 108
109 - def _on_message_cb(self, bus, message):
110 t = message.type 111 if t == gst.MESSAGE_EOS: 112 self.end_cond.acquire() 113 self.pipeline.set_state(gst.STATE_NULL) 114 self.mainloop.quit() 115 self.end_reached = True 116 self.end_cond.notify() 117 self.end_cond.release() 118 119 elif t == gst.MESSAGE_ERROR: 120 self.end_cond.acquire() 121 self.pipeline.set_state(gst.STATE_NULL) 122 self.mainloop.quit() 123 self.end_reached = True 124 err, debug = message.parse_error() 125 self.error_msg = "Error: %s" % err, debug 126 self.end_cond.notify() 127 self.end_cond.release()
128 129 @interfacedoc
130 - def process(self, frames, eod=False):
131 self.eod = eod 132 if eod: 133 self.num_samples += frames.shape[0] 134 else: 135 self.num_samples += self.blocksize() 136 buf = numpy_array_to_gst_buffer(frames, frames.shape[0],self.num_samples, self.samplerate()) 137 self.src.emit('push-buffer', buf) 138 if self.eod: 139 self.src.emit('end-of-stream') 140 if self.streaming: 141 self.chunk = self.app.emit('pull-buffer') 142 return frames, eod
143 144 145 if __name__ == "__main__": 146 # Run doctest from __main__ and unittest from test_analyzer_preprocessors 147 from tests import test_encoding, test_transcoding 148 from tests.unit_timeside import run_test_module 149 run_test_module([test_encoding, test_transcoding]) 150