Package midi :: Package sequencer_portmidi
[hide private]
[frames] | no frames]

Source Code for Package midi.sequencer_portmidi

  1  """ 
  2  Interface to the PyGame midi module, which uses Portmidi to access  
  3  system midi devices. 
  4  This interface allows data to be sent using the Midi library's  
  5  own representation. 
  6   
  7  Note that PyGame needs to be installed. An error will be raised if  
  8  you try importing this module without PyGame installed. 
  9   
 10  @todo: This isn't working yet. It's a nice idea, but it's proving  
 11  very difficult to get the timing to work out right. It seems that  
 12  writing to the midi output stream takes a long time and holds up the  
 13  playback, making a mess of the timings of events. 
 14   
 15  """ 
 16   
 17  try: 
 18      import pygame 
 19  except ImportError: 
 20      raise ImportError, "PyGame needs to be installed before you can load the sequencer" 
 21   
 22  from pygame import midi as pgmidi 
 23  # Initialize the PyGame midi module 
 24  pygame.init() 
 25  pgmidi.init() 
 26   
 27  from pygame import event as pgevent 
 28  from threading import Thread 
 29  from datetime import datetime, timedelta 
 30  from time import sleep 
 31   
 32  from midi import MetaEvent, SysExEvent, NoteOffEvent, NoteOnEvent 
33 34 -class Sequencer(object):
35 """ 36 Midi sequencer that sends midi events to system midi devices using 37 PyGame's interface to Portmidi. 38 39 """
40 - def __init__(self, output_device=None, latency=None):
41 """ 42 @type output_device: int 43 @param output_device: Portmidi device number to use for output. Will 44 use the reported default if not given. 45 @type latency: int 46 @param latency: latency value to use PortMidi output with. 0 is 47 not permitted, as it prevents us doing timestamped events. 48 49 """ 50 if output_device is None: 51 output_device = pgmidi.get_default_output_id() 52 self.output_device = output_device 53 54 latency = 100 55 56 # Check the output device exists 57 devs = Sequencer.get_devices() 58 if output_device >= len(devs): 59 raise SequencerInitializationError, "sequencer tried to use "\ 60 "non-existent output device: %d. Only %d devices exist." % \ 61 (output_device, len(devs)) 62 # Check it can accept output 63 if devs[output_device][1][3] == 0: 64 raise SequencerInitializationError, "cannot use %s as an "\ 65 "output device: it doesn't accept output." % \ 66 devs[output_device][1][1] 67 68 # Initialize the output device 69 self.output = pgmidi.Output(output_device, latency=latency, buffer_size=1024*50) 70 self._sequencer = None 71 self._queue = {}
72 73 @staticmethod
74 - def get_devices(inputs=True, outputs=True):
75 """ 76 Queries available devices. Returns list of pairs 77 C{(index,device_info)}. C{index} is the device number by which 78 it can be accessed. C{device_info} is a tuple in the same 79 format as C{pygame.midi.get_device_info()}: 80 (interf, name, input, output, opened) 81 82 """ 83 devices = [(num,pgmidi.get_device_info(num)) for num in range(pgmidi.get_count())] 84 if inputs and not outputs: 85 devices = [d for d in devices if d[1][2] == 1] 86 elif outputs and not inputs: 87 devices = [d for d in devices if d[1][3] == 1] 88 elif not inputs and not outputs: 89 return [] 90 return devices
91
92 - def write(self, events, time_offset=0):
93 """ 94 Plays a list of events through the output device. Delta times 95 between the events are preserved, but the first event is played 96 immediately, or after the given offset (in miliseconds) 97 98 """ 99 if len(events) == 0: 100 return 101 102 # Encode all the events as PyGame needs them 103 event_list = [] 104 raw_events = [] 105 for ev in events: 106 # Ignore meta events, as they only make sense in a file 107 if isinstance(ev, (MetaEvent,SysExEvent)): 108 # TODO: do something special with sysexs 109 continue 110 # Set all the deltas to 0 for now 111 data = ev.encode(last_tick=ev.tick) 112 # Get each byte as an int in a list (ignore the tick) 113 data = [ord(b) for b in data[1:]] + [0]*(5-len(data)) 114 event_list.append((data,ev.msdelay+time_offset)) 115 raw_events.append(ev) 116 117 # Ask for everything to be played this offset from now 118 now = pgmidi.time()+4000 119 120 # Make all times relative to now 121 event_list = [[bytes,time+now] for (bytes,time) in event_list] 122 123 #### 124 #for bytes,time in event_list: 125 # self.queue_event(bytes, time) 126 # 127 #self.play() 128 #return 129 130 for event in event_list[:2000]: 131 self.output.write([event]) 132 return
133 #### 134 135 ## Split up into chunks not longer than 1024 events 136 #ev_chunks = [] 137 #cursor = 0 138 #chunk_size = 100 139 #while cursor < len(event_list): 140 # ev_chunks.append(event_list[cursor:cursor+chunk_size]) 141 # cursor += chunk_size 142 # 143 #for chunk in ev_chunks: 144 # self.output.write(chunk) 145 # print "Chunk written" 146 #print "Finished writing" 147
148 - def queue_event(self, midi_event, time):
149 self._queue.setdefault(time, []).append(midi_event)
150
151 - def play_stream(self, stream):
152 """ 153 Sends the whole of an L{EventStream<midi.EventStream>} to the 154 sequencer output. 155 156 """ 157 events = list(sorted(stream.trackpool)) 158 self.write(events)
159
160 - def stop(self):
161 if self.playing: 162 self._sequencer.stop() 163 self._sequencer = None
164
165 - def play(self, latency=0):
166 if self.playing: 167 self.stop() 168 sequencer = SequencerThread(self.output, self._queue) 169 sequencer.start() 170 self._sequencer = sequencer 171 print "playing"
172 173 @property
174 - def playing(self):
175 return self._sequencer is not None
176
177 -class SequencerThread(Thread):
178 - def __init__(self, output, queue, buffer_time=10000, buffer_advance=2000):
179 super(SequencerThread, self).__init__() 180 self.output = output 181 self._queue = queue 182 self._stopped = False 183 self.buffer_time = buffer_time 184 self.buffer_advance = buffer_advance
185
186 - def stop(self):
187 self._stopped = True
188
189 - def run(self):
190 start_time = pgmidi.time() + 3000 191 192 times = iter(sorted(self._queue.keys())) 193 while True: 194 if self._stopped: 195 return 196 # Get all the events that should be within the buffer 197 event_times = [] 198 now = pgmidi.time() 199 while len(event_times) == 0 or event_times[-1]+start_time < now + self.buffer_time: 200 event_times.append(times.next()) 201 202 events = sum([[[bytes,ms_time+start_time] for bytes in self._queue[ms_time]] for ms_time in event_times], []) 203 print len(events), now, (now+self.buffer_time) 204 print event_times[-1]+start_time 205 self.output.write(events) 206 finished_time = pgmidi.time() 207 print finished_time 208 sleep_for = float(event_times[-1]+start_time-finished_time-self.buffer_advance)/1000 209 if sleep_for > 0.0: 210 sleep(sleep_for) 211 print "Continuing"
212
213 -class SequencerInitializationError(Exception):
214 pass
215