# kate: syntax Python; # cython: profile=False, emit_code_comments=False from __future__ import print_function, division, absolute_import from xopen import xopen from .seqio import _shorten, FormatError, SequenceReader cimport cython # It would be nice to be able to have the first parameter be a # unsigned char[:] (memory view), but this fails with a BufferError # when a bytes object is passed in. # See ctypedef fused bytes_or_bytearray: bytes bytearray #@cython.boundscheck(False) def head(bytes_or_bytearray buf, Py_ssize_t lines): """ Skip forward by a number of lines in the given buffer and return how many bytes this corresponds to. """ cdef: Py_ssize_t pos = 0 Py_ssize_t linebreaks_seen = 0 Py_ssize_t length = len(buf) unsigned char* data = buf while linebreaks_seen < lines and pos < length: if data[pos] == '\n': linebreaks_seen += 1 pos += 1 return pos def fastq_head(bytes_or_bytearray buf, Py_ssize_t end=-1): """ Return an integer length such that buf[:length] contains the highest possible number of complete four-line records. If end is -1, the full buffer is searched. Otherwise only buf[:end]. """ cdef: Py_ssize_t pos = 0 Py_ssize_t linebreaks = 0 Py_ssize_t length = len(buf) unsigned char* data = buf Py_ssize_t record_start = 0 if end != -1: length = min(length, end) while True: while pos < length and data[pos] != '\n': pos += 1 if pos == length: break pos += 1 linebreaks += 1 if linebreaks == 4: linebreaks = 0 record_start = pos # Reached the end of the data block return record_start def two_fastq_heads(bytes_or_bytearray buf1, bytes_or_bytearray buf2, Py_ssize_t end1, Py_ssize_t end2): """ Skip forward in the two buffers by multiples of four lines. Return a tuple (length1, length2) such that buf1[:length1] and buf2[:length2] contain the same number of lines (where the line number is divisible by four). """ cdef: Py_ssize_t pos1 = 0, pos2 = 0 Py_ssize_t linebreaks = 0 unsigned char* data1 = buf1 unsigned char* data2 = buf2 Py_ssize_t record_start1 = 0 Py_ssize_t record_start2 = 0 while True: while pos1 < end1 and data1[pos1] != '\n': pos1 += 1 if pos1 == end1: break pos1 += 1 while pos2 < end2 and data2[pos2] != '\n': pos2 += 1 if pos2 == end2: break pos2 += 1 linebreaks += 1 if linebreaks == 4: linebreaks = 0 record_start1 = pos1 record_start2 = pos2 # Hit the end of the data block return record_start1, record_start2 cdef class Sequence(object): """ A record in a FASTQ file. Also used for FASTA (then the qualities attribute is None). qualities is a string and it contains the qualities encoded as ascii(qual+33). If an adapter has been matched to the sequence, the 'match' attribute is set to the corresponding Match instance. """ cdef: public str name public str sequence public str qualities public bint second_header public object match def __init__(self, str name, str sequence, str qualities=None, bint second_header=False, match=None): """Set qualities to None if there are no quality values""" self.name = name self.sequence = sequence self.qualities = qualities self.second_header = second_header self.match = match if qualities is not None and len(qualities) != len(sequence): rname = _shorten(name) raise FormatError("In read named {0!r}: length of quality sequence ({1}) and length " "of read ({2}) do not match".format( rname, len(qualities), len(sequence))) def __getitem__(self, key): """slicing""" return self.__class__( self.name, self.sequence[key], self.qualities[key] if self.qualities is not None else None, self.second_header, self.match) def __repr__(self): qstr = '' if self.qualities is not None: qstr = ', qualities={0!r}'.format(_shorten(self.qualities)) return ''.format(_shorten(self.name), _shorten(self.sequence), qstr) def __len__(self): return len(self.sequence) def __richcmp__(self, other, int op): if 2 <= op <= 3: eq = self.name == other.name and \ self.sequence == other.sequence and \ self.qualities == other.qualities if op == 2: return eq else: return not eq else: raise NotImplementedError() def __reduce__(self): return (Sequence, (self.name, self.sequence, self.qualities, self.second_header, self.match)) class FastqReader(SequenceReader): """ Reader for FASTQ files. Does not support multi-line FASTQ files. """ def __init__(self, file, sequence_class=Sequence): """ file is a filename or a file-like object. If file is a filename, then .gz files are supported. """ super(FastqReader, self).__init__(file) self.sequence_class = sequence_class self.delivers_qualities = True def __iter__(self): """ Yield Sequence objects """ cdef int i = 0 cdef int strip cdef str line, name, qualities, sequence, name2 sequence_class = self.sequence_class it = iter(self._file) line = next(it) if not (line and line[0] == '@'): raise FormatError("Line {0} in FASTQ file is expected to start with '@', but found {1!r}".format(i+1, line[:10])) strip = -2 if line.endswith('\r\n') else -1 name = line[1:strip] i = 1 for line in it: if i == 0: if not (line and line[0] == '@'): raise FormatError("Line {0} in FASTQ file is expected to start with '@', but found {1!r}".format(i+1, line[:10])) name = line[1:strip] elif i == 1: sequence = line[:strip] elif i == 2: if line == '+\n': # check most common case first name2 = '' else: line = line[:strip] if not (line and line[0] == '+'): raise FormatError("Line {0} in FASTQ file is expected to start with '+', but found {1!r}".format(i+1, line[:10])) if len(line) > 1: if not line[1:] == name: raise FormatError( "At line {0}: Sequence descriptions in the FASTQ file don't match " "({1!r} != {2!r}).\n" "The second sequence description must be either empty " "or equal to the first description.".format(i+1, name, line[1:])) second_header = True else: second_header = False elif i == 3: if len(line) == len(sequence) - strip: qualities = line[:strip] else: qualities = line.rstrip('\r\n') yield sequence_class(name, sequence, qualities, second_header=second_header) i = (i + 1) % 4 if i != 0: raise FormatError("FASTQ file ended prematurely")