# Copyright 2010-2013 by Peter Cock. # All rights reserved. # This code is part of the Biopython distribution and governed by its # license. Please see the LICENSE file that should have been included # as part of this package. """Test code for working with BGZF files (used in BAM files). See also the doctests in bgzf.py which are called via run_tests.py """ import unittest import gzip import os from random import shuffle from Bio._py3k import _as_bytes, _as_string _empty_bytes_string = _as_bytes("") from Bio import bgzf def _have_bug17666(): """Debug function to check if Python's gzip is broken (PRIVATE). Checks for http://bugs.python.org/issue17666 expected in Python 2.7.4, 3.2.4 and 3.3.1 only. """ from io import BytesIO h = gzip.GzipFile(fileobj=BytesIO(bgzf._bgzf_eof)) try: data = h.read() h.close() assert not data, "Should be zero length, not %i" % len(data) return False except TypeError as err: #TypeError: integer argument expected, got 'tuple' return True if _have_bug17666(): from Bio import MissingPythonDependencyError raise MissingPythonDependencyError("Your Python has a broken gzip library, see " "http://bugs.python.org/issue17666 for details") class BgzfTests(unittest.TestCase): def setUp(self): self.temp_file = "temp.bgzf" if os.path.isfile(self.temp_file): os.remove(self.temp_file) def tearDown(self): if os.path.isfile(self.temp_file): os.remove(self.temp_file) def rewrite(self, compressed_input_file, output_file): h = gzip.open(compressed_input_file, "rb") data = h.read() h.close() with bgzf.BgzfWriter(output_file, "wb") as h: h.write(data) self.assertFalse(h.seekable()) self.assertFalse(h.isatty()) self.assertEqual(h.fileno(), h._handle.fileno()) #Context manager should call close(), #Gives empty BGZF block as BAM EOF marker h = gzip.open(output_file) new_data = h.read() h.close() #Check the decompressed files agree self.assertTrue(new_data, "Empty BGZF file?") self.assertEqual(len(data), len(new_data)) self.assertEqual(data, new_data) def check_blocks(self, old_file, new_file): h = open(old_file, "rb") old = list(bgzf.BgzfBlocks(h)) h.close() h = open(new_file, "rb") new = list(bgzf.BgzfBlocks(h)) h.close() self.assertEqual(len(old), len(new)) self.assertEqual(old, new) def check_text(self, old_file, new_file): """Check text mode using explicit open/close""" h = open(old_file) # text mode! old_line = h.readline() old = old_line + h.read() h.close() h = bgzf.BgzfReader(new_file, "r") # Text mode! new_line = h.readline() new = new_line + h.read(len(old)) h.close() self.assertEqual(old_line, new_line) self.assertEqual(len(old), len(new)) self.assertEqual(old, new) def check_text_with(self, old_file, new_file): """Check text mode using context manager (with statement)""" with open(old_file) as h: # text mode! old_line = h.readline() old = old_line + h.read() with bgzf.BgzfReader(new_file, "r") as h: # Text mode! new_line = h.readline() new = new_line + h.read(len(old)) self.assertEqual(old_line, new_line) self.assertEqual(len(old), len(new)) self.assertEqual(old, new) def check_by_line(self, old_file, new_file, old_gzip=False): for mode in ["r", "rb"]: if old_gzip: h = gzip.open(old_file, mode) else: h = open(old_file, mode) old = h.read() #Seems gzip can return bytes even if mode="r", #perhaps a bug in Python 3.2? if "b" in mode: old = _as_bytes(old) else: old = _as_string(old) h.close() for cache in [1, 10]: h = bgzf.BgzfReader(new_file, mode, max_cache=cache) if "b" in mode: new = _empty_bytes_string.join(line for line in h) else: new = "".join(line for line in h) h.close() self.assertEqual(len(old), len(new)) self.assertEqual(old[:10], new[:10], "%r vs %r, mode %r" % (old[:10], new[:10], mode)) self.assertEqual(old, new) def check_by_char(self, old_file, new_file, old_gzip=False): for mode in ["r", "rb"]: if old_gzip: h = gzip.open(old_file, mode) else: h = open(old_file, mode) old = h.read() #Seems gzip can return bytes even if mode="r", #perhaps a bug in Python 3.2? if "b" in mode: old = _as_bytes(old) else: old = _as_string(old) h.close() for cache in [1, 10]: h = bgzf.BgzfReader(new_file, mode, max_cache=cache) temp = [] while True: char = h.read(1) if not char: break temp.append(char) if "b" in mode: new = _empty_bytes_string.join(temp) else: new = "".join(temp) del temp h.close() self.assertEqual(len(old), len(new)) #If bytes vs unicode mismatch, give a short error message: self.assertEqual(old[:10], new[:10], "%r vs %r, mode %r" % (old[:10], new[:10], mode)) self.assertEqual(old, new) def check_random(self, filename): """Check BGZF random access by reading blocks in forward & reverse order""" h = gzip.open(filename, "rb") old = h.read() h.close() h = open(filename, "rb") blocks = list(bgzf.BgzfBlocks(h)) h.close() #Forward, using explicit open/close new = _empty_bytes_string h = bgzf.BgzfReader(filename, "rb") self.assertTrue(h.seekable()) self.assertFalse(h.isatty()) self.assertEqual(h.fileno(), h._handle.fileno()) for start, raw_len, data_start, data_len in blocks: h.seek(bgzf.make_virtual_offset(start, 0)) data = h.read(data_len) self.assertEqual(len(data), data_len) #self.assertEqual(start + raw_len, h._handle.tell()) self.assertEqual(len(new), data_start) new += data h.close() self.assertEqual(len(old), len(new)) self.assertEqual(old, new) #Reverse, using with statement new = _empty_bytes_string with bgzf.BgzfReader(filename, "rb") as h: for start, raw_len, data_start, data_len in blocks[::-1]: h.seek(bgzf.make_virtual_offset(start, 0)) data = h.read(data_len) self.assertEqual(len(data), data_len) #self.assertEqual(start + raw_len, h._handle.tell()) new = data + new self.assertEqual(len(old), len(new)) self.assertEqual(old, new) #Jump back - non-sequential seeking if len(blocks) >= 3: h = bgzf.BgzfReader(filename, "rb", max_cache = 1) #Seek to a late block in the file, #half way into the third last block start, raw_len, data_start, data_len = blocks[-3] voffset = bgzf.make_virtual_offset(start, data_len // 2) h.seek(voffset) self.assertEqual(voffset, h.tell()) data = h.read(1000) self.assertTrue(data in old) self.assertEqual(old.find(data), data_start + data_len // 2) #Now seek to an early block in the file, #half way into the second block start, raw_len, data_start, data_len = blocks[1] h.seek(bgzf.make_virtual_offset(start, data_len // 2)) voffset = bgzf.make_virtual_offset(start, data_len // 2) h.seek(voffset) self.assertEqual(voffset, h.tell()) #Now read all rest of this block and start of next block data = h.read(data_len + 1000) self.assertTrue(data in old) self.assertEqual(old.find(data), data_start + data_len // 2) h.close() #Check seek/tell at block boundaries v_offsets = [] for start, raw_len, data_start, data_len in blocks: for within_offset in [0, 1, data_len // 2, data_len - 1]: if within_offset < 0 or data_len <= within_offset: continue voffset = bgzf.make_virtual_offset(start, within_offset) real_offset = data_start + within_offset v_offsets.append((voffset, real_offset)) shuffle(v_offsets) h = bgzf.BgzfReader(filename, "rb", max_cache = 1) for voffset, real_offset in v_offsets: h.seek(0) self.assertTrue(voffset >= 0 and real_offset >= 0) self.assertEqual(h.read(real_offset), old[:real_offset]) self.assertEqual(h.tell(), voffset) for voffset, real_offset in v_offsets: h.seek(voffset) self.assertEqual(h.tell(), voffset) h.close() def test_random_bam_ex1(self): """Check random access to SamBam/ex1.bam""" self.check_random("SamBam/ex1.bam") def test_random_bam_ex1_refresh(self): """Check random access to SamBam/ex1_refresh.bam""" self.check_random("SamBam/ex1_refresh.bam") def test_random_bam_ex1_header(self): """Check random access to SamBam/ex1_header.bam""" self.check_random("SamBam/ex1_header.bam") def test_random_wnts_xml(self): """Check random access to Blast/wnts.xml.bgz""" self.check_random("Blast/wnts.xml.bgz") def test_random_example_fastq(self): """Check random access to Quality/example.fastq.bgz (Unix newlines)""" self.check_random("Quality/example.fastq.bgz") def test_random_example_dos_fastq(self): """Check random access to Quality/example_dos.fastq.bgz (DOS newlines)""" self.check_random("Quality/example_dos.fastq.bgz") def test_random_example_cor6(self): """Check random access to GenBank/cor6_6.gb.bgz""" self.check_random("GenBank/cor6_6.gb.bgz") def test_text_wnts_xml(self): """Check text mode access to Blast/wnts.xml.bgz""" self.check_text("Blast/wnts.xml", "Blast/wnts.xml.bgz") self.check_text_with("Blast/wnts.xml", "Blast/wnts.xml.bgz") def test_text_example_fastq(self): """Check text mode access to Quality/example.fastq.bgz""" self.check_text("Quality/example.fastq", "Quality/example.fastq.bgz") self.check_text_with("Quality/example.fastq", "Quality/example.fastq.bgz") def test_iter_wnts_xml(self): """Check iteration over Blast/wnts.xml.bgz""" self.check_by_line("Blast/wnts.xml", "Blast/wnts.xml.bgz") self.check_by_char("Blast/wnts.xml", "Blast/wnts.xml.bgz") def test_iter_example_fastq(self): """Check iteration over Quality/example.fastq.bgz""" self.check_by_line("Quality/example.fastq", "Quality/example.fastq.bgz") self.check_by_char("Quality/example.fastq", "Quality/example.fastq.bgz") def test_iter_example_cor6(self): """Check iteration over GenBank/cor6_6.gb.bgz""" self.check_by_line("GenBank/cor6_6.gb", "GenBank/cor6_6.gb.bgz") self.check_by_char("GenBank/cor6_6.gb", "GenBank/cor6_6.gb.bgz") def test_iter_example_gb(self): """Check iteration over GenBank/NC_000932.gb.bgz""" self.check_by_line("GenBank/NC_000932.gb", "GenBank/NC_000932.gb.bgz") self.check_by_char("GenBank/NC_000932.gb", "GenBank/NC_000932.gb.bgz") def test_bam_ex1(self): """Reproduce BGZF compression for BAM file""" temp_file = self.temp_file #Note this example is from an old version of samtools #and all the blocks are full (except the last one) self.rewrite("SamBam/ex1.bam", temp_file) #Now check the blocks agree (using the fact that #this example BAM file has simple block usage) self.check_blocks("SamBam/ex1.bam", temp_file) def test_iter_bam_ex1(self): """Check iteration over SamBam/ex1.bam""" self.check_by_char("SamBam/ex1.bam", "SamBam/ex1.bam", True) def test_example_fastq(self): """Reproduce BGZF compression for a FASTQ file""" temp_file = self.temp_file self.rewrite("Quality/example.fastq.gz", temp_file) self.check_blocks("Quality/example.fastq.bgz", temp_file) def test_example_gb(self): """Reproduce BGZF compression for NC_000932 GenBank file""" temp_file = self.temp_file self.rewrite("GenBank/NC_000932.gb.bgz", temp_file) self.check_blocks("GenBank/NC_000932.gb.bgz", temp_file) def test_example_cor6(self): """Reproduce BGZF compression for cor6_6.gb GenBank file""" temp_file = self.temp_file self.rewrite("GenBank/cor6_6.gb.bgz", temp_file) self.check_blocks("GenBank/cor6_6.gb.bgz", temp_file) def test_example_wnts_xml(self): """Reproduce BGZF compression for wnts.xml BLAST file""" temp_file = self.temp_file self.rewrite("Blast/wnts.xml.bgz", temp_file) self.check_blocks("Blast/wnts.xml.bgz", temp_file) def test_write_tell(self): """Check offset works during BGZF writing""" temp_file = self.temp_file h = bgzf.open(temp_file, "w") #Text mode! #When opening new file, offset should be 0 self.assertEqual(h.tell(), 0) h.write("X" * 100000) offset = h.tell() self.assertNotEqual(offset, 100000) #Should be a virtual offset! #After writing the same data two times, size of the first and the second #write should be equal also in terms of offsets #(This is because the flush ensures two identical blocks written) h.flush() offset1 = h.tell() #Note 'offset' and 'offset1' effectively the same, but not equal #due to the flush - 'offet' is at the end of the first BGZF block, #while 'offset1' is at the start of the second BGZF block. In terms #of the decompressed data, they point to the same location! self.assertNotEqual(offset, offset1) #New block started h.write("Magic" + "Y" * 100000) h.flush() offset2 = h.tell() h.write("Magic" + "Y" * 100000) h.flush() offset3 = h.tell() self.assertEqual(((offset3 << 16) - (offset2 << 16)), ((offset2 << 16) - (offset1 << 16))) #Flushing should change the offset h.flush() self.assertNotEqual(offset3, h.tell()) h.close() h = bgzf.open(temp_file, "r") #Text mode! h.seek(offset) #i.e. End of first BGZF block self.assertEqual(offset1, h.tell()) #Note *not* seek offset #Now at start of second BGZF block self.assertEqual(h.read(5), "Magic") h.seek(offset2) self.assertEqual(offset2, h.tell()) self.assertEqual(h.read(5), "Magic") #Now go back in the file, h.seek(offset1) self.assertEqual(offset1, h.tell()) self.assertEqual(h.read(5), "Magic") h.close() if __name__ == "__main__": runner = unittest.TextTestRunner(verbosity = 2) unittest.main(testRunner=runner)