#! /usr/bin/env python3 # Copyright 2010, 2011, 2013, 2014 Martin C. Frith # SPDX-License-Identifier: GPL-3.0-or-later # Seems to work with Python 2.x, x>=6 # By "MAF" we mean "multiple alignment format" described in the UCSC # Genome FAQ, not e.g. "MIRA assembly format". from __future__ import print_function from itertools import * import collections import functools import gzip import logging import math import operator import optparse import os import signal import sys try: from future_builtins import map, zip except ImportError: pass aminoAcidCodes = {"ALA": "A", "CYS": "C", "ASP": "D", "GLU": "E", "PHE": "F", "GLY": "G", "HIS": "H", "ILE": "I", "LYS": "K", "LEU": "L", "MET": "M", "ASN": "N", "PRO": "P", "GLN": "Q", "ARG": "R", "SER": "S", "THR": "T", "VAL": "V", "TRP": "W", "TYR": "Y", "***": "*", "XAA": "X"} def standardGeneticCode(): aa = "FFLLSSSSYY**CC*WLLLLPPPPHHQQRRRRIIIMTTTTNNKKSSRRVVVVAAAADDEEGGGG" b1 = "TTTTTTTTTTTTTTTTCCCCCCCCCCCCCCCCAAAAAAAAAAAAAAAAGGGGGGGGGGGGGGGG" b2 = "TTTTCCCCAAAAGGGGTTTTCCCCAAAAGGGGTTTTCCCCAAAAGGGGTTTTCCCCAAAAGGGG" b3 = "TCAGTCAGTCAGTCAGTCAGTCAGTCAGTCAGTCAGTCAGTCAGTCAGTCAGTCAGTCAGTCAG" return {x + y + z: a for a, x, y, z in zip(aa, b1, b2, b3)} def myOpen(fileName): if fileName == "-": return sys.stdin if fileName.endswith(".gz"): return gzip.open(fileName, "rt") # xxx dubious for Python2 return open(fileName) def maxlen(s): return max(map(len, s)) def pairOrDie(sLines, formatName): if len(sLines) != 2: e = "for %s, each alignment must have 2 sequences" % formatName raise Exception(e) return sLines def isMatch(alignmentColumn): # No special treatment of ambiguous bases/residues: same as NCBI BLAST. first = alignmentColumn[0].upper() for i in alignmentColumn[1:]: if i.upper() != first: return False return True def gapRunCount(row): """Get the number of runs of gap characters.""" return sum(k == "-" for k, v in groupby(row)) def alignmentRowsFromColumns(columns): return map(''.join, zip(*columns)) def symbolSize(symbol, letterSize): if symbol == "\\": return 1 if symbol == "/": return -1 return letterSize def insertSize(row, letterSize): """Get the length of sequence included in the row.""" x = len(row) - row.count("-") if letterSize: return x * letterSize - 4 * row.count("/") - 2 * row.count("\\") return x // 3 # assume it's divisible by 3 def matchAndInsertSizes(alignmentColumns, letterSizes): """Get sizes of gapless blocks, and of the inserts between them.""" letterSizeA, letterSizeB = letterSizes delInc = max(letterSizeA, 1) delDiv = 1 if letterSizeA else 3 insInc = max(letterSizeB, 1) insDiv = 1 if letterSizeB else 3 sizeDiv = max(delDiv, insDiv) delSize = insSize = subSize = 0 for x, y in alignmentColumns: if x == "-": if subSize: if delSize or insSize: yield str(delSize // delDiv) + ":" + str(insSize // insDiv) yield str(subSize // sizeDiv) delSize = insSize = subSize = 0 insSize += symbolSize(y, insInc) elif y == "-": if subSize: if delSize or insSize: yield str(delSize // delDiv) + ":" + str(insSize // insDiv) yield str(subSize // sizeDiv) delSize = insSize = subSize = 0 delSize += symbolSize(x, delInc) else: subSize += 1 if delSize or insSize: yield str(delSize // delDiv) + ":" + str(insSize // insDiv) if subSize: yield str(subSize // sizeDiv) ##### Routines for reading MAF format: ##### def updateEvalueParameters(opts, fields): for field in fields: try: k, v = field.split("=") x = float(v) if k == "lambda": opts.bitScoreA = x / math.log(2) if k == "K": opts.bitScoreB = math.log(x, 2) except ValueError: pass def scoreAndEvalue(aLine): score = evalue = None for i in aLine.split(): if i.startswith("score="): score = i[6:] elif i.startswith("E="): evalue = i[2:] return score, evalue def mafInput(opts, lines): opts.scoreMatrix = {} matrixColumnNames = [] aLine = "" sLines = [] qLines = [] pLines = [] for line in lines: if line[0] == "s": junk, seqName, beg, span, strand, seqLen, row = line.split() beg = int(beg) span = int(span) seqLen = int(seqLen) rowLetters = len(row) - row.count("-") if "\\" in row or "/" in row or rowLetters < span: letterSize = 3 elif rowLetters > span: letterSize = 0 # xxx means 3-letter codes like AlaCysAsp else: letterSize = 1 fields = seqName, seqLen, strand, letterSize, beg, beg + span, row sLines.append(fields) elif line[0] == "a": aLine = line opts.headerMode &= 1 elif line[0] == "q": qLines.append(line) elif line[0] == "p": pLines.append(line) elif line.isspace(): if sLines: yield aLine, sLines, qLines, pLines aLine = "" sLines = [] qLines = [] pLines = [] elif line[0] == "#": fields = line.split() updateEvalueParameters(opts, fields) if len(fields) == 65 and all(len(i) == 3 for i in fields[1:]): matrixColumnNames = [i.upper() for i in fields[1:]] if len(fields) == 66 and len(fields[1]) == 1: r = fields[1].upper() for c, x in zip(matrixColumnNames, fields[2:]): opts.scoreMatrix[r, c] = int(x) if opts.headerMode == 1: print(line, end="") if opts.headerMode == 2 and line.startswith("# LAST version "): print("@PG\tID:lastal\tPN:lastal\tVN:" + fields[3]) if sLines: yield aLine, sLines, qLines, pLines def isJoinable(opts, isTouchingInQuery, oldMaf, newMaf): x = oldMaf[1] y = newMaf[1] if x[-1][2] == "-": x, y = y, x if isTouchingInQuery and x[1][5] < y[1][4]: return False return all(i[:4] == j[:4] and i[5] <= j[4] and j[4] - i[5] <= opts.join for i, j in zip(x, y)) def fixOrder(mafs): sLines = mafs[0][1] if sLines[-1][2] == "-": mafs.reverse() def mafGroupInput(opts, isTouchingInQuery, lines): x = [] for i in mafInput(opts, lines): if x and not isJoinable(opts, isTouchingInQuery, x[-1], i): fixOrder(x) yield x x = [] x.append(i) if x: fixOrder(x) yield x def linkedMafBegSortKey(sequenceNumber, mafAndLinks): return operator.itemgetter(0, 2, 4)(mafAndLinks[0][1][sequenceNumber]) def colinearMafInput(opts, lines): mafs = list(mafInput(opts, lines)) if not mafs: return numOfSeqs = max(len(maf[1]) for maf in mafs) linkedMafs = [(maf, [None] * numOfSeqs) for maf in mafs] for s in range(numOfSeqs): begFunc = functools.partial(linkedMafBegSortKey, s) linkedMafs.sort(key=begFunc) maxEnd = "", "+", 0 for j in range(1, len(linkedMafs)): xMaf, xLinks = linkedMafs[j - 1] yMaf, yLinks = linkedMafs[j] x = xMaf[1][s] y = yMaf[1][s] newEnd = x[0], x[2], x[5] if newEnd > maxEnd: maxEnd = newEnd if (x[:4] == y[:4] and 0 <= y[4] - x[5] <= opts.Join): k = j + 1 yBeg = y[0], y[2], y[4] if k == len(linkedMafs) or yBeg < begFunc(linkedMafs[k]): yLinks[s] = xMaf colinearMafs = [] for maf, links in linkedMafs: if colinearMafs: if any(i is not colinearMafs[-1] for i in links): yield colinearMafs colinearMafs = [] colinearMafs.append(maf) yield colinearMafs ##### Routines for converting to AXT format: ##### axtCounter = count() def writeAxt(maf): aLine, sLines, qLines, pLines = maf if sLines[0][2] != "+": raise Exception("for AXT, the 1st strand in each alignment must be +") # Convert to AXT's 1-based coordinates: ranges = [(i[0], str(i[4] + 1), str(i[5]), i[2]) for i in sLines] head, body = ranges[0], ranges[1:] outWords = [str(next(axtCounter))] outWords.extend(head[:3]) for i in body: outWords.extend(i) score, evalue = scoreAndEvalue(aLine) if score: outWords.append(score) print(*outWords) for i in sLines: print(i[6]) print() # print a blank line at the end def mafConvertToAxt(opts, lines): for maf in mafInput(opts, lines): writeAxt(maf) ##### Routines for converting to tabular format: ##### def writeTab(maf): aLine, sLines, qLines, pLines = maf score = "0" endWords = [] for i in aLine.split(): if i.startswith("score="): score = i[6:] elif len(i) > 1: endWords.append(i) outWords = [score] for seqName, seqLen, strand, letterSize, beg, end, row in sLines: x = seqName, str(beg), str(end - beg), strand, str(seqLen) outWords.extend(x) letterSizes = [i[3] for i in sLines] rows = [i[6] for i in sLines] alignmentColumns = zip(*rows) gapWord = ",".join(matchAndInsertSizes(alignmentColumns, letterSizes)) outWords.append(gapWord) print("\t".join(outWords + endWords)) def mafConvertToTab(opts, lines): for maf in mafInput(opts, lines): writeTab(maf) ##### Routines for converting to chain format: ##### def writeChain(maf): aLine, sLines, qLines, pLines = maf score = "0" for i in aLine.split(): if i.startswith("score="): score = i[6:] outWords = ["chain", score] for seqName, seqLen, strand, letterSize, beg, end, row in sLines: x = seqName, str(seqLen), strand, str(beg), str(end) outWords.extend(x) outWords.append(str(next(axtCounter) + 1)) print(*outWords) letterSizes = [i[3] for i in sLines] rows = [i[6] for i in sLines] alignmentColumns = zip(*rows) size = "0" for i in matchAndInsertSizes(alignmentColumns, letterSizes): if ":" in i: print(size + "\t" + i.replace(":", "\t")) size = "0" else: size = i print(size) def mafConvertToChain(opts, lines): for maf in mafInput(opts, lines): writeChain(maf) ##### Routines for converting to BED format: ##### def bedBlocksFromMafs(mafs, ref): for i, x in enumerate(mafs): beg, end = x[1][ref][4:6] if not i: origin = beg yield end - beg, beg - origin def writeBed(opts, mafs): headLines = mafs[0][1] tailLines = mafs[-1][1] if opts.subject: ref = opts.subject - 1 elif len(headLines) > 1 and headLines[0][3] < headLines[1][3]: ref = 1 # protein-to-DNA alignment else: ref = 0 if len(headLines) <= ref: raise RuntimeError("alignment has too few sequences") qry = 0 if ref > 0 else 1 qryName = headLines[qry][0] if len(headLines) > qry else "." qryStrand = headLines[qry][2] if len(headLines) > qry else "+" seqName, seqLen, strand, junk, beg = headLines[ref][:5] end = tailLines[ref][5] blocks = list(bedBlocksFromMafs(mafs, ref)) if strand == "-": beg, end = seqLen - end, seqLen - beg origin = sum(blocks[-1]) blocks = [(s, origin - b - s) for s, b in reversed(blocks)] outStrand = "+" if qryStrand == strand else "-" blockLens = ",".join(str(i[0]) for i in blocks) blockBegs = ",".join(str(i[1]) for i in blocks) print(seqName, beg, end, qryName, 0, outStrand, beg, beg, 0, len(blocks), blockLens, blockBegs, sep="\t") def mafConvertToBed(opts, lines): if opts.Join: for i in colinearMafInput(opts, lines): writeBed(opts, i) elif opts.join: for i in mafGroupInput(opts, False, lines): writeBed(opts, i) else: for i in mafInput(opts, lines): writeBed(opts, [i]) ##### Routines for converting to PSL format: ##### def pslBlocks(opts, geneticCode, mafs, outCounts): """Get sizes and start coordinates of gapless blocks in an alignment.""" # repMatches is always zero # for proteins, nCount is always zero, because that's what BLATv34 does normalBases = "ACGTU" mismatches = repMatches = nCount = 0 for maf in mafs: sLines = maf[1] fieldsA, fieldsB = pairOrDie(sLines, "PSL") letterSizeA, begA, endA, rowA = fieldsA[3:7] letterSizeB, begB, endB, rowB = fieldsB[3:7] rowA = rowA.upper() rowB = rowB.upper() if letterSizeA * letterSizeB < 1: letterSpanA = 3 - letterSizeA * 2 letterSpanB = 3 - letterSizeB * 2 codeA = geneticCode if letterSizeA else aminoAcidCodes codeB = geneticCode if letterSizeB else aminoAcidCodes loopEnd = len(rowA) i = j = 0 while j < loopEnd: if rowA[j] == "-": if j > i: size = j - i yield size // 3, begA, begB begA += size // letterSpanA begB += size // letterSpanB begB += 1 j += letterSpanB i = j elif rowB[j] == "-": if j > i: size = j - i yield size // 3, begA, begB begA += size // letterSpanA begB += size // letterSpanB begA += 1 j += letterSpanA i = j else: k = j + 3 if codeA.get(rowA[j:k], "X") != codeB.get(rowB[j:k], "X"): mismatches += 1 j = k if j > i: yield (j - i) // 3, begA, begB else: isProtein = opts.protein or letterSizeA * letterSizeB > 1 size = 0 for x, y in zip(rowA, rowB): if x == "-": if size: yield size, begA, begB begA += size * letterSizeA begB += size * letterSizeB size = 0 begB += symbolSize(y, letterSizeB) elif y == "-": if size: yield size, begA, begB begA += size * letterSizeA begB += size * letterSizeB size = 0 begA += symbolSize(x, letterSizeA) else: size += 1 if x in normalBases and y in normalBases or isProtein: if x != y: mismatches += 1 else: nCount += 1 if size: yield size, begA, begB outCounts[0:3] = mismatches, repMatches, nCount def pslNumInserts(blocks, letterSizeA, letterSizeB): numInsertA = numInsertB = 0 for i, x in enumerate(blocks): size, begA, begB = x if i: if begA > endA: numInsertA += 1 if begB > endB: numInsertB += 1 endA = begA + size * letterSizeA endB = begB + size * letterSizeB return numInsertA, numInsertB def pslCommaString(things): # UCSC software seems to prefer a trailing comma return ",".join(map(str, things)) + "," def pslEnds(seqLen, strand, beg, end): if strand == "-": return seqLen - end, seqLen - beg return beg, end def writePsl(opts, geneticCode, mafs): matchCounts = [0] * 3 blocks = list(pslBlocks(opts, geneticCode, mafs, matchCounts)) mismatches, repMatches, nCount = matchCounts numGaplessColumns = sum(i[0] for i in blocks) matches = numGaplessColumns - sum(matchCounts) if not blocks: return fieldsA, fieldsB = mafs[0][1] if opts.subject == 2 or fieldsA[3] < fieldsB[3] and opts.subject != 1: # default for protein-to-DNA alignment fieldsA, fieldsB = fieldsB, fieldsA blocks = [(s, b, a) for s, a, b in blocks] seqNameA, seqLenA, strandA, letterSizeA = fieldsA[0:4] seqNameB, seqLenB, strandB, letterSizeB = fieldsB[0:4] sizeMulA = 3 if letterSizeA > letterSizeB else 1 sizeMulB = 3 if letterSizeB > letterSizeA else 1 headSize, headBegA, headBegB = blocks[0] tailSize, tailBegA, tailBegB = blocks[-1] tailEndA = tailBegA + tailSize * sizeMulA begA, endA = pslEnds(seqLenA, strandA, headBegA, tailEndA) baseInsertA = endA - begA - numGaplessColumns * sizeMulA tailEndB = tailBegB + tailSize * sizeMulB begB, endB = pslEnds(seqLenB, strandB, headBegB, tailEndB) baseInsertB = endB - begB - numGaplessColumns * sizeMulB numInsertA, numInsertB = pslNumInserts(blocks, sizeMulA, sizeMulB) if sizeMulA > 1 or sizeMulB > 1: strand = strandB + strandA else: strand = "+" if strandA == strandB else "-" if strandA == "-": blocks = [(s, seqLenA - a - s, seqLenB - b - s) for s, a, b in reversed(blocks)] blockCount = len(blocks) blockSizes, blockStartsA, blockStartsB = map(pslCommaString, zip(*blocks)) print(matches, mismatches, repMatches, nCount, numInsertB, baseInsertB, numInsertA, baseInsertA, strand, seqNameB, seqLenB, begB, endB, seqNameA, seqLenA, begA, endA, blockCount, blockSizes, blockStartsB, blockStartsA, sep="\t") def mafConvertToPsl(opts, lines): geneticCode = standardGeneticCode() if opts.Join: for i in colinearMafInput(opts, lines): writePsl(opts, geneticCode, i) elif opts.join: for i in mafGroupInput(opts, False, lines): writePsl(opts, geneticCode, i) else: for i in mafInput(opts, lines): writePsl(opts, geneticCode, [i]) ##### Routines for converting to SAM format: ##### def readGroupId(readGroupItems): for i in readGroupItems: if i.startswith("ID:"): return i[3:] raise Exception("readgroup must include ID") def readSequenceLengths(fileNames): """Read name & length of topmost sequence in each maf block.""" for i in fileNames: f = myOpen(i) fields = None for line in f: if fields: if line.isspace(): fields = None else: if line[0] == "s": fields = line.split() yield fields[1], fields[5] f.close() def naturalSortKey(s): """Return a key that sorts strings in "natural" order.""" return [(str, int)[k]("".join(v)) for k, v in groupby(s, str.isdigit)] def karyotypicSortKey(s): """Attempt to sort chromosomes in GATK's ridiculous order.""" if s == "chrM": return [] if s == "MT": return ["~"] return naturalSortKey(s) def copyDictFile(lines): for line in lines: if line.startswith("@SQ"): sys.stdout.write(line) elif not line[0] == "@": break def writeSamHeader(opts, fileNames): print("@HD\tVN:1.3\tSO:unsorted") if opts.dictionary: sequenceLengths = dict(readSequenceLengths(fileNames)) for k in sorted(sequenceLengths, key=karyotypicSortKey): print("@SQ\tSN:%s\tLN:%s" % (k, sequenceLengths[k])) if opts.dictfile: f = myOpen(opts.dictfile) copyDictFile(f) f.close() if opts.readgroup: print("@RG\t" + "\t".join(opts.readgroup.split())) mapqMissing = "255" mapqMaximum = "254" mapqMaximumNum = float(mapqMaximum) def mismapProbabilityFromText(probString): try: p = float(probString) except ValueError: raise ValueError("bad probability: " + probString) if math.isnan(p): logging.warning("bad probability: " + probString) return 2.0 if p < 0 or p > 1: raise ValueError("bad probability: " + probString) return p def cigarParts(alignmentColumns): # (doesn't handle translated alignments) # uses "read-ahead" technique, aiming to be as fast as possible: isActive = True for x, y in alignmentColumns: break else: isActive = False while isActive: size = 1 if x == y: # xxx assumes no gap-gap columns, ignores ambiguous bases for x, y in alignmentColumns: if x != y: break size += 1 else: isActive = False yield str(size) + "=" elif x == "-": for x, y in alignmentColumns: if x != "-": break size += 1 else: isActive = False yield str(size) + "I" elif y == "-": for x, y in alignmentColumns: if y != "-": break size += 1 else: isActive = False yield str(size) + "D" else: for x, y in alignmentColumns: if x == y or x == "-" or y == "-": break size += 1 else: isActive = False yield str(size) + "X" def writeSam(readGroup, mafs): seq = qual = cigar = score = evalue = "" mismapProb = 2.0 editDistance = 0 for maf in mafs: aLine, sLines, qLines, pLines = maf fieldsA, fieldsB = pairOrDie(sLines, "SAM") seqNameA, seqLenA, strandA, letterSizeA, begA, endA, rowA = fieldsA seqNameB, seqLenB, strandB, letterSizeB, begB, endB, rowB = fieldsB if letterSizeA > 1 or letterSizeB > 1: raise Exception("this looks like translated DNA - can't convert to SAM format") if strandA != "+": raise Exception("for SAM, the 1st strand in each alignment must be +") isSplice = False for i in aLine.split(): if i.startswith("score="): score = i[6:] elif i.startswith("E="): evalue = i[2:] elif i.startswith("mismap="): mismapProb = min(mismapProb, mismapProbabilityFromText(i[7:])) elif i.startswith(("don=", "acc=")): isSplice = True if seq: d = begA - oldEndA cigar += str(d) + "DN"[isSplice] if not isSplice: editDistance += d else: pos = str(begA + 1) # convert to 1-based coordinate if begB: cigar += str(begB) + "H" oldEndA = endA alignmentColumns = list(zip(rowA.upper(), rowB.upper())) cigar += "".join(cigarParts(iter(alignmentColumns))) editDistance += sum(x != y for x, y in alignmentColumns) # no special treatment of ambiguous bases: might be a minor bug seq += rowB.replace("-", "") if qLines: qFields = qLines[-1].split() if qFields[1] == seqNameB: z = zip(rowB, qFields[2]) qual += ''.join(j for i, j in z if i != "-") revBegB = seqLenB - endB if revBegB: cigar += str(revBegB) + "H" if mismapProb > 1: mapq = mapqMissing elif mismapProb <= 0: mapq = mapqMaximum else: mapq = -10 * math.log(mismapProb, 10) mapq = str(int(round(mapq))) if mapq < mapqMaximumNum else mapqMaximum # It's hard to get all the pair info, so this is very # incomplete, but hopefully good enough. # I'm not sure whether to add 2 and/or 8 to flag. if seqNameB.endswith("/1"): seqNameB = seqNameB[:-2] if strandB == "+": flag = "99" # 1 + 2 + 32 + 64 else: flag = "83" # 1 + 2 + 16 + 64 elif seqNameB.endswith("/2"): seqNameB = seqNameB[:-2] if strandB == "+": flag = "163" # 1 + 2 + 32 + 128 else: flag = "147" # 1 + 2 + 16 + 128 else: if strandB == "+": flag = "0" else: flag = "16" if len(qual) < len(seq): qual = "*" out = [seqNameB, flag, seqNameA, pos, mapq, cigar, "*\t0\t0", seq, qual] out.append("NM:i:" + str(editDistance)) if len(mafs) < 2: if score.isdigit(): out.append("AS:i:" + score) # must be an integer if evalue: out.append("EV:Z:" + evalue) if readGroup: out.append(readGroup) print("\t".join(out)) def mafConvertToSam(opts, lines): readGroup = "" if opts.readgroup: readGroup = "RG:Z:" + readGroupId(opts.readgroup.split()) if opts.join: for i in mafGroupInput(opts, True, lines): writeSam(readGroup, i) else: for i in mafInput(opts, lines): writeSam(readGroup, [i]) ##### Routines for converting to BLAST-like format: ##### def codonMatchSymbols(opts, geneticCode, aaSeq, ntSeq): scoreSymbols = " ", "...", ":::" seqLen = len(aaSeq) i = 0 while i < seqLen: if aaSeq[i] == "-": yield " " i += 1 else: j = i + 3 a = aminoAcidCodes.get(aaSeq[i:j], "X") b = ntSeq[i:j] if geneticCode.get(b, "Z") == a: yield "|||" else: score = opts.scoreMatrix.get((a, b), -1) yield scoreSymbols[(score >= 0) + (score > 0)] i = j def translatedSeq(codon2triplet, aaSeq, ntSeq): seqLen = len(aaSeq) i = 0 while i < seqLen: if aaSeq[i] == "-": j = i + 1 while j < seqLen and aaSeq[j] == "-": j += 1 if (j - i) % 3 == 0: for b in range(i, j, 3): e = b + 3 yield codon2triplet.get(ntSeq[b:e], " ") else: yield " " * (j - i) else: j = i + 3 yield codon2triplet.get(ntSeq[i:j], " ") i = j def strandText(strand): if strand == "+": return "Plus" else: return "Minus" def blastBegCoordinate(zeroBasedCoordinate, strand, seqLen): if strand == "+": return str(zeroBasedCoordinate + 1) else: return str(seqLen - zeroBasedCoordinate) def blastEndCoordinate(zeroBasedCoordinate, strand, seqLen): if strand == "+": return str(zeroBasedCoordinate) else: return str(seqLen - zeroBasedCoordinate + 1) def nextCoordinate(coordinate, row, letterSize): return coordinate + insertSize(row, letterSize) def chunker(things, chunkSize): for i in range(0, len(things), chunkSize): yield things[i:i+chunkSize] def blastChunker(sLines, lineSize, alignmentColumns): seqLens = [i[1] for i in sLines] strands = [i[2] for i in sLines] letterSizes = [i[3] for i in sLines] coords = [i[4] for i in sLines] for chunkCols in chunker(alignmentColumns, lineSize): chunkRows = list(alignmentRowsFromColumns(chunkCols)) begs = list(map(blastBegCoordinate, coords, strands, seqLens)) coords = list(map(nextCoordinate, coords, chunkRows, letterSizes)) ends = list(map(blastEndCoordinate, coords, strands, seqLens)) yield chunkCols, chunkRows, begs, ends def blastDataFromMafFields(fields): seqName, seqLen, strand, letterSize, beg, end, row = fields maxPos = end if strand == "-": beg -= seqLen maxPos = -beg return seqName, seqLen, strand, letterSize, beg, maxPos, row, row.upper() def blastLineData(letterSize, beg, line): end = beg + insertSize(line, letterSize) begStr = str(beg + 1 if beg >= 0 else -beg) endStr = str(end if end > 0 else 1 - end) return begStr, line, endStr, end def writeBlast(opts, geneticCode, codon2triplet, maf, oldQueryName): aLine, sLines, qLines, pLines = maf fieldsA, fieldsB = pairOrDie(sLines, "Blast") if opts.subject == 2: fieldsA, fieldsB = fieldsB, fieldsA dataA = blastDataFromMafFields(fieldsA) dataB = blastDataFromMafFields(fieldsB) seqNameA, seqLenA, strandA, letterSizeA, begA, maxPosA, rowA, upA = dataA seqNameB, seqLenB, strandB, letterSizeB, begB, maxPosB, rowB, upB = dataB if seqNameB != oldQueryName: print("Query= " + seqNameB) print(" (%s letters)" % seqLenB) print() print(">" + seqNameA) print(" Length = %s" % seqLenA) print() score, evalue = scoreAndEvalue(aLine) if score and opts.bitScoreA is not None and opts.bitScoreB is not None: bitScore = opts.bitScoreA * float(score) - opts.bitScoreB scoreLine = " Score = %.3g bits (%s)" % (bitScore, score) else: scoreLine = " Score = %s" % score if evalue: scoreLine += ", Expect = %s" % evalue print(scoreLine) if letterSizeA * letterSizeB < 1: c = codonMatchCounts(geneticCode, letterSizeA, letterSizeB, upA, upB) matches, mismatches, gaps, dangles = c alnSize = matches + mismatches + gaps (aaSeq, ntSeq) = (upA, upB) if letterSizeB else (upB, upA) sySeq = "".join(codonMatchSymbols(opts, geneticCode, aaSeq, ntSeq)) txSeq = "".join(translatedSeq(codon2triplet, aaSeq, ntSeq)) lineSize = max(opts.linesize, 3) else: sySeq = "".join(" |"[x == y] for x, y in zip(upA, upB)) matches = sySeq.count("|") gaps = upA.count("-") + upB.count("-") alnSize = len(sySeq) lineSize = opts.linesize matchPercent = 100 * matches // alnSize # round down, like BLAST identLine = " Identities = %s/%s (%s%%)" % (matches, alnSize, matchPercent) if gaps: gapPercent = 100 * gaps // alnSize # round down, like BLAST identLine += ", Gaps = %s/%s (%s%%)" % (gaps, alnSize, gapPercent) print(identLine) print(" Strand = %s / %s" % (strandText(strandB), strandText(strandA))) print() begWidth = len(str(max(maxPosA, maxPosB))) pad = " " * (begWidth + 7) numOfColumns = len(sySeq) i = 0 while i < numOfColumns: j = min(i + lineSize, numOfColumns) rem = 0 if letterSizeA * letterSizeB < 1 and j < numOfColumns: rem = ((j - i - aaSeq.count("-", i, j)) % 3 or (j - i - txSeq.count(" ", i, j)) % 3) j -= rem bA, lineA, eA, begA = blastLineData(letterSizeA, begA, rowA[i:j]) bB, lineB, eB, begB = blastLineData(letterSizeB, begB, rowB[i:j]) if letterSizeA == 0: print(pad, txSeq[i:j]) print("Query: %-*s" % (begWidth, bB), lineB + " " * rem, eB) print(pad, sySeq[i:j]) print("Sbjct: %-*s" % (begWidth, bA), lineA + " " * rem, eA) if letterSizeB == 0: print(pad, txSeq[i:j]) print() i = j def mafConvertToBlast(opts, lines): geneticCode = standardGeneticCode() aa2triplet = dict((j, i.title()) for i, j in aminoAcidCodes.items()) codon2triplet = dict((i, aa2triplet[j]) for i, j in geneticCode.items()) qryNum = 0 if opts.subject == 2 else 1 oldQueryName = "" for maf in mafInput(opts, lines): writeBlast(opts, geneticCode, codon2triplet, maf, oldQueryName) sLines = maf[1] oldQueryName = sLines[qryNum][0] def btabDataFromMafFields(fields): seqName, seqLen, strand, letterSize, beg, end, row = fields if strand == "+": beg += 1 else: beg = seqLen - beg end = seqLen - end + 1 return seqName, letterSize, str(beg), str(end), row.upper() def codonMatchCounts(geneticCode, letterSizeA, letterSizeB, rowA, rowB): codeA = geneticCode if letterSizeA else aminoAcidCodes codeB = geneticCode if letterSizeB else aminoAcidCodes loopEnd = len(rowA) matches = mismatches = gaps = dangles = 0 i = 0 while i < loopEnd: if rowA[i] == "-": gaps += 1 if rowA[i+1] == "-" and rowA[i+2] == "-": i += 3 else: if rowB[i-1] == "-": dangles += 1 i += 1 elif rowB[i] == "-": gaps += 1 if rowB[i+1] == "-" and rowB[i+2] == "-": i += 3 else: if rowA[i-1] == "-": dangles += 1 i += 1 else: j = i + 3 if codeA.get(rowA[i:j], "X") == codeB.get(rowB[i:j], "X"): matches += 1 else: mismatches += 1 i = j return matches, mismatches, gaps, dangles def writeBlastTab(opts, geneticCode, maf): aLine, sLines, qLines, pLines = maf fieldsA, fieldsB = pairOrDie(sLines, "BlastTab") if opts.subject == 2: fieldsA, fieldsB = fieldsB, fieldsA seqNameA, letterSizeA, begA, endA, rowA = btabDataFromMafFields(fieldsA) seqNameB, letterSizeB, begB, endB, rowB = btabDataFromMafFields(fieldsB) gapOpens = gapRunCount(rowA) + gapRunCount(rowB) if letterSizeA * letterSizeB < 1: c = codonMatchCounts(geneticCode, letterSizeA, letterSizeB, rowA, rowB) matches, mismatches, gaps, dangles = c alnSize = matches + mismatches + gaps gapOpens -= dangles else: alignmentColumns = list(zip(rowA, rowB)) alnSize = len(alignmentColumns) matches = sum(x == y for x, y in alignmentColumns) mismatches = alnSize - matches - rowA.count("-") - rowB.count("-") matchPercent = "%.2f" % (100.0 * matches / alnSize) out = [seqNameB, seqNameA, matchPercent, alnSize, mismatches, gapOpens, begB, endB, begA, endA] score, evalue = scoreAndEvalue(aLine) if evalue: out.append(evalue) if score and opts.bitScoreA is not None and opts.bitScoreB is not None: bitScore = opts.bitScoreA * float(score) - opts.bitScoreB out.append("%.3g" % bitScore) print(*out, sep="\t") def mafConvertToBlastTab(opts, lines): geneticCode = standardGeneticCode() for maf in mafInput(opts, lines): writeBlastTab(opts, geneticCode, maf) ##### Routines for converting to GFF format: ##### def writeGffHeader(): print("##gff-version 3") def gffFromMaf(opts, maf): aLine, sLines, qLines, pLines = maf fieldsA, fieldsB = pairOrDie(sLines, "GFF") if opts.subject == 2 or fieldsA[3] < fieldsB[3] and opts.subject != 1: # default for protein-to-DNA alignment fieldsA, fieldsB = fieldsB, fieldsA seqNameA, seqLenA, strandA, letterSizeA, begA, endA, rowA = fieldsA seqNameB, seqLenB, strandB, letterSizeB, begB, endB, rowB = fieldsB score = "." for i in aLine.split(): if i.startswith("score="): score = i[6:] if strandA == "-": begA, endA = seqLenA - endA, seqLenA - begA if strandB == "-": begB, endB = seqLenB - endB, seqLenB - begB begA += 1 begB += 1 strand = "+" if strandA == strandB else "-" return seqNameA, begA, endA, strand, score, seqNameB, begB, endB def writeOneGff(gff, typeOfThing, parentId): seqNameA, begA, endA, strand, score, seqNameB, begB, endB = gff target = "Target={0} {1} {2}".format(seqNameB, begB, endB) name = "Name={0}:{1}-{2}".format(seqNameB, begB, endB) attributes = [target, name] if parentId: attributes.append("Parent=" + parentId) print(seqNameA, "maf-convert", typeOfThing, begA, endA, score, strand, ".", ";".join(attributes), sep="\t") def writeGffGroup(opts, qryNameCounts, mafs): gffs = sorted(gffFromMaf(opts, i) for i in mafs) seqNameA, begA, endA, strand, score, seqNameB, begB, endB = gffs[0] endA = gffs[-1][2] if strand == "+": endB = gffs[-1][7] else: begB = gffs[-1][6] qryNameCounts[seqNameB] += 1 parentId = "{0}.{1}".format(seqNameB, qryNameCounts[seqNameB]) myId = "ID=" + parentId name = "Name={0}:{1}-{2}".format(seqNameB, begB, endB) attributes = [myId, name] print(seqNameA, "maf-convert", "match", begA, endA, ".", strand, ".", ";".join(attributes), sep="\t") for i in gffs: writeOneGff(i, "match_part", parentId) def mafConvertToGff(opts, lines): qryNameCounts = collections.defaultdict(int) if opts.Join: for i in colinearMafInput(opts, lines): writeGffGroup(opts, qryNameCounts, i) elif opts.join: for i in mafGroupInput(opts, False, lines): writeGffGroup(opts, qryNameCounts, i) else: for i in mafInput(opts, lines): writeOneGff(gffFromMaf(opts, i), "match", None) ##### Routines for converting to HTML format: ##### def writeHtmlHeader(): print('''
prob > 0.999
prob > 0.99
prob > 0.95
prob > 0.9
prob > 0.5
prob ≤ 0.5
') for chunk in blastChunker(sLines, opts.linesize, alignmentColumns): cols, rows, begs, ends = chunk begWidth = maxlen(begs) endWidth = maxlen(ends) matchSymbols = ''.join(map(multipleMatchSymbol, cols)) classChunk = islice(classes, opts.linesize) classRuns = list(identicalRuns(classChunk)) for n, b, r, e in zip(seqNames, begs, rows, ends): spans = [htmlSpan(r, i) for i in classRuns] spans = ''.join(spans) formatParams = nameWidth, n, begWidth, b, spans, endWidth, e print('%-*s %*s %s %*s' % formatParams) print(' ' * nameWidth, ' ' * begWidth, matchSymbols) print() print('') def mafConvertToHtml(opts, lines): for maf in mafInput(opts, lines): writeHtml(opts, maf) ##### Main program: ##### def isFormat(myString, myFormat): return myFormat.startswith(myString) def mafConvertOneFile(opts, formatName, lines): if isFormat(formatName, "axt"): mafConvertToAxt(opts, lines) elif isFormat(formatName, "bed"): mafConvertToBed(opts, lines) elif isFormat(formatName, "blast"): mafConvertToBlast(opts, lines) elif isFormat(formatName, "blasttab"): mafConvertToBlastTab(opts, lines) elif isFormat(formatName, "chain"): mafConvertToChain(opts, lines) elif isFormat(formatName, "gff"): mafConvertToGff(opts, lines) elif isFormat(formatName, "html"): mafConvertToHtml(opts, lines) elif isFormat(formatName, "psl"): mafConvertToPsl(opts, lines) elif isFormat(formatName, "sam"): mafConvertToSam(opts, lines) elif isFormat(formatName, "tabular"): mafConvertToTab(opts, lines) else: raise Exception("unknown format: " + formatName) def mafConvert(opts, args): logging.basicConfig(format="%(filename)s: %(message)s") formatName = args[0].lower() fileNames = args[1:] opts.headerMode = 0 opts.bitScoreA = None opts.bitScoreB = None if not opts.noheader: if isFormat(formatName, "gff"): writeGffHeader() if isFormat(formatName, "html"): writeHtmlHeader() if isFormat(formatName, "sam"): writeSamHeader(opts, fileNames) opts.headerMode = 2 if isFormat(formatName, "tabular"): opts.headerMode = 1 if not fileNames: fileNames = ["-"] for i in fileNames: f = myOpen(i) mafConvertOneFile(opts, formatName, f) f.close() if not opts.noheader: if isFormat(formatName, "html"): print("") if __name__ == "__main__": signal.signal(signal.SIGPIPE, signal.SIG_DFL) # avoid silly error message usage = """ %prog --help %prog axt mafFile(s) %prog bed mafFile(s) %prog blast mafFile(s) %prog blasttab mafFile(s) %prog chain mafFile(s) %prog gff mafFile(s) %prog html mafFile(s) %prog psl mafFile(s) %prog sam mafFile(s) %prog tab mafFile(s)""" description = "Read MAF-format alignments & write them in another format." op = optparse.OptionParser(usage=usage, description=description) op.add_option("-s", "--subject", type="int", metavar="N", help="which sequence to use as subject/reference") op.add_option("-p", "--protein", action="store_true", help="assume protein alignments, for psl match counts") op.add_option("-j", "--join", type="float", metavar="N", help="join " "consecutive co-linear alignments separated by <= N letters") op.add_option("-J", "--Join", type="float", metavar="N", help= "join nearest co-linear alignments separated by <= N letters") op.add_option("-n", "--noheader", action="store_true", help="omit any header lines from the output") op.add_option("-d", "--dictionary", action="store_true", help="include dictionary of sequence lengths in sam format") op.add_option("-f", "--dictfile", help="get sequence dictionary from DICTFILE") op.add_option("-r", "--readgroup", help="read group info for sam format") op.add_option("-l", "--linesize", type="int", default=60, #metavar="CHARS", help="line length for blast and html formats (default: %default)") opts, args = op.parse_args() if opts.linesize <= 0: op.error("option -l: should be >= 1") if opts.dictionary and opts.dictfile: op.error("can't use both -d and -f") if len(args) < 1: op.error("I need a format-name and some MAF alignments") if opts.dictionary and (len(args) == 1 or "-" in args[1:]): op.error("need file (not pipe) with option -d") try: mafConvert(opts, args) except Exception as e: prog = os.path.basename(sys.argv[0]) sys.exit(prog + ": error: " + str(e))