package kmer; import java.util.Arrays; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import align2.Tools; import dna.Timer; /** * * Uses prime numbers for array lengths. * Supports a prefilter that is checked before looking at the main filter. * * @author Brian Bushnell * @date Aug 17, 2012 * */ public class KCountArray8MT extends KCountArray { public static void main(String[] args){ long cells=Long.parseLong(args[0]); int bits=Integer.parseInt(args[1]); int gap=Integer.parseInt(args[2]); int hashes=Integer.parseInt(args[3]); verbose=false; KCountArray8MT kca=new KCountArray8MT(cells, bits, gap, hashes, null); System.out.println(kca.read(0)); kca.increment(0); System.out.println(kca.read(0)); kca.increment(0); System.out.println(kca.read(0)); System.out.println(); System.out.println(kca.read(1)); kca.increment(1); System.out.println(kca.read(1)); kca.increment(1); System.out.println(kca.read(1)); System.out.println(); System.out.println(kca.read(100)); kca.increment(100); System.out.println(kca.read(100)); kca.increment(100); System.out.println(kca.read(100)); kca.increment(100); System.out.println(kca.read(100)); System.out.println(); System.out.println(kca.read(150)); kca.increment(150); System.out.println(kca.read(150)); System.out.println(); } public KCountArray8MT(long cells_, int bits_, int gap_, int hashes_, KCountArray prefilter_){ super(getPrimeCells(cells_, bits_), bits_, gap_, getDesiredArrays(cells_, bits_)); // verbose=false; // assert(false); cellsPerArray=cells/numArrays; wordsPerArray=(int)((cellsPerArray%cellsPerWord)==0 ? (cellsPerArray/cellsPerWord) : (cellsPerArray/cellsPerWord+1)); cellMod=cellsPerArray; hashes=hashes_; // System.out.println("cells="+cells+", words="+words+", wordsPerArray="+wordsPerArray+", numArrays="+numArrays+", hashes="+hashes); // assert(false); matrix=new int[numArrays][]; prefilter=prefilter_; assert(prefilter!=null); assert(hashes>0 && hashes<=hashMasks.length); } private static int getDesiredArrays(long desiredCells, int bits){ long words=Tools.max((desiredCells*bits+31)/32, minArrays); int arrays=minArrays; while(words/arrays>=Integer.MAX_VALUE){ arrays*=2; } return arrays; } private static long getPrimeCells(long desiredCells, int bits){ int arrays=getDesiredArrays(desiredCells, bits); long x=(desiredCells+arrays-1)/arrays; long x2=Primes.primeAtMost(x); return x2*arrays; } public int read(final long rawKey){ assert(finished); if(verbose){System.err.println("Reading raw key "+rawKey);} int pre=0; if(prefilter!=null){ pre=prefilter.read(rawKey); if(pre0; i++){ if(verbose){System.err.println("Reading. i="+i+", key2="+key2);} key2=Long.rotateRight(key2, hashBits); key2=hash(key2, i); if(verbose){System.err.println("Rot/hash. i="+i+", key2="+key2);} min=min(min, readHashed(key2)); } return min; } private int readHashed(long key){ if(verbose){System.err.print("Reading hashed key "+key);} // System.out.println("key="+key); int arrayNum=(int)(key&arrayMask); key=(key>>>arrayBits)%(cellMod); // key=(key>>>(arrayBits+1))%(cellMod); // System.out.println("array="+arrayNum); // System.out.println("key2="+key); int[] array=matrix[arrayNum]; int index=(int)(key>>>indexShift); // assert(false) : indexShift; // System.out.println("index="+index); int word=array[index]; // System.out.println("word="+Integer.toHexString(word)); assert(word>>>(cellBits*key) == word>>>(cellBits*(key&cellMask))); // int cellShift=(int)(cellBits*(key&cellMask)); int cellShift=(int)(cellBits*key); if(verbose){System.err.println(", array="+arrayNum+", index="+index+", cellShift="+(cellShift%32)+", value="+((int)((word>>>cellShift)&valueMask)));} // System.out.println("cellShift="+cellShift); return (int)((word>>>cellShift)&valueMask); } public void write(final long key, int value){ throw new RuntimeException("Not allowed for this class."); } @Override /** This should increase speed by doing the first hash outside the critical section, but it does not seem to help. */ public void increment(long[] keys){ if(prefilter==null){ for(int i=0; i>>=cellBits; comma=", "; } } } sb.append("]"); return sb.toString(); } public double usedFraction(){return cellsUsed/(double)cells;} public double usedFraction(int mindepth){return cellsUsed(mindepth)/(double)cells;} public long cellsUsed(int mindepth){ long count=0; // System.out.println("A: "+cellBits+", "+Integer.toBinaryString(valueMask)); for(int[] array : matrix){ // System.out.println("B"); if(array!=null){ // System.out.println("C"); for(int word : array){ // System.out.println("D: "+Integer.toBinaryString(word)); while(word>0){ int x=word&valueMask; // System.out.println("E: "+x+", "+mindepth); if(x>=mindepth){count++;} word>>>=cellBits; } } } } return count; } final long hash(long key, int row){ int cell=(int)((Long.MAX_VALUE&key)%(hashArrayLength-1)); // int cell=(int)(hashCellMask&(key)); if(row==0){//Doublehash only first time key=key^hashMasks[(row+4)%hashMasks.length][cell]; cell=(int)(hashCellMask&(key>>5)); // cell=(int)(hashCellMask&(key>>hashBits)); // cell=(int)((Long.MAX_VALUE&key)%(hashArrayLength-1)); } return key^hashMasks[row][cell]; } /** * @param i * @param j * @return */ private static long[][] makeMasks(int rows, int cols) { long seed; synchronized(KCountArray8MT.class){ seed=counter; counter++; } Timer t=new Timer(); t.start(); long[][] r=new long[rows][cols]; Random randy=new Random(seed); for(int i=0; i200000000L){System.out.println("Mask-creation time: "+t);} return r; } /** * @param cols * @param randy * @return */ private static void fillMasks(long[] r, Random randy) { // for(int i=0; i16){ x&=(~(1L<16){ x&=(~(1L<<(randy.nextInt(32)+32))); } // System.out.print("."); // y=(((int)(x&mask))^i); y=(((int)(x&mask))); z=(int)((x>>hashBits)&mask); if(count1[y]>0 || count2[z]>0){ x=0; } } // System.out.println(Long.toBinaryString(x)); r[i]=(x&Long.MAX_VALUE); count1[y]++; count2[z]++; } } public void initialize(){ for(int i=0; i0){ writers[i].add(array); } } //Add poison for(WriteThread wt : writers){ wt.add(poison); } //Wait for termination for(WriteThread wt : writers){ // System.out.println("wt"+wt.num+" is alive: "+wt.isAlive()); while(wt.isAlive()){ // System.out.println("wt"+wt.num+" is alive: "+wt.isAlive()); try { wt.join(10000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } if(wt.isAlive()){System.err.println(wt.getClass().getCanonicalName()+" is taking a long time to die.");} } cellsUsed+=wt.cellsUsedPersonal; // System.out.println("cellsUsed="+cellsUsed); } assert(!finished); finished=true; } } private class WriteThread extends Thread{ public WriteThread(int tnum){ num=tnum; } @Override public void run(){ assert(matrix[num]==null); array=new int[wordsPerArray]; //Makes NUMA systems use local memory. matrix[num]=array; long[] keys=null; while(!shutdown){ if(verbose){System.err.println(" - Reading keys for wt"+num+".");} while(keys==null){ try { keys=writeQueue.take(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if(keys==poison){ // assert(false); shutdown=true; }else{ for(long key : keys){ incrementHashedLocal(key); } } // System.out.println(" -- Read keys for wt"+num+". poison="+(keys==poison)+", len="+keys.length); if(verbose){System.err.println(" -- Read keys for wt"+num+". (success)");} keys=null; if(verbose){System.err.println("shutdown="+shutdown);} } // System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I died: "+shutdown+", "+(keys==null)+"."); // assert(false) : ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I died: "+shutdown+", "+(keys==null)+"."; array=null; } private void add(long[] keys){ // assert(isAlive()); assert(!shutdown); if(shutdown){return;} // assert(keys!=poison); if(verbose){System.err.println(" + Adding keys to wt"+num+".");} boolean success=false; while(!success){ try { writeQueue.put(keys); success=true; } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if(verbose){System.err.println(" ++ Added keys to wt"+num+". (success)");} } private int incrementHashedLocal(long key){ assert((key&arrayMask)==num); key=(key>>>arrayBits)%(cellMod); // key=(key>>>(arrayBits+1))%(cellMod); int index=(int)(key>>>indexShift); int word=array[index]; int cellShift=(int)(cellBits*key); int value=((word>>>cellShift)&valueMask); if(value==0){cellsUsedPersonal++;} value=min(value+1, maxValue); word=(value< writeQueue=new ArrayBlockingQueue(16); public boolean shutdown=false; } public long cellsUsed(){return cellsUsed;} private boolean finished=false; private long cellsUsed; private final int[][] matrix; private final WriteThread[] writers=new WriteThread[numArrays]; private final int hashes; private final int wordsPerArray; private final long cellsPerArray; private final long cellMod; private final long[][] hashMasks=makeMasks(8, hashArrayLength); private final long[][] buffers=new long[numArrays][500]; private final int[] bufferlen=new int[numArrays]; public final KCountArray prefilter; private static final int hashBits=6; private static final int hashArrayLength=1<