#!/usr/bin/env python3
import birchenv
import birchscript
#optparse is deprecated in favor of argparse as of Python 2.7. However,
# since 2.7 is not always present on many systems, at this writing,
# it is safer to stick with optparse for now. It should be easy
# to change later, since the syntax is very similar between argparse and optparse.
from optparse import OptionParser
import os
import os.path
import stat
import subprocess
import sys
import re
import shutil
from Bio import Entrez
'''
ncbiquery.py - Send an Entrez query to NCBI and return the result.
Synopsis: ncbiquery.py --db database [--qfile qfile_name] --query entrez_query_statement [--filter entrez_filter_statement] [-maxcount integer] --format sequence_format --out outfile
ncbiquery.py --qfile qfile_name --related [-maxcount integer] --format sequence_format --out outfile
ncbiquery.py --qfile qfile_name --target database [-maxcount integer] --format sequence_format --out outfile
--qfile - output from a previous ncbiquery.py run. This run will refine the previous search.
--query entrez_query_statement - search parameters
--filter entrez_filter_statement - limit search based on other parameters
--related - look up neighbors in current database
--target database - find links in a different database
--format sequence_format - format as defined by NCBI EUtilities.
--db - NCBI database as defined for EUtilities
--out - output file
@modified: July 9, 2020
@author: Brian Fristensky
@contact: frist@cc.umanitoba.ca
'''
blib = os.environ.get("BIRCHPYLIB")
sys.path.append(blib)
from birchlib import Birchmod
from birchlib import SimpleXML
PROGRAM = "ncbiquery.py : "
USAGE = "\n\tUSAGE: ncbiquery.py --db database [--qfile qfile_name] --query entrez_query_statement [--filter entrez_filter_statement] [--related] [--target database] [-maxcount integer] --format efetch_format --out outfile"
DEBUG = True
if DEBUG :
print('Debugging mode on')
BM = Birchmod(PROGRAM, USAGE)
HOMEDIR = BM.getHomeDir()
# To use an Entrez API key and email address. These variables can be set once for the Entrez class,
# and will be used for all EUtil calls.
Entrez.api_key = os.getenv("NCBI_ENTREZ_KEY", default=None)
Entrez.email = os.getenv("BL_EMAIL", default=None)
if DEBUG :
print("Entrez.api_key: " + str(Entrez.api_key))
print('Entrez.email: ' + str(Entrez.email))
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
class Parameters:
"""
Wrapper class for command line parameters
"""
def __init__(self):
"""
Initializes arguments:
QFILE = ""
EQUERY=""
EFILTER=""
RELATED=False
TARGET=""
MAXCOUNT = 500
DATABASE = "nuccore"
FORMAT=""
OFN=""
Then calls read_args() to fill in their values from command line
"""
self.QFILE = ""
self.EQUERY = ""
self.EFILTER = ""
self.RELATED = False
self.TARGET = ""
self.MAXCOUNT = int(500)
self.DATABASE = "nuccore"
self.FORMAT = ""
self.PID = str(os.getpid())
self.OFN = ""
self.read_args()
def read_args(self):
"""
Read command line arguments into a Parameter object
"""
parser = OptionParser()
parser.add_option("--qfile", dest="qfile", action="store", type="string", default="",
help="ncbiquery file from a previous search")
parser.add_option("--query", dest="query", action="store", type="string", default="",
help="ncbiquery query term")
parser.add_option("--filter", dest="filter", action="store", type="string", default="",
help="filter for query")
parser.add_option("--related", dest="related", action="store_true", default=False,
help="search for Entrez neighbors in same database")
parser.add_option("--target", dest="target", action="store", type="string", default="",
help="search for Entrez neighbors in target database")
parser.add_option("--maxcount", dest="maxcount", action="store", type="int", default="500",
help="filter for query")
parser.add_option("--db", dest="database", action="store", type="string", default="nuccore",
help="database to search")
parser.add_option("--format", dest="format", action="store", type="string", default="",
help="format for output file")
parser.add_option("--out", dest="outfile", action="store", type="string", default="",
help="output filename")
(options, args) = parser.parse_args()
self.QFILE = options.qfile
self.EQUERY = options.query
self.EFILTER = options.filter
self.RELATED = options.related
self.TARGET = options.target
self.MAXCOUNT = options.maxcount
self.DATABASE = options.database
self.FORMAT = options.format
self.OFN = options.outfile
if DEBUG :
print('------------ Parameters from command line ------' + '\n')
print(' QFILE: ' + self.QFILE)
print(' EQUERY: ' + self.EQUERY)
print(' EFILTER: ' + self.EFILTER)
print(' RELATED: ' + str(self.RELATED))
print(' TARGET: ' + self.TARGET)
print(' MAXCOUNT: ' + str(self.MAXCOUNT))
print(' FORMAT: ' + self.FORMAT)
print(' DATABASE: ' + self.DATABASE)
print(' OUTFILE: ' + self.OFN)
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
class NCBIhistory:
"""
Classes for local storage of NCBI query and search history.
The $HOME/.ncbi_history directory stores .xml files produced
by EUtils. These files can, in turn, be used to reference results
from previous queries or searches.
"""
def __init__(self):
"""
Creates $HOME/.ncbi_history directory, if it doesn't already exist.
"""
self.HISDIR = os.path.join(HOMEDIR, '.ncbi_history')
self.PComments = []
if ( not os.path.isdir(self.HISDIR) ) :
os.mkdir(self.HISDIR,0o700)
# These fields are the payload of the history
self.Db = ""
self.WebEnv = ""
self.QueryKey = ""
self.Count = 0
# Get rid of old hisory files
self.CleanHistDir()
def CleanHistDir(self) :
"""
Removes old xml files the refer to search history items
that are no longer present at NCBI. Typically, NCBI
deletes history items after 3 days.
"""
import time
for fname in os.listdir(self.HISDIR) :
fullpath = os.path.join(self.HISDIR,fname)
mtime = os.stat(fullpath).st_mtime
now = time.time()
if (mtime < now - 3 * 86400 ) :
if DEBUG:
print('Removing ' + fullpath)
os.remove(fullpath)
def MoveToHistDir(self,HFN,WEBENV) :
"""
Adds a search file to the history directory
Use the WEBENV as the basename for the file.
"""
DEST = os.path.join(self.HISDIR, WEBENV + '.xml')
shutil.move(HFN,DEST)
os.chmod(DEST,0o600)
def ReadHistory(self,HFile) :
"""
Read pseudocomment lines from a previous history file
To facilitate reading of CSV and TSV files, double quote (") characters
are stripped out, and the line is truncated at the first comma, and
at the first tab, if those exist. That way we don't have to know if
the field separator is tab or comma.
eg. "#QUERY: Pisum [ORGN]","","",""
"""
h_HFile = open(HFile,"r")
line = h_HFile.readline()
while (line != "") and (line.find("#") > -1 ) :
line = line.replace('"','')
line = line.split(',')[0]
line = line.split('\t')[0]
line = line.strip()
self.PComments.append(line)
line = h_HFile.readline()
h_HFile.close()
if DEBUG and len(self.PComments) > 0 :
print('----------- Pseudo Comments from query file -----------------' + '\n')
for line in self.PComments :
print(line)
def WriteHistory(self,HFile) :
"""
Write an XML history file to HFile
"""
h_HFile = open(HFile,"w")
h_HFile.write('' + '\n')
h_HFile.write(' ' + self.Db + '\n')
h_HFile.write(' ' + self.WebEnv + '\n')
h_HFile.write(' ' + self.QueryKey + '\n')
h_HFile.write(' ' + str(self.Count) + '\n')
h_HFile.write('' + '\n')
h_HFile.close()
def ShowValues(self) :
"""
Print values of history variables for debugging.
"""
print(' Db: ' + self.Db)
print(' WebEnv: ' + self.WebEnv)
print(' QueryKey: ' + self.QueryKey)
print(' Count: ' + str(self.Count))
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def WriteHeader(PComments,EQUERY,WEBENV,QKEY,EFILTER,RELATED,TARGET,DATABASE,h_OUTFILE):
"""
Write header information to the beginning of OFN. This file stays open
so that we can write more information as the search proceeds.
"""
#Header(s) from previous queries, if they are supplied using --qfile
for line in PComments :
h_OUTFILE.write(line + '\n')
if len(PComments) > 0 :
h_OUTFILE.write('#------------------------------ AND ------------------------------' + '\n')
#Header info from current run
h_OUTFILE.write('#ncbiquery.py' + '\n')
if DATABASE != "" :
h_OUTFILE.write('#DATABASE: ' + DATABASE + '\n')
if EQUERY != "" :
h_OUTFILE.write('#QUERY: ' + EQUERY + '\n')
if WEBENV != "" :
h_OUTFILE.write('#WEBENV: ' + WEBENV + '\n')
if QKEY != "" :
h_OUTFILE.write('#QKEY: ' + QKEY + '\n')
if EFILTER != "" :
h_OUTFILE.write('#FILTER: ' + EFILTER + '\n')
if RELATED :
h_OUTFILE.write('#RELATED: ' + str(RELATED) + '\n')
if TARGET != "" :
h_OUTFILE.write('#TARGET: ' + TARGET + '\n')
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def ParsePseudoCom(PComments,TargetName):
"""
Get the value of the variable specified by TargetName
from the list of Pseudocomments in PComments.
Variables are in pseudocomments of the form:
#TargetName:Value\n
eg. #QUERY: Pisum [ORGN]
"""
Value = ""
Target = '#' + TargetName + ':'
LINENUM = 0
# commented line stops at first occurrence of the target
#while (LINENUM < len(PComments)) and (Value == "") :
while (LINENUM < len(PComments)) :
POSN = PComments[LINENUM].find(Target)
if POSN > -1 :
Value = PComments[LINENUM].split(' ')[1]
LINENUM = LINENUM + 1
return Value
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def RunQueryAndFilter(HIST,EQUERY,PID,EFILTER,DATABASE):
"""
Call esearch with the --query term.
Note that HIST.WebEnv and HIST.QueryKey are changed to
reference the last search done in this method.
"""
def GetValues(handle, HIST) :
"""
Use Bio.Entrez.read to retrieve the values from a handle.
The handle is returned by calls to Bio.Entrez functions such as
esearch and efetch.
There doesn't seem to be database field in the XML returned
by Entrez. We have to set that separately.
"""
result = Entrez.read(handle)
#print result
handle.close()
HIST.WebEnv = result["WebEnv"]
HIST.QueryKey = result["QueryKey"]
HIST.Count = int(result["Count"])
if (EFILTER == "" ):
if HIST.WebEnv == "" :
if DEBUG :
print('=== RunQueryAndFilter: no filter, no history ===')
handle = Entrez.esearch(db=DATABASE, term=EQUERY, usehistory="y", retmode="text", rettype="xml")
GetValues(handle,HIST)
HIST.Db = DATABASE
else :
if DEBUG :
print('=== RunQueryAndFilter: no filter, prior history ===')
handle = Entrez.esearch(db=DATABASE, term=EQUERY, usehistory="y", WebEnv=HIST.WebEnv, query_key=HIST.QueryKey, label=PID, retmode="text", rettype="xml")
GetValues(handle,HIST)
HIST.Db = DATABASE
else:
if HIST.WebEnv == "" :
if DEBUG :
print('=== RunQueryAndFilter: filter, no history ===')
# Search for the query
handle = Entrez.esearch(db=DATABASE, term=EQUERY, usehistory="y")
GetValues(handle,HIST)
# refine the search using the filter
handle = Entrez.esearch(db=DATABASE, term=EFILTER, usehistory="y", WebEnv=HIST.WebEnv, query_key=HIST.QueryKey)
GetValues(handle,HIST)
HIST.Db = DATABASE
else :
if DEBUG :
print('=== RunQueryAndFilter: filter, prior history ===')
# Search for the query
handle = Entrez.esearch(db=DATABASE, term=EQUERY, usehistory="y", WebEnv=HIST.WebEnv, query_key=HIST.QueryKey, label=PID, retmode="text", rettype="xml")
GetValues(handle,HIST)
# refine the search using the filter
handle = Entrez.esearch(db=DATABASE, term=EFILTER, usehistory="y", WebEnv=HIST.WebEnv, query_key=HIST.QueryKey)
GetValues(handle,HIST)
HIST.Db = DATABASE
if DEBUG :
HIST.ShowValues()
HFN = os.path.join(HIST.HISDIR, HIST.WebEnv + '.xml')
HIST.WriteHistory(HFN)
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def FindRelated(HIST):
"""
Call elink with the --related switch
>>> We're still figuring this one out. It doesn't work yet.
"""
# There doesn't seem to be a way to get both the WebEnv and the Count in a single call to
# Elink. Therefore, we have to make the same query twice, once to get the Count, and once to get
# WebEnv
handle = Entrez.elink(cmd="neighbor", dbfrom=HIST.Db, db=HIST.Db, usehistory="y", WebEnv=HIST.WebEnv, query_key=HIST.QueryKey, retmode="text", rettype="xml")
print("handle obtained")
result = Entrez.read(handle, validate=False)
print('Results from first elink search - ')
print(result)
handle.close()
HIST.Count = len(result[0]["LinkSetDb"][0])
print(' HIST.Count: ' + str(HIST.Count))
#HIST.Db = DATABASE
handle = Entrez.elink(cmd="neighbor_history", dbfrom=HIST.Db, db=HIST.Db, usehistory="y", WebEnv=HIST.WebEnv, query_key=HIST.QueryKey, retmode="text", rettype="xml")
result = Entrez.read(handle)
print('Results from second elink search - ')
print(result)
handle.close()
HIST.WebEnv = result[0]["WebEnv"]
HIST.QueryKey = result[0]["LinkSetDbHistory"][0]["QueryKey"]
if DEBUG :
HIST.ShowValues()
HFN = os.path.join(HIST.HISDIR, HIST.WebEnv + '.xml')
h_HISTORY = open(HFN,"w")
HIST.WriteHistory(HFN)
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def LinkOut(HIST,TARGET):
"""
Find links from a previous search to
entries in a new database.
"""
# There doesn't seem to be a way to get both the WebEnv and the Count in a single call to
# Elink. Therefore, we have to make the same query twice, once to get the Count, and once to get
# WebEnv
handle = Entrez.elink(dbfrom=HIST.Db, db=TARGET, usehistory="y", WebEnv=HIST.WebEnv, query_key=HIST.QueryKey, retmode="text", rettype="xml")
handle = Entrez.elink(dbfrom=HIST.Db, db=TARGET, usehistory="y", retmode="text", rettype="xml")
result = handle.read()
print(result)
HIST.Count = len(result[0]["LinkSetDb"][0])
print(' HIST.Count: ' + str(HIST.Count))
handle = Entrez.elink(cmd="neighbor_history", dbfrom=HIST.Db, db=TARGET, usehistory="y", WebEnv=HIST.WebEnv, query_key=HIST.QueryKey, retmode="text", rettype="xml")
result = handle.read()
print(result)
handle.close()
HIST.WebEnv = result[0]["WebEnv"]
HIST.QueryKey = result[0]["LinkSetDbHistory"][0]["QueryKey"]
HIST.Db = TARGET
if DEBUG :
HIST.ShowValues()
HFN = os.path.join(HIST.HISDIR, HIST.WebEnv + '.xml')
h_HISTORY = open(HFN,"w")
HIST.WriteHistory(HFN)
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def RetrieveEntries(WEBENV,QKEY,DATABASE,FORMAT,PID,h_OUTFILE):
"""
Run efetch or esummary to retrieve the results.
The first step is to retrieve the results from the search
in RunQueryAndFilter, FindRelated, or LinkOut, as referenced
from NCBI using WEBENV and QKEY. Note that when a document
summary is retrieved from NCBI, by default, NCBI returns
an older version of the XML, that contains a small number of
fields. It is preferrable to retrieve the XML results in a
newer format, version 2.0, which has more fields, although some of the fields
have different XML tags than in version 1.0.
(See http://www.ncbi.nlm.nih.gov/books/NBK25499/#chapter4.Release_Notes.
Geeeeez, the use of version=2.0 was hard to track down in the NCBI
documentation!)
For backward compatability, it is necessary for calls to efetch or esummary
to include the version="2.0" parameter when you retrieve a document summary.
The second step is to extract the desired fields from the summary
to be written to the output file. This is currently done
using the EDirect xtract script. This is the only one EDirect
tool that uses only standard Perl libraries. Part of the reason
we are using the BioPythonBio::Entrez methods is that
all other EDirect scripts use non-standard libraries, that
creates a nightmare in terms of portability.
"""
if DEBUG :
print('Retrieving entries ---------------------' + '\n')
RESULTFILE=str(PID) +'.xml'
h_RESULTFILE = open(RESULTFILE, "w")
if (FORMAT == "docsum" ) or (FORMAT == "" ) :
handle = Entrez.esummary(db=DATABASE, query_key=QKEY, WebEnv=WEBENV, usehistory="y", rettype="docsum", version="2.0")
else :
handle = Entrez.efetch(db=DATABASE, query_key=QKEY, WebEnv=WEBENV, usehistory="y", rettype=FORMAT)
for line in handle:
h_RESULTFILE.write(line.strip())
#if DEBUG:
#print(line.strip())
h_RESULTFILE.close()
# PostProcessing
# This code block uses the NCBI EDirect xtract script. For now, we'll keep it, since it appears
# not to use any Perl modules that are not part of the standard Perl distribution.
# In the future, we may look into replacing this with a Python method.
# In September 2016, NCBI sequence records will no longer have GI numbers.
# https://www.ncbi.nlm.nih.gov/news/03-02-2016-phase-out-of-GI-numbers/
# To comply with this change, the element list extracted from the DocSummary will have Caption, rather than Gi.
# Why on earth the XML of the DocSummary labels Accession numbers as "Caption" is beyond my mortal knowledge.
# Deep within the fabric of the universe, there must be a reason. Nonetheless, it seems to work.
h_RESULTFILE = open(RESULTFILE, "r")
if ( FORMAT == 'docsum' ) or (FORMAT == "" ) :
if DATABASE == 'protein':
#ELEMENTS = 'Gi,Title,MolType,Slen'
ELEMENTS = 'Caption,Title,MolType,Slen'
else :
#ELEMENTS = 'Gi,Title,Biomol,Slen'
ELEMENTS = 'Caption,Title,Biomol,Slen'
#p = subprocess.Popen(['perl', os.path.join(birchenv.BIRCH, 'bin-' + birchenv.BIRCH_PLATFORM , 'xtract'), '-pattern', 'DocumentSummary', '-sep', '\t', '-element', ELEMENTS ], stdin=h_RESULTFILE, stdout=h_OUTFILE)
p = subprocess.Popen(['perl', os.path.join(birchenv.BIRCH, 'script', 'xtract'), '-pattern', 'DocumentSummary', '-sep', '\t', '-element', ELEMENTS ], stdin=h_RESULTFILE, stdout=h_OUTFILE)
p.wait()
else :
# no postprocessing required
for line in h_RESULTFILE :
h_OUTFILE.write(line)
h_RESULTFILE.close()
if os.path.isfile(RESULTFILE) :
os.remove(RESULTFILE)
#======================== MAIN PROCEDURE ==========================
def main():
"""
Called when not in documentation mode.
"""
# Read parameters from command line
P = Parameters()
# HIST contains the pseudocomments from the previous search found in QFILE, if provided.
HIST = NCBIhistory()
if DEBUG :
print('HISTORY_DIR: ' + HIST.HISDIR)
#Get history variables from Pseudocomments from previous history file, if the file was specified on the command line.
if P.QFILE != "" :
HIST.ReadHistory(P.QFILE)
if len(HIST.PComments) > 0 :
Value = ParsePseudoCom(HIST.PComments,'DATABASE')
if Value != "" :
HIST.Db = Value
Value = ParsePseudoCom(HIST.PComments,'WEBENV')
if Value != "" :
HIST.WebEnv = Value
Value = ParsePseudoCom(HIST.PComments,'QKEY')
if Value != "" :
HIST.QueryKey = Value
Value = ParsePseudoCom(HIST.PComments,'COUNT')
if Value != "" :
HIST.Count = int(Value)
if DEBUG :
print('---------------- Initial values from Pseudocomments (if any)-----------------' + '\n')
HIST.ShowValues()
if (P.EQUERY != "" or P.RELATED or P.TARGET != "") :
h_OUTFILE = open(P.OFN, "w")
WriteHeader(HIST.PComments,P.EQUERY,HIST.WebEnv,HIST.QueryKey,P.EFILTER,P.RELATED,P.TARGET,P.DATABASE,h_OUTFILE)
# Choose a search to run, based on parameters
if (P.EQUERY != "" ) :
RunQueryAndFilter(HIST,P.EQUERY,P.PID,P.EFILTER,P.DATABASE)
RETRIEVEDB = P.DATABASE
elif P.RELATED:
FindRelated(HIST)
RETRIEVEDB = HIST.Db
elif P.TARGET != "":
LinkOut(HIST,P.TARGET)
RETRIEVEDB = P.TARGET
h_OUTFILE.write('#WEBENV: ' + HIST.WebEnv + '\n')
h_OUTFILE.write('#QKEY: ' + HIST.QueryKey + '\n')
if HIST.Count == "" :
HIST.Count = 0
h_OUTFILE.write('#COUNT: ' + str(HIST.Count) + '\n')
if (HIST.Count <= P.MAXCOUNT) and (HIST.Count > 0) :
if RETRIEVEDB in ['nuccore','nucleotide', 'nucgss', 'nucest'] :
h_OUTFILE.write('#uid' + '\t' + 'Title' + '\t' + 'BioMol' + '\t' + 'Slen' + '\n')
else :
h_OUTFILE.write('#uid' + '\t' + 'Title' + '\t' + 'MolType' + '\t' + 'Slen' + '\n')
#It's necessary to close and reopen OUTFILE, because if we don't do that,
#popen will write the results of RetrieveEntries BEFORE the stuff that is
#already sitting in the OUTFILE buffer.
h_OUTFILE.close()
h_OUTFILE = open(P.OFN, "a")
RetrieveEntries(HIST.WebEnv,HIST.QueryKey,RETRIEVEDB,P.FORMAT,P.PID,h_OUTFILE)
h_OUTFILE.close()
else:
print('ncbiquery.py: No query statement given.')
BM.exit_success()
if (BM.documentor() or "-test" in sys.argv):
pass
else:
main()