1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 from timeside.component import *
22 from timeside.api import IProcessor
23 from timeside.exceptions import Error, ApiError
24
25
26 import re
27 import time
28 import numpy
29 import uuid
30
31 __all__ = ['Processor', 'MetaProcessor', 'implements', 'abstract',
32 'interfacedoc', 'processors', 'get_processor', 'ProcessPipe',
33 'FixedSizeInputAdapter']
34
35 _processors = {}
65
68 """Base component class of all processors
69
70
71 Attributes:
72 parents : List of parent Processors that must be processed
73 before the current Processor
74 pipe : The current ProcessPipe in which the Processor will run
75 """
76 __metaclass__ = MetaProcessor
77
78 abstract()
79 implements(IProcessor)
80
82 super(Processor, self).__init__()
83
84 self.parents = []
85 self.source_mediainfo = None
86 self.pipe = None
87 self.UUID = uuid.uuid4()
88
89 @interfacedoc
90 - def setup(self, channels=None, samplerate=None, blocksize=None,
91 totalframes=None):
92 self.source_channels = channels
93 self.source_samplerate = samplerate
94 self.source_blocksize = blocksize
95 self.source_totalframes = totalframes
96
97
98
99 if not hasattr(self, 'input_channels'):
100 self.input_channels = self.source_channels
101 if not hasattr(self, 'input_samplerate'):
102 self.input_samplerate = self.source_samplerate
103 if not hasattr(self, 'input_blocksize'):
104 self.input_blocksize = self.source_blocksize
105 if not hasattr(self, 'input_stepsize'):
106 self.input_stepsize = self.source_blocksize
107
108
109
110
111
112 @interfacedoc
114 return self.source_channels
115
116 @interfacedoc
118 return self.source_samplerate
119
120 @interfacedoc
122 return self.source_blocksize
123
124 @interfacedoc
126 return self.source_totalframes
127
128 @interfacedoc
131
132 @interfacedoc
133 - def post_process(self):
135
136 @interfacedoc
139
140 @interfacedoc
143
144 @interfacedoc
146 return str(self.UUID)
147
150
153
217
220 """Returns the processors implementing a given interface and, if recurse,
221 any of the descendants of this interface."""
222 return implementations(interface, recurse)
223
226 """Return a processor by its id"""
227 if not _processors.has_key(processor_id):
228 raise Error("No processor registered with id: '%s'"
229 % processor_id)
230
231 return _processors[processor_id]
232
235 """Handle a pipe of processors
236
237 Attributes:
238 processor: List of all processors in the Process pipe
239 results : Results Container for all the analyzers of the Pipe process
240 """
241
248
251
253 if isinstance(other, Processor):
254 for parent in other.parents:
255 self |= parent
256 self.processors.append(other)
257 other.process_pipe = self
258 elif isinstance(other, ProcessPipe):
259 self.processors.extend(other.processors)
260 else:
261 try:
262 iter(other)
263 except TypeError:
264 raise Error("Can not add this type of object to a pipe: %s", str(other))
265
266 for item in other:
267 self |= item
268
269 return self
270
272 pipe = ''
273 for item in self.processors:
274 pipe += item.id()
275 if item != self.processors[-1]:
276 pipe += ' | '
277 return pipe
278
279 - def run(self, channels=None, samplerate=None, blocksize=None, stack=None):
280 """Setup/reset all processors in cascade and stream audio data along
281 the pipe. Also returns the pipe itself."""
282
283 source = self.processors[0]
284 items = self.processors[1:]
285 source.setup(channels=channels, samplerate=samplerate,
286 blocksize=blocksize)
287
288 if stack is None:
289 self.stack = False
290 else:
291 self.stack = stack
292
293 if self.stack:
294 self.frames_stack = []
295
296 last = source
297
298
299 for item in items:
300 item.source_mediainfo = source.mediainfo()
301 item.setup(channels=last.channels(),
302 samplerate=last.samplerate(),
303 blocksize=last.blocksize(),
304 totalframes=last.totalframes())
305 last = item
306
307
308 eod = False
309 while not eod:
310 frames, eod = source.process()
311 if self.stack:
312 self.frames_stack.append(frames)
313 for item in items:
314 frames, eod = item.process(frames, eod)
315
316
317 for item in items:
318 item.post_process()
319
320
321 if self.stack:
322 if not isinstance(self.frames_stack, numpy.ndarray):
323 self.frames_stack = numpy.vstack(self.frames_stack)
324 from timeside.decoder.core import ArrayDecoder
325 new_source = ArrayDecoder(samples=self.frames_stack,
326 samplerate=source.samplerate())
327 new_source.setup(channels=source.channels(),
328 samplerate=source.samplerate(),
329 blocksize=source.blocksize())
330 self.processors[0] = new_source
331
332 for item in items:
333 item.release()
334 self.processors.remove(item)
335