#!/usr/bin/env python3 """ optparse is deprecated in favor of argparse as of Python 2.7. However, since 2.7 is not always present on many systems, at this writing, it is safer to stick with optparse for now. It should be easy to change later, since the syntax is very similar between argparse and optparse. from optparse import OptionParser """ from optparse import OptionParser import datetime import getpass import os import re import stat import subprocess import sys import time ''' bl_trim_galore.py - Set environment variables for BioLegato Helper Applications Synopsis: bl_trim_galore.py --tsv tsvfile [options] The --adapter, --illumina, --nextera and --small_rna options of trim_galore have been distilled into a single option: --adapter illumina|nexera|small_rna|sequence where sequence is any adapter sequence. The first three choices will call the --illumina, --nextera and --small_rna options. If anything else is specified, the --adapter option will be included. @modified: Feb. 29, 2020 @author: Brian Fristensky @contact: Brian.Fristensky@umanitoba.ca ''' #blib = os.environ.get("BIRCHPYLIB") #sys.path.append(blib) #from birchlib import Birchmod PROGRAM = "bl_trim_galore.py : " USAGE = "\n\tUSAGE: bl_trim_galore.py [options] filenames..." DEBUG = True if DEBUG : print('bl_trim_galore: Debugging mode on') #BM = Birchmod(PROGRAM, USAGE) # - - - - - - - - - - - - - Utility classes - - - - - - - - - - - - - - - - - def chmod_ar(filename): """ Make a file world-readable. """ if os.path.exists(filename): st = os.stat(filename) os.chmod(filename, st.st_mode | stat.S_IREAD \ | stat.S_IRGRP | stat.S_IROTH) def chmod_arx(filename): """ Make a file or directory world-readable and world-executable/searchable. """ if os.path.exists(filename): st = os.stat(filename) os.chmod(filename, st.st_mode | stat.S_IEXEC | stat.S_IREAD \ | stat.S_IXGRP | stat.S_IRGRP | stat.S_IXOTH \ | stat.S_IROTH) def LocalHostname(): """ Return the name of the local machine. Tries a number of methods to get a name other than 'localhost' or a null result. """ import socket import platform def CheckName(name) : if name == None or name.startswith("localhost") or name == "" : OKAY = False else : OKAY = True return OKAY name = os.getenv('HOSTNAME') if not CheckName(name) : name = platform.uname()[1] if not CheckName(name) : if socket.gethostname().find('.')>=0: name=socket.gethostname() else: name=socket.gethostbyaddr(socket.gethostname())[0] return name def SendEmail(From,To,Subject,Text) : """ Very simple email method adapted from: http://stackoverflow.com/questions/882712/sending-html-email-using-python There are more elaborate examples on this site for sending HTML messages and attachments. """ import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText Host = 'localhost' msg = MIMEMultipart('alternative') msg['Subject'] = Subject Html = """\

%s

""" %(Text) part1 = MIMEText(Text, 'plain') part2 = MIMEText(Html, 'html') msg.attach(part1) msg.attach(part2) try: server = smtplib.SMTP(Host) server.sendmail(From, To, msg.as_string()) server.quit() print("Successfully sent email") except: print("Error: unable to send email") # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - class Parameters: """ Wrapper class for command line parameters """ def __init__(self): """ Initializes arguments: TSVFILE = "" QUALITY = 20 FASTQC = False FQCTHREADS = 0 ADAPTER = "" ADAPTER2 = "" STRINGENCY = 1 ERRORRATE = 0.1 GZIP = True DONT_GZIP = False LENGTH = 20 OUTDIR = "" TRIM1 = False EMAIL = "" Then calls read_args() to fill in their values from command line """ self.TSVFILE = "" self.QUALITY = 20 self.FASTQC = False self.FQCTHREADS = 0 self.ADAPTER = "" self.ADAPTER2 = "" self.STRINGENCY = 1 self.ERRORRATE = 0.1 self.GZIP = False self.DONT_GZIP = False self.LENGTH = 20 self.OUTDIR = "" self.TRIM1 = False self.EMAIL = "" self.read_args() if DEBUG : print('------------ Parameters from command line ------') print(' TSVFILE: ' + self.TSVFILE) print(' QUALITY: ' + str(self.QUALITY)) print(' FASTQC: ' + str(self.FASTQC)) print(' FQCTHREADS: ' + str(self.FQCTHREADS)) print(' ADAPTER: ' + self.ADAPTER) print(' ADAPTER2: ' + self.ADAPTER2) print(' STRINGENCY: ' + str(self.STRINGENCY)) print(' ERRORRATE: ' + str(self.ERRORRATE)) print(' GZIP: ' + str(self.GZIP)) print(' DONT_GZIP: ' + str(self.DONT_GZIP)) print(' LENGTH: ' + str(self.LENGTH)) print(' OUTDIR: ' + self.OUTDIR) print(' TRIM1: ' + str(self.TRIM1)) print(' EMAIL: ' + self.EMAIL) print() def read_args(self): """ Read command line arguments into a Parameter object """ parser = OptionParser() parser.add_option("--tsvfile", dest="tsvfile", action="store", default="", help="list of paired/unpaired read files") parser.add_option("--quality", dest="quality", action="store", default=20, help="trim N bases in addition to adapter removal") parser.add_option("--fastqc", dest="fastqc", action="store_true", default=False, help="run FASTQC on trimmed reads") parser.add_option("--fqcthreads", dest="fqcthreads", action="store", default=0, help="number of threads to use when running fastqc") parser.add_option("--adapter", dest="adapter", action="store", default="", help="adapter to be trimmed") parser.add_option("--adapter2", dest="adapter2", action="store", default="", help="optional adapter to trim from 2nd read in paired-end files") parser.add_option("--stringency", dest="stringency", action="store", default=1, help="nucleotides to trim from overlap with adapter") parser.add_option("-e", dest="errorrate", action="store", default=0.1, help="Max. allowed errors") parser.add_option("--gzip", dest="gzip", action="store_true", default=False, help="Compress output using gzip") parser.add_option("--dont_gzip", dest="dont_gzip", action="store_true", default=False, help="Do not compress output using gzip") parser.add_option("--length", dest="length", action="store", default=20, help="discard reads shorter than N") parser.add_option("--trim1", dest="trim1", action="store_true", default=False, help="trim 1 bp off every read at 3' end.") parser.add_option("--email", dest="email", action="store", default="", help="send email upon completion") parser.add_option("--output_dir", dest="outdir", action="store", default="", help="output directory") (options, args) = parser.parse_args() self.TSVFILE = options.tsvfile self.QUALITY = options.quality self.FASTQC = options.fastqc self.FQCTHREADS = int(options.fqcthreads) self.ADAPTER = options.adapter self.ADAPTER2 = options.adapter2 self.STRINGENCY = options.stringency self.ERRORRATE = options.errorrate self.GZIP = options.gzip self.DONT_GZIP = options.dont_gzip self.LENGTH = options.length self.OUTDIR = options.outdir self.EMAIL = options.email self.TRIM1 = options.trim1 # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - class TSVFiles : """ Methods for reading lists of paired read TSV files, and for writing lists to output. """ def __init__(self): """ Initializes arguments: READPAIRS = [] """ self.READPAIRS = [] def ReadTSVfile(self,FN) : """ TSV file containing names of paired-end and/or single end read files. Paired-end files are on lines such as leftreadfile.fq.gzrightreadfile.fq.gz Single-end files have a each file on a separate line reads1.fq.gz reads2.fq.gz reads3.fq.gz """ TAB = '\t' F = open(FN,"r") for line in F.readlines() : line = line.strip() if len(line) > 0 and not line.startswith('#') : # get rid of double quotes that enclose fields when some programs write # output, and then split by TABs. tokens = line.replace('"','').split(TAB) # ignore blank fields. Add either single or pair of filenames # to list. Only process names from first two fields on a line # and ignore other fields. if len(tokens) > 0 : r1 = tokens[0].strip() if len(r1) > 0 : fnames = [r1] else : fnames = [] if len(tokens) > 1 : r2 = tokens[1].strip() if len(r2) > 0 : fnames.append(r2) if len(fnames) > 0 : self.READPAIRS.append(fnames) if DEBUG : print(str(self.READPAIRS)) F.close() # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - def RunTrimGalore(P,PR,LOGFILE) : # Construct the command string - - - - - - - - - - - - - - COMMAND=["trim_galore","--quality", str(P.QUALITY)] if P.FASTQC : COMMAND.append('--fastqc_args') if P.FQCTHREADS > 0 : COMMAND.append('"--threads ' + str(P.FQCTHREADS) + '"') if P.ADAPTER == 'illumina' : COMMAND.append('--illumina') elif P.ADAPTER == 'nextera' : COMMAND.append('--nextera') elif P.ADAPTER == 'small_rna' : COMMAND.append('--small_rna') elif P.ADAPTER != "" : COMMAND.extend(['--adapter',P.ADAPTER]) if P.ADAPTER2 != "" : COMMAND.extend(['--adapter2',P.ADAPTER2]) COMMAND.extend(['--stringency',str(P.STRINGENCY)]) COMMAND.extend(['-e',str(P.ERRORRATE)]) if P.GZIP : COMMAND.append('--gzip') # --dont_gzip overrides --gzip if P.DONT_GZIP : COMMAND.append('--dont_gzip') COMMAND.extend(['--length',str(P.LENGTH)]) if P.OUTDIR != "" : COMMAND.extend(['--output_dir',P.OUTDIR]) if len(PR) == 2 : COMMAND.append('--paired') if P.TRIM1 : COMMAND.append('--trim1') COMMAND.extend([PR[0],PR[1]]) else : COMMAND.extend([PR[0]]) # Run the command - - - - - - - - - - - - - - - - - LOGFILE.write('======== trim_galore ==========' + '\n') LOGFILE.write(str(COMMAND) + '\n') StartTime = datetime.datetime.now() LOGFILE.write('Start time: ' + str(StartTime) + '\n') LOGFILE.write('\n') LOGFILE.flush() #LOGFILE.write('COMMAND: ' + str(COMMAND) + '\n') p = subprocess.Popen(COMMAND,stdout=LOGFILE) p.wait() FinishTime = datetime.datetime.now() LOGFILE.write('\n') LOGFILE.write('Finish time: ' + str(FinishTime) + '\n') ElapsedTime = FinishTime - StartTime LOGFILE.write('Elapsed time: ' + str(ElapsedTime) + '\n') #======================== MAIN PROCEDURE ========================== def main(): """ Called when not in documentation mode. """ # Read parameters from command line P = Parameters() TF = TSVFiles() if not P.TSVFILE == "" : TF.ReadTSVfile(P.TSVFILE) OKAY = True else : OKAY = False # Create output directory, if it doesn't already exist. if OKAY and P.OUTDIR != "" : if not os.path.isdir(P.OUTDIR) : os.mkdir(P.OUTDIR) LOGFN = os.path.join(P.OUTDIR,'bl_trim_galore.log') LOGFILE = open(LOGFN,'w') LOGFILE.write('\n') # Run trim_galore if OKAY : for PR in TF.READPAIRS : RunTrimGalore(P,PR,LOGFILE) LOGFILE.close() # Notify user when job is done, if email address was # supplied using --email if P.EMAIL != "" : Sender = getpass.getuser() + '@' + LocalHostname() Subject = 'bl_trim_galore.py completed' Message = 'bl_trim_galore.py: Completed
' LOGFILE = open(os.path.join('bl_trim_galore.log'),'r') for line in LOGFILE.readlines() : Message = Message + line + '
' LOGFILE.close() SendEmail(Sender,[P.EMAIL],Subject,Message) if __name__ == "__main__": main() #else: #used to generate documentation # import doctest # doctest.testmod() #if (BM.documentor() or "-test" in sys.argv): # pass #else: # main()