#!/usr/bin/env python3 #optparse is deprecated in favor of argparse as of Python 2.7. However, # since 2.7 is not always present on many systems, at this writing, # it is safer to stick with optparse for now. It should be easy # to change later, since the syntax is very similar between argparse and optparse. from optparse import OptionParser import os import os.path import stat import subprocess import sys import re import shutil from Bio import Entrez ''' ncbiquery.py - Send an Entrez query to NCBI and return the result. Synopsis: ncbiquery.py --db database [--qfile qfile_name] --query entrez_query_statement [--filter entrez_filter_statement] [-maxcount integer] --format sequence_format --out outfile ncbiquery.py --qfile qfile_name --related [-maxcount integer] --format sequence_format --out outfile ncbiquery.py --qfile qfile_name --target database [-maxcount integer] --format sequence_format --out outfile --qfile - output from a previous ncbiquery.py run. This run will refine the previous search. --query entrez_query_statement - search parameters --filter entrez_filter_statement - limit search based on other parameters --related - look up neighbors in current database --target database - find links in a different database --format sequence_format - format as defined by NCBI EUtilities. --db - NCBI database as defined for EUtilities --out - output file @modified: July 12, 2020 @author: Brian Fristensky @contact: frist@cc.umanitoba.ca ''' blib = os.environ.get("BIRCHPYLIB") sys.path.append(blib) from birchlib import Birchmod from birchlib import SimpleXML PROGRAM = "ncbiquery.py : " USAGE = "\n\tUSAGE: ncbiquery.py --db database [--qfile qfile_name] --query entrez_query_statement [--filter entrez_filter_statement] [--related] [--target database] [-maxcount integer] --format efetch_format --out outfile" DEBUG = True if DEBUG : print('Debugging mode on') BM = Birchmod(PROGRAM, USAGE) HOMEDIR = BM.getHomeDir() # To use an Entrez API key and email address. These variables can be set once for the Entrez class, # and will be used for all EUtil calls. Entrez.api_key = os.getenv("NCBI_ENTREZ_KEY", default=None) Entrez.email = os.getenv("BL_EMAIL", default=None) if DEBUG : print("Entrez.api_key: " + str(Entrez.api_key)) print('Entrez.email: ' + str(Entrez.email)) # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - class Parameters: """ Wrapper class for command line parameters """ def __init__(self): """ Initializes arguments: QFILE = "" EQUERY="" EFILTER="" RELATED=False TARGET="" MAXCOUNT = 500 DATABASE = "nuccore" FORMAT="" OFN="" Then calls read_args() to fill in their values from command line """ self.QFILE = "" self.EQUERY = "" self.EFILTER = "" self.RELATED = False self.TARGET = "" self.MAXCOUNT = int(500) self.DATABASE = "nuccore" self.FORMAT = "" self.PID = str(os.getpid()) self.OFN = "" self.read_args() def read_args(self): """ Read command line arguments into a Parameter object """ parser = OptionParser() parser.add_option("--qfile", dest="qfile", action="store", type="string", default="", help="ncbiquery file from a previous search") parser.add_option("--query", dest="query", action="store", type="string", default="", help="ncbiquery query term") parser.add_option("--filter", dest="filter", action="store", type="string", default="", help="filter for query") parser.add_option("--related", dest="related", action="store_true", default=False, help="search for Entrez neighbors in same database") parser.add_option("--target", dest="target", action="store", type="string", default="", help="search for Entrez neighbors in target database") parser.add_option("--maxcount", dest="maxcount", action="store", type="int", default="500", help="filter for query") parser.add_option("--db", dest="database", action="store", type="string", default="nuccore", help="database to search") parser.add_option("--format", dest="format", action="store", type="string", default="", help="format for output file") parser.add_option("--out", dest="outfile", action="store", type="string", default="", help="output filename") (options, args) = parser.parse_args() self.QFILE = options.qfile self.EQUERY = options.query self.EFILTER = options.filter self.RELATED = options.related self.TARGET = options.target self.MAXCOUNT = options.maxcount self.DATABASE = options.database self.FORMAT = options.format self.OFN = options.outfile if DEBUG : print('------------ Parameters from command line ------' + '\n') print(' QFILE: ' + self.QFILE) print(' EQUERY: ' + self.EQUERY) print(' EFILTER: ' + self.EFILTER) print(' RELATED: ' + str(self.RELATED)) print(' TARGET: ' + self.TARGET) print(' MAXCOUNT: ' + str(self.MAXCOUNT)) print(' FORMAT: ' + self.FORMAT) print(' DATABASE: ' + self.DATABASE) print(' OUTFILE: ' + self.OFN) # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - class NCBIhistory: """ Classes for local storage of NCBI query and search history. The $HOME/.ncbi_history directory stores .xml files produced by EUtils. These files can, in turn, be used to reference results from previous queries or searches. An important reference regarding use of histories for Entrez functions can be found at https://www.ncbi.nlm.nih.gov/books/NBK25497/ under the heading "Using the Entrez History Server". """ def __init__(self): """ Creates $HOME/.ncbi_history directory, if it doesn't already exist. """ self.HISDIR = os.path.join(HOMEDIR, '.ncbi_history') self.PComments = [] if ( not os.path.isdir(self.HISDIR) ) : os.mkdir(self.HISDIR,0o700) # These fields are the payload of the history self.Db = "" self.WebEnv = "" self.QueryKey = "" self.Count = 0 # Get rid of old hisory files self.CleanHistDir() def CleanHistDir(self) : """ Removes old xml files the refer to search history items that are no longer present at NCBI. Typically, NCBI deletes history items after 3 days. """ import time for fname in os.listdir(self.HISDIR) : fullpath = os.path.join(self.HISDIR,fname) mtime = os.stat(fullpath).st_mtime now = time.time() if (mtime < now - 3 * 86400 ) : if DEBUG: print('Removing ' + fullpath) os.remove(fullpath) def MoveToHistDir(self,HFN,WEBENV) : """ Adds a search file to the history directory Use the WEBENV as the basename for the file. """ DEST = os.path.join(self.HISDIR, WEBENV + '.xml') shutil.move(HFN,DEST) os.chmod(DEST,0o600) def ReadHistory(self,HFile) : """ Read pseudocomment lines from a previous history file To facilitate reading of CSV and TSV files, double quote (") characters are stripped out, and the line is truncated at the first comma, and at the first tab, if those exist. That way we don't have to know if the field separator is tab or comma. eg. "#QUERY: Pisum [ORGN]","","","" """ h_HFile = open(HFile,"r") line = h_HFile.readline() while (line != "") and (line.find("#") > -1 ) : line = line.replace('"','') line = line.split(',')[0] line = line.split('\t')[0] line = line.strip() self.PComments.append(line) line = h_HFile.readline() h_HFile.close() if DEBUG and len(self.PComments) > 0 : print('----------- Pseudo Comments from query file -----------------' + '\n') for line in self.PComments : print(line) def GetValues(self,handle) : """ Use Bio.Entrez.read to retrieve the values from a handle. The handle is returned by calls to Bio.Entrez functions such as esearch and efetch. There doesn't seem to be database field in the XML returned by Entrez. We have to set that separately. """ result = Entrez.read(handle) handle.close() self.WebEnv = result["WebEnv"] self.QueryKey = result["QueryKey"] self.Count = int(result["Count"]) def WriteHistory(self,HFile) : """ Write an XML history file to HFile """ h_HFile = open(HFile,"w") h_HFile.write('' + '\n') h_HFile.write(' ' + self.Db + '\n') h_HFile.write(' ' + self.WebEnv + '\n') h_HFile.write(' ' + self.QueryKey + '\n') h_HFile.write(' ' + str(self.Count) + '\n') h_HFile.write('' + '\n') h_HFile.close() def ShowValues(self) : """ Print values of history variables for debugging. """ print(' Db: ' + self.Db) print(' WebEnv: ' + self.WebEnv) print(' QueryKey: ' + self.QueryKey) print(' Count: ' + str(self.Count)) # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - def WriteHeader(PComments,EQUERY,WEBENV,QKEY,EFILTER,RELATED,TARGET,DATABASE,h_OUTFILE): """ Write header information to the beginning of OFN. This file stays open so that we can write more information as the search proceeds. """ #Header(s) from previous queries, if they are supplied using --qfile for line in PComments : h_OUTFILE.write(line + '\n') if len(PComments) > 0 : h_OUTFILE.write('#------------------------------ AND ------------------------------' + '\n') #Header info from current run h_OUTFILE.write('#ncbiquery.py' + '\n') if DATABASE != "" : h_OUTFILE.write('#DATABASE: ' + DATABASE + '\n') if EQUERY != "" : h_OUTFILE.write('#QUERY: ' + EQUERY + '\n') if WEBENV != "" : h_OUTFILE.write('#WEBENV: ' + WEBENV + '\n') if QKEY != "" : h_OUTFILE.write('#QKEY: ' + QKEY + '\n') if EFILTER != "" : h_OUTFILE.write('#FILTER: ' + EFILTER + '\n') if RELATED : h_OUTFILE.write('#RELATED: ' + str(RELATED) + '\n') if TARGET != "" : h_OUTFILE.write('#TARGET: ' + TARGET + '\n') # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - def ParsePseudoCom(PComments,TargetName): """ Get the value of the variable specified by TargetName from the list of Pseudocomments in PComments. Variables are in pseudocomments of the form: #TargetName:Value\n eg. #QUERY: Pisum [ORGN] """ Value = "" Target = '#' + TargetName + ':' LINENUM = 0 # commented line stops at first occurrence of the target #while (LINENUM < len(PComments)) and (Value == "") : while (LINENUM < len(PComments)) : POSN = PComments[LINENUM].find(Target) if POSN > -1 : Value = PComments[LINENUM].split(' ')[1] LINENUM = LINENUM + 1 return Value # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - def RunQueryAndFilter(HIST,EQUERY,PID,EFILTER,DATABASE): """ Call esearch with the --query term. Note that HIST.WebEnv and HIST.QueryKey are changed to reference the last search done in this method. """ if (EFILTER == "" ): if HIST.WebEnv == "" : if DEBUG : print('=== RunQueryAndFilter: no filter, no history ===') handle = Entrez.esearch(db=DATABASE, term=EQUERY, usehistory="y", retmode="text", rettype="xml") HIST.GetValues(handle) HIST.Db = DATABASE else : if DEBUG : print('=== RunQueryAndFilter: no filter, prior history ===') handle = Entrez.esearch(db=DATABASE, term=EQUERY, usehistory="y", WebEnv=HIST.WebEnv, query_key=HIST.QueryKey, label=PID, retmode="text", rettype="xml") Hist.GetValues(handle) HIST.Db = DATABASE else: if HIST.WebEnv == "" : if DEBUG : print('=== RunQueryAndFilter: filter, no history ===') # Search for the query handle = Entrez.esearch(db=DATABASE, term=EQUERY, usehistory="y") Hist.GetValues(handle) # refine the search using the filter handle = Entrez.esearch(db=DATABASE, term=EFILTER, usehistory="y", WebEnv=HIST.WebEnv, query_key=HIST.QueryKey) HIST.GetValues(handle) HIST.Db = DATABASE else : if DEBUG : print('=== RunQueryAndFilter: filter, prior history ===') # Search for the query handle = Entrez.esearch(db=DATABASE, term=EQUERY, usehistory="y", WebEnv=HIST.WebEnv, query_key=HIST.QueryKey, label=PID, retmode="text", rettype="xml") HIST.GetValues(handle) # refine the search using the filter handle = Entrez.esearch(db=DATABASE, term=EFILTER, usehistory="y", WebEnv=HIST.WebEnv, query_key=HIST.QueryKey) HIST.GetValues(handle) HIST.Db = DATABASE if DEBUG : HIST.ShowValues() HFN = os.path.join(HIST.HISDIR, HIST.WebEnv + '.xml') HIST.WriteHistory(HFN) # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - def FindRelated(HIST): """ Call elink with the --related switch >>> We're still figuring this one out. It doesn't work yet. """ # There doesn't seem to be a way to get both the WebEnv and the Count in a single call to # Elink. Therefore, we have to make the same query twice, once to get the Count, and once to get # WebEnv handle = Entrez.elink(cmd="neighbor", dbfrom=HIST.Db, db=HIST.Db, usehistory="y", WebEnv=HIST.WebEnv, query_key=HIST.QueryKey, retmode="text", rettype="xml") print("handle obtained") result = Entrez.read(handle, validate=False) print('Results from first elink search - ') print(result) handle.close() HIST.Count = len(result[0]["LinkSetDb"][0]) print(' HIST.Count: ' + str(HIST.Count)) #HIST.Db = DATABASE handle = Entrez.elink(cmd="neighbor_history", dbfrom=HIST.Db, db=HIST.Db, usehistory="y", WebEnv=HIST.WebEnv, query_key=HIST.QueryKey, retmode="text", rettype="xml") result = Entrez.read(handle) print('Results from second elink search - ') print(result) handle.close() HIST.WebEnv = result[0]["WebEnv"] HIST.QueryKey = result[0]["LinkSetDbHistory"][0]["QueryKey"] if DEBUG : HIST.ShowValues() HFN = os.path.join(HIST.HISDIR, HIST.WebEnv + '.xml') h_HISTORY = open(HFN,"w") HIST.WriteHistory(HFN) # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - def LinkOut(HIST,TARGET): """ Find links from a previous search to entries in a new database. """ # There doesn't seem to be a way to get both the WebEnv and the Count in a single call to # Elink. Therefore, we have to make the same query twice, once to get the Count, and once to get # WebEnv handle = Entrez.elink(dbfrom=HIST.Db, db=TARGET, usehistory="y", WebEnv=HIST.WebEnv, query_key=HIST.QueryKey, retmode="text", rettype="xml") print(handle) HIST.GetValues(handle) HIST.Db = DATABASE #result = handle.read() #print(result) HIST.Count = len(result[0]["LinkSetDb"][0]) print(' HIST.Count: ' + str(HIST.Count)) handle = Entrez.elink(cmd="neighbor_history", dbfrom=HIST.Db, db=TARGET, usehistory="y", WebEnv=HIST.WebEnv, query_key=HIST.QueryKey, retmode="text", rettype="xml") result = handle.read() print(result) handle.close() HIST.WebEnv = result[0]["WebEnv"] HIST.QueryKey = result[0]["LinkSetDbHistory"][0]["QueryKey"] HIST.Db = TARGET if DEBUG : HIST.ShowValues() HFN = os.path.join(HIST.HISDIR, HIST.WebEnv + '.xml') h_HISTORY = open(HFN,"w") HIST.WriteHistory(HFN) # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - def RetrieveEntries(WEBENV,QKEY,DATABASE,FORMAT,PID,h_OUTFILE): """ Run efetch or esummary to retrieve the results. The first step is to retrieve the results from the search in RunQueryAndFilter, FindRelated, or LinkOut, as referenced from NCBI using WEBENV and QKEY. Note that when a document summary is retrieved from NCBI, by default, NCBI returns an older version of the XML, that contains a small number of fields. It is preferrable to retrieve the XML results in a newer format, version 2.0, which has more fields, although some of the fields have different XML tags than in version 1.0. (See http://www.ncbi.nlm.nih.gov/books/NBK25499/#chapter4.Release_Notes. Geeeeez, the use of version=2.0 was hard to track down in the NCBI documentation!) For backward compatability, it is necessary for calls to efetch or esummary to include the version="2.0" parameter when you retrieve a document summary. The second step is to extract the desired fields from the summary to be written to the output file. This is currently done using the EDirect xtract script. This is the only one EDirect tool that uses only standard Perl libraries. Part of the reason we are using the BioPythonBio::Entrez methods is that all other EDirect scripts use non-standard libraries, that creates a nightmare in terms of portability. """ if DEBUG : print('Retrieving entries ---------------------' + '\n') RESULTFILE=str(PID) +'.xml' h_RESULTFILE = open(RESULTFILE, "w") if (FORMAT == "docsum" ) or (FORMAT == "" ) : handle = Entrez.esummary(db=DATABASE, query_key=QKEY, WebEnv=WEBENV, usehistory="y", rettype="docsum", version="2.0") else : handle = Entrez.efetch(db=DATABASE, query_key=QKEY, WebEnv=WEBENV, usehistory="y", rettype=FORMAT) for line in handle: h_RESULTFILE.write(line.strip()) #if DEBUG: #print(line.strip()) h_RESULTFILE.close() # PostProcessing # This code block uses the NCBI EDirect xtract script. For now, we'll keep it, since it appears # not to use any Perl modules that are not part of the standard Perl distribution. # In the future, we may look into replacing this with a Python method. # In September 2016, NCBI sequence records will no longer have GI numbers. # https://www.ncbi.nlm.nih.gov/news/03-02-2016-phase-out-of-GI-numbers/ # To comply with this change, the element list extracted from the DocSummary will have Caption, rather than Gi. # Why on earth the XML of the DocSummary labels Accession numbers as "Caption" is beyond my mortal knowledge. # Deep within the fabric of the universe, there must be a reason. Nonetheless, it seems to work. h_RESULTFILE = open(RESULTFILE, "r") if ( FORMAT == 'docsum' ) or (FORMAT == "" ) : if DATABASE == 'protein': #ELEMENTS = 'Gi,Title,MolType,Slen' ELEMENTS = 'Caption,Title,MolType,Slen' else : #ELEMENTS = 'Gi,Title,Biomol,Slen' ELEMENTS = 'Caption,Title,Biomol,Slen' p = subprocess.Popen(['perl', os.path.join(os.environ.get("BIRCH"), 'script', 'xtract'), '-pattern', 'DocumentSummary', '-sep', '\t', '-element', ELEMENTS ], stdin=h_RESULTFILE, stdout=h_OUTFILE) p.wait() else : # no postprocessing required for line in h_RESULTFILE : h_OUTFILE.write(line) h_RESULTFILE.close() if os.path.isfile(RESULTFILE) : os.remove(RESULTFILE) #======================== MAIN PROCEDURE ========================== def main(): """ Called when not in documentation mode. """ # Read parameters from command line P = Parameters() # HIST contains the pseudocomments from the previous search found in QFILE, if provided. HIST = NCBIhistory() if DEBUG : print('HISTORY_DIR: ' + HIST.HISDIR) #Get history variables from Pseudocomments from previous history file, if the file was specified on the command line. if P.QFILE != "" : HIST.ReadHistory(P.QFILE) if len(HIST.PComments) > 0 : Value = ParsePseudoCom(HIST.PComments,'DATABASE') if Value != "" : HIST.Db = Value Value = ParsePseudoCom(HIST.PComments,'WEBENV') if Value != "" : HIST.WebEnv = Value Value = ParsePseudoCom(HIST.PComments,'QKEY') if Value != "" : HIST.QueryKey = Value Value = ParsePseudoCom(HIST.PComments,'COUNT') if Value != "" : HIST.Count = int(Value) if DEBUG : print('---------------- Initial values from Pseudocomments (if any)-----------------' + '\n') HIST.ShowValues() if (P.EQUERY != "" or P.RELATED or P.TARGET != "") : h_OUTFILE = open(P.OFN, "w") WriteHeader(HIST.PComments,P.EQUERY,HIST.WebEnv,HIST.QueryKey,P.EFILTER,P.RELATED,P.TARGET,P.DATABASE,h_OUTFILE) # Choose a search to run, based on parameters if (P.EQUERY != "" ) : RunQueryAndFilter(HIST,P.EQUERY,P.PID,P.EFILTER,P.DATABASE) RETRIEVEDB = P.DATABASE elif P.RELATED: FindRelated(HIST) RETRIEVEDB = HIST.Db elif P.TARGET != "": LinkOut(HIST,P.TARGET) RETRIEVEDB = P.TARGET h_OUTFILE.write('#WEBENV: ' + HIST.WebEnv + '\n') h_OUTFILE.write('#QKEY: ' + HIST.QueryKey + '\n') if HIST.Count == "" : HIST.Count = 0 h_OUTFILE.write('#COUNT: ' + str(HIST.Count) + '\n') if (HIST.Count <= P.MAXCOUNT) and (HIST.Count > 0) : if RETRIEVEDB in ['nuccore','nucleotide', 'nucgss', 'nucest'] : h_OUTFILE.write('#uid' + '\t' + 'Title' + '\t' + 'BioMol' + '\t' + 'Slen' + '\n') else : h_OUTFILE.write('#uid' + '\t' + 'Title' + '\t' + 'MolType' + '\t' + 'Slen' + '\n') #It's necessary to close and reopen OUTFILE, because if we don't do that, #popen will write the results of RetrieveEntries BEFORE the stuff that is #already sitting in the OUTFILE buffer. h_OUTFILE.close() h_OUTFILE = open(P.OFN, "a") RetrieveEntries(HIST.WebEnv,HIST.QueryKey,RETRIEVEDB,P.FORMAT,P.PID,h_OUTFILE) h_OUTFILE.close() else: print('ncbiquery.py: No query statement given.') BM.exit_success() if (BM.documentor() or "-test" in sys.argv): pass else: main()