# Copyright 2015-2015 by Eric Rasche. All rights reserved. # # This file is part of the Biopython distribution and governed by your # choice of the "Biopython License Agreement" or the "BSD 3-Clause License". # Please see the LICENSE file that should have been included as part of this # package. """Bio.AlignIO support for "xmfa" output from Mauve/ProgressiveMauve. You are expected to use this module via the Bio.AlignIO functions (or the Bio.SeqIO functions if you want to work directly with the gapped sequences). For example, consider a progressiveMauve alignment file containing the following:: #FormatVersion Mauve1 #Sequence1File a.fa #Sequence1Entry 1 #Sequence1Format FastA #Sequence2File b.fa #Sequence2Entry 2 #Sequence2Format FastA #Sequence3File c.fa #Sequence3Entry 3 #Sequence3Format FastA #BackboneFile three.xmfa.bbcols > 1:0-0 + a.fa -------------------------------------------------------------------------------- -------------------------------------------------------------------------------- -------------------------------------------------------------------------------- > 2:5417-5968 + b.fa TTTAAACATCCCTCGGCCCGTCGCCCTTTTATAATAGCAGTACGTGAGAGGAGCGCCCTAAGCTTTGGGAAATTCAAGC- -------------------------------------------------------------------------------- CTGGAACGTACTTGCTGGTTTCGCTACTATTTCAAACAAGTTAGAGGCCGTTACCTCGGGCGAACGTATAAACCATTCTG > 3:9476-10076 - c.fa TTTAAACACCTTTTTGGATG--GCCCAGTTCGTTCAGTTGTG-GGGAGGAGATCGCCCCAAACGTATGGTGAGTCGGGCG TTTCCTATAGCTATAGGACCAATCCACTTACCATACGCCCGGCGTCGCCCAGTCCGGTTCGGTACCCTCCATGACCCACG ---------------------------------------------------------AAATGAGGGCCCAGGGTATGCTT = > 2:5969-6015 + b.fa ----------------------- GGGCGAACGTATAAACCATTCTG > 3:9429-9476 - c.fa TTCGGTACCCTCCATGACCCACG AAATGAGGGCCCAGGGTATGCTT This is a multiple sequence alignment with multiple aligned sections, so you would probably load this using the Bio.AlignIO.parse() function: >>> from Bio import AlignIO >>> align = AlignIO.parse("Mauve/simple.xmfa", "mauve") >>> alignments = list(align) >>> for aln in alignments: ... print(align) SingleLetterAlphabet() alignment with 3 rows and 240 columns --------------------------------------------...--- 1 TTTAAACATCCCTCGGCCCGTCGCCCTTTTATAATAGCAGTACG...CTG 2 TTTAAACACCTTTTTGGATG--GCCCAGTTCGTTCAGTTGTG-G...CTT 3 SingleLetterAlphabet() alignment with 3 rows and 46 columns ---------------------------------------------- 1 -----------------------GGGCGAACGTATAAACCATTCTG 2 TTCGGTACCCTCCATGACCCACGAAATGAGGGCCCAGGGTATGCTT 3 Additional information is extracted from the XMFA file and available through the annotation attribute of each record:: >>> for record in alignments[0]: ... print record.id, len(record), record.annotations 1 240 {'start': 0, 'end': 0, 'strand': 1} 2 240 {'start': 5417, 'end': 5968, 'strand': 1} 3 240 {'start': 9476, 'end': 10076, 'strand': -1} """ from __future__ import print_function import re from Bio.Seq import Seq from Bio.SeqRecord import SeqRecord from Bio.Align import MultipleSeqAlignment from .Interfaces import AlignmentIterator from .Interfaces import SequentialAlignmentWriter XMFA_HEADER_REGEX = re.compile(r"> (?P\d+):(?P\d+)-(?P\d+) (?P[+-]) (?P.*)") XMFA_HEADER_REGEX_BIOPYTHON = re.compile(r"> (?P\d+):(?P\d+)-(?P\d+) (?P[+-]) (?P[^#]*) # (?P.*)") ID_LINE_FMT = "> {seq_name}:{start}-{end} {strand} {file} # {ugly_hack}\n" def _identifier_split(identifier): """Return (name, start, end) string tuple from an identifier (PRIVATE).""" id, loc, strand = identifier.split(':') start, end = map(int, loc.split('-')) start -= 1 return id, start, end, strand class MauveWriter(SequentialAlignmentWriter): """Mauve/XMFA alignment writer.""" def __init__(self, *args, **kwargs): """Initialize.""" super(MauveWriter, self).__init__(*args, **kwargs) self._wrote_header = False self._wrote_first = False def write_alignment(self, alignment): """Use this to write (another) single alignment to an open file. Note that sequences and their annotation are recorded together (rather than having a block of annotation followed by a block of aligned sequences). """ count = len(alignment) self._length_of_sequences = alignment.get_alignment_length() # NOTE - For now, the alignment object does not hold any per column # or per alignment annotation - only per sequence. if count == 0: raise ValueError("Must have at least one sequence") if self._length_of_sequences == 0: raise ValueError("Non-empty sequences are required") if not self._wrote_header: self._wrote_header = True self.handle.write("#FormatVersion Mauve1\n") # There are some more headers, but we ignore those for now. # Sequence1File unknown.fa # Sequence1Entry 1 # Sequence1Format FastA for i in range(1, count + 1): self.handle.write('#Sequence%sEntry\t%s\n' % (i, i)) for idx, record in enumerate(alignment): self._write_record(record, record_idx=idx) self.handle.write('=\n') def _write_record(self, record, record_idx=0): """Write a single SeqRecord to the file (PRIVATE).""" if self._length_of_sequences != len(record.seq): raise ValueError("Sequences must all be the same length") seq_name = record.name try: seq_name = str(int(record.name)) except ValueError: seq_name = str(record_idx + 1) # We remove the "/{start}-{end}" before writing, as it cannot be part # of the produced XMFA file. if "start" in record.annotations and "end" in record.annotations: suffix0 = "/%s-%s" % (str(record.annotations["start"]), str(record.annotations["end"])) suffix1 = "/%s-%s" % (str(record.annotations["start"] + 1), str(record.annotations["end"])) if seq_name[-len(suffix0):] == suffix0: seq_name = seq_name[:-len(suffix0)] if seq_name[-len(suffix1):] == suffix1: seq_name = seq_name[:-len(suffix1)] if "start" in record.annotations \ and "end" in record.annotations \ and "strand" in record.annotations: id_line = ID_LINE_FMT.format( seq_name=seq_name, start=record.annotations["start"] + 1, end=record.annotations["end"], strand=("+" if record.annotations["strand"] == 1 else "-"), file=record.name + '.fa', ugly_hack=record.id ) lacking_annotations = False else: id_line = ID_LINE_FMT.format( seq_name=seq_name, start=0, end=0, strand='+', file=record.name + '.fa', ugly_hack=record.id ) lacking_annotations = True # If the sequence is an empty one, skip writing it out if (':0-0 ' in id_line or ':1-0 ' in id_line) and not lacking_annotations: # Except in the first LCB if not self._wrote_first: self._wrote_first = True # The first LCB we write out is special, and must list ALL # sequences, for the Mauve GUI # http://darlinglab.org/mauve/user-guide/files.html#non-standard-xmfa-formatting-used-by-the-mauve-gui id_line = ID_LINE_FMT.format( seq_name=seq_name, start=0, end=0, strand='+', file=record.name + '.fa', ugly_hack=record.id ) self.handle.write(id_line + '\n') # Alignments lacking a start/stop/strand were generated by # BioPython on load, and shouldn't exist according to XMFA else: # In other blocks, we only write sequences if they exist in a given # alignment. self.handle.write(id_line) for i in range(0, len(record.seq), 80): self.handle.write("%s\n" % str(record.seq[i:i + 80])) class MauveIterator(AlignmentIterator): """Mauve xmfa alignment iterator.""" _ids = [] # for caching IDs between __next__ calls def __next__(self): """Parse the next alignment from the handle.""" handle = self.handle line = handle.readline() if not line: raise StopIteration # Strip out header comments while line and line.strip().startswith('#'): line = handle.readline() seqs = {} seq_regions = {} passed_end_alignment = False latest_id = None while True: if not line: break # end of file line = line.strip() if line.startswith('='): # There may be more data, but we've reached the end of this # alignment break elif line.startswith('>'): m = XMFA_HEADER_REGEX_BIOPYTHON.match(line) if not m: m = XMFA_HEADER_REGEX.match(line) if not m: raise ValueError("Malformed header line: %s", line) parsed_id = m.group('id') parsed_data = {} for key in ('start', 'end', 'id', 'strand', 'name', 'realname'): try: value = m.group(key) if key == 'start': value = int(value) # Convert to zero based counting if value > 0: value -= 1 if key == 'end': value = int(value) parsed_data[key] = value except IndexError: # This will occur if we're asking for a group that # doesn't exist. It's fine. pass seq_regions[parsed_id] = parsed_data if parsed_id not in self._ids: self._ids.append(parsed_id) seqs.setdefault(parsed_id, '') latest_id = parsed_id else: assert not passed_end_alignment if latest_id is None: raise ValueError("Saw sequence before definition line") seqs[latest_id] += line line = handle.readline() assert len(seqs) <= len(self._ids) self.ids = self._ids self.sequences = seqs if self._ids and seqs: alignment_length = max(map(len, list(seqs.values()))) records = [] for id in self._ids: if id not in seqs or len(seqs[id]) == 0 \ or len(seqs[id]) == 0: seq = '-' * alignment_length else: seq = seqs[id] if alignment_length != len(seq): raise ValueError("Sequences have different lengths, or repeated identifier") # Sometimes we don't see a particular sequence in the # alignment, so we skip that record since it isn't present in # that LCB/alignment if id not in seq_regions: continue if (seq_regions[id]['start'] != 0 or seq_regions[id]['end'] != 0): suffix = '/{start}-{end}'.format(**seq_regions[id]) if 'realname' in seq_regions[id]: corrected_id = seq_regions[id]['realname'] else: corrected_id = seq_regions[id]['name'] if corrected_id.count(suffix) == 0: corrected_id += suffix else: if 'realname' in seq_regions[id]: corrected_id = seq_regions[id]['realname'] else: corrected_id = seq_regions[id]['name'] record = SeqRecord( Seq(seq, self.alphabet), id=corrected_id, name=id ) record.annotations["start"] = seq_regions[id]['start'] record.annotations["end"] = seq_regions[id]['end'] record.annotations["strand"] = 1 if seq_regions[id]['strand'] == '+' else -1 records.append(record) return MultipleSeqAlignment(records, self.alphabet) else: raise StopIteration