#!/usr/bin/env python
import re
import yaml
import logging
import math
import svgwrite
from pkg_resources import resource_stream
logging.basicConfig()
log = logging.getLogger()
class Dnadigest():
def __init__(self, enzyme_data_file=None):
"""Class to digest DNA strings.
By default a dataset based on the Wikipedia enzyme list is loaded
"""
self.data = ''
self.dna_regex_translations = {
'A': 'A',
'T': 'T',
'C': 'C',
'G': 'G',
'N': '.',
'M': '[AC]',
'R': '[AG]',
'W': '[AT]',
'Y': '[CT]',
'S': '[CG]',
'K': '[GT]',
'H': '[^G]',
'B': '[^A]',
'V': '[^T]',
'D': '[^C]',
}
# These provide translations between the ambiguity codes.
self.complement_regex = {
'A': 'T',
'C': 'G',
'N': 'N',
'H': 'D',
'B': 'V',
'M': 'K',
'R': 'Y',
'W': 'S',
}
# Less typing
for k in self.complement_regex.keys():
self.complement_regex[self.complement_regex[k]] = k
if enzyme_data_file is None:
handle = resource_stream(__name__, 'rebase.yaml')
self.load_enzyme_data(handle)
else:
with open(enzyme_data_file, 'r') as handle:
self.load_enzyme_data(handle)
def load_enzyme_data(self, data_handle):
data_structure = yaml.load(data_handle)
self.enzyme_dict = {}
for enzyme_key in data_structure:
enzyme = data_structure[enzyme_key]
if len(enzyme['cut'][0]) != len(enzyme['cut'][1]):
log.warning("Cannot use %s; no support for non-matching cuts" %
enzyme['enzyme'])
elif len(enzyme['cut']) != 2:
log.warning("Cannot use %s; too many cut sites" %
enzyme['enzyme'])
else:
# Convert
# d['k'] = ["5' asdfasdf", "3' asdfasdf"]
# to
# d['k'] = {"5": "asdfasdf", "3": "asdfasdf" }
enzyme['recognition_sequence'] = {
x[0]: x[3:] for x in enzyme['recognition_sequence']
}
enzyme['cut'] = {
x[0]: x[3:-3] for x in enzyme['cut']
}
enzyme['sense_cut_idx'] = self.determine_cut_index(enzyme)
self.enzyme_dict[enzyme['enzyme']] = enzyme
@classmethod
def determine_cut_index(cls, enzyme):
return enzyme['cut']['5'].strip('-').index(' ')
def generate_regex_str(self, recognition_sequence):
return ''.join([self.dna_regex_translations[x] for x in
recognition_sequence])
def matcher(self, sequence, recognition_sequence):
regex = re.compile(self.generate_regex_str(recognition_sequence))
return len(regex.findall(sequence)) != 0
def __merged_iter(cls, *args):
"""Treat multiple iterators as a single iterator
"""
for arg in args:
for x in arg:
yield x
def string_cutter(self, sequence, recognition_fr, recog_nucl_index,
status):
"""Cut a sequence with a 5'+3' cut recognition site
"""
rec_f_exp = self.expand_multiple(recognition_fr['5'])
rec_r_exp = self.expand_multiple(recognition_fr['3'])
rec_seq_f = re.compile(self.generate_regex_str(rec_f_exp))
rec_seq_r = re.compile(self.generate_regex_str(rec_r_exp))
# TODO: try and make this appx. the length of the cut site, we don't
# want to have a case where we match TWO times within the wrapped
# around section
wrap_around = 15
fragments = []
prev_index = 0
if status == 'circular':
# Add a little bit on the end where it'd "wrap"
mod_sequence = sequence + sequence[0:wrap_around]
match_list = self.__merged_iter(
rec_seq_f.finditer(mod_sequence),
rec_seq_r.finditer(mod_sequence)
)
# Track where our first cut was made
first_cut = None
# Cleanup for some corner cases
remove_first_fragment = False
for match in match_list:
adjusted_recog = \
self.__adjust_recog_for_strand(recog_nucl_index, rec_f_exp,
match.group(0))
cut_location = match.start() + adjusted_recog
if first_cut is None:
# If this is the first cut, in order to handle some nasty
# corner cases more nicely, just recursively call ourselves
# with the strand opened at the first cut site.
if cut_location < len(sequence):
reopened_sequence = sequence[cut_location:] + \
sequence[0:cut_location]
else:
reopened_sequence = mod_sequence[cut_location:] + \
mod_sequence[wrap_around:cut_location]
return self.string_cutter(reopened_sequence,
recognition_fr, adjusted_recog,
'linear')
# If this is a "normal" cut, append the new fragment from the
# previous cut site to here
remove_first_fragment = True
if cut_location < len(sequence):
fragments.append(mod_sequence[prev_index:cut_location])
prev_index = cut_location
else:
# This is not a normal cut, i.e. in the wrapped sequenc
# This case is a bit complicated:
# - need to add the correct fragment
# - need to removeleading characters from the first
# fragment (and ensure it wasn't detected there too)
if first_cut == cut_location - len(sequence):
# First cut was in the same position as this, so we
# delete first fragment and trim up to this cut
# location here.
fragments.append(mod_sequence[prev_index:cut_location])
break
else:
# This cut was NOT caught by the first cut, so this
# means that there's some serious overlap and we cannot
# delete the first fragment.
#
# This is a REALLY unpleasant case.
# Get the full first fragment by taking the first
# fragment with the "latest" sequence, not including
# the wrap around
full_first_fragment = mod_sequence[prev_index:] + \
fragments[0]
remapped_cut_location = cut_location - prev_index
fragments.append(full_first_fragment[0:remapped_cut_location])
fragments.append(full_first_fragment[remapped_cut_location:])
if remove_first_fragment and len(fragments) > 1:
del fragments[0]
else:
match_list = self.__merged_iter(
rec_seq_f.finditer(sequence),
rec_seq_r.finditer(sequence)
)
for match in match_list:
adjusted_recog = \
self.__adjust_recog_for_strand(recog_nucl_index, rec_f_exp,
match.group(0))
cut_location = match.start() + adjusted_recog
fragments.append(sequence[prev_index:cut_location])
prev_index = cut_location
fragments.append(sequence[prev_index:])
# Instead of returning status, if len(fragments) > 1: status='linear'
return fragments
def find_cut_sites(self, sequence, enzyme_list, status='circular'):
"""Primarily for use with the drawer() method
"""
enzymes = self.enzyme_dict_filter(self.enzyme_dict, enzyme_list)
cut_sites = {}
for enzyme in enzymes:
sites, status = self.__find_cut_sites(sequence,
self.enzyme_dict[enzyme]['recognition_sequence'],
self.enzyme_dict[enzyme]['sense_cut_idx'],
'circular')
for site in sites:
try:
cut_sites[site].append(enzyme)
except KeyError:
cut_sites[site] = [enzyme]
return cut_sites
def __find_cut_sites(self, sequence, recognition_fr, recog_nucl_index,
status):
"""Find all cut locations in a sequence with a 5'+3' cut recognition
site """
rec_f_exp = self.expand_multiple(recognition_fr['5'])
rec_r_exp = self.expand_multiple(recognition_fr['3'])
rec_seq_f = re.compile(self.generate_regex_str(rec_f_exp))
rec_seq_r = re.compile(self.generate_regex_str(rec_r_exp))
cut_locations = []
if status == 'circular':
# Add a little bit on the end where it'd "wrap"
mod_sequence = sequence + sequence[0:15]
match_list = self.__merged_iter(
rec_seq_f.finditer(mod_sequence),
rec_seq_r.finditer(mod_sequence)
)
for match in match_list:
adjusted_recog = \
self.__adjust_recog_for_strand(recog_nucl_index, rec_f_exp,
match.group(0))
cut_location = match.start() + adjusted_recog
if cut_location < len(sequence):
cut_locations.append(cut_location)
else:
tmp = cut_location - len(sequence)
if tmp not in cut_locations:
cut_locations.append(tmp)
else:
match_list = self.__merged_iter(
rec_seq_f.finditer(sequence),
rec_seq_r.finditer(sequence)
)
for match in match_list:
adjusted_recog = \
self.__adjust_recog_for_strand(recog_nucl_index, rec_f_exp,
match.group(0))
cut_location = match.start() + adjusted_recog
cut_locations.append(cut_location)
return cut_locations, status
def __adjust_recog_for_strand(self, recog_nucl_index, plus_reference,
matchstr):
# If the matched group is the plus sense strand, then cut site is FINE
plus_ref_re = re.compile(self.generate_regex_str(plus_reference))
if plus_ref_re.match(matchstr):
return recog_nucl_index
else:
# Otherwise, invert it against length of matchstr
return len(matchstr) - recog_nucl_index
def string_processor(self, fragment_list, recognition_fr, recog_nucl_index,
status):
new_fragment_list = []
did_cut = False
for fragment in fragment_list:
fragments = self.string_cutter(fragment, recognition_fr,
recog_nucl_index, status)
if status == 'circular' and len(fragments) > 0:
status = 'linear'
if len(fragments) > 0:
did_cut = True
new_fragment_list += fragments
# Ensure we return a complete fragment list and not empty
if len(new_fragment_list) == 0:
return fragment_list, status, False
else:
return new_fragment_list, status, did_cut
def expand_multiple(self, base_str):
m = re.search('(?P[A-Z])(?P[0-9]+)', base_str)
try:
# Get position of first match
base = m.group('base')
count = int(m.group('count'))
# Create a fixed string with those bases replaced properly
replaced = base_str[0:m.start('base')] + \
base * count + \
base_str[m.end('count'):]
# Recurse to replace any more instances of [ACTG][0-9]+
return self.expand_multiple(replaced)
except AttributeError:
return base_str
def enzyme_dict_filter(self, data, cut_list):
# TODO: need to include isoscizomers, but current data structure
# doesn't allow for that.
#
# For the time being, just remove all enzymes that the user didn't
# request
good = {}
for enzyme in data:
if enzyme in cut_list:
good[enzyme] = data[enzyme]
elif 'isoscizomers' in data[enzyme]:
for iso in data[enzyme]['isoscizomers']:
if iso in cut_list:
good[enzyme] = data[enzyme]
continue
return good
def process_data(self, seq, cut_with, status='circular'):
filtered_enzyme_dict = self.enzyme_dict_filter(self.enzyme_dict,
cut_with)
fragment_list = [seq]
cuts = []
for enzyme in filtered_enzyme_dict:
log.info('Cutting [%s] with %s' % (','.join(fragment_list),
enzyme))
(fragment_list, status, did_cut) = \
self.string_processor(fragment_list,
filtered_enzyme_dict[enzyme]['recognition_sequence'],
filtered_enzyme_dict[enzyme]['sense_cut_idx'],
status)
if did_cut:
cuts.append(enzyme)
return {
'fragment_list': fragment_list,
'cut_with': cuts,
'status': status,
}
@classmethod
def drawer(cls, sequence_length, cut_sites, sequence_id="PhiX", radius=250,
border=50):
"""Print SVG in plasmid digest style
"""
image_size = 2 * (radius + border)
center = (radius+border, radius+border)
svg_document = svgwrite.Drawing(size=("%spx" % image_size, "%spx" %
image_size))
svg_document.add(svg_document.circle(center=center, r=radius,
fill="rgb(255, 255, 255)",
stroke="black"))
# TODO: offset text based on length of string
svg_document.add(svg_document.text('>' + sequence_id, insert=center))
for cut_site in sorted(cut_sites.keys()):
tau = 2 * math.pi
angle_in_radians = (float(cut_site)/float(sequence_length)) * tau
point_from = cls.__cast_ray(center, angle_in_radians, radius - 20)
point_to = cls.__cast_ray(center, angle_in_radians, radius + 20)
svg_document.add(svg_document.line(start=point_from, end=point_to,
stroke="black"))
text_loc = cls.__cast_ray(center, angle_in_radians, radius+40)
for i, enzyme in enumerate(cut_sites[cut_site]):
text_loc[1] += i * 10
svg_document.add(svg_document.text(enzyme, insert=text_loc))
return svg_document.tostring()
@classmethod
def __cast_ray(cls, center, angle, length):
# Now we need to find a vector of length start_distance from center
return [length * math.cos(angle) + center[0],
length * math.sin(angle) + center[1]]