# Copyright 2009-2013 by Peter Cock. All rights reserved. # This code is part of the Biopython distribution and governed by its # license. Please see the LICENSE file that should have been included # as part of this package. """Additional unit tests for Bio.SeqIO.QualityIO (covering FASTQ and QUAL).""" from __future__ import print_function import os import unittest import warnings from Bio._py3k import range from Bio._py3k import StringIO from Bio._py3k import _universal_read_mode from io import BytesIO from Bio import BiopythonWarning, BiopythonParserWarning from Bio.Alphabet import generic_dna from Bio.SeqIO import QualityIO from Bio import SeqIO from Bio.Seq import Seq, UnknownSeq, MutableSeq from Bio.SeqRecord import SeqRecord from Bio.Data.IUPACData import ambiguous_dna_letters, ambiguous_rna_letters BINARY_FORMATS = ["sff", "sff-trim"] def truncation_expected(format): if format in ["fastq-solexa", "fastq-illumina"] : return 62 elif format in ["fastq", "fastq-sanger"]: return 93 else: assert format in ["fasta", "qual", "phd", "sff"] return None #Top level function as this makes it easier to use for debugging: def write_read(filename, in_format, out_format): if in_format in BINARY_FORMATS: mode = "rb" else: mode = "r" with open(filename, mode) as handle: records = list(SeqIO.parse(handle, in_format)) #Write it out... if out_format in BINARY_FORMATS: handle = BytesIO() else : handle = StringIO() SeqIO.write(records, handle, out_format) handle.seek(0) #Now load it back and check it agrees, records2 = list(SeqIO.parse(handle, out_format)) compare_records(records, records2, truncation_expected(out_format)) def compare_record(old, new, truncate=None): """Quality aware SeqRecord comparison. This will check the mapping between Solexa and PHRED scores. It knows to ignore UnknownSeq objects for string matching (i.e. QUAL files). """ if old.id != new.id: raise ValueError("'%s' vs '%s' " % (old.id, new.id)) if old.description != new.description \ and (old.id+" "+old.description).strip() != new.description: raise ValueError("'%s' vs '%s' " % (old.description, new.description)) if len(old.seq) != len(new.seq): raise ValueError("%i vs %i" % (len(old.seq), len(new.seq))) if isinstance(old.seq, UnknownSeq) or isinstance(new.seq, UnknownSeq): pass elif str(old.seq) != str(new.seq): if len(old.seq) < 200: raise ValueError("'%s' vs '%s'" % (old.seq, new.seq)) else: raise ValueError("'%s...' vs '%s...'" % (old.seq[:100], new.seq[:100])) if "phred_quality" in old.letter_annotations \ and "phred_quality" in new.letter_annotations \ and old.letter_annotations["phred_quality"] != new.letter_annotations["phred_quality"]: if truncate and [min(q, truncate) for q in old.letter_annotations["phred_quality"]] == \ [min(q, truncate) for q in new.letter_annotations["phred_quality"]]: pass else: raise ValuerError("Mismatch in phred_quality") if "solexa_quality" in old.letter_annotations \ and "solexa_quality" in new.letter_annotations \ and old.letter_annotations["solexa_quality"] != new.letter_annotations["solexa_quality"]: if truncate and [min(q, truncate) for q in old.letter_annotations["solexa_quality"]] == \ [min(q, truncate) for q in new.letter_annotations["solexa_quality"]]: pass else: raise ValueError("Mismatch in phred_quality") if "phred_quality" in old.letter_annotations \ and "solexa_quality" in new.letter_annotations: #Mapping from Solexa to PHRED is lossy, but so is PHRED to Solexa. #Assume "old" is the original, and "new" has been converted. converted = [round(QualityIO.solexa_quality_from_phred(q)) for q in old.letter_annotations["phred_quality"]] if truncate: converted = [min(q, truncate) for q in converted] if converted != new.letter_annotations["solexa_quality"]: print("") print(old.letter_annotations["phred_quality"]) print(converted) print(new.letter_annotations["solexa_quality"]) raise ValueError("Mismatch in phred_quality vs solexa_quality") if "solexa_quality" in old.letter_annotations \ and "phred_quality" in new.letter_annotations: #Mapping from Solexa to PHRED is lossy, but so is PHRED to Solexa. #Assume "old" is the original, and "new" has been converted. converted = [round(QualityIO.phred_quality_from_solexa(q)) for q in old.letter_annotations["solexa_quality"]] if truncate: converted = [min(q, truncate) for q in converted] if converted != new.letter_annotations["phred_quality"]: print(old.letter_annotations["solexa_quality"]) print(converted) print(new.letter_annotations["phred_quality"]) raise ValueError("Mismatch in solexa_quality vs phred_quality") return True def compare_records(old_list, new_list, truncate_qual=None): """Check two lists of SeqRecords agree, raises a ValueError if mismatch.""" if len(old_list) != len(new_list): raise ValueError("%i vs %i records" % (len(old_list), len(new_list))) for old, new in zip(old_list, new_list): if not compare_record(old, new, truncate_qual): return False return True class TestFastqErrors(unittest.TestCase): """Test reject invalid FASTQ files.""" def check_fails(self, filename, good_count, formats=None, raw=True): if not formats: formats = ["fastq-sanger", "fastq-solexa", "fastq-illumina"] for format in formats: handle = open(filename, _universal_read_mode) records = SeqIO.parse(handle, format) for i in range(good_count): record = next(records) # Make sure no errors! self.assertTrue(isinstance(record, SeqRecord)) self.assertRaises(ValueError, next, records) handle.close() def check_general_fails(self, filename, good_count): handle = open(filename, _universal_read_mode) tuples = QualityIO.FastqGeneralIterator(handle) for i in range(good_count): title, seq, qual = next(tuples) # Make sure no errors! self.assertRaises(ValueError, next, tuples) handle.close() def check_general_passes(self, filename, record_count): handle = open(filename, _universal_read_mode) tuples = QualityIO.FastqGeneralIterator(handle) #This "raw" parser doesn't check the ASCII characters which means #certain invalid FASTQ files will get parsed without errors. count = 0 for title, seq, qual in tuples: self.assertEqual(len(seq), len(qual)) count += 1 self.assertEqual(count, record_count) handle.close() def check_all_fail(self, filename, count): self.check_fails(filename, count) self.check_general_fails(filename, count) def check_qual_char(self, filename, good_count, count): self.check_fails(filename, good_count) self.check_general_passes(filename, count) #Now add methods at run time... these FASTQ files will be rejected #by both the low level parser AND the high level SeqRecord parser: tests = [("diff_ids", 2), ("no_qual", 0), ("long_qual", 3), ("short_qual", 2), ("double_seq", 3), ("double_qual", 2), ("tabs", 0), ("spaces", 0), ("trunc_in_title", 4), ("trunc_in_seq", 4), ("trunc_in_plus", 4), ("trunc_in_qual", 4), ("trunc_at_seq", 4), ("trunc_at_plus", 4), ("trunc_at_qual", 4)] for base_name, good_count in tests: def funct(name, c): f = lambda x : x.check_all_fail("Quality/error_%s.fastq" % name, c) f.__doc__ = "Reject FASTQ with %s" % name.replace("_", " ") return f setattr(TestFastqErrors, "test_%s" % (base_name), funct(base_name, good_count)) del funct #Now add methods for FASTQ files which will be rejected by the high #level SeqRecord parser, but will be accepted by the low level parser: tests = [("del", 3, 5), ("space", 3, 5), ("vtab", 0, 5), ("escape", 4, 5), ("unit_sep", 2, 5), ("tab", 4, 5), ("null", 0, 5)] for base_name, good_count, full_count in tests: def funct(name, c1, c2): f = lambda x : x.check_qual_char("Quality/error_qual_%s.fastq"%name, c1, c2) f.__doc__ = "Reject FASTQ with %s in quality" % name.replace("_", " ") return f setattr(TestFastqErrors, "test_qual_%s" % (base_name), funct(base_name, good_count, full_count)) del funct class TestReferenceSffConversions(unittest.TestCase): def check(self, sff_name, sff_format, out_name, format) : wanted = list(SeqIO.parse(out_name, format)) data = StringIO() count = SeqIO.convert(sff_name, sff_format, data, format) self.assertEqual(count, len(wanted)) data.seek(0) converted = list(SeqIO.parse(data, format)) self.assertEqual(len(wanted), len(converted)) for old, new in zip(wanted, converted) : self.assertEqual(old.id, new.id) self.assertEqual(old.name, new.name) if format!="qual" : self.assertEqual(str(old.seq), str(new.seq)) elif format!="fasta" : self.assertEqual(old.letter_annotations["phred_quality"], new.letter_annotations["phred_quality"]) def check_sff(self, sff_name): self.check(sff_name, "sff", "Roche/E3MFGYR02_random_10_reads_no_trim.fasta", "fasta") self.check(sff_name, "sff", "Roche/E3MFGYR02_random_10_reads_no_trim.qual", "qual") self.check(sff_name, "sff-trim", "Roche/E3MFGYR02_random_10_reads.fasta", "fasta") self.check(sff_name, "sff-trim", "Roche/E3MFGYR02_random_10_reads.qual", "qual") def test_original(self) : """Test converting E3MFGYR02_random_10_reads.sff into FASTA+QUAL""" self.check_sff("Roche/E3MFGYR02_random_10_reads.sff") def test_no_manifest(self) : """Test converting E3MFGYR02_no_manifest.sff into FASTA+QUAL""" self.check_sff("Roche/E3MFGYR02_no_manifest.sff") def test_alt_index_at_start(self) : """Test converting E3MFGYR02_alt_index_at_start into FASTA+QUAL""" self.check_sff("Roche/E3MFGYR02_alt_index_at_start.sff") def test_alt_index_in_middle(self) : """Test converting E3MFGYR02_alt_index_in_middle into FASTA+QUAL""" self.check_sff("Roche/E3MFGYR02_alt_index_in_middle.sff") def test_alt_index_at_end(self) : """Test converting E3MFGYR02_alt_index_at_end into FASTA+QUAL""" self.check_sff("Roche/E3MFGYR02_alt_index_at_end.sff") def test_index_at_start(self) : """Test converting E3MFGYR02_index_at_start into FASTA+QUAL""" self.check_sff("Roche/E3MFGYR02_index_at_start.sff") def test_index_at_end(self) : """Test converting E3MFGYR02_index_in_middle into FASTA+QUAL""" self.check_sff("Roche/E3MFGYR02_index_in_middle.sff") class TestReferenceFastqConversions(unittest.TestCase): """Tests where we have reference output.""" def simple_check(self, base_name, in_variant): for out_variant in ["sanger", "solexa", "illumina"]: in_filename = "Quality/%s_original_%s.fastq" \ % (base_name, in_variant) self.assertTrue(os.path.isfile(in_filename)) # Load the reference output... with open("Quality/%s_as_%s.fastq" % (base_name, out_variant), _universal_read_mode) as handle: expected = handle.read() with warnings.catch_warnings(): if out_variant != "sanger": # Ignore data loss warnings from max qualities warnings.simplefilter("ignore", BiopythonWarning) warnings.simplefilter("ignore", UserWarning) # Check matches using convert... handle = StringIO() SeqIO.convert(in_filename, "fastq-"+in_variant, handle, "fastq-"+out_variant) self.assertEqual(expected, handle.getvalue()) # Check matches using parse/write handle = StringIO() SeqIO.write(SeqIO.parse(in_filename, "fastq-"+in_variant), handle, "fastq-"+out_variant) self.assertEqual(expected, handle.getvalue()) #Now add methods at run time... tests = [("illumina_full_range", "illumina"), ("sanger_full_range", "sanger"), ("longreads", "sanger"), ("solexa_full_range", "solexa"), ("misc_dna", "sanger"), ("wrapping", "sanger"), ("misc_rna", "sanger")] for base_name, variant in tests: assert variant in ["sanger", "solexa", "illumina"] def funct(bn, var): f = lambda x : x.simple_check(bn, var) f.__doc__ = "Reference conversions of %s file %s" % (var, bn) return f setattr(TestReferenceFastqConversions, "test_%s_%s" % (base_name, variant), funct(base_name, variant)) del funct class TestQual(unittest.TestCase): """Tests with QUAL files.""" def test_paired(self): """Check FASTQ parsing matches FASTA+QUAL parsing""" with open("Quality/example.fasta") as f: with open("Quality/example.qual") as q: records1 = list(QualityIO.PairedFastaQualIterator(f, q )) records2 = list(SeqIO.parse("Quality/example.fastq", "fastq")) self.assertTrue(compare_records(records1, records2)) def test_qual(self): """Check FASTQ parsing matches QUAL parsing""" records1 = list(SeqIO.parse("Quality/example.qual", "qual")) records2 = list(SeqIO.parse("Quality/example.fastq", "fastq")) #Will ignore the unknown sequences :) self.assertTrue(compare_records(records1, records2)) def test_qual_out(self): """Check FASTQ to QUAL output""" records = SeqIO.parse("Quality/example.fastq", "fastq") h = StringIO() SeqIO.write(records, h, "qual") with open("Quality/example.qual") as expected: self.assertEqual(h.getvalue(), expected.read()) def test_fasta(self): """Check FASTQ parsing matches FASTA parsing""" records1 = list(SeqIO.parse("Quality/example.fasta", "fasta")) records2 = list(SeqIO.parse("Quality/example.fastq", "fastq")) self.assertTrue(compare_records(records1, records2)) def test_fasta_out(self): """Check FASTQ to FASTA output""" records = SeqIO.parse("Quality/example.fastq", "fastq") h = StringIO() SeqIO.write(records, h, "fasta") with open("Quality/example.fasta") as expected: self.assertEqual(h.getvalue(), expected.read()) def test_qual_negative(self): """Check QUAL negative scores mapped to PHRED zero""" data = """>1117_10_107_F3 23 31 -1 -1 -1 29 -1 -1 20 32 -1 18 25 7 -1 6 -1 -1 -1 30 -1 20 13 7 -1 -1 21 30 -1 24 -1 22 -1 -1 22 14 -1 12 26 21 -1 5 -1 -1 -1 20 -1 -1 12 28 >1117_10_146_F3 20 33 -1 -1 -1 29 -1 -1 28 28 -1 7 16 5 -1 30 -1 -1 -1 14 -1 4 13 4 -1 -1 11 13 -1 5 -1 7 -1 -1 10 16 -1 4 12 15 -1 8 -1 -1 -1 16 -1 -1 10 4 >1117_10_1017_F3 33 33 -1 -1 -1 27 -1 -1 17 16 -1 28 24 11 -1 6 -1 -1 -1 29 -1 8 29 24 -1 -1 8 8 -1 20 -1 13 -1 -1 8 13 -1 28 10 24 -1 10 -1 -1 -1 4 -1 -1 7 6 >1117_11_136_F3 16 22 -1 -1 -1 33 -1 -1 30 27 -1 27 28 32 -1 29 -1 -1 -1 27 -1 18 9 6 -1 -1 23 16 -1 26 -1 5 7 -1 22 7 -1 18 14 8 -1 8 -1 -1 -1 11 -1 -1 4 24""" h = StringIO(data) h2 = StringIO() with warnings.catch_warnings(): warnings.simplefilter("ignore", BiopythonParserWarning) self.assertEqual(4, SeqIO.convert(h, "qual", h2, "fastq")) self.assertEqual(h2.getvalue(), """@1117_10_107_F3 ?????????????????????????????????????????????????? + 8@!!!>!!5A!3:(!'!!!?!5.(!!6?!9!7!!7/!-;6!&!!!5!!-= @1117_10_146_F3 ?????????????????????????????????????????????????? + 5B!!!>!!==!(1&!?!!!/!%.%!!,.!&!(!!+1!%-0!)!!!1!!+% @1117_10_1017_F3 ?????????????????????????????????????????????????? + BB!!!!)>9!!))!5!.!!).!=+9!+!!!%!!(' @1117_11_136_F3 ?????????????????????????????????????????????????? + 17!!!B!!?!!!