package jgi; import java.io.File; import java.io.PrintStream; import java.lang.Thread.State; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongArray; import kmer.KCountArray; import kmer.KmerCount7MTA; import stream.ConcurrentGenericReadInputStream; import stream.ConcurrentReadStreamInterface; import stream.FASTQ; import stream.FastaReadInputStream; import stream.RTextOutputStream3; import stream.Read; import align2.ListNum; import align2.Shared; import align2.Tools; import dna.AminoAcid; import dna.Data; import dna.Timer; import fileIO.ReadWrite; import fileIO.FileFormat; import fileIO.TextStreamWriter; /** * This class is designed to visualize the distribution of kmer depths across individual reads. * @author Brian Bushnell * @date May 15, 2013 * */ public class ReadKmerDepthDistribution { public static void main(String[] args){ for(String s : args){if(s.contains("=standardout") || s.contains("=stdout")){outstream=System.err;}} outstream.println("Executing "+(new Object() { }.getClass().getEnclosingClass().getName())+" "+Arrays.toString(args)+"\n"); if(args.length<1){throw new RuntimeException("No parameters.");} String reads1=(args[0].indexOf("=")>0 ? null : args[0]); String reads2=(reads1!=null && args.length>1 ? args[1] : null); if(reads2!=null && "null".equalsIgnoreCase(reads2)){reads2=null;} { if(reads1!=null && !reads1.contains(",")){ File f=new File(reads1); if(!f.exists() || !f.isFile()){throw new RuntimeException(reads1+" does not exist.");} } if(reads2!=null && !reads2.contains(",")){ File f=new File(reads2); if(!f.exists() || !f.isFile()){throw new RuntimeException(reads2+" does not exist.");} if(reads1.equalsIgnoreCase(reads2)){ throw new RuntimeException("Both input files are the same."); } } } FASTQ.PARSE_CUSTOM=false; KmerCount7MTA.minQuality=4; KmerCount7MTA.minProb=0.4f; int k=31; int cbits=32; int gap=0; int hashes=3; // int matrixbits=-1; long cells=-1; long maxReads=-1; int buildpasses=1; long tablereads=-1; //How many reads to process when building the hashtable int buildStepsize=4; String outKeep=null; int prehashes=-1; long precells=-1; String histFile=null; int threads=-1; ReadWrite.ZIPLEVEL=2; int minq=KmerCount7MTA.minQuality; KmerCount7MTA.CANONICAL=true; boolean auto=true; boolean deterministic=true; FastaReadInputStream.TARGET_READ_LEN=Integer.MAX_VALUE; FASTQ.PARSE_CUSTOM=false; List extra=null; long memory=Runtime.getRuntime().maxMemory(); long tmemory=Runtime.getRuntime().totalMemory(); // assert(false) : memory+", "+tmemory; for(int i=(reads1==null ? 0 : 1); i16 ? Integer.MAX_VALUE : (1L<0); HIST_LEN_PRINT=Tools.max(1, Tools.min(HIST_LEN_PRINT, maxCount)); assert(HIST_LEN_PRINT<=Integer.MAX_VALUE) : HIST_LEN_PRINT+", "+Integer.MAX_VALUE; HIST_LEN=(int)Tools.min(maxCount, Tools.max(HIST_LEN_PRINT, HIST_LEN)); THREAD_HIST_LEN=Tools.min(THREAD_HIST_LEN, HIST_LEN); histogram_total=new AtomicLongArray(HIST_LEN); } if(extra!=null){ for(String s : extra){ File f=new File(s); if(!f.exists() || !f.isFile()){throw new RuntimeException(s+" does not exist.");} assert(!s.equalsIgnoreCase(reads1) && (reads2==null || !s.equalsIgnoreCase(reads2))) : "\nInput file "+s+" should not be included as an extra file.\n"; } } // outstream.println("ForceInterleaved = "+FASTQ.FORCE_INTERLEAVED); // assert(false) : reads1+", "+reads2+", "+output; // if(FASTQ.FORCE_INTERLEAVED && in2==null){ // outstream.println() // } if(threads<=0){ if(auto){THREADS=Data.LOGICAL_PROCESSORS;} else{THREADS=8;} }else{ THREADS=threads; } // KmerCount7MTA.THREADS=Tools.min(THREADS,6); KmerCount7MTA.THREADS=THREADS; // System.err.println("THREADS="+THREADS+", KmerCount7MTA.THREADS="+KmerCount7MTA.THREADS); if(auto && cells==-1){ final long usable=(long)Tools.max(((memory-96000000)*.73), memory*0.45); long mem=usable-(USE_HISTOGRAM ? (HIST_LEN*8*(1)) : 0); if(buildpasses>1){mem/=2;} cells=(mem*8)/cbits; // // long tablebytes=((1L<0 && prehashes>0 ? Tools.toKMG(precells) : "?")); outstream.println("prefilter hashes: \t"+(precells>0 && prehashes>0 ? ""+prehashes : "?")); } outstream.println("base min quality: \t"+KmerCount7MTA.minQuality); outstream.println("kmer min prob: \t"+KmerCount7MTA.minProb); outstream.println(); outstream.println("target depth: \t"+TARGET_DEPTH); outstream.println("min depth: \t"+MIN_DEPTH); outstream.println("max depth: \t"+MAX_DEPTH); outstream.println("min good kmers: \t"+MIN_KMERS_OVER_MIN_DEPTH); outstream.println("depth percentile: \t"+String.format("%.1f", 100*DEPTH_PERCENTILE)); outstream.println("remove duplicates:\t"+!KmerCount7MTA.KEEP_DUPLICATE_KMERS); outstream.println("fix spikes: \t"+FIX_SPIKES); if(USE_HISTOGRAM && HIST_LEN>0){ outstream.println("histogram length: \t"+(USE_HISTOGRAM ? HIST_LEN : 0)); } if(histFile!=null){ outstream.println("print zero cov: \t"+PRINT_ZERO_COVERAGE); } outstream.println(); } if(!prefilter && k<32 && cells>(1L<<(2*k))){cells=(1L<<(2*k));} assert(cells>0); // KmerCount7MTA.THREADS=Tools.max(THREADS/2, KmerCount7MTA.THREADS); //Seems like 4 is actually optimal... FastaReadInputStream.MIN_READ_LEN=k; Timer t=new Timer(); Timer ht=new Timer(); t.start(); ht.start(); KCountArray kca; KCountArray prefilterArray=null; // outstream.println(); if(prefilter){ prefilterArray=KmerCount7MTA.makeKca(reads1, reads2, extra, k, 2, gap, precells, prehashes, minq, true, tablereads, 1, buildStepsize, 1, 1, null); outstream.println("Made prefilter: \t"+prefilterArray.toShortString(prehashes)); double uf=prefilterArray.usedFraction(); if(uf>0.6){ outstream.println("Warning: This table is "+(uf>0.995 ? "totally" : uf>0.99 ? "crazy" : uf>0.95 ? "incredibly" : uf>0.9 ? "extremely" : uf>0.8 ? "very" : uf>0.7 ? "fairly" : "somewhat")+" full, which may reduce accuracy for kmers of depth under 3. Ideal load is under 60% used." + "\nFor better accuracy, run on a node with more memory; quality-trim or error-correct reads; " + "or increase the values of the minprob flag to reduce spurious kmers."); } } kca=KmerCount7MTA.makeKca(reads1, reads2, extra, k, cbits, gap, cells, hashes, minq, true, tablereads, buildpasses, buildStepsize, 2, 2, prefilterArray); ht.stop(); outstream.println("Made hash table: \t"+kca.toShortString(hashes)); double uf=kca.usedFraction(); if(uf>0.6){ outstream.println("Warning: This table is "+(uf>0.995 ? "totally" : uf>0.99 ? "crazy" : uf>0.95 ? "incredibly" : uf>0.9 ? "extremely" : uf>0.8 ? "very" : uf>0.7 ? "fairly" : "somewhat")+" full, which may reduce accuracy. Ideal load is under 60% used." + "\nFor better accuracy, use the 'prefilter' flag; run on a node with more memory; quality-trim or error-correct reads; " + "or increase the values of the minprob flag to reduce spurious kmers. In practice you should still get good normalization results " + "even with loads over 90%, but the histogram and statistics will be off."); } long estUnique; outstream.println(); if(prefilterArray!=null){ int lim1=prefilterArray.maxValue, lim2=prefilterArray.maxValue+1; double a=prefilterArray.estimateUniqueKmers(prehashes); double b=kca.estimateUniqueKmers(hashes, lim2); a=a-b; if(CANONICAL){ // a=(a*KCountArray.canonMask)/(KCountArray.canonMask+1); // b=(b*KCountArray.canonMask)/(KCountArray.canonMask+1); }else{ a/=2; b/=2; } estUnique=((long)((a+b))); outstream.println("Estimated kmers of depth 1-"+lim1+": \t"+(long)a); outstream.println("Estimated kmers of depth "+lim2+"+ : \t"+(long)b); }else{ // double est=kca.cells*(1-Math.pow(1-Math.sqrt(kca.usedFraction()), 1.0/hashes)); // double est=kca.cells*(1-Math.pow(1-kca.usedFraction(), 1.0/hashes)); double est=kca.estimateUniqueKmers(hashes); // outstream.println("Used cells: "+kca.cellsUsed(1)); if(CANONICAL){ // est=(est*KCountArray.canonMask)/(KCountArray.canonMask+1); }else{ est/=2; } estUnique=((long)((est))); } outstream.println("Estimated unique kmers: \t"+estUnique);//+", or "+estUnique+" counting forward kmers only."); // outstream.println("(Includes forward and reverse kmers)"); outstream.println(); outstream.println("Table creation time:\t\t"+ht);//+" \t"+String.format("%.2f", totalBases*1000000.0/(ht.elapsed))+" kb/sec"); long bases=0; ListNum.setDeterministicRandom(deterministic); if(reads1!=null && reads1.contains(",") && !new File(reads1).exists()){ throw new RuntimeException("This class is not designed to deal with lists of input files."); }else{ bases=count(reads1, reads2, kca, k, maxReads, outKeep, overwrite, histFile, estUnique); } printTopology(); t.stop(); outstream.println("\nTotal time: \t\t"+t+" \t"+String.format("%.2f", bases*1000000.0/(t.elapsed))+" kb/sec"); } public static void printTopology(){ long total=peaks.get()+spikes.get()+flats.get()+valleys.get()+slopes.get(); double mult=100.0/total; long sp=spikes.get(); long pe=peaks.get(); long va=valleys.get(); long sl=slopes.get(); long fl=flats.get(); double dsp=mult*sp; double dpe=mult*pe; double dva=mult*va; double dsl=mult*sl; double dfl=mult*fl; System.err.println("\nDepth Topology:\t"); System.err.println("Spikes: \t\t\t"+(dsp<10 ? " " : "")+String.format("%.3f%% \t%d",dsp,sp)); System.err.println("Peaks: \t\t\t"+(dpe<10 ? " " : "")+String.format("%.3f%% \t%d",dpe,pe)); System.err.println("Valleys: \t\t\t"+(dva<10 ? " " : "")+String.format("%.3f%% \t%d",dva,va)); System.err.println("Slopes: \t\t\t"+(dsl<10 ? " " : "")+String.format("%.3f%% \t%d",dsl,sl)); System.err.println("Flats: \t\t\t"+(dfl<10 ? " " : "")+String.format("%.3f%% \t%d",dfl,fl)); } public static long count(String in1, String in2, KCountArray kca, int k, long maxReads, String outKeep, boolean overwrite, String histFile, long estUnique) { final ConcurrentReadStreamInterface cris; { FileFormat ff1=FileFormat.testInput(in1, FileFormat.FASTQ, null, true, true); FileFormat ff2=FileFormat.testInput(in2, FileFormat.FASTQ, null, true, true); cris=ConcurrentGenericReadInputStream.getReadInputStream(maxReads, false, true, ff1, ff2); if(verbose){System.err.println("Started cris");} Thread th=new Thread(cris); th.start(); } boolean paired=cris.paired(); if(verbose){System.err.println("Paired: "+paired);} RTextOutputStream3 rosKeep=null; if(outKeep!=null){ final int buff=(!ordered ? 8 : Tools.max(16, 2*THREADS)); String out1=outKeep.replaceFirst("#", "1"); String out2=null; if(cris.paired()){ if(outKeep.contains("#")){ out2=outKeep.replaceFirst("#", "2"); }else{ outstream.println("Writing interleaved."); } } assert(!out1.equalsIgnoreCase(in1) && !out1.equalsIgnoreCase(in1)); assert(out2==null || (!out2.equalsIgnoreCase(in1) && !out2.equalsIgnoreCase(in2))); // assert(false) : out1+", "+out2; FileFormat ff1=FileFormat.testOutput(out1, FileFormat.FASTQ, "attachment", true, overwrite, ordered); FileFormat ff2=FileFormat.testOutput(out2, FileFormat.FASTQ, "attachment", true, overwrite, ordered); rosKeep=new RTextOutputStream3(ff1, ff2, buff, null, true); } if(rosKeep!=null){ rosKeep.start(); outstream.println("Started output threads."); } long bases=downsample(cris, kca, k, maxReads, rosKeep, histFile, overwrite, estUnique); ReadWrite.closeStreams(cris, rosKeep); if(verbose){System.err.println("Closed streams");} return bases; } public static long downsample(ConcurrentReadStreamInterface cris, KCountArray kca, int k, long maxReads, RTextOutputStream3 rosKeep, String histFile, boolean overwrite, long estUnique) { Timer tdetect=new Timer(); tdetect.start(); long totalBases=0; long totalReads=0; long basesKept=0; long readsKept=0; long basesTossed=0; long readsTossed=0; // assert(false) : THREADS; ProcessThread[] pta=new ProcessThread[THREADS]; for(int i=0; i1){ histogram_total.addAndGet(1, histogram_total.get(0)); histogram_total.set(0, 0); } // outstream.println(); tdetect.stop(); outstream.println("Table read time: \t\t"+tdetect+" \t"+String.format("%.2f", totalBases*1000000.0/(tdetect.elapsed))+" kb/sec"); { String pad=""; String s=""+totalReads; while(pad.length()+s.length()<9){pad+=" ";} outstream.println("Total reads in: \t\t"+totalReads+pad+String.format("\t(%.3f%% Kept)", (readsKept*100.0/totalReads))); s=""+totalBases; while(pad.length()+s.length()<9){pad+=" ";} outstream.println("Total bases in: \t\t"+totalBases+pad+String.format("\t(%.3f%% Kept)", (basesKept*100.0/totalBases))); } // outstream.println(); if(histogram_total!=null){ TextStreamWriter tswh=null; StringBuilder sb=new StringBuilder(100); if(USE_HISTOGRAM){ tswh=new TextStreamWriter(histFile, overwrite, false, false); tswh.start(); tswh.print("#Depth\tRaw_Count\tUnique_Kmers\n"); } int lim=(int)(HIST_LEN_PRINT-1); long remaining=Tools.sum(histogram_total); long sumRaw1=0; long sumRaw2=0; long sum1=0; long sum2=0; long sumsquare=0; for(int i=0; i0*/ || y>0){ sb.append(i).append('\t'); sb.append(x).append('\t'); sb.append(y).append('\n'); } tswh.print(sb.toString()); sb.setLength(0); } if(sumRaw1>=remaining){break;} //Stop once there is no more coverage, even if PRINT_ZERO_COVERAGE is not set. } for(int i=lim; i0 || sum2>0){ sb.append(lim).append('\t'); sb.append(sumRaw2).append('\t'); sb.append(sum2).append('\n'); } tswh.print(sb.toString()); tswh.poison(); tswh.waitForFinish(); outstream.println("Wrote histogram to "+histFile); } long histCount=Tools.sum(histogram_total); //Total number of kmers counted long halfCount=(histCount+1)/2; double histCountU=0; //Unique kmers counted long temp1=0; double temp2=0; int median_all=-1; int median_unique=-1; for(int i=0; i=halfCount && median_all<0){median_all=i;} // histSum+=(x*(double)i); histCountU+=(x/(double)Tools.max(1, i)); } double halfCount2=(histCountU)/2; for(int i=0; i=halfCount2 && median_unique<0){ median_unique=i; break; } } if(median_all<0){median_all=0;} double avg_all=sumsquare/(double)histCount; double avg_unique=histCount/histCountU; double stdev_unique=Tools.standardDeviationHistogramKmer(histogram_total); double stdev_all=Tools.standardDeviationHistogram(histogram_total); outstream.println("Total kmers counted: \t"+(sumRaw1+sumRaw2)); double uniqueC=((sum1+sum2)*100.0/(sumRaw1+sumRaw2)); double uniqueE=((estUnique)*100.0/(sumRaw1+sumRaw2)); double uniqueM=Tools.max(uniqueC, uniqueE); outstream.println("Total unique kmer count: \t"+(sum1+sum2)); if(CANONICAL){outstream.println("Includes forward kmers only.");} outstream.println("The unique kmer estimate can be more accurate than the unique count, if the tables are very full."); outstream.println("The most accurate value is the greater of the two."); outstream.println(); outstream.println("Percent unique: \t"+(uniqueM<10 ? " " : "")+String.format("%.2f%%", uniqueM)); outstream.println("Depth average: \t"+String.format("%.2f\t(unique kmers)", avg_unique)); outstream.println("Depth median: \t"+String.format("%d\t(unique kmers)", median_unique)); outstream.println("Depth standard deviation: \t"+String.format("%.2f\t(unique kmers)", stdev_unique)); outstream.println("\nDepth average: \t"+String.format("%.2f\t(all kmers)", avg_all)); outstream.println("Depth median: \t"+String.format("%d\t(all kmers)", median_all)); outstream.println("Depth standard deviation: \t"+String.format("%.2f\t(all kmers)", stdev_all)); } return totalBases; } /** * Locates and fixes spikes in a coverage profile (potentially) caused by false positives in a bloom filter. * Theory: If a high-count kmer is adjacent on both sides to low-count kmers, it may be a false positive. * It could either be reduced to the max of the two flanking points or examined in more detail. * @param array An array of kmer counts for adjacent kmers in a read. */ private static void fixSpikes(int[] array){ for(int i=1; i1 && b>a && b>c){ //peak if((b>=2*a || b>a+2) && (b>=2*c || b>c+2)){ //spike array[i]=(int)Tools.max(a, c); } } } } private static void fixSpikes(int[] array, long[] kmers, KCountArray kca, int k){ if(array.length<3){return;} if(array[1]-array[0]>1){ array[0]=kca.readPrecise(kmers[0], k, CANONICAL); } if(array[array.length-1]-array[array.length-2]>1){ array[array.length-1]=kca.readPrecise(kmers[array.length-1], k, CANONICAL); } for(int i=1; i1){ long a=Tools.max(1, array[i-1]); long c=Tools.max(1, array[i+1]); long key=kmers[i]; if(b>a && b>c){ //peak if(b<6 || b>a+1 || b>c+1){ array[i]=kca.readPreciseMin(key, k, CANONICAL); } // if((b>=2*a || b>a+2) && (b>=2*c || b>c+2)){ // //spike // int b1=(int)((a+c)/2); // int b2=kca.readLeft(key, k, CANONICAL); // int b3=kca.readRight(key, k, CANONICAL); // array[i]=Tools.min(b, b1, b2, b3); // } // else // { //// array[i]=kca.readPreciseMin(key, k, CANONICAL); // } } // else // if(Tools.max(ada, adc)>=Tools.max(2, Tools.min((int)a, b, (int)c)/4)){ // array[i]=kca.readPrecise(key, k, CANONICAL); // } // else // if(b>a+1 || b>c+1){ // //steep // array[i]=kca.readPrecise(key, k, CANONICAL); // } } } } private static void analyzeSpikes(int[] array, int width){ if(array.length<3){return;} int peakcount=0, valleycount=0, spikecount=0, flatcount=0, slopecount=0; for(int i=1; ia && b>c){ peakcount++; if((b>=2*a || b>a+2) && (b>=2*c || b>c+2)){ spikecount++; } }else if(b0){peaks.addAndGet(peakcount);} if(valleycount>0){valleys.addAndGet(valleycount);} if(spikecount>0){spikes.addAndGet(spikecount);} if(flatcount>0){flats.addAndGet(flatcount);} if(slopecount>0){slopes.addAndGet(slopecount);} } /** * @param r * @param kca * @return */ public static int[] generateCoverage(Read r, KCountArray kca, int k, int[] out, long[] kmers) { if(k>31){return generateCoverageLong(r, kca, k, out);} if(kca.gap>0){throw new RuntimeException("Gapped reads: TODO");} if(r==null || r.bases==null || r.bases.length=k){ // int count=kca.readPrecise(kmer, k, CANONICAL); int count=kca.read(kmer, k, CANONICAL); out[i-k+1]=count; if(kmers!=null){kmers[i-k+1]=kmer;} } } } if(FIX_SPIKES){fixSpikes(out, kmers, kca, k);} // fixSpikes(out, 1); analyzeSpikes(out, 1); return out; } /** * @param r * @param kca * @return */ public static int[] generateCoverageLong(Read r, KCountArray kca, int k, int[] out) { assert(k>31); if(kca.gap>0){throw new RuntimeException();} if(r==null || r.bases==null || r.bases.lengthk){ long x2=AminoAcid.baseToNumber[bases[i-k]]; kmer=kmer^(x2<=k){ int count=kca.read(kmer); out[i-k+1]=count; } } } fixSpikes(out); analyzeSpikes(out, 1); return out; } private static class ProcessThread extends Thread{ ProcessThread(ConcurrentReadStreamInterface cris_, KCountArray kca_, int k_, RTextOutputStream3 rosk_){ cris=cris_; kca=kca_; k=k_; rosk=rosk_; } public void run(){ countInThread(); } void countInThread() { ListNum ln=cris.nextList(); ArrayList reads=(ln!=null ? ln.list : null); final ArrayList keep=new ArrayList(Shared.READ_BUFFER_LENGTH); int[] cov1=null; long[] kmers1=null; while(reads!=null && reads.size()>0){ for(int rnum=0; rnum=k){ if(verbose){outstream.println();} if(FIX_SPIKES && k<32){ final int arraylen=r.bases.length-k+1; if(kmers1==null || kmers1.length!=arraylen){kmers1=new long[arraylen];} kmers=kmers1; } cov=getSortedCoverageAndIncrementHistogram(r, cov1, kmers1); if(cov!=null){; int i=cov.length-1; while(i>=0 && cov[i]=MIN_KMERS_OVER_MIN_DEPTH){depth=cov[(int)(i*(1-DEPTH_PERCENTILE))];} cov1=cov; min=cov[cov.length-1]; max=cov[(int)(cov.length*0.05f)]; } } } totalReads+=readcount; totalBases+=basecount; if(max>TARGET_DEPTH && max>2*min){ readsKept+=readcount; basesKept+=basecount; StringBuilder sb=new StringBuilder(); sb.append(cov[0]); for(int i=1; i=k) : r; cov=generateCoverage(r, kca, k, cov, kmers); if(cov!=null){ Arrays.sort(cov); Tools.reverseInPlace(cov); incrementHistogramSorted(cov); } return cov; } private final void incrementHistogramSorted(int[] cov){ if(hist==null || cov==null || cov.length==0){return;} // outstream.println(Arrays.toString(cov)); int last=cov[0]; long sum=0; // long sum2=0; for(int x : cov){ // outstream.println("Processing "+x); if(x<0){break;} int y=Tools.min(x, HIST_LEN-1); if(y==last){sum++;} else if(sum>0){ // outstream.println("Incrementing "+last+" by "+sum); // sum2+=sum; if(last0){ // outstream.println("Incrementing "+last+" by "+sum); // sum2+=sum; if(last