# Copyright 2012 by Wibowo Arindrarto. All rights reserved. # This code is part of the Biopython distribution and governed by its # license. Please see the LICENSE file that should have been included # as part of this package. """Bio.SearchIO parser for Exonerate plain text output format.""" import re from itertools import chain from Bio._py3k import _as_bytes, _bytes_to_string from Bio._py3k import zip from Bio.Alphabet import generic_protein from ._base import _BaseExonerateParser, _BaseExonerateIndexer, _STRAND_MAP, \ _parse_hit_or_query_line from .exonerate_vulgar import _RE_VULGAR __all__ = ['ExonerateTextParser', 'ExonerateTextIndexer'] # for capturing sequences in alignment blocks # e.g. ' 529 : ATCCCTTATCTCTTTATCTTGTA : 472' _RE_ALN_ROW = re.compile(r'\s*\d+\s+: (.*) :\s+\d+') # for splitting the line based on intron annotations # e.g. ' >>>> Target Intron 1 >>>> ' or 'gt.........................ag' _RE_EXON = re.compile(r'[atgc ]{2,}?(?:(?:[<>]+ \w+ Intron \d+ [<>]+)|(?:\.+))[atgc ]{2,}?') # captures the intron length # from e.g. '61 bp // 154295 bp' (joint intron lengths) or '177446 bp' _RE_EXON_LEN = re.compile(r'(?:(\d+) bp // (\d+) bp)|(?:(\d+) bp)') # for splitting lines in the NER model _RE_NER = re.compile(r'--<\s+\d+\s+>--') # for capturing NER gap lengths _RE_NER_LEN = re.compile(r'--<\s+(\d+)\s+>--') # regexes for capturing the letters inside curly braces # no. of letters is either 1 or 2, since they are split codons _RE_SCODON_START = re.compile(r'\{(\w{1,2})\}$') _RE_SCODON_END = re.compile(r'^\{(\w{1,2})\}') def _flip_codons(codon_seq, target_seq): """Flips the codon characters from one seq to another.""" a, b = '', '' for char1, char2 in zip(codon_seq, target_seq): # no need to do anything if the codon seq line has nothing if char1 == ' ': a += char1 b += char2 else: a += char2 b += char1 return a, b def _get_block_coords(parsed_seq, row_dict, has_ner=False): """Returns a list of start, end coordinates for each given block in the sequence.""" start = 0 coords = [] if not has_ner: splitter = _RE_EXON else: splitter = _RE_NER # use the query line for reference seq = parsed_seq[row_dict['query']] for block in re.split(splitter, seq): start += seq[start:].find(block) end = start + len(block) coords.append((start, end)) return coords def _get_inter_coords(coords, strand=1): """From the given pairs of coordinates, returns a list of pairs covering the intervening ranges.""" # adapted from Python's itertools guide # if strand is -1, adjust coords to the ends and starts are chained if strand == -1: sorted_coords = [(max(a, b), min(a, b)) for a, b in coords] inter_coords = list(chain(*sorted_coords))[1:-1] return list(zip(inter_coords[1::2], inter_coords[::2])) else: inter_coords = list(chain(*coords))[1:-1] return list(zip(inter_coords[::2], inter_coords[1::2])) def _stitch_rows(raw_rows): """Stitches together the parsed alignment rows and returns them in a list.""" # deal with possible codon surprise! # (i.e. alignments with codons using cdna2genome model) # by creating additional rows to contain the codons try: max_len = max(len(x) for x in raw_rows) for row in raw_rows: assert len(row) == max_len except AssertionError: for idx, row in enumerate(raw_rows): if len(row) != max_len: # codons must be present in the query and hit (so +2) assert len(row) + 2 == max_len # add additional empty lines to contain codons raw_rows[idx] = [' ' * len(row[0])] + row + [' ' * len(row[0])] cmbn_rows = [] for idx, row in enumerate(raw_rows[0]): cmbn_row = ''.join(aln_row[idx] for aln_row in raw_rows) cmbn_rows.append(cmbn_row) # the real aligned sequence is always the 'outer' one, so we want # to flip them with their 'inner' pairs if len(cmbn_rows) == 5: # flip query sequence cmbn_rows[0], cmbn_rows[1] = \ _flip_codons(cmbn_rows[0], cmbn_rows[1]) # flip hit sequence cmbn_rows[4], cmbn_rows[3] = \ _flip_codons(cmbn_rows[4], cmbn_rows[3]) return cmbn_rows def _get_row_dict(row_len, model): """Returns a dictionary of row indices for parsing alignment blocks.""" idx = {} # 3 lines, usually in dna vs dna models if row_len == 3: idx['query'] = 0 idx['midline'] = 1 idx['hit'] = 2 idx['qannot'], idx['hannot'] = None, None # 4 lines, in protein vs dna models or dna vs protein models # TODO: currently we check this from the model string; is there # a better way to do it? elif row_len == 4: if 'protein2' in model: idx['query'] = 0 idx['midline'] = 1 idx['hit'] = 2 idx['hannot'] = 3 idx['qannot'] = None elif '2protein' in model: idx['query'] = 1 idx['midline'] = 2 idx['hit'] = 3 idx['hannot'] = None idx['qannot'] = 0 else: raise ValueError("Unexpected model: " + model) # 5 lines, translated dna vs translated dna elif row_len == 5: # set sequence indexes idx['qannot'] = 0 idx['query'] = 1 idx['midline'] = 2 idx['hit'] = 3 idx['hannot'] = 4 else: raise ValueError("Unexpected row count in alignment block: " "%i" % row_len) return idx def _get_blocks(rows, coords, idx): """Returns a list of dictionaries of sequences split by the coordinates.""" for idx_name in ('query', 'hit', 'midline', 'qannot', 'hannot'): assert idx_name in idx blocks = [] for start, end in coords: block = {} # get seqs according to index block['query'] = rows[idx['query']][start:end] block['hit'] = rows[idx['hit']][start:end] block['similarity'] = rows[idx['midline']][start:end] if idx['qannot'] is not None: block['query_annotation'] = rows[idx['qannot']][start:end] if idx['hannot'] is not None: block['hit_annotation'] = rows[idx['hannot']][start:end] blocks.append(block) return blocks def _get_scodon_moves(tmp_seq_blocks): """Returns a dictionary of split codon locations relative to each fragment's end""" scodon_moves = {'query': [], 'hit': []} for seq_type in scodon_moves: scoords = [] for block in tmp_seq_blocks: # check both ends of the sequence for residues in curly braces m_start = re.search(_RE_SCODON_START, block[seq_type]) m_end = re.search(_RE_SCODON_END, block[seq_type]) if m_start: m_start = len(m_start.group(1)) scoords.append((m_start, 0)) else: scoords.append((0, 0)) if m_end: m_end = len(m_end.group(1)) scoords.append((0, m_end)) else: scoords.append((0, 0)) scodon_moves[seq_type] = scoords return scodon_moves def _clean_blocks(tmp_seq_blocks): """Removes curly braces (split codon markers) from the given sequences.""" seq_blocks = [] for seq_block in tmp_seq_blocks: for line_name in seq_block: seq_block[line_name] = \ seq_block[line_name].replace('{', '').replace('}', '') seq_blocks.append(seq_block) return seq_blocks def _comp_intron_lens(seq_type, inter_blocks, raw_inter_lens): """Returns the length of introns between fragments.""" # set opposite type, for setting introns opp_type = 'hit' if seq_type == 'query' else 'query' # list of flags to denote if an intron follows a block # it reads e.g. this line: # "ATGTT{TT} >>>> Target Intron 1 >>>> {G}TGTGTGTACATT" # and sets the opposing sequence type's intron (since this # line is present on the opposite sequence type line) has_intron_after = ['Intron' in x[seq_type] for x in inter_blocks] assert len(has_intron_after) == len(raw_inter_lens) # create list containing coord adjustments incorporating # intron lengths inter_lens = [] for flag, parsed_len in zip(has_intron_after, raw_inter_lens): if flag: # joint introns if all(parsed_len[:2]): # intron len is [0] if opp_type is query, otherwise it's [1] intron_len = int(parsed_len[0]) if opp_type == 'query' \ else int(parsed_len[1]) # single hit/query introns elif parsed_len[2]: intron_len = int(parsed_len[2]) else: raise ValueError("Unexpected intron parsing " "result: %r" % parsed_len) else: intron_len = 0 inter_lens.append(intron_len) return inter_lens def _comp_coords(hsp, seq_type, inter_lens): """Fill the block coordinates of the given hsp dictionary.""" assert seq_type in ('hit', 'query') # manually fill the first coord seq_step = 1 if hsp['%s_strand' % seq_type] >= 0 else -1 fstart = hsp['%s_start' % seq_type] # fend is fstart + number of residues in the sequence, minus gaps fend = fstart + len( hsp[seq_type][0].replace('-', '').replace('>', '').replace('<', '')) * seq_step coords = [(fstart, fend)] # and start from the second block, after the first inter seq for idx, block in enumerate(hsp[seq_type][1:]): bstart = coords[-1][1] + inter_lens[idx] * seq_step bend = bstart + seq_step * \ len(block.replace('-', '')) coords.append((bstart, bend)) # adjust the coords so the smallest is [0], if strand is -1 # couldn't do this in the previous steps since we need the initial # block ordering if seq_step != 1: for idx, coord in enumerate(coords): coords[idx] = coords[idx][1], coords[idx][0] return coords def _comp_split_codons(hsp, seq_type, scodon_moves): """Computes the positions of split codons and puts the values in the given HSP dictionary.""" scodons = [] for idx in range(len(scodon_moves[seq_type])): pair = scodon_moves[seq_type][idx] if not any(pair): continue else: assert not all(pair) a, b = pair anchor_pair = hsp['%s_ranges' % seq_type][idx // 2] strand = 1 if hsp['%s_strand' % seq_type] >= 0 else -1 if a: func = max if strand == 1 else min anchor = func(anchor_pair) start_c, end_c = anchor + a * strand * -1, anchor elif b: func = min if strand == 1 else max anchor = func(anchor_pair) start_c, end_c = anchor + b * strand, anchor scodons.append((min(start_c, end_c), max(start_c, end_c))) return scodons class ExonerateTextParser(_BaseExonerateParser): """Parser for Exonerate plain text output.""" _ALN_MARK = 'C4 Alignment:' def parse_alignment_block(self, header): qresult = header['qresult'] hit = header['hit'] hsp = header['hsp'] # check for values that must have been set by previous methods for val_name in ('query_start', 'query_end', 'hit_start', 'hit_end', 'query_strand', 'hit_strand'): assert val_name in hsp, hsp # get the alignment rows # and stitch them so we have the full sequences in single strings raw_aln_blocks, vulgar_comp = self._read_alignment() # cmbn_rows still has split codon markers (curly braces) cmbn_rows = _stitch_rows(raw_aln_blocks) row_dict = _get_row_dict(len(cmbn_rows), qresult['model']) # get the sequence blocks has_ner = 'NER' in qresult['model'].upper() seq_coords = _get_block_coords(cmbn_rows, row_dict, has_ner) tmp_seq_blocks = _get_blocks(cmbn_rows, seq_coords, row_dict) # get split codon temp coords for later use # this result in pairs of base movement for both ends of each row scodon_moves = _get_scodon_moves(tmp_seq_blocks) # remove the split codon markers seq_blocks = _clean_blocks(tmp_seq_blocks) # adjust strands hsp['query_strand'] = _STRAND_MAP[hsp['query_strand']] hsp['hit_strand'] = _STRAND_MAP[hsp['hit_strand']] # cast coords into ints hsp['query_start'] = int(hsp['query_start']) hsp['query_end'] = int(hsp['query_end']) hsp['hit_start'] = int(hsp['hit_start']) hsp['hit_end'] = int(hsp['hit_end']) # cast score into ints hsp['score'] = int(hsp['score']) # set sequences hsp['query'] = [x['query'] for x in seq_blocks] hsp['hit'] = [x['hit'] for x in seq_blocks] hsp['aln_annotation'] = {} # set the alphabet # currently only limited to models with protein queries if 'protein2' in qresult['model'] or 'coding2' in qresult['model'] \ or '2protein' in qresult['model']: hsp['alphabet'] = generic_protein # get the annotations if they exist for annot_type in ('similarity', 'query_annotation', 'hit_annotation'): try: hsp['aln_annotation'][annot_type] = \ [x[annot_type] for x in seq_blocks] except KeyError: pass # use vulgar coordinates if vulgar line is present and return #if vulgar_comp is not None: # hsp = parse_vulgar_comp(hsp, vulgar_comp) # return {'qresult': qresult, 'hit': hit, 'hsp': hsp} # otherwise we need to get the coordinates from the alignment # get the intervening blocks first, so we can use them # to adjust the coordinates if not has_ner: # get intervening coordinates and blocks, only if model is not ner # ner models have a much more simple coordinate calculation inter_coords = _get_inter_coords(seq_coords) inter_blocks = _get_blocks(cmbn_rows, inter_coords, row_dict) # returns a three-component tuple of intron lengths # first two component filled == intron in hit and query # last component filled == intron in hit or query raw_inter_lens = re.findall(_RE_EXON_LEN, cmbn_rows[row_dict['midline']]) # compute start and end coords for each block for seq_type in ('query', 'hit'): # ner blocks and intron blocks require different adjustments if not has_ner: opp_type = 'hit' if seq_type == 'query' else 'query' inter_lens = _comp_intron_lens(seq_type, inter_blocks, raw_inter_lens) else: # for NER blocks, the length of the inter-fragment gaps is # written on the same strand, so opp_type is seq_type opp_type = seq_type inter_lens = [int(x) for x in re.findall(_RE_NER_LEN, cmbn_rows[row_dict[seq_type]])] # check that inter_lens's length is len opp_type block - 1 assert len(inter_lens) == len(hsp[opp_type])-1, \ "%r vs %r" % (len(inter_lens), len(hsp[opp_type])-1) # fill the hsp query and hit coordinates hsp['%s_ranges' % opp_type] = \ _comp_coords(hsp, opp_type, inter_lens) # and fill the split codon coordinates, if model != ner # can't do this in the if-else clause above since we need to # compute the ranges first if not has_ner: hsp['%s_split_codons' % opp_type] = \ _comp_split_codons(hsp, opp_type, scodon_moves) # now that we've finished parsing coords, we can set the hit and start # coord according to Biopython's convention (start <= end) for seq_type in ('query', 'hit'): if hsp['%s_strand' % seq_type] == -1: n_start = '%s_start' % seq_type n_end = '%s_end' % seq_type hsp[n_start], hsp[n_end] = hsp[n_end], hsp[n_start] return {'qresult': qresult, 'hit': hit, 'hsp': hsp} def _read_alignment(self): """Reads the raw alignment block strings, returns them in a list.""" raw_aln_blocks = [] # flag to check whether we're in an aligment row in_aln_row = False # flag for vulgar line, if present, we can parse coordinates from it vulgar_comp = None while True: match = re.search(_RE_ALN_ROW, self.line.strip()) # if we have a match, set flags and values if match and not in_aln_row: start_idx = self.line.index(match.group(1)) row_len = len(match.group(1)) in_aln_row = True raw_aln_block = [] # if we're in an alignment row, grab the sequence if in_aln_row: raw_aln_block.append(self.line[start_idx:start_idx+row_len]) # reset flags and values if the line matches, we're in an alignment # row, and there are more than 1 line in rows if match and in_aln_row and len(raw_aln_block) > 1: raw_aln_blocks.append(raw_aln_block) start_idx = None row_len = None in_aln_row = False self.line = self.handle.readline() # try to parse vulgar line if present if self.line.startswith('vulgar'): vulgar = re.search(_RE_VULGAR, self.line) vulgar_comp = vulgar.group(10) if not self.line or self.line.startswith(self._ALN_MARK): # HACK: this is so that the parse_qresult method does not # yield the objects before appending the last HSP. We are doing # this to keep the parser compatible with outputs without # human-readable alignment outputs. This also relies on the # fact that repeated readline() always returns '' on EOF. if not self.line: self.line = 'mock' break return raw_aln_blocks, vulgar_comp class ExonerateTextIndexer(_BaseExonerateIndexer): """Indexer class for Exonerate plain text.""" _parser = ExonerateTextParser _query_mark = _as_bytes('C4 Alignment') def get_qresult_id(self, pos): """Returns the query ID from the nearest "Query:" line.""" handle = self._handle handle.seek(pos) sentinel = _as_bytes('Query:') while True: line = handle.readline().strip() if line.startswith(sentinel): break if not line: raise StopIteration qid, desc = _parse_hit_or_query_line(_bytes_to_string(line)) return qid def get_raw(self, offset): """Returns the raw string of a QueryResult object from the given offset.""" handle = self._handle handle.seek(offset) qresult_key = None qresult_raw = _as_bytes('') while True: line = handle.readline() if not line: break elif line.startswith(self._query_mark): cur_pos = handle.tell() if qresult_key is None: qresult_key = self.get_qresult_id(cur_pos) else: curr_key = self.get_qresult_id(cur_pos) if curr_key != qresult_key: break handle.seek(cur_pos) qresult_raw += line return qresult_raw # if not used as a module, run the doctest if __name__ == "__main__": from Bio._utils import run_doctest run_doctest()