1 """An HMM midi chord labeler.
2
3 An HMM midi chord labeler based on the audio chord labeler of Ni, Mcvicar,
4 Santos-Rodriguez and De Bie, MIREX 2011, Harmony Progression (HP).
5
6 """
7 """
8 ============================== License ========================================
9 Copyright (C) 2008, 2010-12 University of Edinburgh, Mark Granroth-Wilding
10
11 This file is part of The Jazz Parser.
12
13 The Jazz Parser is free software: you can redistribute it and/or modify
14 it under the terms of the GNU General Public License as published by
15 the Free Software Foundation, either version 3 of the License, or
16 (at your option) any later version.
17
18 The Jazz Parser is distributed in the hope that it will be useful,
19 but WITHOUT ANY WARRANTY; without even the implied warranty of
20 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 GNU General Public License for more details.
22
23 You should have received a copy of the GNU General Public License
24 along with The Jazz Parser. If not, see <http://www.gnu.org/licenses/>.
25
26 ============================ End license ======================================
27
28 """
29 __author__ = "Mark Granroth-Wilding <mark.granroth-wilding@ed.ac.uk>"
30
31 import numpy, os, re, math
32 from datetime import datetime
33 from numpy import ones, float64, sum as array_sum, zeros
34 import cPickle as pickle
35
36 from jazzparser.utils.nltk.ngram import DictionaryHmmModel
37 from jazzparser.utils.nltk.probability import logprob, add_logs, \
38 sum_logs, prob_dist_to_dictionary_prob_dist, \
39 cond_prob_dist_to_dictionary_cond_prob_dist, \
40 prob_dist_to_dictionary_prob_dist, mle_estimator, \
41 laplace_estimator
42 from jazzparser.utils.options import ModuleOption, choose_from_list
43 from jazzparser.utils.strings import str_to_bool
44 from jazzparser import settings
45 from jazzparser.data.input import detect_input_type, \
46 MidiTaggerTrainingBulkInput, DbBulkInput
47 from jazzparser.data.parsing import keys_for_sequence
48 from jazzparser.utils.loggers import create_dummy_logger
49
50 from nltk.probability import ConditionalProbDist, FreqDist, \
51 ConditionalFreqDist, DictionaryProbDist, \
52 DictionaryConditionalProbDist, MutableProbDist
53
54 from .chord_vocabs import CHORD_VOCABS, get_mapping
55 from .midi import midi_to_emission_stream
56 from .data import ChordLabel
57 from .baumwelch import HPBaumWelchTrainer
58 from . import ModelTrainError, ModelLoadError
59
60 FILE_EXTENSION = "mdl"
63 """
64 HMM based on that of I{Harmony Progression Analyzer for MIREX 2011}, by
65 Ni, Mcvicar, Santos-Rodriguez, De Bie. Their audio labeling model is
66 here adapted to MIDI labeling.
67
68 The state labels are triples: C{(key,root,label)}. C{key} is the ET key
69 (C=0, C#=1, etc). C{root} is the chord root: e.g. C{root=7} denotes chord
70 root G. The chord root is not relative to the key, though the probabilities
71 stored in the distributions are.
72
73 """
74 TRAINING_OPTIONS = [
75
76 ModuleOption('chordprob', filter=float,
77 help_text="Initialization of the emission distribution.",
78 usage="ccprob=P, P is a probability. Prob P is distributed "\
79 "over the pitch classes that are in the chord.",
80 required=True),
81 ModuleOption('maxnotes', filter=int,
82 help_text="Maximum number of notes that can be generated from a "\
83 "a state. Limit is required to make the distribution finite.",
84 usage="maxnotes=N, N is an integer",
85 default=100),
86 ModuleOption('vocab', filter=choose_from_list(CHORD_VOCABS.keys()),
87 help_text="Chord vocabulary for the model to use",
88 usage="vocab=V, V is one of %s" % ", ".join("'%s'" % v for v in CHORD_VOCABS.keys()),
89 required=True),
90 ModuleOption('split', filter=int,
91 help_text="Limits the length of midi inputs by splitting them into "\
92 "fragments of at most this length.",
93 usage="split=X, where X is an int"),
94 ModuleOption('truncate', filter=int,
95 help_text="Limits the length of midi inputs by truncating them to "\
96 "this number of timesteps. Truncation is applied before "\
97 "splitting.",
98 usage="truncate=X, where X is an int"),
99 ] + HPBaumWelchTrainer.OPTIONS
100 LABELING_OPTIONS = [
101 ModuleOption('n', filter=int,
102 help_text="Number of best labels to consider for each timestep",
103 usage="decoden=N, where N is an integer",
104 default=10),
105 ModuleOption('nokey', filter=str_to_bool,
106 help_text="Don't distinguish between keys: removed labels that "\
107 "are duplicate if you ignore their key and return all chords "\
108 "with a key of 0. Probabilities are not summed: only the "\
109 "highest is returned",
110 usage="nokey=B, where B is 'true' or 'false'",
111 default=False),
112 ModuleOption('viterbi', filter=str_to_bool,
113 help_text="Use Viterbi decoding, instead of forward-backward. Only "\
114 "one label will be returned for each chord, but it's likey "\
115 "to be a more coherent sequence",
116 usage="viterbi=B, where B is 'true' or 'false'",
117 default=False),
118 ]
119
120 - def __init__(self, initial_key_dist, initial_chord_dist,
121 key_transition_dist, chord_transition_dist,
122 emission_dist, note_number_dist,
123 chord_vocab, max_notes, chord_corpus_mapping,
124 history="", description="", name="default"):
125 self.order = 2
126 self.backoff_model = None
127
128 self.emission_dom = range(12)
129 self.num_emissions = 12
130 self.chord_vocab = chord_vocab
131 self.chord_types = list(sorted(chord_vocab.keys()))
132 self.chord_corpus_mapping = chord_corpus_mapping
133
134 self.chord_dom = [(root,label) for root in range(12) \
135 for label in self.chord_types]
136 self.label_dom = [(key,root,label) for key in range(12) \
137 for root in range(12) \
138 for label in self.chord_types]
139 self.num_labels = len(self.label_dom)
140
141 self.initial_key_dist = initial_key_dist
142 self.initial_chord_dist = initial_chord_dist
143 self.key_transition_dist = key_transition_dist
144 self.chord_transition_dist = chord_transition_dist
145 self.note_number_dist = note_number_dist
146 self.emission_dist = emission_dist
147 self.max_notes = max_notes
148
149
150 self.history = history
151
152 self.description = description
153
154 self.model_name = name
155
156
157
158 self.clear_cache()
159
160 - def copy(self, mutable=False):
161 """
162 Creates a complete copy of the model, optionally making the
163 distributions mutable.
164
165 """
166
167 initial_key_dist = prob_dist_to_dictionary_prob_dist(
168 self.initial_key_dist, mutable=mutable)
169 initial_chord_dist = prob_dist_to_dictionary_prob_dist(
170 self.initial_chord_dist, mutable=mutable)
171 key_transition_dist = prob_dist_to_dictionary_prob_dist(
172 self.key_transition_dist, mutable=mutable)
173 chord_transition_dist = cond_prob_dist_to_dictionary_cond_prob_dist(
174 self.chord_transition_dist, mutable=mutable)
175 emission_dist = cond_prob_dist_to_dictionary_cond_prob_dist(
176 self.emission_dist, mutable=mutable)
177 note_number_dist = prob_dist_to_dictionary_prob_dist(
178 self.note_number_dist, mutable=mutable)
179
180 return HPChordLabeler(initial_key_dist,
181 initial_chord_dist,
182 key_transition_dist,
183 chord_transition_dist,
184 emission_dist,
185 note_number_dist,
186 self.chord_vocab,
187 self.max_notes,
188 self.chord_corpus_mapping,
189 history = self.history,
190 description = self.description,
191 name = self.model_name)
192
194 """
195 Initializes or empties probability distribution caches.
196
197 Make sure to call this if you change or update the distributions.
198
199 No caches used thus far, so this does nothing for now, except call
200 the super method.
201
202 """
203 self._small_transition_matrix_cache = None
204 self._small_transition_matrix_cache_trans = None
205 return DictionaryHmmModel.clear_cache(self)
206
207 - def add_history(self, string):
208 """ Adds a line to the end of this model's history string. """
209 self.history += "%s: %s\n" % (datetime.now().isoformat(' '), string)
210
212 """
213 Returns a dict mapping the labels of this model's vocabulary to the
214 chord types of the chord corpus.
215
216 """
217 return get_mapping(self.chord_corpus_mapping, reverse=True)
218
220 """
221 Returns a dict mapping the chord types of the chord corpus to this
222 model's chord labels.
223
224 """
225 return get_mapping(self.chord_corpus_mapping)
226
228 """
229 Postprocesses the sequence, which should be a sequence in the form
230 it comes out from the labeler, to apply the corpus mapping associated
231 with this labeler. The result should be a chord sequence that uses
232 the chord corpus chord types.
233
234 """
235 mapping = self.get_mapping_to_corpus()
236 labels = [
237 [(ChordLabel(lab.root, mapping[lab.label], lab.key), prob) \
238 for (lab,prob) in chords] \
239 for chords in sequence]
240 return labels
241
242 @classmethod
243 - def initialize_chords(cls, chord_prob, max_notes, chord_vocab,
244 chord_mapping, name="default"):
245 """
246 Creates a new model with the distributions initialized naively to
247 favour simple chord-types, in a similar way to what R&S do in their
248 paper.
249
250 The transition distributions are initialized so that everything is
251 equiprobable.
252
253 @type chord_prob: float
254 @param chord_prob: prob of a note in the chord. This prob is
255 distributed over the notes of the chord. The remaining prob
256 mass is distributed over the remaining notes. You'll want this
257 to be big enough that chord notes are more probable than others.
258 @type max_notes: int
259 @param max_notes: maximum number of notes that can be generated in
260 each emission. Usually best to set to something high, like 100 -
261 it's just to make the distribution finite.
262 @type chord_vocab: dict
263 @param chord_vocab: a vocabularly of chord types. Mapping from string
264 chord labels to a list of ints representing the notes, relative
265 to the root that are contained in the chord.
266
267 """
268
269 dists = {}
270 for label,chord in chord_vocab.items():
271 probs = {}
272 for note in range(12):
273 if note in chord:
274
275 probs[note] = chord_prob / len(chord)
276 else:
277
278 probs[note] = (1.0 - chord_prob) / (12 - len(chord))
279 dists[label] = DictionaryProbDist(probs)
280 emission_dist = DictionaryConditionalProbDist(dists)
281
282
283 label_dom = [(key,root,label) for key in range(12) \
284 for root in range(12) \
285 for label in list(sorted(chord_vocab.keys()))]
286
287 initial_key_probs = {}
288 key_transition_probs = {}
289 initial_chord_probs = {}
290 chord_transition_probs = {}
291
292 for pitch in range(12):
293
294 initial_key_probs[pitch] = 1.0/12
295
296 key_transition_probs[pitch] = 1.0/12
297
298 num_chords = 12*len(chord_vocab)
299 for label in chord_vocab:
300 initial_chord_probs[(pitch,label)] = 1.0/num_chords
301
302 chord_transition_probs[(pitch,label)] = 1.0/(num_chords+1)
303
304 chord_transition_probs[None] = 1.0/(num_chords+1)
305
306 initial_key_dist = DictionaryProbDist(initial_key_probs)
307 key_transition_dist = DictionaryProbDist(key_transition_probs)
308 initial_chord_dist = DictionaryProbDist(initial_chord_probs)
309
310
311 chord_transition_dists = {}
312 for pitch in range(12):
313 for label in chord_vocab:
314 chord_transition_dists[(pitch,label)] = DictionaryProbDist(
315 chord_transition_probs.copy())
316 chord_transition_dist = DictionaryConditionalProbDist(
317 chord_transition_dists)
318
319
320 note_number_prob = 1.0 / max_notes
321 note_number_probs = {}
322 for i in range(max_notes):
323 note_number_probs[i] = note_number_prob
324 note_number_dist = DictionaryProbDist(note_number_probs)
325
326
327 model = cls(initial_key_dist,
328 initial_chord_dist,
329 key_transition_dist,
330 chord_transition_dist,
331 emission_dist,
332 note_number_dist,
333 chord_vocab,
334 max_notes,
335 chord_mapping,
336 name=name)
337 model.add_history(\
338 "Initialized model to chord type probabilities, using "\
339 "chord probability %s" % chord_prob)
340 return model
341
344 """
345 Train the transition distribution parameters in a supervised manner,
346 using chord corpus input.
347
348 This is used as an initialization step to set transition parameters
349 before running EM on unannotated data.
350
351 @type inputs: L{jazzparser.data.input.AnnotatedDbBulkInput}
352 @param inputs: annotated chord training data
353 @type input_keys: list of lists of ints
354 @param input_keys: the key associated with each chord. Should contain
355 a key list for each input sequence and each should be the length
356 of the chord sequence
357 @type chord_mapping: dict
358 @param chord_mapping: a mapping from the chord labels of the corpus to
359 those we will use for this model, so that we can use the training
360 data. See L{jazzparser.misc.chordlabel.chord_vocabs} for mappings
361 and use C{get_mapping} to prepare a dict from them. This doesn't
362 have to be the same as the mapping stored in the model
363 (C{model.chord_corpus_mapping}) and won't overwrite it. If not
364 given, the model's corpus mapping will be used
365
366 """
367 self.add_history(
368 "Training transition probabilities using %d annotated chord "\
369 "sequences" % len(inputs))
370
371 if chord_mapping is None:
372 chord_mapping = self.get_mapping_from_corpus()
373
374
375 sequences = []
376 for seq in inputs:
377 sequence = []
378 for chord in seq.chords:
379 sequence.append((chord.root, chord.type, chord.duration))
380 sequences.append(sequence)
381
382
383 sequences = [ \
384 [(root, chord_mapping.get(label, label), duration) for \
385 (root, label, duration) in sequence] for sequence in sequences]
386
387
388 rep_sequences = []
389 for seq in sequences:
390 sequence = []
391 for root,label,duration in seq:
392
393 for i in range(duration):
394 sequence.append((root,label))
395 rep_sequences.append(sequence)
396
397
398 initial_chord_counts = FreqDist()
399 key_transition_counts = FreqDist()
400 chord_transition_counts = ConditionalFreqDist()
401
402 for sequence,seq_keys in zip(rep_sequences, input_keys):
403
404 root0, label0 = sequence[0]
405 key0 = seq_keys[0][1]
406 initial_chord_counts.inc(((root0-key0)%12,label0))
407
408
409 last_relroot = (root0 - key0) % 12
410 last_label = label0
411 last_key = key0
412
413 for (root,label),(chord,key) in zip(sequence[1:], seq_keys[1:]):
414 key_change = (key - last_key) % 12
415 key_transition_counts.inc(key_change)
416
417
418 relroot = (root-key) % 12
419 chord_transition_counts[(last_relroot,last_label)].inc(\
420 (relroot,label))
421
422 last_key = key
423 last_relroot = relroot
424 last_label = label
425
426 chord_transition_counts[(last_relroot,last_label)].inc(None)
427
428
429 possible_chords = [(root,label) for root in range(12) for label in \
430 list(sorted(self.chord_vocab.keys()))]
431
432
433 initial_chord_dist = prob_dist_to_dictionary_prob_dist(\
434 laplace_estimator(initial_chord_counts, \
435 len(possible_chords)),
436 samples=possible_chords)
437 key_transition_dist = prob_dist_to_dictionary_prob_dist(\
438 laplace_estimator(key_transition_counts, 12),
439 samples=range(12))
440 chord_transition_dist = cond_prob_dist_to_dictionary_cond_prob_dist(\
441 ConditionalProbDist(chord_transition_counts,
442 laplace_estimator, len(possible_chords)+1),
443 conditions=possible_chords,
444 samples=possible_chords+[None])
445
446
447 self.initial_chord_dist = initial_chord_dist
448 self.key_transition_dist = key_transition_dist
449 self.chord_transition_dist = chord_transition_dist
450
451 self.clear_cache()
452
454 """
455 Trains the distribution over the number of notes emitted from a
456 chord class. It's not conditioned on the chord class, so the only
457 training data needed is a segmented MIDI corpus.
458
459 @type inputs: list of lists
460 @param inputs: training data. The same format as is produced by
461 L{jazzparser.taggers.segmidi.midi.midi_to_emission_stream}
462
463 """
464 self.add_history(
465 "Training emission number probabilities using %d MIDI segments"\
466 % len(inputs))
467
468 emission_number_counts = FreqDist()
469 for sequence in inputs:
470 for segment in sequence:
471 notes = len(segment)
472
473 if notes <= self.max_notes:
474 emission_number_counts.inc(notes)
475
476
477 for notes in range(self.max_notes):
478 emission_number_counts.inc(notes)
479
480
481 emission_number_dist = prob_dist_to_dictionary_prob_dist(\
482 mle_estimator(emission_number_counts, None))
483 self.emission_number_dist = emission_number_dist
484
485 @staticmethod
486 - def train(data, name, logger=None, options={}, chord_data=None):
487 """
488 Initializes and trains an HMM in a supervised fashion using the given
489 training data.
490
491 """
492 if len(data) == 0:
493 raise ModelTrainError, "empty training data set"
494
495
496 if logger is None:
497 logger = create_dummy_logger()
498
499
500 options = HPChordLabeler.process_training_options(options)
501
502
503
504 input_type = detect_input_type(data[0], allowed=['segmidi', 'db-annotated'])
505
506 logger.info(">>> Beginning training of HP chord labeler model '%s'" % name)
507
508 if isinstance(data, MidiTaggerTrainingBulkInput) and \
509 data.chords is not None:
510 if chord_data is None:
511
512 logger.info("Midi training data; chord corpus data available")
513 chord_inputs = data.chords
514 else:
515
516 chord_inputs = chord_data
517 midi_inputs = data
518 elif isinstance(data, DbBulkInput):
519 logger.info("Only chord corpus training data")
520
521 chord_inputs = data
522 midi_inputs = None
523 else:
524 chord_inputs = chord_data
525
526 midi_inputs = data
527 logger.info("Midi training data; no chord data was included")
528
529
530 logger.info("Model chord vocabulary: %s" % options['vocab'])
531 vocab, vocab_mapping = CHORD_VOCABS[options['vocab']]
532
533
534 logger.info("Initializing emission distributions to favour chord "\
535 "notes with chord probability %s" % (options['chordprob']))
536 model = HPChordLabeler.initialize_chords(options['chordprob'], \
537 options['maxnotes'], vocab, \
538 vocab_mapping, name=name)
539
540
541 if chord_inputs is not None:
542 logger.info("Training using chord data")
543
544
545
546 logger.info("Preparing key data for annotated chord sequences")
547 input_keys = [keys_for_sequence(dbinput) for dbinput in chord_inputs]
548
549
550 logger.info("Training transition distribution on chord sequences")
551 model.train_transition_distribution(chord_inputs, input_keys)
552
553 if midi_inputs is not None:
554 logger.info("Training using midi data")
555
556
557 emissions = [midi_to_emission_stream(seq,
558 remove_empty=False)[0] \
559 for seq in midi_inputs]
560
561
562 logger.info("Training emission number distribution")
563 model.train_emission_number_distribution(emissions)
564
565
566
567
568 bw_opt_names = [opt.name for opt in HPBaumWelchTrainer.OPTIONS]
569 bw_opts = dict([(name,val) for (name,val) in options.items() \
570 if name in bw_opt_names])
571
572 trainer = HPBaumWelchTrainer(model, bw_opts)
573
574 model = trainer.train(emissions, logger=logger)
575 logger.info("Training complete")
576
577 return model
578
579
580
582 if state is None:
583
584
585
586 key0,root0,label0 = previous_state
587 chord_root = (root0-key0) % 12
588 return self.chord_transition_dist[(chord_root,label0)].logprob(None)
589
590 if previous_state is None:
591
592 key,root,label = state
593 chord_root = (root-key) % 12
594 return self.initial_chord_dist.logprob((chord_root,label)) + \
595 self.initial_key_dist.logprob(key)
596
597
598 (key0,root0,label0),(key1,root1,label1) = (previous_state, state)
599 key_change = (key1 - key0) % 12
600 chord_root0 = (root0-key0) % 12
601 chord_root1 = (root1-key1) % 12
602
603
604
605 key_prob = self.key_transition_dist.logprob(key_change)
606 chord_prob = self.chord_transition_dist[(chord_root0,label0)].logprob(\
607 (chord_root1,label1))
608
609 return key_prob + chord_prob
610
612 """
613 Gives the probability P(emission | label). Returned as a base 2
614 log.
615
616 The emission should be a list of emitted notes as integer pitch classes.
617
618 """
619
620 key, root, label = state
621
622
623 prob = self.note_number_dist.logprob(len(emission))
624
625
626 for note in emission:
627
628 pc = (note - root) % 12
629 prob += self.emission_dist[label].logprob(pc)
630
631 return prob
632
634 """
635 We override this to make it faster by taking advantage of where we
636 know states share probabilities
637
638 """
639 T = len(sequence)
640 N = len(self.label_dom)
641
642 ems = numpy.zeros((T, N), numpy.float64)
643
644
645 for t,emission in enumerate(sequence):
646
647
648 chord_ems = {}
649 for root in range(12):
650 for label in self.chord_vocab:
651 chord_ems[(root,label)] = \
652 self.emission_probability(emission, (0,root,label))
653
654 for i,(key,root,label) in enumerate(self.label_dom):
655 ems[t,i] = chord_ems[(root,label)]
656 return ems
657
659 """
660 Instead of building the emission matrix for every state, just
661 includes probabilities for (root,label) pairs, decomposed into
662 2 dimensions. This is useful for the forward-backward calculations.
663
664 """
665 ems = numpy.zeros((len(sequence), 12, len(self.chord_types)), numpy.float64)
666
667 for t,emission in enumerate(sequence):
668
669
670 chord_ems = {}
671 for root in range(12):
672 for c,label in enumerate(self.chord_types):
673 ems[t,root,c] = \
674 self.emission_probability(emission, (0,root,label))
675 return ems
676
678 """
679 Decomposed version of just the chord part of the transition
680 probabilities, for forward-backward calculations.
681
682 """
683 if transpose:
684 if self._small_transition_matrix_cache_trans is None:
685
686 C = len(self.chord_types)
687 chords = self.chord_types
688 trans = numpy.zeros((12,12,C,12,12,C), numpy.float64)
689
690
691 for key0 in range(12):
692 for root0 in range(12):
693 for c0,chord0 in enumerate(chords):
694 for key1 in range(12):
695 for root1 in range(12):
696 for c1,chord1 in enumerate(chords):
697 trans[key0, root0, c0, \
698 key1, root1, c1] = \
699 self.transition_probability(
700 (key1,root1,chord1),
701 (key0,root0,chord0))
702
703 self._small_transition_matrix_cache_trans = trans
704 return self._small_transition_matrix_cache_trans
705 else:
706 if self._small_transition_matrix_cache is None:
707
708 C = len(self.chord_types)
709 chords = self.chord_types
710 trans = numpy.zeros((12,12,C,12,12,C), numpy.float64)
711
712
713 for key0 in range(12):
714 for root0 in range(12):
715 for c0,chord0 in enumerate(chords):
716 for key1 in range(12):
717 for root1 in range(12):
718 for c1,chord1 in enumerate(chords):
719 trans[key1, root1, c1, \
720 key0, root0, c0] = \
721 self.transition_probability(
722 (key1,root1,chord1),
723 (key0,root0,chord0))
724
725 self._small_transition_matrix_cache = trans
726 return self._small_transition_matrix_cache
727
729 """
730 Specialized version of this to make it faster.
731
732 @note: verified that this gets identical results to the superclass
733
734 @param seq_prob: return the log probability of the whole sequence
735 as well as the array (tuple of (array,logprob)).
736 @return: 2D Numpy array.
737 The first dimension represents timesteps, the second the states.
738
739 """
740 from numpy import newaxis
741 N = len(sequence)
742 states = self.label_dom
743 S = len(states)
744 chords = self.chord_types
745 C = len(chords)
746
747
748 ems = self.get_small_emission_matrix(sequence)
749 trans = self.get_small_transition_matrix()
750
751
752 forward_matrix = numpy.zeros((N,12,12,C), numpy.float64)
753
754 coefficients = numpy.zeros((N,), numpy.float64)
755
756
757 for root in range(12):
758 for c,chord in enumerate(chords):
759 for key in range(12):
760
761 forward_matrix[0,key,root,c] = self.transition_probability(
762 (key,root,chord), None)
763
764
765 forward_matrix[0] = forward_matrix[0] * ems[0]
766
767 coefficients[0] = logprob(numpy.sum(forward_matrix[0]))
768 forward_matrix[0] /= numpy.sum(forward_matrix[0])
769
770 for time in range(1, N):
771
772 trans_step = forward_matrix[time-1] * trans
773
774 for i in range(3):
775
776 trans_step = numpy.sum(trans_step, axis=-1)
777
778
779 forward_matrix[time] = trans_step * ems[time]
780
781 coefficients[time] = logprob(numpy.sum(forward_matrix[time]))
782 forward_matrix[time] /= numpy.sum(forward_matrix[time])
783
784 if not decomposed:
785
786
787
788 forward_matrix = forward_matrix.reshape(N, 12*12*C)
789
790 if seq_prob:
791 return forward_matrix, numpy.sum(coefficients)
792 else:
793 return forward_matrix
794
796 """
797 Specialized version of this to make it faster.
798
799 @note: verified that this gets identical results to the superclass
800
801 @param seq_prob: return the log probability of the whole sequence
802 as well as the array (tuple of (array,logprob)).
803 @return: 2D Numpy array.
804 The first dimension represents timesteps, the second the states.
805
806 """
807 N = len(sequence)
808 states = self.label_dom
809 chords = self.chord_types
810 C = len(chords)
811
812
813 trans_back = self.get_small_transition_matrix(transpose=True)
814 ems = self.get_small_emission_matrix(sequence)
815
816 backward_matrix = numpy.zeros((N,12,12,C), numpy.float64)
817
818
819 for root in range(12):
820 for c,chord in enumerate(chords):
821 for key in range(12):
822
823 backward_matrix[N-1,key,root,c] = \
824 self.transition_probability(None, (key,root,chord))
825
826 backward_matrix[N-1] /= numpy.sum(backward_matrix[N-1])
827
828
829 for time in range(N-2, -1, -1):
830
831
832
833
834
835 trans_step = (trans_back * backward_matrix[time+1]) * ems[time+1]
836 for i in range(3):
837 trans_step = numpy.sum(trans_step, axis=-1)
838 backward_matrix[time] = trans_step
839
840 backward_matrix[time] /= numpy.sum(backward_matrix[time])
841
842 if decomposed:
843 return backward_matrix
844 else:
845
846
847 return backward_matrix.reshape(N, 12*12*C)
848
849 - def compute_decomposed_xi(self, sequence, forward=None, backward=None,
850 emission_matrix=None, transition_matrix=None):
851 from numpy import newaxis
852
853 if forward is None:
854 forward = self.normal_forward_probabilities(sequence, decomposed=True)
855 if backward is None:
856 backward = self.normal_backward_probabilities(sequence, decomposed=True)
857
858
859 T = forward.shape[0]
860 C = len(self.chord_types)
861
862 xi = numpy.zeros((T-1,12,12,C,12,12,C), numpy.float64)
863
864
865 if emission_matrix is None:
866 emission_matrix = self.get_small_emission_matrix(sequence)
867
868 if transition_matrix is None:
869 transition_matrix = self.get_small_transition_matrix(transpose=True)
870
871
872 for t in range(T-1):
873 total = 0.0
874
875 fwd_trans = forward[t,:,:,:, newaxis,newaxis,newaxis]
876
877 xi[t] = transition_matrix * fwd_trans * backward[t+1] * \
878 emission_matrix[t+1]
879
880
881 xi[t] /= array_sum(xi[t])
882
883 return xi
884
885
886 - def label(self, midi, options={}, corpus=False):
887 """
888 Decodes the model with the given midi data to produce a chord labeling.
889 This is in the form of a set of possible chord labels for each midi
890 segment, each with a probability.
891
892 @type options: dict
893 @param options: labeling options: see L{HPChordLabeler.LABELING_OPTIONS}
894 @type midi: L{jazzparser.data.input.SegmentedMidiInput}
895 @param midi: the midi data to label
896 @type corpus: bool
897 @param corpus: if True, applies the corpus mapping associated with
898 the model to the labels so that the returned labels are corpus
899 labels instead of those used by the labeler
900
901 """
902 options = HPChordLabeler.process_labeling_options(options)
903 N = options['n']
904
905
906 emissions = midi_to_emission_stream(midi, remove_empty=False)[0]
907
908 if corpus:
909
910 cmap = self.get_mapping_to_corpus()
911 def _labmap(lab):
912 return cmap[lab]
913 else:
914 def _labmap(lab):
915 return lab
916
917 if options['viterbi']:
918 states = self.viterbi_decode(emissions)
919
920
921 top_tags = [[
922 (ChordLabel(root,
923 _labmap(label),
924 None if options['nokey'] else key,
925 label), 1.0)] \
926 for (key,root,label) in states]
927 else:
928 gamma = self.compute_gamma(emissions)
929
930
931 T = gamma.shape[0]
932 probabilities = []
933 for t in range(T):
934 timeprobs = {}
935 for i,label in enumerate(self.label_dom):
936 timeprobs[label] = gamma[t,i]
937 probabilities.append(timeprobs)
938
939
940 top_tags = []
941 for time,probs in enumerate(probabilities):
942 ranked = list(reversed(sorted(\
943 [(prob,(key,root,_labmap(label),label)) for \
944 ((key,root,label),prob) in probs.items()])))
945
946 if options['nokey']:
947
948 filtered = []
949 seen = []
950 for prob,(key,root,label,mlabel) in ranked:
951 if (root,label) not in seen:
952 filtered.append((prob,(None,root,label,mlabel)))
953 seen.append((root,label))
954 ranked = filtered
955
956
957 ranked = ranked[:N]
958
959 ranked = [(ChordLabel(root,label,key,mlabel),prob) for (prob,(key,root,label,mlabel)) in ranked]
960 top_tags.append(ranked[:N])
961
962 return top_tags
963
965 """
966 Decodes the model and produces a lattice of the top probability
967 chord labels.
968
969 This is just L{label} with some extra wrapping to produce a
970 L{jazzparser.data.input.WeightedChordLabelInput} for the lattice.
971
972 """
973 from jazzparser.data.input import WeightedChordLabelInput
974
975
976 top_tags = self.label(*args, **kwargs)
977
978 return WeightedChordLabelInput(top_tags)
979
980
992
993 return {
994 'initial_key_dist' : _object_to_dict(self.initial_key_dist),
995 'initial_chord_dist' : _object_to_dict(self.initial_chord_dist),
996 'key_transition_dist' : _object_to_dict(self.key_transition_dist),
997 'note_number_dist' : _object_to_dict(self.note_number_dist),
998 'chord_transition_dist' : _object_to_dict(self.chord_transition_dist),
999 'emission_dist' : _object_to_dict(self.emission_dist),
1000 'chord_vocab' : self.chord_vocab,
1001 'chord_corpus_mapping' : self.chord_corpus_mapping,
1002 'max_notes' : self.max_notes,
1003 'history' : self.history,
1004 'description' : self.description,
1005 }
1006
1007 @classmethod
1009 """
1010 Reproduces an model that was converted to a picklable
1011 form using to_picklable_dict.
1012
1013 """
1014 from jazzparser.utils.nltk.storage import dict_to_object
1015
1016 return cls(dict_to_object(data['initial_key_dist']),
1017 dict_to_object(data['initial_chord_dist']),
1018 dict_to_object(data['key_transition_dist']),
1019 dict_to_object(data['chord_transition_dist']),
1020 dict_to_object(data['emission_dist']),
1021 dict_to_object(data['note_number_dist']),
1022 data['chord_vocab'],
1023 data['max_notes'],
1024 data['chord_corpus_mapping'],
1025 history=data.get('history', ''),
1026 description=data.get('description', ''),
1027 name=name)
1028
1029 @classmethod
1032 @classmethod
1037 _filename = property(__get_my_filename)
1038
1039 @staticmethod
1043
1044 @staticmethod
1048
1049 @classmethod
1051 """ Returns a list of the names of available models. """
1052 model_dir = cls._get_model_dir()
1053 if not os.path.exists(model_dir):
1054 return []
1055 model_ext = ".%s" % FILE_EXTENSION
1056 names = [name.rpartition(model_ext) for name in os.listdir(model_dir)]
1057 return [name for name,ext,right in names if ext == model_ext and len(right) == 0]
1058
1060 """
1061 Saves the model data to a file.
1062 """
1063
1064 data = self.to_picklable_dict()
1065 data = pickle.dumps(data, 2)
1066 filename = self._filename
1067
1068 filedir = os.path.dirname(filename)
1069 if not os.path.exists(filedir):
1070 os.mkdir(filedir)
1071 f = open(filename, 'w')
1072 f.write(data)
1073 f.close()
1074
1076 """
1077 Removes all the model's data. It is assumed that the model
1078 will not be used at all after this has been called.
1079
1080 """
1081 fn = self._filename
1082 if os.path.exists(fn):
1083 os.remove(fn)
1084
1085 @classmethod
1087 filename = cls.__get_filename(model_name)
1088
1089 if os.path.exists(filename):
1090 f = open(filename, 'r')
1091 model_data = f.read()
1092 model_data = pickle.loads(model_data)
1093 f.close()
1094 else:
1095 raise ModelLoadError, "the model '%s' has not been trained" % model_name
1096 return cls.from_picklable_dict(model_data, model_name)
1097
1098
1100 text = ""
1101
1102
1103 text += "HMM with %d states\n" % len(self.label_dom)
1104 text += "\nChord vocab:\n %s\n" % "\n ".join(chord for chord in self.chord_vocab)
1105
1106
1107 text += "\nEmission dist:\n"
1108 for label in self.chord_vocab:
1109 text += " %s:\n" % label
1110 probs = reversed(sorted(
1111 [(self.emission_dist[label].prob(root),root) for \
1112 root in range(12)]))
1113 for (prob,root) in probs:
1114 text += " %d: %s\n" % (root, prob)
1115
1116
1117 text += "\nInitial key dist:\n"
1118 for key in range(12):
1119 text += " %d: %s\n" % (key, self.initial_key_dist.prob(key))
1120
1121
1122 text += "\nInitial chord dist:\n"
1123 for label in self.chord_vocab:
1124 text += " %s:\n" % label
1125 for root in range(12):
1126 text += " %d: %s\n" % (root, self.initial_chord_dist.prob((root,label)))
1127
1128
1129 text += "\nKey change dist:\n"
1130 for change in range(12):
1131 text += " %d: %s\n" % (change, self.key_transition_dist.prob(change))
1132
1133
1134 text += "\nChord transition dist:\n"
1135 for label0 in self.chord_vocab:
1136 for root0 in range(12):
1137 text += " %s, %d ->\n" % (label0, root0)
1138 for label1 in self.chord_vocab:
1139 for root1 in range(12):
1140 text += " %s, %d: %s\n" % (label1, root1, \
1141 self.chord_transition_dist[(root0,label0)].prob(\
1142 (root1,label1)))
1143 text += " End: %s\n" % \
1144 self.chord_transition_dist[(root0,label0)].prob(None)
1145 return text
1146 readable_parameters = property(_get_readable_params)
1147