# Copyright 2007 by Tiago Antao . All rights reserved. # This code is part of the Biopython distribution and governed by its # license. Please see the LICENSE file that should have been included # as part of this package. """ This modules allows for asynchronous execution of Fdist and spliting of loads. FDistAsync Allows for the execution of FDist. SplitFDist splits a single Fdist execution in several, taking advantage of multi-core architectures. """ import os import _thread from time import sleep from Bio.PopGen.Async import Local from Bio.PopGen.FDist.Controller import FDistController class FDistAsync(FDistController): """Asynchronous FDist execution. """ def __init__(self, fdist_dir = "", ext = None): """Constructor. Parameters: fdist_dir - Where fdist can be found, if = "", then it should be on the path. ext - Extension of binary names (e.g. nothing on Unix, ".exe" on Windows """ FDistController.__init__(self, fdist_dir, ext) def run_job(self, parameters, input_files): """Runs FDist asynchronously. Gets typical Fdist parameters from a dictionary and makes a "normal" call. This is run, normally, inside a separate thread. """ npops = parameters['npops'] nsamples = parameters['nsamples'] fst = parameters['fst'] sample_size = parameters['sample_size'] mut = parameters.get('mut', 0) num_sims = parameters.get('num_sims', 20000) data_dir = parameters.get('data_dir', '.') fst = self.run_fdist(npops, nsamples, fst, sample_size, mut, num_sims, data_dir) output_files = {} output_files['out.dat'] = open(data_dir + os.sep + 'out.dat', 'r') return fst, output_files class SplitFDist: """Splits a FDist run. The idea is to split a certain number of simulations in smaller numbers (e.g. 30.000 sims split in 30 packets of 1.000). This allows to run simulations in parallel, thus taking advantage of multi-core CPUs. Each SplitFDist object can only be used to run a single FDist simulation. """ def __init__(self, report_fun = None, num_thr = 2, split_size = 1000, fdist_dir = '', ext = None): """Constructor. Parameters: report_fun - Function that is called when a single packet is run, it should have a single parameter: Fst. num_thr - Number of desired threads, typically the number of cores. split_size - Size that a full simulation will be split in. ext - Binary extension name (e.g. nothing on Unix, '.exe' on Windows). """ self.async = Local.Local(num_thr) self.async.hooks['fdist'] = FDistAsync(fdist_dir, ext) self.report_fun = report_fun self.split_size = split_size #There might be races when reporting... def monitor(self): """Monitors and reports (using report_fun) execution. Every time a partial simulation ends, calls report_fun. IMPORTANT: monitor calls can be concurrent with other events, ie, a tasks might end while report_fun is being called. This means that report_fun should be consider that other events might be happening while it is running (it can call acquire/release if necessary). """ while(True): sleep(1) self.async.access_ds.acquire() keys = list(self.async.done.keys())[:] self.async.access_ds.release() for done in keys: self.async.access_ds.acquire() fst, files = self.async.done[done] del self.async.done[done] out_dat = files['out.dat'] f = open(self.data_dir + os.sep + 'out.dat','a') f.writelines(out_dat.readlines()) f.close() out_dat.close() self.async.access_ds.release() for file in os.listdir(self.parts[done]): os.remove (self.parts[done] + os.sep + file) os.rmdir(self.parts[done]) #print fst, out_dat if self.report_fun: self.report_fun(fst) self.async.access_ds.acquire() if len(self.async.waiting) == 0 and len(self.async.running) == 0 \ and len(self.async.done) == 0: break self.async.access_ds.release() #print 'R', self.async.running #print 'W', self.async.waiting #print 'R', self.async.running def acquire(self): """Allows the external acquisition of the lock. """ self.async.access_ds.acquire() def release(self): """Allows the external release of the lock. """ self.async.access_ds.release() #You can only run a fdist case at a time def run_fdist(self, npops, nsamples, fst, sample_size, mut = 0, num_sims = 20000, data_dir='.'): """Runs FDist. Parameters can be seen on FDistController.run_fdist. It will split a single execution in several parts and create separated data directories. """ num_parts = num_sims/self.split_size self.parts = {} self.data_dir = data_dir for directory in range(num_parts): full_path = data_dir + os.sep + str(directory) try: os.mkdir(full_path) except OSError: pass #Its ok, if it is already there id = self.async.run_program('fdist', { 'npops' : npops, 'nsamples' : nsamples, 'fst' : fst, 'sample_size' : sample_size, 'mut' : mut, 'num_sims' : self.split_size, 'data_dir' : full_path }, {}) self.parts[id] = full_path _thread.start_new_thread(self.monitor, ())