#!/usr/bin/env perl
# PROJECT: TaskManager
# AUTHOR:  Lukasz Szajkowski
#
# Copyright (c) 2008 Illumina
# This software is covered by the "Illumina Genome Analyzer Software
# License Agreement" and the "Illumina Source Code License Agreement",
# and certain third party copyright/licenses, and any user of this
# source file is bound by the terms therein (see accompanying files
# Illumina_Genome_Analyzer_Software_License_Agreement.pdf and
# Illumina_Source_Code_License_Agreement.pdf and third party
# copyright/license notices).
use warnings FATAL => 'all';
use strict;
use POSIX qw(strftime);
use IO::File;
use Sys::Hostname;
use Socket;
use IO::Socket;
use IO::Select;
use Getopt::Long;
use Sys::Hostname;


use lib '/home/psgendb/local/pkg/CASAVA_v1.8.2-build/lib/CASAVA-1.8.2/perl';
use Casava::Common::Log;
use Casava::Common::IOLib
  qw(executeCmd testIfSupports);
use Casava::TaskManager qw(%taskFields);
use Casava::TaskManager::Sge qw(submitJobs2SGE submit2SGE);
my $sgeQ     = undef;    
my $ports    = "8001-8100";
my $hostname = hostname();
my $sgeQsubFlags = "-V -S /usr/bin/perl";
my $sgePriority       = 0;
my $timeCmdSprintf = '/usr/bin/time -v -o %2$s %1$s';
my $timeCmdSprintf2 = '(time %1$s 2>&3 ; ) 3>&2 2>%2$s';
my $clientDelay = 60;
my $clientRetries = 3;
my $shell = "bash -o pipefail";

my $usage    = '' . << "USAGE_END";
taskServer.pl [options]\n"
    -t, --tasksFile=FILE       - workflow FILE name or full path
    --ports=PORT|START-END     - port or range - task server will search for an empty port to listen on (default $ports).
    --host=HOSTNAME|IP         - HOSTNAME or IP of network interface to listen on (default $hostname).
    --clientPort=PORT          - force agents to connect to this PORT (default is the one taskServer is listening on).
    --clientHost=HOSTNAME|IP   - force agents to connect to this HOSTNAME (default same as --host).
    --clientDelay=SECONDS      - Maximum number of seconds the agents will sleep before attempting to contact the server (default $clientDelay).
    --clientRetries=INT        - Maximum number retries the agents will perform before failing to contact the server (default $clientRetries).
    --mode={local|sge}         - 'local' (default) submits agents as child processes. 'sge' uses qsub to start agents.
    --sgeQueue=QUEUE           - SGE QUEUE name (no effect unless --mode=sge).
    --sgePriority=VALUE        - SGE task priority see qsub manual for details on VALUE (default $sgePriority).
    --sgeQsubFlags=FLAGS       - extra switches to pass to qsub. default is '$sgeQsubFlags'
    -j, --jobs=LIMIT           - limit number of parallel jobs. -1 means no limit. With --mode=sge default is -1. Otherwise - a required parameter.
    --time                     - record timing statistics for each task
    --timeCmdSprintf           - command line sprintf-style format string to use for timing. Default is '$timeCmdSprintf'
                                 Alternatively '$timeCmdSprintf2' can be used
    --noColorLog               - Stop coloring the log output
  
    --help                     - display this help.
EXAMPLES
Run tasks.txt using 7 local processes (total number of processors minus one recommended)
    taskServer.pl -t tasks.txt -j 7
Run tasks.txt on Sun Grid Engine cluster. Don't limit the number of scheduled jobs (recommended)
    taskServer.pl -t tasks.txt --sgeQueue all.q
Run tasks.txt on Sun Grid Engine cluster. Keep number of jobs no greater than 200
    taskServer.pl -t tasks.txt -j 200 --sgeQueue all.q
USAGE_END

my $help               = 0;
my $tasksFile          = "";
my $managedAgents       = 0;

my $clientHostname     = undef;
my $clientPort         = undef;
my $jobsLimit = -1;
my $mode = undef;
my $time = 0;
my @allowedModes = ('local', 'sge');
my $verbose = 2;
my $noColorLog = 0;
my $logDateFormat = "%F %T";
my $taskAgentShell = "bash -o pipefail";

my $result = GetOptions(
    "tasksFile|t=s"         => \$tasksFile,
    "ports=s"               => \$ports,
    "host|h=s"              => \$hostname,
    "clientPort=i"          => \$clientPort,
    "clientHost=s"          => \$clientHostname,
    'clientDelay=i'         => \$clientDelay,
    'clientRetries=i'       => \$clientRetries,
    "mode=s"                => \$mode,
    "sgeQueue=s"            => \$sgeQ,
    "sgePriority=i"         => \$sgePriority,
    "sgeQsubFlags=s"        => \$sgeQsubFlags,
    "jobsLimit=i"           => \$jobsLimit,
    'time'                  => \$time,
    'timeCmdSprintf=s'       => \$timeCmdSprintf,
    'verbose|v=i'           => \$verbose,
    "logDateFormat=s"       => \$logDateFormat,
    'noColorLog'            => \$noColorLog,
    "taskAgentShell=s"      => \$taskAgentShell,
    "help"                  => \$help
);

initLog( undef, $verbose, 0, !$noColorLog );

if ( $result == 0 || $tasksFile eq "" ) {
    die "\n\n$usage";
}

errorExit "Invalid mode $mode\n\n$usage" unless grep (/^$mode$/, @allowedModes);
errorExit "-j must be specified for --mode=local\n\n$usage" if ('local' eq $mode and 0 >= $jobsLimit);

sub readWorkflowFile($\%);
sub getNextTask(\%$\@);
sub setStatus(\%$$$$);
sub countTasksToSchedule(\%);

$clientHostname = $hostname unless defined $clientHostname;

my %worflowMap   = ();

readWorkflowFile( $tasksFile, %worflowMap );

# Create the receiving socket

my ($startPort, $dummy, $endPort) = $ports =~ /^(\d+)(\-(\d+))?$/;

$endPort = $startPort unless defined $endPort;

my $soket = undef;
my $port = undef;
for ($port = $startPort; $port <= $endPort; ++$port)
{
    $soket = new IO::Socket::INET(
        LocalPort => $port,
        LocalHost => $hostname,
        Proto     => 'tcp',
        Listen    => 16,
        Reuse     => 1,
        ReuseAddr => 1,
    );
    last if (defined $soket);
}
errorExit "ERROR: Could not create socket: $!" unless $soket;

$clientPort = $port unless ( defined $clientPort );

# Create an IO::Select handler
my $select = new IO::Select($soket);

logInfo( "Using port $port to set up server on host $hostname...", 0 );
logInfo( "Server ready.  Waiting for connections . . . ",          0 );
my $isQsub = 0;

if ('local' eq $mode)
{
    my $agentCmd = File::Spec->catfile('/home/psgendb/local/pkg/CASAVA_v1.8.2-build/libexec/CASAVA-1.8.2', 'TaskManager', 'taskAgent.pl');
    my $kidpid;
    for ( my $i = 0 ; $i < $jobsLimit ; $i++ ) {
        errorExit "can't fork: $!" unless defined($kidpid = fork());
        
        if ( !$kidpid ) {
            sleep(1);
            my $cmd = $agentCmd . " -h $clientHostname -p $clientPort --delay $clientDelay --retries $clientRetries --verbose $verbose --logDateFormat '$logDateFormat' --shell '$taskAgentShell'";
            print $cmd . "\n";
            exec($cmd);
            exit(0);
        }
    }
    $managedAgents = $jobsLimit;
    
}
else
{
    $isQsub = testIfSupports('qstat');
    if ( $isQsub == 1 ) {
        logInfo( "Support for qsub ON", 0 );
    }
    else {
        logWarning( "Support for qsub OFF" );
        errorExit "taskServer.pl must be run on SGE submit host if --mode=sge is specified";
    }
}

my $timeFolder = "$tasksFile.time";
if ($time)
{
    mkdir $timeFolder;
}

my %OUT_BUF = ();
my @data    = ();
my $new;
my $newState        = 1;
my @taskDescStorage = ();
my $timeToExit = 0;
my @read_ready = ();
my $failedAgents=0;
do
{
    my $newTasksLimit = ('local' eq $mode || $jobsLimit <= 0) ? -1 : $jobsLimit - $managedAgents;
    my $found = getNextTask( %worflowMap, $newTasksLimit, @taskDescStorage );
    logInfo( "found $found new task(s) ready to run", 1 ) if ($found);
    if ( $isQsub == 1 && !$failedAgents) {
        submitJobs2SGE(
            $sgeQ,
            $sgePriority, "-p $clientPort -h $clientHostname --delay $clientDelay --retries $clientRetries --verbose $verbose --logDateFormat '$logDateFormat' --shell '$taskAgentShell'",
            $tasksFile,  $found, $sgeQsubFlags
        );
        $managedAgents += $found;
    }

    foreach my $fh (@read_ready) {

        # if it is the main socket then we have an incoming connection and
        # we should accept() it and then add the new socket to the $read_set
        # Create a new socket
        if ( $fh == $soket ) {
            $new = $soket->accept;
            $select->add($new);
            push( @data, fileno($new) . " has joined." );
            logInfo( "Connection from " . $new->peerhost . ".", 2 );
            next;
        }

 # otherwise it is an ordinary socket and we should read and process the request
        my $buf = $fh->getline;

        if ( !defined $buf || $buf eq '' ) {
            $select->remove($fh);
            $fh->close;
            next;
        }
        chomp $buf;
        if ($buf) {    # we get normal input ... process $buf ...
            logInfo( "Line = [$buf]", 2 );
            chomp($buf);
            my @clientRequest = split '\t',
              $buf;    # output stuff from client here and...
            if ( scalar(@clientRequest) == 0 ) {

                # print "Badly formed line = [$buf]\n";
                next;
            }
            my $type       = $clientRequest[0];
            my $taskId     = -1;
            my $queueId    = -1;
            my $clientType = '';
            if ( $type eq 'msg' ) {
                logInfo( "Msg:[$buf]", 2 );
            }
            elsif ( $type eq 'status' ) {
                my $systemCmd1 = $clientRequest[1];
                if (   $systemCmd1 eq 'setStatus'
                    || $systemCmd1 eq 'setStatusSge' )
                {
                    if ( $clientRequest[3] =~ /(\S+)\[(\S+)\]/ ) {
                        $queueId = $1;
                        $taskId  = $2;
                        my $status = $clientRequest[4];
                        setStatus( %worflowMap, $queueId, $taskId, $status, undef );
                        $newState = 1;
                        if ( $status =~ /ERROR/ ) {
                            ++$failedAgents;
                            my $cmd = @{$worflowMap{$queueId}->{queueData}->{$taskId}}[$taskFields{cmd}];
                            my $host = @{$worflowMap{$queueId}->{queueData}->{$taskId}}[$taskFields{host}];
                            $host = 'unknown' unless $host;
                            logWarning( "An agent has failed on host '$host'\n"
                                     ."Command: $cmd\n"
                                     ."Process exit code information: $status\n"
                                     ."Waiting for other agents to terminate...");
#                            exit(0);
                        }
                    }
                    if ( $systemCmd1 eq 'setStatusSge' ) {
                        $clientType = 'sge';
                        $managedAgents--;
                        $OUT_BUF{$fh} = "exit\t0";
                        if (!$managedAgents && ($failedAgents || !countTasksToSchedule(%worflowMap)))
                        {
                            logInfo( "All agents are done. Failed agents: $failedAgents. Exiting...", 0);
                            $timeToExit = 1;
                        }
                    }
                }
                if ( $systemCmd1 eq 'sge_no_job' ) {
                    $clientType = 'sge';
                    $managedAgents--;
                    $OUT_BUF{$fh} = "exit\t0";
                    if (!$managedAgents && ($failedAgents || !countTasksToSchedule(%worflowMap)))
                    {
                        logInfo( "All agents are done. Failed agents: $failedAgents. Exiting...", 0);
                        $timeToExit = 1;
                    }
                }
            }
            if ( ( $type eq 'system' || $type eq 'status' )
                && $clientType ne 'sge' )
            {
                my $cmd             = $clientRequest[1];
                my $msg             = "-";
                my $tasksToSchedule = $failedAgents ? 0 : countTasksToSchedule(%worflowMap);
                if ( scalar(@taskDescStorage) > 0 ) {
                    if (!$failedAgents) #stop kicking off new stuff if an agent has failed
                    {
                        my @taskDescOut = @{ shift @taskDescStorage };
                        my ($jobPrefix, $jobCmd) = $taskDescOut[$taskFields{cmd}] =~ /^(\`?)(.*)/;
                        if ($time) {
                            my $taskTimeFileName = "$taskDescOut[$taskFields{queueId}]_$taskDescOut[$taskFields{taskId}]";
                            $taskTimeFileName =~ s/\//_/g;
                            my $timeFile = File::Spec->catfile($timeFolder, $taskTimeFileName);
                            $jobCmd = sprintf $timeCmdSprintf, $jobCmd, $timeFile;
                            logInfo "TIMED: $jobCmd", 1;
                        }
                        else
                        {
                            $jobCmd = "$jobCmd";
                        }
                        $msg = "task\t" . join( "\t", @taskDescOut[$taskFields{queueId} .. ($taskFields{cmd}-1)], 
                               "$jobPrefix$jobCmd" );
    
                        setStatus( %worflowMap, $taskDescOut[$taskFields{queueId}], 
                                   $taskDescOut[$taskFields{taskId}], 'running', $clientRequest[2] );
                    }
                    else #tell the finished agent to exit once one of the agents fails
                    {
                        if(!--$managedAgents)
                        {
                            logInfo( "All agents are done. Failed agents: $failedAgents Exiting...", 0);
                            $timeToExit = 1;
                        }
                        $msg = "exit\t0";
                    }
                    
                }
                elsif ( $tasksToSchedule == 0 ) {
                    if(!--$managedAgents)
                    {
                        logInfo( "Nothing to schedule. Failed agents: $failedAgents. Exiting...", 0);
                        $timeToExit = 1;
                    }
                    $msg = "exit\t0";
                }
                else {
                    if ( $newState == 0 ) {
                        $msg = "wait\t1";
                    }
                    else {
                        $msg = "wait\t0";
                    }
                    $newState = 0;
                }
                $OUT_BUF{$fh} = $msg;
            }
        }

        # Write to the clients that are available
        foreach my $fh ( my @write_ready = $select->can_write(0) ) {
            if ( defined $OUT_BUF{$fh} && $OUT_BUF{$fh} ne "" ) {

                #if ($OUT_BUF{$fh})
                print $fh $OUT_BUF{$fh} . "\n";
                $OUT_BUF{$fh} = "";
            }
        }
    }
        
    if ($timeToExit)
    {
        sleep(2);
        exit(0);
    }
}
while (@read_ready = $select->can_read );

=pod

=head1 The procedure reads tasks definition from the workflow file.

=over 4

=item readWorkflowFile($fileName, $tasksRef)

The procedure stores all tasks in the workflow file.

Parameters:
    $fileName     - workflow filename
    $tasksRef     - Reference to HASH MAP with all tasks

Returns:
    nothing
=back

=cut

sub readWorkflowFile($\%) {
    my ( $fileName, $tasksRef ) = @_;
    open( FILE, "<$fileName" ) || errorExit "ERROR: Couldn't open file handle for $fileName $!";

    my $queueId;
    while (<FILE>) {
        my $line = $_;
        chomp $line;

        #queue
        my @taskDesc = split '\t', $line;
        if ( 'finished' ne $taskDesc[ $taskFields{status} ] )
        {
            $taskDesc[ $taskFields{status} ] = 'waiting';
        }
        $queueId = $taskDesc[ $taskFields{queueId} ];
        my $taskId = $taskDesc[ $taskFields{taskId} ];
        ${$tasksRef}{$queueId}->{queueData}->{$taskId} = \@taskDesc;
    }    # while
    foreach $queueId ( sort keys %{$tasksRef} ) {
        my @taskIds = sort keys %{ ${$tasksRef}{$queueId}->{queueData} };
        ${$tasksRef}{$queueId}->{taskIds} = \@taskIds;
        if ( scalar(@taskIds) > 0 ) {

            ${$tasksRef}{$queueId}->{currentTaskNumber} = 0;
            my $currentTaskNumber = 0;
            my @taskDesc          =
              @{ ${$tasksRef}{$queueId}->{queueData}
                  ->{ $taskIds[$currentTaskNumber] } };
            while ($taskDesc[ $taskFields{status} ] eq 'finished'
                && $currentTaskNumber < scalar(@taskIds) - 1 )
            {
                $currentTaskNumber++;
                @taskDesc =
                  @{ ${$tasksRef}{$queueId}->{queueData}
                      ->{ $taskIds[$currentTaskNumber] } };
            }
            ${$tasksRef}{$queueId}->{currentTaskNumber} = $currentTaskNumber;
        }
        else {
            ${$tasksRef}{$queueId}->{currentTaskNumber} = -1;
        }
    }
    close FILE;
}

=pod

=head1 The procedure stores all tasks in the workflow file.

=over 4

=item writeWorkflowFile($fileName, $tasksRef)

The procedure stores all tasks in the workflow file.

Parameters:
    $fileName     - workflow filename
    $tasksRef     - Reference to HASH MAP with all tasks

Returns:
    nothing
=back

=cut

sub writeWorkflowFile($\%) {
    my ( $fileName, $tasksRef ) = @_;
    open( FILE, ">$fileName" ) || errorExit "readWorkflowFile: Couldn't open file handle for $fileName $!";
    
    my $queueId;
    foreach my $queueId ( sort keys %{$tasksRef} ) {
        my @taskIds           = @{ ${$tasksRef}{$queueId}->{taskIds} };
        my $currentTaskNumber = ${$tasksRef}{$queueId}->{currentTaskNumber};
        foreach my $taskId (@taskIds) {
            my @task = @{ ${$tasksRef}{$queueId}->{queueData}->{$taskId} };
            print FILE "" . join( "\t", @task ) . "\n";
        }
    }
    close FILE;
}

=pod

=head1 The procedure looks for tasks ready to run.

=over 4

=item getNextTask($tasksRef, $maxTasksToGet, $nextTaskRef)

The procedure looks for tasks ready to run.

Parameters:
    $tasksRef       - Reference to HASH MAP with all tasks
    $maxTasksToGet  - limit the amount of new tasks to this number. if -1, get 
                      all the tasks that are ready to run
    $nextTaskRef    - Reference to ARRAY where list of task ready to schedule 
                      will be stored

Returns:
    nothing
=back

=cut

sub getNextTask(\%$\@) {
    my ( $tasksRef, $maxTasksToGet, $nextTaskRef ) = @_;
    my $NA    = 'N/A';
    my $found = 0;

    # print "getNextTask start \n";
    foreach my $queueId ( keys %{$tasksRef} ) {
        my @taskIds           = @{ ${$tasksRef}{$queueId}->{taskIds} };
        my $currentTaskNumber = ${$tasksRef}{$queueId}->{currentTaskNumber};
        if ( $currentTaskNumber >= scalar(@taskIds) ) {
            next;
        }
        my $taskId = $taskIds[$currentTaskNumber];

        #print "$queueId";
        #print "\[$taskId\]\n";
        if ( ${$tasksRef}{$queueId}->{queueData}->{$taskId}
            ->[ $taskFields{status} ] ne 'waiting' )
        {
            next;
        }
        my $dontWait = 0;
        my $taskType =
          ${$tasksRef}{$queueId}->{queueData}->{$taskId}
          ->[ $taskFields{taskType} ];
        if ( $taskType eq 'task' ) {
            my $waitFor =
              ${$tasksRef}{$queueId}->{queueData}->{$taskId}
              ->[ $taskFields{waitFor} ];
            if ( $waitFor =~ /$NA/ ) {
                $dontWait = 1;
            }
            else {
                my $waitForCurrentTastNumber =
                  ${$tasksRef}{$waitFor}->{currentTaskNumber};

                errorExit "ERROR: cannot find taskIds for: '$waitFor'\n" unless(defined ${$tasksRef}{$waitFor}->{taskIds});
                my @waitForTaskIds = @{ ${$tasksRef}{$waitFor}->{taskIds} };


                if ( $waitForCurrentTastNumber < scalar(@waitForTaskIds) ) {
                    my @tastDeskTmp1 =
                      @{ ${$tasksRef}{$waitFor}->{queueData}
                          ->{ $waitForTaskIds[$waitForCurrentTastNumber] } };
                    my $statusTmp = $tastDeskTmp1[ $taskFields{status} ];
                    if ( $statusTmp eq 'finished' ) {
                        $dontWait = 1;
                    }
                    else {

                        #print "wait because statusTmp = [$statusTmp]\n";
                    }
                }
                else {
                    $dontWait = 1;
                }
            }
        }
        elsif ( $taskType eq 'synch' ) {
            my @tastDeskTmp2 =
              @{ ${$tasksRef}{$queueId}->{queueData}->{$currentTaskNumber} };
            my $cmd = $tastDeskTmp2[ $taskFields{cmd} ];
            if ( $cmd =~ /TASKS\s+\((\S+)\)/ ) {
                my @tasksList = split ',', $1;
                my $foundWaiting = 0;
                foreach my $item (@tasksList) {
                    if ( $item =~ /(\S+)\[(\S+)\]/ ) {
                        my $queueIdTmp = $1;
                        my $taskIdTmp  = $2;
                        my $status     =
                          ${$tasksRef}{$queueIdTmp}->{queueData}->{$taskIdTmp}
                          ->[ $taskFields{status} ];
                        errorExit "ERROR: undefined status for task: $item"  unless(defined $status);
                        if ( $status ne 'finished' ) {
                            $foundWaiting = 1;
                        }
                    }
                }
                if ( $foundWaiting == 0 ) {

                    #print "wait because $foundWaiting = $foundWaiting\n";
                    $dontWait = 1;
                }
            }
        }
        if ( $dontWait == 0 ) {
            next;
        }
        if (-1 != $maxTasksToGet && !$maxTasksToGet--)
        {
            last;
        }
        my $tastDeskRef = $tasksRef->{$queueId}->{queueData}->{$taskId};
        push @{$nextTaskRef}, \@{$tastDeskRef};
        setStatus( %{$tasksRef}, $queueId, $taskId, 'pending', undef );
        ++$found;
    }
    return $found;
}

=pod

=head1 The procedure sets the status of the task.

=over 4

=item setStatus($tasksRef, $queueId, $taskId, $status)

The procedure sets the status of the task..

Parameters:
    $tasksRef     - Reference to HASH MAP with all tasks
    $queueId      - queue id
    $taskId       - task id
    $status       - value of the status to be set

Returns:
    nothing
=back

=cut

sub setStatus(\%$$$$) {
    my ( $tasksRef, $queueId, $taskId, $status, $hostname ) = @_;

    #print "setStatus([$queueId\[$taskId\] = $status])\n";
    my $tastDeskRef =
      $tasksRef->{$queueId}->{queueData}->{$taskId};
    $tastDeskRef->[ $taskFields{status} ] = $status;
    
    if ( 'pending' eq $status )
    {
        $tastDeskRef->[ $taskFields{submitTime} ] = strftime "%Y/%m/%d %H:%M:%S", gmtime;
    }
    elsif ( 'running' eq $status )
    {
        $tastDeskRef->[ $taskFields{startTime} ] = strftime "%Y/%m/%d %H:%M:%S", gmtime;
        $tastDeskRef->[ $taskFields{host} ] = $hostname;
    }
    elsif ( $status eq 'finished' )
    {
        $tastDeskRef->[ $taskFields{finishTime} ] = strftime "%Y/%m/%d %H:%M:%S", gmtime;

        my $hostname = $tastDeskRef->[ $taskFields{host} ];
        if ( $tastDeskRef->[ $taskFields{taskType} ] eq 'synch' )
        {
            logInfo("Checkpoint completed on $hostname: $queueId", 1);
        }
        else 
        {
            logInfo("Job completed on $hostname: $tastDeskRef->[ $taskFields{cmd} ]", 1);
        }
        ${$tasksRef}{$queueId}->{currentTaskNumber}++;
    }

    writeWorkflowFile( $tasksFile . '.current', %{$tasksRef} );
}

=pod

=head1 The procedure counts number of task remaining to be scheduled.

=over 4

=item countTasksToSchedule($tasksRef)

The procedure counts number of task remaining to be scheduled.

Parameters:
    $tasksRef     - Reference to HASH MAP with all tasks

Returns:
    nothing
=back

=cut

sub countTasksToSchedule(\%) {
    my ($tasksRef) = @_;
    my $tasksToSchedule = 0;
    foreach my $queueId ( keys %{$tasksRef} ) {
        my @taskIds           = @{ ${$tasksRef}{$queueId}->{taskIds} };
        my $currentTaskNumber = ${$tasksRef}{$queueId}->{currentTaskNumber};
        if ( $currentTaskNumber >= scalar(@taskIds) ) {
            next;
        }
        else {
            foreach my $taskId (@taskIds)
            {
                if ( 'waiting' ne ${$tasksRef}{$queueId}->{queueData}->{$taskId}->[ $taskFields{status} ] )
                {
                    next;
                }
                ++$tasksToSchedule;
            }
        }
    }
    return $tasksToSchedule;
}

