Package jazzparser :: Package misc :: Package chordlabel :: Module hmm
[hide private]
[frames] | no frames]

Source Code for Module jazzparser.misc.chordlabel.hmm

   1  """An HMM midi chord labeler. 
   2   
   3  An HMM midi chord labeler based on the audio chord labeler of Ni, Mcvicar,  
   4  Santos-Rodriguez and De Bie, MIREX 2011, Harmony Progression (HP). 
   5   
   6  """ 
   7  """ 
   8  ============================== License ======================================== 
   9   Copyright (C) 2008, 2010-12 University of Edinburgh, Mark Granroth-Wilding 
  10    
  11   This file is part of The Jazz Parser. 
  12    
  13   The Jazz Parser is free software: you can redistribute it and/or modify 
  14   it under the terms of the GNU General Public License as published by 
  15   the Free Software Foundation, either version 3 of the License, or 
  16   (at your option) any later version. 
  17    
  18   The Jazz Parser is distributed in the hope that it will be useful, 
  19   but WITHOUT ANY WARRANTY; without even the implied warranty of 
  20   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
  21   GNU General Public License for more details. 
  22    
  23   You should have received a copy of the GNU General Public License 
  24   along with The Jazz Parser.  If not, see <http://www.gnu.org/licenses/>. 
  25   
  26  ============================ End license ====================================== 
  27   
  28  """ 
  29  __author__ = "Mark Granroth-Wilding <mark.granroth-wilding@ed.ac.uk>"  
  30   
  31  import numpy, os, re, math 
  32  from datetime import datetime 
  33  from numpy import ones, float64, sum as array_sum, zeros 
  34  import cPickle as pickle 
  35   
  36  from jazzparser.utils.nltk.ngram import DictionaryHmmModel 
  37  from jazzparser.utils.nltk.probability import logprob, add_logs, \ 
  38                          sum_logs, prob_dist_to_dictionary_prob_dist, \ 
  39                          cond_prob_dist_to_dictionary_cond_prob_dist, \ 
  40                          prob_dist_to_dictionary_prob_dist, mle_estimator, \ 
  41                          laplace_estimator 
  42  from jazzparser.utils.options import ModuleOption, choose_from_list 
  43  from jazzparser.utils.strings import str_to_bool 
  44  from jazzparser import settings 
  45  from jazzparser.data.input import detect_input_type, \ 
  46                          MidiTaggerTrainingBulkInput, DbBulkInput 
  47  from jazzparser.data.parsing import keys_for_sequence 
  48  from jazzparser.utils.loggers import create_dummy_logger 
  49   
  50  from nltk.probability import ConditionalProbDist, FreqDist, \ 
  51              ConditionalFreqDist, DictionaryProbDist, \ 
  52              DictionaryConditionalProbDist, MutableProbDist 
  53   
  54  from .chord_vocabs import CHORD_VOCABS, get_mapping 
  55  from .midi import midi_to_emission_stream 
  56  from .data import ChordLabel 
  57  from .baumwelch import HPBaumWelchTrainer 
  58  from . import ModelTrainError, ModelLoadError 
  59   
  60  FILE_EXTENSION = "mdl" 
61 62 -class HPChordLabeler(DictionaryHmmModel):
63 """ 64 HMM based on that of I{Harmony Progression Analyzer for MIREX 2011}, by 65 Ni, Mcvicar, Santos-Rodriguez, De Bie. Their audio labeling model is 66 here adapted to MIDI labeling. 67 68 The state labels are triples: C{(key,root,label)}. C{key} is the ET key 69 (C=0, C#=1, etc). C{root} is the chord root: e.g. C{root=7} denotes chord 70 root G. The chord root is not relative to the key, though the probabilities 71 stored in the distributions are. 72 73 """ 74 TRAINING_OPTIONS = [ 75 # Initialization options 76 ModuleOption('chordprob', filter=float, 77 help_text="Initialization of the emission distribution.", 78 usage="ccprob=P, P is a probability. Prob P is distributed "\ 79 "over the pitch classes that are in the chord.", 80 required=True), 81 ModuleOption('maxnotes', filter=int, 82 help_text="Maximum number of notes that can be generated from a "\ 83 "a state. Limit is required to make the distribution finite.", 84 usage="maxnotes=N, N is an integer", 85 default=100), 86 ModuleOption('vocab', filter=choose_from_list(CHORD_VOCABS.keys()), 87 help_text="Chord vocabulary for the model to use", 88 usage="vocab=V, V is one of %s" % ", ".join("'%s'" % v for v in CHORD_VOCABS.keys()), 89 required=True), 90 ModuleOption('split', filter=int, 91 help_text="Limits the length of midi inputs by splitting them into "\ 92 "fragments of at most this length.", 93 usage="split=X, where X is an int"), 94 ModuleOption('truncate', filter=int, 95 help_text="Limits the length of midi inputs by truncating them to "\ 96 "this number of timesteps. Truncation is applied before "\ 97 "splitting.", 98 usage="truncate=X, where X is an int"), 99 ] + HPBaumWelchTrainer.OPTIONS 100 LABELING_OPTIONS = [ 101 ModuleOption('n', filter=int, 102 help_text="Number of best labels to consider for each timestep", 103 usage="decoden=N, where N is an integer", 104 default=10), 105 ModuleOption('nokey', filter=str_to_bool, 106 help_text="Don't distinguish between keys: removed labels that "\ 107 "are duplicate if you ignore their key and return all chords "\ 108 "with a key of 0. Probabilities are not summed: only the "\ 109 "highest is returned", 110 usage="nokey=B, where B is 'true' or 'false'", 111 default=False), 112 ModuleOption('viterbi', filter=str_to_bool, 113 help_text="Use Viterbi decoding, instead of forward-backward. Only "\ 114 "one label will be returned for each chord, but it's likey "\ 115 "to be a more coherent sequence", 116 usage="viterbi=B, where B is 'true' or 'false'", 117 default=False), 118 ] 119
120 - def __init__(self, initial_key_dist, initial_chord_dist, 121 key_transition_dist, chord_transition_dist, 122 emission_dist, note_number_dist, 123 chord_vocab, max_notes, chord_corpus_mapping, 124 history="", description="", name="default"):
125 self.order = 2 126 self.backoff_model = None 127 128 self.emission_dom = range(12) 129 self.num_emissions = 12 130 self.chord_vocab = chord_vocab 131 self.chord_types = list(sorted(chord_vocab.keys())) 132 self.chord_corpus_mapping = chord_corpus_mapping 133 # Possible chord tags (root and label) 134 self.chord_dom = [(root,label) for root in range(12) \ 135 for label in self.chord_types] 136 self.label_dom = [(key,root,label) for key in range(12) \ 137 for root in range(12) \ 138 for label in self.chord_types] 139 self.num_labels = len(self.label_dom) 140 141 self.initial_key_dist = initial_key_dist 142 self.initial_chord_dist = initial_chord_dist 143 self.key_transition_dist = key_transition_dist 144 self.chord_transition_dist = chord_transition_dist 145 self.note_number_dist = note_number_dist 146 self.emission_dist = emission_dist 147 self.max_notes = max_notes 148 149 # Store a string with information about training, etc 150 self.history = history 151 # Store another string with an editable description 152 self.description = description 153 154 self.model_name = name 155 156 # Initialize the various caches 157 # These will be filled as we access probabilities 158 self.clear_cache()
159
160 - def copy(self, mutable=False):
161 """ 162 Creates a complete copy of the model, optionally making the 163 distributions mutable. 164 165 """ 166 # Copy all the distributions 167 initial_key_dist = prob_dist_to_dictionary_prob_dist( 168 self.initial_key_dist, mutable=mutable) 169 initial_chord_dist = prob_dist_to_dictionary_prob_dist( 170 self.initial_chord_dist, mutable=mutable) 171 key_transition_dist = prob_dist_to_dictionary_prob_dist( 172 self.key_transition_dist, mutable=mutable) 173 chord_transition_dist = cond_prob_dist_to_dictionary_cond_prob_dist( 174 self.chord_transition_dist, mutable=mutable) 175 emission_dist = cond_prob_dist_to_dictionary_cond_prob_dist( 176 self.emission_dist, mutable=mutable) 177 note_number_dist = prob_dist_to_dictionary_prob_dist( 178 self.note_number_dist, mutable=mutable) 179 180 return HPChordLabeler(initial_key_dist, 181 initial_chord_dist, 182 key_transition_dist, 183 chord_transition_dist, 184 emission_dist, 185 note_number_dist, 186 self.chord_vocab, 187 self.max_notes, 188 self.chord_corpus_mapping, 189 history = self.history, 190 description = self.description, 191 name = self.model_name)
192
193 - def clear_cache(self):
194 """ 195 Initializes or empties probability distribution caches. 196 197 Make sure to call this if you change or update the distributions. 198 199 No caches used thus far, so this does nothing for now, except call 200 the super method. 201 202 """ 203 self._small_transition_matrix_cache = None 204 self._small_transition_matrix_cache_trans = None 205 return DictionaryHmmModel.clear_cache(self)
206
207 - def add_history(self, string):
208 """ Adds a line to the end of this model's history string. """ 209 self.history += "%s: %s\n" % (datetime.now().isoformat(' '), string)
210
211 - def get_mapping_to_corpus(self):
212 """ 213 Returns a dict mapping the labels of this model's vocabulary to the 214 chord types of the chord corpus. 215 216 """ 217 return get_mapping(self.chord_corpus_mapping, reverse=True)
218
219 - def get_mapping_from_corpus(self):
220 """ 221 Returns a dict mapping the chord types of the chord corpus to this 222 model's chord labels. 223 224 """ 225 return get_mapping(self.chord_corpus_mapping)
226
227 - def map_to_corpus(self, sequence):
228 """ 229 Postprocesses the sequence, which should be a sequence in the form 230 it comes out from the labeler, to apply the corpus mapping associated 231 with this labeler. The result should be a chord sequence that uses 232 the chord corpus chord types. 233 234 """ 235 mapping = self.get_mapping_to_corpus() 236 labels = [ 237 [(ChordLabel(lab.root, mapping[lab.label], lab.key), prob) \ 238 for (lab,prob) in chords] \ 239 for chords in sequence] 240 return labels
241 242 @classmethod
243 - def initialize_chords(cls, chord_prob, max_notes, chord_vocab, 244 chord_mapping, name="default"):
245 """ 246 Creates a new model with the distributions initialized naively to 247 favour simple chord-types, in a similar way to what R&S do in their 248 paper. 249 250 The transition distributions are initialized so that everything is 251 equiprobable. 252 253 @type chord_prob: float 254 @param chord_prob: prob of a note in the chord. This prob is 255 distributed over the notes of the chord. The remaining prob 256 mass is distributed over the remaining notes. You'll want this 257 to be big enough that chord notes are more probable than others. 258 @type max_notes: int 259 @param max_notes: maximum number of notes that can be generated in 260 each emission. Usually best to set to something high, like 100 - 261 it's just to make the distribution finite. 262 @type chord_vocab: dict 263 @param chord_vocab: a vocabularly of chord types. Mapping from string 264 chord labels to a list of ints representing the notes, relative 265 to the root that are contained in the chord. 266 267 """ 268 # Condition emission distribution on chord label 269 dists = {} 270 for label,chord in chord_vocab.items(): 271 probs = {} 272 for note in range(12): 273 if note in chord: 274 # Assign the share of the chord prob to this note 275 probs[note] = chord_prob / len(chord) 276 else: 277 # Assign a share of the remaining prob to this 278 probs[note] = (1.0 - chord_prob) / (12 - len(chord)) 279 dists[label] = DictionaryProbDist(probs) 280 emission_dist = DictionaryConditionalProbDist(dists) 281 282 # Work out the state label domain 283 label_dom = [(key,root,label) for key in range(12) \ 284 for root in range(12) \ 285 for label in list(sorted(chord_vocab.keys()))] 286 # Create all the distributions that will be uniform 287 initial_key_probs = {} 288 key_transition_probs = {} 289 initial_chord_probs = {} 290 chord_transition_probs = {} 291 # Initialize transition distributions so every transition is equiprobable 292 for pitch in range(12): 293 # Each initial key (0-12) is equiprobable 294 initial_key_probs[pitch] = 1.0/12 295 # Each key transition (0-12) is equiprobable 296 key_transition_probs[pitch] = 1.0/12 297 # Each pair of chord label and root, relative to key, is equiprobable 298 num_chords = 12*len(chord_vocab) 299 for label in chord_vocab: 300 initial_chord_probs[(pitch,label)] = 1.0/num_chords 301 # Each chord transition is equiprobable 302 chord_transition_probs[(pitch,label)] = 1.0/(num_chords+1) 303 # Include the transition to the final state 304 chord_transition_probs[None] = 1.0/(num_chords+1) 305 306 initial_key_dist = DictionaryProbDist(initial_key_probs) 307 key_transition_dist = DictionaryProbDist(key_transition_probs) 308 initial_chord_dist = DictionaryProbDist(initial_chord_probs) 309 310 # Use the same (uniform) chord distribution for each previous chord 311 chord_transition_dists = {} 312 for pitch in range(12): 313 for label in chord_vocab: 314 chord_transition_dists[(pitch,label)] = DictionaryProbDist( 315 chord_transition_probs.copy()) 316 chord_transition_dist = DictionaryConditionalProbDist( 317 chord_transition_dists) 318 319 # Also initialize the notes number distribution to uniform 320 note_number_prob = 1.0 / max_notes 321 note_number_probs = {} 322 for i in range(max_notes): 323 note_number_probs[i] = note_number_prob 324 note_number_dist = DictionaryProbDist(note_number_probs) 325 326 # Create the model 327 model = cls(initial_key_dist, 328 initial_chord_dist, 329 key_transition_dist, 330 chord_transition_dist, 331 emission_dist, 332 note_number_dist, 333 chord_vocab, 334 max_notes, 335 chord_mapping, 336 name=name) 337 model.add_history(\ 338 "Initialized model to chord type probabilities, using "\ 339 "chord probability %s" % chord_prob) 340 return model
341
342 - def train_transition_distribution(self, inputs, input_keys, 343 chord_mapping=None):
344 """ 345 Train the transition distribution parameters in a supervised manner, 346 using chord corpus input. 347 348 This is used as an initialization step to set transition parameters 349 before running EM on unannotated data. 350 351 @type inputs: L{jazzparser.data.input.AnnotatedDbBulkInput} 352 @param inputs: annotated chord training data 353 @type input_keys: list of lists of ints 354 @param input_keys: the key associated with each chord. Should contain 355 a key list for each input sequence and each should be the length 356 of the chord sequence 357 @type chord_mapping: dict 358 @param chord_mapping: a mapping from the chord labels of the corpus to 359 those we will use for this model, so that we can use the training 360 data. See L{jazzparser.misc.chordlabel.chord_vocabs} for mappings 361 and use C{get_mapping} to prepare a dict from them. This doesn't 362 have to be the same as the mapping stored in the model 363 (C{model.chord_corpus_mapping}) and won't overwrite it. If not 364 given, the model's corpus mapping will be used 365 366 """ 367 self.add_history( 368 "Training transition probabilities using %d annotated chord "\ 369 "sequences" % len(inputs)) 370 371 if chord_mapping is None: 372 chord_mapping = self.get_mapping_from_corpus() 373 374 # Prepare the label sequences that we'll train on 375 sequences = [] 376 for seq in inputs: 377 sequence = [] 378 for chord in seq.chords: 379 sequence.append((chord.root, chord.type, chord.duration)) 380 sequences.append(sequence) 381 382 # Apply the mapping to the chord data 383 sequences = [ \ 384 [(root, chord_mapping.get(label, label), duration) for \ 385 (root, label, duration) in sequence] for sequence in sequences] 386 387 # Repeat values with a duration > 1 388 rep_sequences = [] 389 for seq in sequences: 390 sequence = [] 391 for root,label,duration in seq: 392 # Put it in once for each duration 393 for i in range(duration): 394 sequence.append((root,label)) 395 rep_sequences.append(sequence) 396 397 # Count up the observations 398 initial_chord_counts = FreqDist() 399 key_transition_counts = FreqDist() 400 chord_transition_counts = ConditionalFreqDist() 401 402 for sequence,seq_keys in zip(rep_sequences, input_keys): 403 # Count the initial events 404 root0, label0 = sequence[0] 405 key0 = seq_keys[0][1] 406 initial_chord_counts.inc(((root0-key0)%12,label0)) 407 # Don't count the initial key distribution: leave that uniform 408 409 last_relroot = (root0 - key0) % 12 410 last_label = label0 411 last_key = key0 412 413 for (root,label),(chord,key) in zip(sequence[1:], seq_keys[1:]): 414 key_change = (key - last_key) % 12 415 key_transition_counts.inc(key_change) 416 417 # Take the root relative to the key we're in 418 relroot = (root-key) % 12 419 chord_transition_counts[(last_relroot,last_label)].inc(\ 420 (relroot,label)) 421 422 last_key = key 423 last_relroot = relroot 424 last_label = label 425 # Note the transition to the final state from this last state 426 chord_transition_counts[(last_relroot,last_label)].inc(None) 427 428 # Build the correct domains of these distributions 429 possible_chords = [(root,label) for root in range(12) for label in \ 430 list(sorted(self.chord_vocab.keys()))] 431 432 # Estimate the prob dists from these counts 433 initial_chord_dist = prob_dist_to_dictionary_prob_dist(\ 434 laplace_estimator(initial_chord_counts, \ 435 len(possible_chords)), 436 samples=possible_chords) 437 key_transition_dist = prob_dist_to_dictionary_prob_dist(\ 438 laplace_estimator(key_transition_counts, 12), 439 samples=range(12)) 440 chord_transition_dist = cond_prob_dist_to_dictionary_cond_prob_dist(\ 441 ConditionalProbDist(chord_transition_counts, 442 laplace_estimator, len(possible_chords)+1), 443 conditions=possible_chords, 444 samples=possible_chords+[None]) 445 446 # Replace the model's transition distributions 447 self.initial_chord_dist = initial_chord_dist 448 self.key_transition_dist = key_transition_dist 449 self.chord_transition_dist = chord_transition_dist 450 # Invalidate the cache 451 self.clear_cache()
452
453 - def train_emission_number_distribution(self, inputs):
454 """ 455 Trains the distribution over the number of notes emitted from a 456 chord class. It's not conditioned on the chord class, so the only 457 training data needed is a segmented MIDI corpus. 458 459 @type inputs: list of lists 460 @param inputs: training data. The same format as is produced by 461 L{jazzparser.taggers.segmidi.midi.midi_to_emission_stream} 462 463 """ 464 self.add_history( 465 "Training emission number probabilities using %d MIDI segments"\ 466 % len(inputs)) 467 468 emission_number_counts = FreqDist() 469 for sequence in inputs: 470 for segment in sequence: 471 notes = len(segment) 472 # There should very rarely be more than the max num of notes 473 if notes <= self.max_notes: 474 emission_number_counts.inc(notes) 475 476 # Apply simple laplace smoothing 477 for notes in range(self.max_notes): 478 emission_number_counts.inc(notes) 479 480 # Make a prob dist out of this 481 emission_number_dist = prob_dist_to_dictionary_prob_dist(\ 482 mle_estimator(emission_number_counts, None)) 483 self.emission_number_dist = emission_number_dist
484 485 @staticmethod
486 - def train(data, name, logger=None, options={}, chord_data=None):
487 """ 488 Initializes and trains an HMM in a supervised fashion using the given 489 training data. 490 491 """ 492 if len(data) == 0: 493 raise ModelTrainError, "empty training data set" 494 495 # Prepare a dummy logger if none was given 496 if logger is None: 497 logger = create_dummy_logger() 498 499 # Process the options dict 500 options = HPChordLabeler.process_training_options(options) 501 502 # Work out what kind of input data we've got 503 # It should be a bulk input type: check what type the first input is 504 input_type = detect_input_type(data[0], allowed=['segmidi', 'db-annotated']) 505 506 logger.info(">>> Beginning training of HP chord labeler model '%s'" % name) 507 # If we got midi tagger training data, it may include chord data as well 508 if isinstance(data, MidiTaggerTrainingBulkInput) and \ 509 data.chords is not None: 510 if chord_data is None: 511 # Use the chord data in the input data 512 logger.info("Midi training data; chord corpus data available") 513 chord_inputs = data.chords 514 else: 515 # Use the chord data that was given explicitly 516 chord_inputs = chord_data 517 midi_inputs = data 518 elif isinstance(data, DbBulkInput): 519 logger.info("Only chord corpus training data") 520 # This was only chord input, no midi data 521 chord_inputs = data 522 midi_inputs = None 523 else: 524 chord_inputs = chord_data 525 # Presumably this is another form of midi training data 526 midi_inputs = data 527 logger.info("Midi training data; no chord data was included") 528 529 # Get the chord vocab from the options 530 logger.info("Model chord vocabulary: %s" % options['vocab']) 531 vocab, vocab_mapping = CHORD_VOCABS[options['vocab']] 532 533 # Initialize a model according to the chord types 534 logger.info("Initializing emission distributions to favour chord "\ 535 "notes with chord probability %s" % (options['chordprob'])) 536 model = HPChordLabeler.initialize_chords(options['chordprob'], \ 537 options['maxnotes'], vocab, \ 538 vocab_mapping, name=name) 539 540 # If we have chord training data, use this to train the transition dist 541 if chord_inputs is not None: 542 logger.info("Training using chord data") 543 544 # Construct the trees implicit in the annotations to get the 545 # key of every chord 546 logger.info("Preparing key data for annotated chord sequences") 547 input_keys = [keys_for_sequence(dbinput) for dbinput in chord_inputs] 548 549 # Run the supervised training of the transition distribution 550 logger.info("Training transition distribution on chord sequences") 551 model.train_transition_distribution(chord_inputs, input_keys) 552 553 if midi_inputs is not None: 554 logger.info("Training using midi data") 555 556 # Preprocess the midi inputs so they're ready for the model training 557 emissions = [midi_to_emission_stream(seq, 558 remove_empty=False)[0] \ 559 for seq in midi_inputs] 560 561 # Use the midi data to train emission number dist 562 logger.info("Training emission number distribution") 563 model.train_emission_number_distribution(emissions) 564 565 ####### EM unsupervised training on the midi data 566 # Pull out the options to pass to the trainer 567 # These are a subset of the model training options 568 bw_opt_names = [opt.name for opt in HPBaumWelchTrainer.OPTIONS] 569 bw_opts = dict([(name,val) for (name,val) in options.items() \ 570 if name in bw_opt_names]) 571 # Create a Baum-Welch trainer 572 trainer = HPBaumWelchTrainer(model, bw_opts) 573 # Do the Baum-Welch training 574 model = trainer.train(emissions, logger=logger) 575 logger.info("Training complete") 576 577 return model
578 579 580 ################## Probabilities ###################
581 - def transition_log_probability(self, state, previous_state):
582 if state is None: 583 # Final state 584 # This is represented in the distribution as the chord 585 # transition to None (and has no dependence on root) 586 key0,root0,label0 = previous_state 587 chord_root = (root0-key0) % 12 588 return self.chord_transition_dist[(chord_root,label0)].logprob(None) 589 590 if previous_state is None: 591 # Initial state: take from a different distribution 592 key,root,label = state 593 chord_root = (root-key) % 12 594 return self.initial_chord_dist.logprob((chord_root,label)) + \ 595 self.initial_key_dist.logprob(key) 596 597 # Split up the states 598 (key0,root0,label0),(key1,root1,label1) = (previous_state, state) 599 key_change = (key1 - key0) % 12 600 chord_root0 = (root0-key0) % 12 601 chord_root1 = (root1-key1) % 12 602 603 # Multiply together the probability of the key change and the chord 604 # transition 605 key_prob = self.key_transition_dist.logprob(key_change) 606 chord_prob = self.chord_transition_dist[(chord_root0,label0)].logprob(\ 607 (chord_root1,label1)) 608 609 return key_prob + chord_prob
610
611 - def emission_log_probability(self, emission, state):
612 """ 613 Gives the probability P(emission | label). Returned as a base 2 614 log. 615 616 The emission should be a list of emitted notes as integer pitch classes. 617 618 """ 619 # Condition the emission probs only on the chord class (and root) 620 key, root, label = state 621 622 # Get the probability of generating this number of notes 623 prob = self.note_number_dist.logprob(len(emission)) 624 625 # Multiply together the probabilities of each note 626 for note in emission: 627 # Take the note relative to the chord root 628 pc = (note - root) % 12 629 prob += self.emission_dist[label].logprob(pc) 630 631 return prob
632
633 - def get_emission_matrix(self, sequence):
634 """ 635 We override this to make it faster by taking advantage of where we 636 know states share probabilities 637 638 """ 639 T = len(sequence) 640 N = len(self.label_dom) 641 642 ems = numpy.zeros((T, N), numpy.float64) 643 644 # Set the probability for every timestep... 645 for t,emission in enumerate(sequence): 646 # Compute the emission probability for each root/label combo 647 # We know that the key makes no difference to this 648 chord_ems = {} 649 for root in range(12): 650 for label in self.chord_vocab: 651 chord_ems[(root,label)] = \ 652 self.emission_probability(emission, (0,root,label)) 653 # ...given each state 654 for i,(key,root,label) in enumerate(self.label_dom): 655 ems[t,i] = chord_ems[(root,label)] 656 return ems
657
658 - def get_small_emission_matrix(self, sequence):
659 """ 660 Instead of building the emission matrix for every state, just 661 includes probabilities for (root,label) pairs, decomposed into 662 2 dimensions. This is useful for the forward-backward calculations. 663 664 """ 665 ems = numpy.zeros((len(sequence), 12, len(self.chord_types)), numpy.float64) 666 # Set the probability for every timestep... 667 for t,emission in enumerate(sequence): 668 # Compute the emission probability for each root/label combo 669 # We know that the key makes no difference to this 670 chord_ems = {} 671 for root in range(12): 672 for c,label in enumerate(self.chord_types): 673 ems[t,root,c] = \ 674 self.emission_probability(emission, (0,root,label)) 675 return ems
676
677 - def get_small_transition_matrix(self, transpose=False):
678 """ 679 Decomposed version of just the chord part of the transition 680 probabilities, for forward-backward calculations. 681 682 """ 683 if transpose: 684 if self._small_transition_matrix_cache_trans is None: 685 # Compute the matrix from scratch, as we've not done it yet 686 C = len(self.chord_types) 687 chords = self.chord_types 688 trans = numpy.zeros((12,12,C,12,12,C), numpy.float64) 689 690 # Fill the matrix 691 for key0 in range(12): 692 for root0 in range(12): 693 for c0,chord0 in enumerate(chords): 694 for key1 in range(12): 695 for root1 in range(12): 696 for c1,chord1 in enumerate(chords): 697 trans[key0, root0, c0, \ 698 key1, root1, c1] = \ 699 self.transition_probability( 700 (key1,root1,chord1), 701 (key0,root0,chord0)) 702 703 self._small_transition_matrix_cache_trans = trans 704 return self._small_transition_matrix_cache_trans 705 else: 706 if self._small_transition_matrix_cache is None: 707 # Compute the matrix from scratch, as we've not done it yet 708 C = len(self.chord_types) 709 chords = self.chord_types 710 trans = numpy.zeros((12,12,C,12,12,C), numpy.float64) 711 712 # Fill the matrix 713 for key0 in range(12): 714 for root0 in range(12): 715 for c0,chord0 in enumerate(chords): 716 for key1 in range(12): 717 for root1 in range(12): 718 for c1,chord1 in enumerate(chords): 719 trans[key1, root1, c1, \ 720 key0, root0, c0] = \ 721 self.transition_probability( 722 (key1,root1,chord1), 723 (key0,root0,chord0)) 724 725 self._small_transition_matrix_cache = trans 726 return self._small_transition_matrix_cache
727
728 - def normal_forward_probabilities(self, sequence, seq_prob=False, decomposed=False):
729 """ 730 Specialized version of this to make it faster. 731 732 @note: verified that this gets identical results to the superclass 733 734 @param seq_prob: return the log probability of the whole sequence 735 as well as the array (tuple of (array,logprob)). 736 @return: 2D Numpy array. 737 The first dimension represents timesteps, the second the states. 738 739 """ 740 from numpy import newaxis 741 N = len(sequence) 742 states = self.label_dom 743 S = len(states) 744 chords = self.chord_types 745 C = len(chords) 746 747 # Prepare the transition and emission matrices 748 ems = self.get_small_emission_matrix(sequence) 749 trans = self.get_small_transition_matrix() 750 # Initialize an empty matrix 751 # The dims of the matrix are (time, key, root, label) 752 forward_matrix = numpy.zeros((N,12,12,C), numpy.float64) 753 # Create an array for the total logprobs 754 coefficients = numpy.zeros((N,), numpy.float64) 755 756 # First fill in the first columns with transitions from None 757 for root in range(12): 758 for c,chord in enumerate(chords): 759 for key in range(12): 760 # Fill in with the (None-padded) transition probability 761 forward_matrix[0,key,root,c] = self.transition_probability( 762 (key,root,chord), None) 763 # Multiply in the emission probabilities 764 # These get broadcast over the last dim, key 765 forward_matrix[0] = forward_matrix[0] * ems[0] 766 # Normalize 767 coefficients[0] = logprob(numpy.sum(forward_matrix[0])) 768 forward_matrix[0] /= numpy.sum(forward_matrix[0]) 769 770 for time in range(1, N): 771 # Multiply in the transition matrix to get the new state probabilities 772 trans_step = forward_matrix[time-1] * trans 773 # DIMS: key, root, label, key[-1], root[-1], label[-1] 774 for i in range(3): 775 # Sum over previous states 776 trans_step = numpy.sum(trans_step, axis=-1) 777 # Multiply in the emission probabilities 778 # This broadcasts over keys, since emissions don't care about key 779 forward_matrix[time] = trans_step * ems[time] 780 # Normalize the timestep 781 coefficients[time] = logprob(numpy.sum(forward_matrix[time])) 782 forward_matrix[time] /= numpy.sum(forward_matrix[time]) 783 784 if not decomposed: 785 # Reshape the array so it has only two dimensions 786 # The dimensions are ordered in the same way as the components of the 787 # labels, so we just reshape 788 forward_matrix = forward_matrix.reshape(N, 12*12*C) 789 790 if seq_prob: 791 return forward_matrix, numpy.sum(coefficients) 792 else: 793 return forward_matrix
794
795 - def normal_backward_probabilities(self, sequence, decomposed=False):
796 """ 797 Specialized version of this to make it faster. 798 799 @note: verified that this gets identical results to the superclass 800 801 @param seq_prob: return the log probability of the whole sequence 802 as well as the array (tuple of (array,logprob)). 803 @return: 2D Numpy array. 804 The first dimension represents timesteps, the second the states. 805 806 """ 807 N = len(sequence) 808 states = self.label_dom 809 chords = self.chord_types 810 C = len(chords) 811 812 # Prepare the transition and emission matrices 813 trans_back = self.get_small_transition_matrix(transpose=True) 814 ems = self.get_small_emission_matrix(sequence) 815 # Initialize an empty matrix 816 backward_matrix = numpy.zeros((N,12,12,C), numpy.float64) 817 818 # First fill in the last column with transitions to None 819 for root in range(12): 820 for c,chord in enumerate(chords): 821 for key in range(12): 822 # Fill in with the transition probability to the final state 823 backward_matrix[N-1,key,root,c] = \ 824 self.transition_probability(None, (key,root,chord)) 825 # Normalize 826 backward_matrix[N-1] /= numpy.sum(backward_matrix[N-1]) 827 828 # Work backwards, filling in the matrix 829 for time in range(N-2, -1, -1): 830 # The transition matrix is reversed so we have backwards transitions 831 # Then we multiply in the emission probabilities, broadcast over 832 # the first index so we broadcast over keys 833 # Summing over the first three axes sums over possible next states 834 # To speed up the computations, we keep the whole lot transposed 835 trans_step = (trans_back * backward_matrix[time+1]) * ems[time+1] 836 for i in range(3): 837 trans_step = numpy.sum(trans_step, axis=-1) 838 backward_matrix[time] = trans_step 839 # Normalize over the timestep 840 backward_matrix[time] /= numpy.sum(backward_matrix[time]) 841 842 if decomposed: 843 return backward_matrix 844 else: 845 # Reshape the array so it has only two dimensions 846 # Dimensions are ordered in same way as the components of the labels 847 return backward_matrix.reshape(N, 12*12*C)
848
849 - def compute_decomposed_xi(self, sequence, forward=None, backward=None, 850 emission_matrix=None, transition_matrix=None):
851 from numpy import newaxis 852 853 if forward is None: 854 forward = self.normal_forward_probabilities(sequence, decomposed=True) 855 if backward is None: 856 backward = self.normal_backward_probabilities(sequence, decomposed=True) 857 # T is the number of timesteps 858 # N is the number of states 859 T = forward.shape[0] 860 C = len(self.chord_types) 861 # Create the empty array to fill 862 xi = numpy.zeros((T-1,12,12,C,12,12,C), numpy.float64) 863 864 # Precompute all the emission probabilities 865 if emission_matrix is None: 866 emission_matrix = self.get_small_emission_matrix(sequence) 867 # And transition probabilities: we'll need these many times over 868 if transition_matrix is None: 869 transition_matrix = self.get_small_transition_matrix(transpose=True) 870 871 # Do it without logs - much faster 872 for t in range(T-1): 873 total = 0.0 874 # Add axies to the forward probabilities to represent the next state 875 fwd_trans = forward[t,:,:,:, newaxis,newaxis,newaxis] 876 # Compute the xi values by multiplying the arrays together 877 xi[t] = transition_matrix * fwd_trans * backward[t+1] * \ 878 emission_matrix[t+1] 879 # Normalize all the probabilities 880 # Sum all the probs for the timestep and divide them all by total 881 xi[t] /= array_sum(xi[t]) 882 883 return xi
884 885 ################## Labeling ###################
886 - def label(self, midi, options={}, corpus=False):
887 """ 888 Decodes the model with the given midi data to produce a chord labeling. 889 This is in the form of a set of possible chord labels for each midi 890 segment, each with a probability. 891 892 @type options: dict 893 @param options: labeling options: see L{HPChordLabeler.LABELING_OPTIONS} 894 @type midi: L{jazzparser.data.input.SegmentedMidiInput} 895 @param midi: the midi data to label 896 @type corpus: bool 897 @param corpus: if True, applies the corpus mapping associated with 898 the model to the labels so that the returned labels are corpus 899 labels instead of those used by the labeler 900 901 """ 902 options = HPChordLabeler.process_labeling_options(options) 903 N = options['n'] 904 905 # Preprocess the midi data to get an emission stream 906 emissions = midi_to_emission_stream(midi, remove_empty=False)[0] 907 908 if corpus: 909 # Prepare the corpus mapping to apply to every chord label 910 cmap = self.get_mapping_to_corpus() 911 def _labmap(lab): 912 return cmap[lab]
913 else: 914 def _labmap(lab): 915 return lab
916 917 if options['viterbi']: 918 states = self.viterbi_decode(emissions) 919 920 # A list of one top label for each timestep 921 top_tags = [[ 922 (ChordLabel(root, 923 _labmap(label), 924 None if options['nokey'] else key, 925 label), 1.0)] \ 926 for (key,root,label) in states] 927 else: 928 gamma = self.compute_gamma(emissions) 929 930 # Match up the elements in the array with their labels 931 T = gamma.shape[0] 932 probabilities = [] 933 for t in range(T): 934 timeprobs = {} 935 for i,label in enumerate(self.label_dom): 936 timeprobs[label] = gamma[t,i] 937 probabilities.append(timeprobs) 938 939 # Get just the top N labels for each timestep 940 top_tags = [] 941 for time,probs in enumerate(probabilities): 942 ranked = list(reversed(sorted(\ 943 [(prob,(key,root,_labmap(label),label)) for \ 944 ((key,root,label),prob) in probs.items()]))) 945 946 if options['nokey']: 947 # Ignore key and remove repeated labels 948 filtered = [] 949 seen = [] 950 for prob,(key,root,label,mlabel) in ranked: 951 if (root,label) not in seen: 952 filtered.append((prob,(None,root,label,mlabel))) 953 seen.append((root,label)) 954 ranked = filtered 955 956 # Cut off after the top N 957 ranked = ranked[:N] 958 # Convert all these tuples to chord label objects 959 ranked = [(ChordLabel(root,label,key,mlabel),prob) for (prob,(key,root,label,mlabel)) in ranked] 960 top_tags.append(ranked[:N]) 961 962 return top_tags 963
964 - def label_lattice(self, *args, **kwargs):
965 """ 966 Decodes the model and produces a lattice of the top probability 967 chord labels. 968 969 This is just L{label} with some extra wrapping to produce a 970 L{jazzparser.data.input.WeightedChordLabelInput} for the lattice. 971 972 """ 973 from jazzparser.data.input import WeightedChordLabelInput 974 975 # Use label() to get the labels 976 top_tags = self.label(*args, **kwargs) 977 978 return WeightedChordLabelInput(top_tags)
979 980 ################## Storage ####################
981 - def to_picklable_dict(self):
982 """ Produces a picklable representation of model as a dict. """ 983 from jazzparser.utils.nltk.storage import object_to_dict 984 985 # We could store mutable distributions, but it's nicer to convert 986 # them to unmutable ones before storing 987 def _object_to_dict(obj): 988 if type(obj) == MutableProbDist: 989 # Convert to a normal dict prob dist 990 obj = prob_dist_to_dictionary_prob_dist(obj) 991 return object_to_dict(obj)
992 993 return { 994 'initial_key_dist' : _object_to_dict(self.initial_key_dist), 995 'initial_chord_dist' : _object_to_dict(self.initial_chord_dist), 996 'key_transition_dist' : _object_to_dict(self.key_transition_dist), 997 'note_number_dist' : _object_to_dict(self.note_number_dist), 998 'chord_transition_dist' : _object_to_dict(self.chord_transition_dist), 999 'emission_dist' : _object_to_dict(self.emission_dist), 1000 'chord_vocab' : self.chord_vocab, 1001 'chord_corpus_mapping' : self.chord_corpus_mapping, 1002 'max_notes' : self.max_notes, 1003 'history' : self.history, 1004 'description' : self.description, 1005 } 1006 1007 @classmethod
1008 - def from_picklable_dict(cls, data, name):
1009 """ 1010 Reproduces an model that was converted to a picklable 1011 form using to_picklable_dict. 1012 1013 """ 1014 from jazzparser.utils.nltk.storage import dict_to_object 1015 1016 return cls(dict_to_object(data['initial_key_dist']), 1017 dict_to_object(data['initial_chord_dist']), 1018 dict_to_object(data['key_transition_dist']), 1019 dict_to_object(data['chord_transition_dist']), 1020 dict_to_object(data['emission_dist']), 1021 dict_to_object(data['note_number_dist']), 1022 data['chord_vocab'], 1023 data['max_notes'], 1024 data['chord_corpus_mapping'], 1025 history=data.get('history', ''), 1026 description=data.get('description', ''), 1027 name=name)
1028 1029 @classmethod
1030 - def _get_model_dir(cls):
1031 return os.path.join(settings.MODEL_DATA_DIR, "hp_chords")
1032 @classmethod
1033 - def __get_filename(cls, model_name):
1034 return os.path.join(cls._get_model_dir(), "%s.%s" % (model_name, FILE_EXTENSION))
1035 - def __get_my_filename(self):
1036 return type(self).__get_filename(self.model_name)
1037 _filename = property(__get_my_filename) 1038 1039 @staticmethod
1040 - def process_training_options(opts):
1041 """ Verifies and processes the training option values. """ 1042 return ModuleOption.process_option_dict(opts, HPChordLabeler.TRAINING_OPTIONS)
1043 1044 @staticmethod
1045 - def process_labeling_options(opts):
1046 """ Verifies and processes the labeling option values (dict). """ 1047 return ModuleOption.process_option_dict(opts, HPChordLabeler.LABELING_OPTIONS)
1048 1049 @classmethod
1050 - def list_models(cls):
1051 """ Returns a list of the names of available models. """ 1052 model_dir = cls._get_model_dir() 1053 if not os.path.exists(model_dir): 1054 return [] 1055 model_ext = ".%s" % FILE_EXTENSION 1056 names = [name.rpartition(model_ext) for name in os.listdir(model_dir)] 1057 return [name for name,ext,right in names if ext == model_ext and len(right) == 0]
1058
1059 - def save(self):
1060 """ 1061 Saves the model data to a file. 1062 """ 1063 # Convert to picklable data 1064 data = self.to_picklable_dict() 1065 data = pickle.dumps(data, 2) 1066 filename = self._filename 1067 # Check the directory exists 1068 filedir = os.path.dirname(filename) 1069 if not os.path.exists(filedir): 1070 os.mkdir(filedir) 1071 f = open(filename, 'w') 1072 f.write(data) 1073 f.close()
1074
1075 - def delete(self):
1076 """ 1077 Removes all the model's data. It is assumed that the model 1078 will not be used at all after this has been called. 1079 1080 """ 1081 fn = self._filename 1082 if os.path.exists(fn): 1083 os.remove(fn)
1084 1085 @classmethod
1086 - def load_model(cls, model_name):
1087 filename = cls.__get_filename(model_name) 1088 # Load the model from a file 1089 if os.path.exists(filename): 1090 f = open(filename, 'r') 1091 model_data = f.read() 1092 model_data = pickle.loads(model_data) 1093 f.close() 1094 else: 1095 raise ModelLoadError, "the model '%s' has not been trained" % model_name 1096 return cls.from_picklable_dict(model_data, model_name)
1097 1098 ############### Output ##############
1099 - def _get_readable_params(self):
1100 text = "" 1101 1102 # General information about the model 1103 text += "HMM with %d states\n" % len(self.label_dom) 1104 text += "\nChord vocab:\n %s\n" % "\n ".join(chord for chord in self.chord_vocab) 1105 1106 # Emission distribution 1107 text += "\nEmission dist:\n" 1108 for label in self.chord_vocab: 1109 text += " %s:\n" % label 1110 probs = reversed(sorted( 1111 [(self.emission_dist[label].prob(root),root) for \ 1112 root in range(12)])) 1113 for (prob,root) in probs: 1114 text += " %d: %s\n" % (root, prob) 1115 1116 # Initial key distribution 1117 text += "\nInitial key dist:\n" 1118 for key in range(12): 1119 text += " %d: %s\n" % (key, self.initial_key_dist.prob(key)) 1120 1121 # Initial chord distribution 1122 text += "\nInitial chord dist:\n" 1123 for label in self.chord_vocab: 1124 text += " %s:\n" % label 1125 for root in range(12): 1126 text += " %d: %s\n" % (root, self.initial_chord_dist.prob((root,label))) 1127 1128 # Key change distribution 1129 text += "\nKey change dist:\n" 1130 for change in range(12): 1131 text += " %d: %s\n" % (change, self.key_transition_dist.prob(change)) 1132 1133 # Chord transition distribution 1134 text += "\nChord transition dist:\n" 1135 for label0 in self.chord_vocab: 1136 for root0 in range(12): 1137 text += " %s, %d ->\n" % (label0, root0) 1138 for label1 in self.chord_vocab: 1139 for root1 in range(12): 1140 text += " %s, %d: %s\n" % (label1, root1, \ 1141 self.chord_transition_dist[(root0,label0)].prob(\ 1142 (root1,label1))) 1143 text += " End: %s\n" % \ 1144 self.chord_transition_dist[(root0,label0)].prob(None) 1145 return text
1146 readable_parameters = property(_get_readable_params) 1147