# Copyright 2012 by Wibowo Arindrarto. All rights reserved. # This file is part of the Biopython distribution and governed by your # choice of the "Biopython License Agreement" or the "BSD 3-Clause License". # Please see the LICENSE file that should have been included as part of this # package. """Bio.SearchIO abstract base parser for Exonerate standard output format.""" import re from functools import reduce from Bio.SearchIO._index import SearchIndexer from Bio.SearchIO._model import QueryResult, Hit, HSP, HSPFragment from Bio.SeqUtils import seq1 # strand char-value mapping _STRAND_MAP = {'+': 1, '-': -1, '.': 0} _RE_SHIFTS = re.compile(r'(#+)') # regex for checking whether a vulgar line has protein/translated components _RE_TRANS = re.compile(r'[53ISCF]') def _set_frame(frag): """Set the HSPFragment frames (PRIVATE).""" frag.hit_frame = (frag.hit_start % 3 + 1) * frag.hit_strand frag.query_frame = (frag.query_start % 3 + 1) * frag.query_strand def _make_triplets(seq, phase=0): """Select a valid amino acid sequence given a 3-letter code input (PRIVATE). This function takes a single three-letter amino acid sequence and the phase of the sequence to return the longest intact amino acid sequence possible. Parts of the input sequence before and after the selected sequence are also returned. This is an internal private function and is meant for parsing Exonerate's three-letter amino acid output. >>> from Bio.SearchIO.ExonerateIO._base import _make_triplets >>> _make_triplets('GlyThrSerAlaPro') ('', ['Gly', 'Thr', 'Ser', 'Ala', 'Pro'], '') >>> _make_triplets('yThSerAla', phase=1) ('y', ['Thr', 'Ser', 'Ala'], '') >>> _make_triplets('yThSerAlaPr', phase=1) ('y', ['Thr', 'Ser', 'Ala'], 'Pr') """ pre = seq[:phase] np_seq = seq[phase:] non_triplets = len(np_seq) % 3 post = "" if not non_triplets else np_seq[-1 * non_triplets:] intacts = [np_seq[3 * i:3 * (i + 1)] for i in range(len(np_seq) // 3)] return pre, intacts, post def _get_fragments_coord(frags): """Return the letter coordinate of the given list of fragments (PRIVATE). This function takes a list of three-letter amino acid sequences and returns a list of coordinates for each fragment had all the input sequences been flattened. This is an internal private function and is meant for parsing Exonerate's three-letter amino acid output. >>> from Bio.SearchIO.ExonerateIO._base import _get_fragments_coord >>> _get_fragments_coord(['Thr', 'Ser', 'Ala']) [0, 3, 6] >>> _get_fragments_coord(['Thr', 'SerAlaPro', 'GlyLeu']) [0, 3, 12, ] >>> _get_fragments_coord(['Thr', 'SerAlaPro', 'GlyLeu', 'Cys']) [0, 3, 12, 18] """ if not frags: return [] # first fragment always starts from position 0 init = [0] return reduce(lambda acc, frag: acc + [acc[-1] + len(frag)], frags[:-1], init) def _get_fragments_phase(frags): """Return the phases of the given list of 3-letter amino acid fragments (PRIVATE). This is an internal private function and is meant for parsing Exonerate's three-letter amino acid output. >>> from Bio.SearchIO.ExonerateIO._base import _get_fragments_phase >>> _get_fragments_phase(['Thr', 'Ser', 'Ala']) [0, 0, 0] >>> _get_fragments_phase(['ThrSe', 'rAla']) [0, 1] >>> _get_fragments_phase(['ThrSe', 'rAlaLeu', 'ProCys']) [0, 1, 0] >>> _get_fragments_phase(['ThrSe', 'rAlaLeuP', 'roCys']) [0, 1, 2] >>> _get_fragments_phase(['ThrSe', 'rAlaLeuPr', 'oCys']) [0, 1, 1] """ return [(3 - (x % 3)) % 3 for x in _get_fragments_coord(frags)] def _adjust_aa_seq(fraglist): """Transform 3-letter AA codes of input fragments to one-letter codes (PRIVATE). Argument fraglist should be a list of HSPFragments objects. """ custom_map = {'***': '*', '<->': '-'} hsp_hstart = fraglist[0].hit_start hsp_qstart = fraglist[0].query_start frag_phases = _get_fragments_phase(fraglist) for frag, phase in zip(fraglist, frag_phases): assert frag.query_strand == 0 or frag.hit_strand == 0 # hit step may be -1 as we're aligning to DNA hstep = 1 if frag.hit_strand >= 0 else -1 # set fragment phase frag.phase = phase # fragment should have a length that is a multiple of 3 # assert len(frag) % 3 == 0 qseq = str(frag.query.seq) q_triplets_pre, q_triplets, q_triplets_post = \ _make_triplets(qseq, phase) hseq = str(frag.hit.seq) h_triplets_pre, h_triplets, h_triplets_post = \ _make_triplets(hseq, phase) # get one letter codes # and replace gap codon markers and termination characters hseq1_pre = "X" if h_triplets_pre else "" hseq1_post = "X" if h_triplets_post else "" hseq1 = seq1("".join(h_triplets), custom_map=custom_map) hstart = hsp_hstart + (len(hseq1_pre) * hstep) hend = hstart + len(hseq1.replace('-', '')) * hstep qseq1_pre = "X" if q_triplets_pre else "" qseq1_post = "X" if q_triplets_post else "" qseq1 = seq1("".join(q_triplets), custom_map=custom_map) qstart = hsp_qstart + len(qseq1_pre) qend = qstart + len(qseq1.replace('-', '')) # replace the old frag sequences with the new ones frag.hit = None frag.query = None frag.hit = hseq1_pre + hseq1 + hseq1_post frag.query = qseq1_pre + qseq1 + qseq1_post # set coordinates for the protein sequence if frag.query_strand == 0: frag.query_start, frag.query_end = qstart, qend elif frag.hit_strand == 0: frag.hit_start, frag.hit_end = hstart, hend # update alignment annotation # by turning them into list of triplets for annot, annotseq in frag.aln_annotation.items(): pre, intact, post = _make_triplets(annotseq, phase) frag.aln_annotation[annot] = \ list(filter(None, [pre])) + intact + list(filter(None, [post])) # update values for next iteration hsp_hstart, hsp_qstart = hend, qend return fraglist def _split_fragment(frag): """Split one HSPFragment containing frame-shifted alignment into two (PRIVATE).""" # given an HSPFragment object with frameshift(s), this method splits it # into fragments without frameshifts by sequentially chopping it off # starting from the beginning simil = frag.aln_annotation['similarity'] # we should have at least 1 frame shift for splitting assert simil.count('#') > 0 split_frags = [] qstep = 1 if frag.query_strand >= 0 else -1 hstep = 1 if frag.hit_strand >= 0 else -1 qpos = min(frag.query_range) if qstep >= 0 else max(frag.query_range) hpos = min(frag.hit_range) if qstep >= 0 else max(frag.hit_range) abs_pos = 0 # split according to hit, then query while simil: try: shifts = re.search(_RE_SHIFTS, simil).group(1) s_start = simil.find(shifts) s_stop = s_start + len(shifts) split = frag[abs_pos:abs_pos + s_start] except AttributeError: # no '#' in simil, i.e. last frag shifts = '' s_start = 0 s_stop = len(simil) split = frag[abs_pos:] # coordinates for the split strand qstart, hstart = qpos, hpos qpos += (len(split) - sum(str(split.query.seq).count(x) for x in ('-', '<', '>'))) * qstep hpos += (len(split) - sum(str(split.hit.seq).count(x) for x in ('-', '<', '>'))) * hstep split.hit_start = min(hstart, hpos) split.query_start = min(qstart, qpos) split.hit_end = max(hstart, hpos) split.query_end = max(qstart, qpos) # account for frameshift length abs_slice = slice(abs_pos + s_start, abs_pos + s_stop) if len(frag.aln_annotation) == 2: seqs = (str(frag[abs_slice].query.seq), str(frag[abs_slice].hit.seq)) elif len(frag.aln_annotation) == 3: seqs = (frag[abs_slice].aln_annotation['query_annotation'], frag[abs_slice].aln_annotation['hit_annotation'],) if '#' in seqs[0]: qpos += len(shifts) * qstep elif '#' in seqs[1]: hpos += len(shifts) * hstep # set frame _set_frame(split) split_frags.append(split) # set similarity string and absolute position for the next loop simil = simil[s_stop:] abs_pos += s_stop return split_frags def _create_hsp(hid, qid, hspd): """Return a list of HSP objects from the given parsed HSP values (PRIVATE).""" frags = [] # we are iterating over query_ranges, but hit_ranges works just as well for idx, qcoords in enumerate(hspd['query_ranges']): # get sequences, create object hseqlist = hspd.get('hit') hseq = '' if hseqlist is None else hseqlist[idx] qseqlist = hspd.get('query') qseq = '' if qseqlist is None else qseqlist[idx] frag = HSPFragment(hid, qid, hit=hseq, query=qseq) # coordinates frag.query_start = qcoords[0] frag.query_end = qcoords[1] frag.hit_start = hspd['hit_ranges'][idx][0] frag.hit_end = hspd['hit_ranges'][idx][1] # alignment annotation try: aln_annot = hspd.get('aln_annotation', {}) for key, value in aln_annot.items(): frag.aln_annotation[key] = value[idx] except IndexError: pass # strands frag.query_strand = hspd['query_strand'] frag.hit_strand = hspd['hit_strand'] # and append the hsp object to the list if frag.aln_annotation.get('similarity') is not None: if '#' in frag.aln_annotation['similarity']: frags.extend(_split_fragment(frag)) continue # try to set frame if there are translation in the alignment if len(frag.aln_annotation) > 1 or \ frag.query_strand == 0 or \ ('vulgar_comp' in hspd and re.search(_RE_TRANS, hspd['vulgar_comp'])): _set_frame(frag) frags.append(frag) # if the query is protein, we need to change the hit and query sequences # from three-letter amino acid codes to one letter, and adjust their # coordinates accordingly if len(frags[0].aln_annotation) == 2: # 2 annotations == protein query frags = _adjust_aa_seq(frags) hsp = HSP(frags) # set hsp-specific attributes for attr in ('score', 'hit_split_codons', 'query_split_codons', 'model', 'vulgar_comp', 'cigar_comp', 'alphabet'): if attr in hspd: setattr(hsp, attr, hspd[attr]) return hsp def _parse_hit_or_query_line(line): """Parse the 'Query:' line of exonerate alignment outputs (PRIVATE).""" try: mark, id, desc = line.split(' ', 2) except ValueError: # no desc mark, id = line.split(' ', 1) desc = '' return id, desc class _BaseExonerateParser(object): """Abstract iterator for exonerate format.""" _ALN_MARK = None def __init__(self, handle): self.handle = handle self.has_c4_alignment = False def __iter__(self): # read line until the first alignment block or cigar/vulgar lines while True: self.line = self.handle.readline() # flag for human-readable alignment block if self.line.startswith('C4 Alignment:') and not \ self.has_c4_alignment: self.has_c4_alignment = True if self.line.startswith('C4 Alignment:') or \ self.line.startswith('vulgar:') or \ self.line.startswith('cigar:'): break elif not self.line or self.line.startswith('-- completed '): return for qresult in self._parse_qresult(): qresult.program = 'exonerate' # HACK: so that all descriptions are set qresult.description = qresult.description for hit in qresult: hit.description = hit.description yield qresult def read_until(self, bool_func): """Read the file handle until the given bool function returns True.""" while True: if not self.line or bool_func(self.line): return else: self.line = self.handle.readline() def parse_alignment_block(self, header): raise NotImplementedError("Subclass must implement this") def _parse_alignment_header(self): # read all header lines and store them aln_header = [] # header is everything before the first empty line while self.line.strip(): aln_header.append(self.line.strip()) self.line = self.handle.readline() # then parse them qresult, hit, hsp = {}, {}, {} for line in aln_header: # query line if line.startswith('Query:'): qresult['id'], qresult['description'] = \ _parse_hit_or_query_line(line) # target line elif line.startswith('Target:'): hit['id'], hit['description'] = \ _parse_hit_or_query_line(line) # model line elif line.startswith('Model:'): qresult['model'] = line.split(' ', 1)[1] # score line elif line.startswith('Raw score:'): hsp['score'] = line.split(' ', 2)[2] # query range line elif line.startswith('Query range:'): # line is always 'Query range: \d+ -> \d+', so we can pluck # the numbers directly hsp['query_start'], hsp['query_end'] = line.split(' ', 4)[2:5:2] # hit range line elif line.startswith('Target range:'): # same logic with query range hsp['hit_start'], hsp['hit_end'] = line.split(' ', 4)[2:5:2] # determine strand if qresult['description'].endswith(':[revcomp]'): hsp['query_strand'] = '-' qresult['description'] = qresult['description'].replace(':[revcomp]', '') elif 'protein2' in qresult['model']: hsp['query_strand'] = '.' else: hsp['query_strand'] = '+' if hit['description'].endswith(':[revcomp]'): hsp['hit_strand'] = '-' hit['description'] = hit['description'].replace(':[revcomp]', '') elif '2protein' in qresult['model']: hsp['hit_strand'] = '.' else: hsp['hit_strand'] = '+' # NOTE: we haven't processed the coordinates types # and the strands are not yet Biopython's standard (1 / -1 / 0) # since it's easier if we do the conversion later return {'qresult': qresult, 'hit': hit, 'hsp': hsp} def _parse_qresult(self): # state values state_EOF = 0 state_QRES_NEW = 1 state_QRES_SAME = 3 state_HIT_NEW = 2 state_HIT_SAME = 4 # initial dummies qres_state, hit_state = None, None file_state = None cur_qid, cur_hid = None, None prev_qid, prev_hid = None, None cur, prev = None, None hit_list, hsp_list = [], [] # if the file has c4 alignments, use that as the alignment mark if self.has_c4_alignment: self._ALN_MARK = 'C4 Alignment:' while True: self.read_until(lambda line: line.startswith(self._ALN_MARK)) if cur is not None: prev = cur prev_qid = cur_qid prev_hid = cur_hid # only parse the result row if it's not EOF if self.line: assert self.line.startswith(self._ALN_MARK), self.line # create temp dicts for storing parsed values header = {'qresult': {}, 'hit': {}, 'hsp': {}} # if the file has c4 alignments, try to parse the header if self.has_c4_alignment: self.read_until(lambda line: line.strip().startswith('Query:')) header = self._parse_alignment_header() # parse the block contents cur = self.parse_alignment_block(header) cur_qid = cur['qresult']['id'] cur_hid = cur['hit']['id'] elif not self.line or self.line.startswith('-- completed '): file_state = state_EOF cur_qid, cur_hid = None, None # get the state of hit and qresult if prev_qid != cur_qid: qres_state = state_QRES_NEW else: qres_state = state_QRES_SAME # new hits are hits with different ids or hits in a new query if prev_hid != cur_hid or qres_state == state_QRES_NEW: hit_state = state_HIT_NEW else: hit_state = state_HIT_SAME if prev is not None: hsp = _create_hsp(prev_hid, prev_qid, prev['hsp']) hsp_list.append(hsp) if hit_state == state_HIT_NEW: hit = Hit(hsp_list) for attr, value in prev['hit'].items(): setattr(hit, attr, value) hit_list.append(hit) hsp_list = [] if qres_state == state_QRES_NEW or file_state == state_EOF: qresult = QueryResult(id=prev_qid) for hit in hit_list: # not using append since Exonerate may separate the # same hit if it has different strands qresult.absorb(hit) for attr, value in prev['qresult'].items(): setattr(qresult, attr, value) yield qresult if file_state == state_EOF: break hit_list = [] # only readline() here if we're not parsing C4 alignments # C4 alignments readline() is handled by its parse_alignment_block # function if not self.has_c4_alignment: self.line = self.handle.readline() class _BaseExonerateIndexer(SearchIndexer): """Indexer class for Exonerate plain text.""" _parser = None # should be defined by subclass _query_mark = None # this one too def get_qresult_id(self, pos): raise NotImplementedError("Should be defined by subclass") def __iter__(self): """Iterate over the file handle; yields key, start offset, and length.""" handle = self._handle handle.seek(0) qresult_key = None while True: start_offset = handle.tell() line = handle.readline() if line.startswith(self._query_mark): if qresult_key is None: qresult_key = self.get_qresult_id(start_offset) qresult_offset = start_offset else: curr_key = self.get_qresult_id(start_offset) if curr_key != qresult_key: yield qresult_key, qresult_offset, \ start_offset - qresult_offset qresult_key = curr_key qresult_offset = start_offset handle.seek(qresult_offset) elif not line: yield qresult_key, qresult_offset, \ start_offset - qresult_offset break # if not used as a module, run the doctest if __name__ == "__main__": from Bio._utils import run_doctest run_doctest()