""" This is an implementation of a state-emitting MarkovModel. I am using terminology similar to Manning and Schutze. Functions: train_bw Train a markov model using the Baum-Welch algorithm. train_visible Train a visible markov model using MLE. find_states Find the a state sequence that explains some observations. load Load a MarkovModel. save Save a MarkovModel. Classes: MarkovModel Holds the description of a markov model """ import numpy def itemindex(values): d = {} entries = enumerate(values[::-1]) n = len(values)-1 for index, key in entries: d[key] = n-index return d numpy.random.seed() VERY_SMALL_NUMBER = 1E-300 LOG0 = numpy.log(VERY_SMALL_NUMBER) class MarkovModel: def __init__(self, states, alphabet, p_initial=None, p_transition=None, p_emission=None): self.states = states self.alphabet = alphabet self.p_initial = p_initial self.p_transition = p_transition self.p_emission = p_emission def __str__(self): import io handle = io.StringIO() save(self, handle) handle.seek(0) return handle.read() def _readline_and_check_start(handle, start): line = handle.readline() if not line.startswith(start): raise ValueError("I expected %r but got %r" % (start, line)) return line def load(handle): """load(handle) -> MarkovModel()""" # Load the states. line = _readline_and_check_start(handle, "STATES:") states = line.split()[1:] # Load the alphabet. line = _readline_and_check_start(handle, "ALPHABET:") alphabet = line.split()[1:] mm = MarkovModel(states, alphabet) N, M = len(states), len(alphabet) # Load the initial probabilities. mm.p_initial = numpy.zeros(N) line = _readline_and_check_start(handle, "INITIAL:") for i in range(len(states)): line = _readline_and_check_start(handle, " %s:" % states[i]) mm.p_initial[i] = float(line.split()[-1]) # Load the transition. mm.p_transition = numpy.zeros((N, N)) line = _readline_and_check_start(handle, "TRANSITION:") for i in range(len(states)): line = _readline_and_check_start(handle, " %s:" % states[i]) mm.p_transition[i,:] = list(map(float, line.split()[1:])) # Load the emission. mm.p_emission = numpy.zeros((N, M)) line = _readline_and_check_start(handle, "EMISSION:") for i in range(len(states)): line = _readline_and_check_start(handle, " %s:" % states[i]) mm.p_emission[i,:] = list(map(float, line.split()[1:])) return mm def save(mm, handle): """save(mm, handle)""" # This will fail if there are spaces in the states or alphabet. w = handle.write w("STATES: %s\n" % ' '.join(mm.states)) w("ALPHABET: %s\n" % ' '.join(mm.alphabet)) w("INITIAL:\n") for i in range(len(mm.p_initial)): w(" %s: %g\n" % (mm.states[i], mm.p_initial[i])) w("TRANSITION:\n") for i in range(len(mm.p_transition)): x = list(map(str, mm.p_transition[i])) w(" %s: %s\n" % (mm.states[i], ' '.join(x))) w("EMISSION:\n") for i in range(len(mm.p_emission)): x = list(map(str, mm.p_emission[i])) w(" %s: %s\n" % (mm.states[i], ' '.join(x))) # XXX allow them to specify starting points def train_bw(states, alphabet, training_data, pseudo_initial=None, pseudo_transition=None, pseudo_emission=None, update_fn=None, ): """train_bw(states, alphabet, training_data[, pseudo_initial] [, pseudo_transition][, pseudo_emission][, update_fn]) -> MarkovModel Train a MarkovModel using the Baum-Welch algorithm. states is a list of strings that describe the names of each state. alphabet is a list of objects that indicate the allowed outputs. training_data is a list of observations. Each observation is a list of objects from the alphabet. pseudo_initial, pseudo_transition, and pseudo_emission are optional parameters that you can use to assign pseudo-counts to different matrices. They should be matrices of the appropriate size that contain numbers to add to each parameter matrix, before normalization. update_fn is an optional callback that takes parameters (iteration, log_likelihood). It is called once per iteration. """ N, M = len(states), len(alphabet) if not training_data: raise ValueError("No training data given.") if pseudo_initial!=None: pseudo_initial = numpy.asarray(pseudo_initial) if pseudo_initial.shape != (N,): raise ValueError("pseudo_initial not shape len(states)") if pseudo_transition!=None: pseudo_transition = numpy.asarray(pseudo_transition) if pseudo_transition.shape != (N,N): raise ValueError("pseudo_transition not shape " + \ "len(states) X len(states)") if pseudo_emission!=None: pseudo_emission = numpy.asarray(pseudo_emission) if pseudo_emission.shape != (N,M): raise ValueError("pseudo_emission not shape " + \ "len(states) X len(alphabet)") # Training data is given as a list of members of the alphabet. # Replace those with indexes into the alphabet list for easier # computation. training_outputs = [] indexes = itemindex(alphabet) for outputs in training_data: training_outputs.append([indexes[x] for x in outputs]) # Do some sanity checking on the outputs. lengths = list(map(len, training_outputs)) if min(lengths) == 0: raise ValueError("I got training data with outputs of length 0") # Do the training with baum welch. x = _baum_welch(N, M, training_outputs, pseudo_initial=pseudo_initial, pseudo_transition=pseudo_transition, pseudo_emission=pseudo_emission, update_fn=update_fn) p_initial, p_transition, p_emission = x return MarkovModel(states, alphabet, p_initial, p_transition, p_emission) MAX_ITERATIONS = 1000 def _baum_welch(N, M, training_outputs, p_initial=None, p_transition=None, p_emission=None, pseudo_initial=None, pseudo_transition=None, pseudo_emission=None, update_fn=None): # Returns (p_initial, p_transition, p_emission) if p_initial==None: p_initial = _random_norm(N) else: p_initial = _copy_and_check(p_initial, (N,)) if p_transition==None: p_transition = _random_norm((N,N)) else: p_transition = _copy_and_check(p_transition, (N,N)) if p_emission==None: p_emission = _random_norm((N,M)) else: p_emission = _copy_and_check(p_emission, (N,M)) # Do all the calculations in log space to avoid underflows. lp_initial, lp_transition, lp_emission = list(map( numpy.log, (p_initial, p_transition, p_emission))) if pseudo_initial!=None: lpseudo_initial = numpy.log(pseudo_initial) else: lpseudo_initial = None if pseudo_transition!=None: lpseudo_transition = numpy.log(pseudo_transition) else: lpseudo_transition = None if pseudo_emission!=None: lpseudo_emission = numpy.log(pseudo_emission) else: lpseudo_emission = None # Iterate through each sequence of output, updating the parameters # to the HMM. Stop when the log likelihoods of the sequences # stops varying. prev_llik = None for i in range(MAX_ITERATIONS): llik = LOG0 for outputs in training_outputs: x = _baum_welch_one( N, M, outputs, lp_initial, lp_transition, lp_emission, lpseudo_initial, lpseudo_transition, lpseudo_emission,) llik += x if update_fn is not None: update_fn(i, llik) if prev_llik is not None and numpy.fabs(prev_llik-llik) < 0.1: break prev_llik = llik else: raise RuntimeError("HMM did not converge in %d iterations" \ % MAX_ITERATIONS) # Return everything back in normal space. return list(map(numpy.exp, (lp_initial, lp_transition, lp_emission))) def _baum_welch_one(N, M, outputs, lp_initial, lp_transition, lp_emission, lpseudo_initial, lpseudo_transition, lpseudo_emission): # Do one iteration of Baum-Welch based on a sequence of output. # NOTE: This will change the values of lp_initial, lp_transition, # and lp_emission in place. T = len(outputs) fmat = _forward(N, T, lp_initial, lp_transition, lp_emission, outputs) bmat = _backward(N, T, lp_transition, lp_emission, outputs) # Calculate the probability of traversing each arc for any given # transition. lp_arc = numpy.zeros((N, N, T)) for t in range(T): k = outputs[t] lp_traverse = numpy.zeros((N, N)) # P going over one arc. for i in range(N): for j in range(N): # P(getting to this arc) # P(making this transition) # P(emitting this character) # P(going to the end) lp = fmat[i][t] + \ lp_transition[i][j] + \ lp_emission[i][k] + \ bmat[j][t+1] lp_traverse[i][j] = lp # Normalize the probability for this time step. lp_arc[:,:,t] = lp_traverse - _logsum(lp_traverse) # Sum of all the transitions out of state i at time t. lp_arcout_t = numpy.zeros((N, T)) for t in range(T): for i in range(N): lp_arcout_t[i][t] = _logsum(lp_arc[i,:,t]) # Sum of all the transitions out of state i. lp_arcout = numpy.zeros(N) for i in range(N): lp_arcout[i] = _logsum(lp_arcout_t[i,:]) # UPDATE P_INITIAL. lp_initial = lp_arcout_t[:,0] if lpseudo_initial!=None: lp_initial = _logvecadd(lp_initial, lpseudo_initial) lp_initial = lp_initial - _logsum(lp_initial) # UPDATE P_TRANSITION. p_transition[i][j] is the sum of all the # transitions from i to j, normalized by the sum of the # transitions out of i. for i in range(N): for j in range(N): lp_transition[i][j] = _logsum(lp_arc[i,j,:]) - lp_arcout[i] if lpseudo_transition!=None: lp_transition[i] = _logvecadd(lp_transition[i], lpseudo_transition) lp_transition[i] = lp_transition[i] - _logsum(lp_transition[i]) # UPDATE P_EMISSION. lp_emission[i][k] is the sum of all the # transitions out of i when k is observed, divided by the sum of # the transitions out of i. for i in range(N): ksum = numpy.zeros(M)+LOG0 # ksum[k] is the sum of all i with k. for t in range(T): k = outputs[t] for j in range(N): ksum[k] = _logadd(ksum[k], lp_arc[i,j,t]) ksum = ksum - _logsum(ksum) # Normalize if lpseudo_emission!=None: ksum = _logvecadd(ksum, lpseudo_emission[i]) ksum = ksum - _logsum(ksum) # Renormalize lp_emission[i,:] = ksum # Calculate the log likelihood of the output based on the forward # matrix. Since the parameters of the HMM has changed, the log # likelihoods are going to be a step behind, and we might be doing # one extra iteration of training. The alternative is to rerun # the _forward algorithm and calculate from the clean one, but # that may be more expensive than overshooting the training by one # step. return _logsum(fmat[:,T]) def _forward(N, T, lp_initial, lp_transition, lp_emission, outputs): # Implement the forward algorithm. This actually calculates a # Nx(T+1) matrix, where the last column is the total probability # of the output. matrix = numpy.zeros((N, T+1)) # Initialize the first column to be the initial values. matrix[:,0] = lp_initial for t in range(1, T+1): k = outputs[t-1] for j in range(N): # The probability of the state is the sum of the # transitions from all the states from time t-1. lprob = LOG0 for i in range(N): lp = matrix[i][t-1] + \ lp_transition[i][j] + \ lp_emission[i][k] lprob = _logadd(lprob, lp) matrix[j][t] = lprob return matrix def _backward(N, T, lp_transition, lp_emission, outputs): matrix = numpy.zeros((N, T+1)) for t in range(T-1, -1, -1): k = outputs[t] for i in range(N): # The probability of the state is the sum of the # transitions from all the states from time t+1. lprob = LOG0 for j in range(N): lp = matrix[j][t+1] + \ lp_transition[i][j] + \ lp_emission[i][k] lprob = _logadd(lprob, lp) matrix[i][t] = lprob return matrix def train_visible(states, alphabet, training_data, pseudo_initial=None, pseudo_transition=None, pseudo_emission=None): """train_visible(states, alphabet, training_data[, pseudo_initial] [, pseudo_transition][, pseudo_emission]) -> MarkovModel Train a visible MarkovModel using maximum likelihoood estimates for each of the parameters. states is a list of strings that describe the names of each state. alphabet is a list of objects that indicate the allowed outputs. training_data is a list of (outputs, observed states) where outputs is a list of the emission from the alphabet, and observed states is a list of states from states. pseudo_initial, pseudo_transition, and pseudo_emission are optional parameters that you can use to assign pseudo-counts to different matrices. They should be matrices of the appropriate size that contain numbers to add to each parameter matrix """ N, M = len(states), len(alphabet) if pseudo_initial!=None: pseudo_initial = numpy.asarray(pseudo_initial) if pseudo_initial.shape != (N,): raise ValueError("pseudo_initial not shape len(states)") if pseudo_transition!=None: pseudo_transition = numpy.asarray(pseudo_transition) if pseudo_transition.shape != (N,N): raise ValueError("pseudo_transition not shape " + \ "len(states) X len(states)") if pseudo_emission!=None: pseudo_emission = numpy.asarray(pseudo_emission) if pseudo_emission.shape != (N,M): raise ValueError("pseudo_emission not shape " + \ "len(states) X len(alphabet)") # Training data is given as a list of members of the alphabet. # Replace those with indexes into the alphabet list for easier # computation. training_states, training_outputs = [], [] states_indexes = itemindex(states) outputs_indexes = itemindex(alphabet) for toutputs, tstates in training_data: if len(tstates) != len(toutputs): raise ValueError("states and outputs not aligned") training_states.append([states_indexes[x] for x in tstates]) training_outputs.append([outputs_indexes[x] for x in toutputs]) x = _mle(N, M, training_outputs, training_states, pseudo_initial, pseudo_transition, pseudo_emission) p_initial, p_transition, p_emission = x return MarkovModel(states, alphabet, p_initial, p_transition, p_emission) def _mle(N, M, training_outputs, training_states, pseudo_initial, pseudo_transition, pseudo_emission): # p_initial is the probability that a sequence of states starts # off with a particular one. p_initial = numpy.zeros(N) if pseudo_initial: p_initial = p_initial + pseudo_initial for states in training_states: p_initial[states[0]] += 1 p_initial = _normalize(p_initial) # p_transition is the probability that a state leads to the next # one. C(i,j)/C(i) where i and j are states. p_transition = numpy.zeros((N,N)) if pseudo_transition: p_transition = p_transition + pseudo_transition for states in training_states: for n in range(len(states)-1): i, j = states[n], states[n+1] p_transition[i, j] += 1 for i in range(len(p_transition)): p_transition[i,:] = p_transition[i,:] / sum(p_transition[i,:]) # p_emission is the probability of an output given a state. # C(s,o)|C(s) where o is an output and s is a state. p_emission = numpy.zeros((N,M)) if pseudo_emission: p_emission = p_emission + pseudo_emission p_emission = numpy.ones((N,M)) for outputs, states in zip(training_outputs, training_states): for o, s in zip(outputs, states): p_emission[s, o] += 1 for i in range(len(p_emission)): p_emission[i,:] = p_emission[i,:] / sum(p_emission[i,:]) return p_initial, p_transition, p_emission def _argmaxes(vector, allowance=None): return [numpy.argmax(vector)] def find_states(markov_model, output): """find_states(markov_model, output) -> list of (states, score)""" mm = markov_model N = len(mm.states) # _viterbi does calculations in log space. Add a tiny bit to the # matrices so that the logs will not break. x = mm.p_initial + VERY_SMALL_NUMBER y = mm.p_transition + VERY_SMALL_NUMBER z = mm.p_emission + VERY_SMALL_NUMBER lp_initial, lp_transition, lp_emission = list(map(numpy.log, (x, y, z))) # Change output into a list of indexes into the alphabet. indexes = itemindex(mm.alphabet) output = [indexes[x] for x in output] # Run the viterbi algorithm. results = _viterbi(N, lp_initial, lp_transition, lp_emission, output) for i in range(len(results)): states, score = results[i] results[i] = [mm.states[x] for x in states], numpy.exp(score) return results def _viterbi(N, lp_initial, lp_transition, lp_emission, output): # The Viterbi algorithm finds the most likely set of states for a # given output. Returns a list of states. T = len(output) # Store the backtrace in a NxT matrix. backtrace = [] # list of indexes of states in previous timestep. for i in range(N): backtrace.append([None] * T) # Store the best scores. scores = numpy.zeros((N, T)) scores[:,0] = lp_initial + lp_emission[:,output[0]] for t in range(1, T): k = output[t] for j in range(N): # Find the most likely place it came from. i_scores = scores[:,t-1] + \ lp_transition[:,j] + \ lp_emission[j,k] indexes = _argmaxes(i_scores) scores[j,t] = i_scores[indexes[0]] backtrace[j][t] = indexes # Do the backtrace. First, find a good place to start. Then, # we'll follow the backtrace matrix to find the list of states. # In the event of ties, there may be multiple paths back through # the matrix, which implies a recursive solution. We'll simulate # it by keeping our own stack. in_process = [] # list of (t, states, score) results = [] # return values. list of (states, score) indexes = _argmaxes(scores[:,T-1]) # pick the first place for i in indexes: in_process.append((T-1, [i], scores[i][T-1])) while in_process: t, states, score = in_process.pop() if t == 0: results.append((states, score)) else: indexes = backtrace[states[0]][t] for i in indexes: in_process.append((t-1, [i]+states, score)) return results def _normalize(matrix): # Make sure numbers add up to 1.0 if len(matrix.shape) == 1: matrix = matrix / float(sum(matrix)) elif len(matrix.shape) == 2: # Normalize by rows. for i in range(len(matrix)): matrix[i,:] = matrix[i,:] / sum(matrix[i,:]) else: raise ValueError("I cannot handle matrixes of that shape") return matrix def _uniform_norm(shape): matrix = numpy.ones(shape) return _normalize(matrix) def _random_norm(shape): matrix = numpy.random.random(shape) return _normalize(matrix) def _copy_and_check(matrix, desired_shape): # Copy the matrix. matrix = numpy.array(matrix, copy=1) # Check the dimensions. if matrix.shape != desired_shape: raise ValueError("Incorrect dimension") # Make sure it's normalized. if len(matrix.shape) == 1: if numpy.fabs(sum(matrix)-1.0) > 0.01: raise ValueError("matrix not normalized to 1.0") elif len(matrix.shape) == 2: for i in range(len(matrix)): if numpy.fabs(sum(matrix[i])-1.0) > 0.01: raise ValueError("matrix %d not normalized to 1.0" % i) else: raise ValueError("I don't handle matrices > 2 dimensions") return matrix def _logadd(logx, logy): if logy - logx > 100: return logy elif logx - logy > 100: return logx minxy = min(logx, logy) return minxy + numpy.log(numpy.exp(logx-minxy) + numpy.exp(logy-minxy)) def _logsum(matrix): if len(matrix.shape) > 1: vec = numpy.reshape(matrix, (numpy.product(matrix.shape),)) else: vec = matrix sum = LOG0 for num in vec: sum = _logadd(sum, num) return sum def _logvecadd(logvec1, logvec2): assert len(logvec1) == len(logvec2), "vectors aren't the same length" sumvec = numpy.zeros(len(logvec1)) for i in range(len(logvec1)): sumvec[i] = _logadd(logvec1[i], logvec2[i]) return sumvec def _exp_logsum(numbers): sum = _logsum(numbers) return numpy.exp(sum) try: import cMarkovModel except ImportError as x: pass else: import sys this_module = sys.modules[__name__] for name in list(cMarkovModel.__dict__.keys()): if not name.startswith("__"): this_module.__dict__[name] = cMarkovModel.__dict__[name]