/* * Copyright (c) 2013 Genome Research Ltd. * Author(s): James Bonfield * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following * disclaimer in the documentation and/or other materials provided * with the distribution. * * 3. Neither the names Genome Research Ltd and Wellcome Trust Sanger * Institute nor the names of its contributors may be used to endorse * or promote products derived from this software without specific * prior written permission. * * THIS SOFTWARE IS PROVIDED BY GENOME RESEARCH LTD AND CONTRIBUTORS "AS * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GENOME RESEARCH * LTD OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #ifdef HAVE_CONFIG_H #include "io_lib_config.h" #endif #include #include #include #include #include #include #include #include "io_lib/thread_pool.h" //#define DEBUG //#define DEBUG_TIME #define IN_ORDER #ifdef DEBUG static int worker_id(t_pool *p) { int i; pthread_t s = pthread_self(); for (i = 0; i < p->tsize; i++) { if (pthread_equal(s, p->t[i].tid)) return i; } return -1; } #endif //Visual Studio does not provide gettimeofday #ifdef _MSC_VER #define WIN32_LEAN_AND_MEAN #include #include // portable: uint64_t MSVC: __int64 int gettimeofday(struct timeval * tp, struct timezone * tzp) { // Note: some broken versions only have 8 trailing zero's, the correct epoch has 9 trailing zero's static const uint64_t EPOCH = ((uint64_t) 116444736000000000ULL); SYSTEMTIME system_time; FILETIME file_time; uint64_t time; GetSystemTime( &system_time ); SystemTimeToFileTime( &system_time, &file_time ); time = ((uint64_t)file_time.dwLowDateTime ) ; time += ((uint64_t)file_time.dwHighDateTime) << 32; tp->tv_sec = (long) ((time - EPOCH) / 10000000L); tp->tv_usec = (long) (system_time.wMilliseconds * 1000); return 0; } #endif /* ---------------------------------------------------------------------------- * A queue to hold results from the thread pool. * * Each thread pool may have jobs of multiple types being queued up and * interleaved, so we allow several results queue per pool. * * The jobs themselves are expected to push their results onto their * appropriate results queue. */ /* * Adds a result to the end of the result queue. * * Returns 0 on success; * -1 on failure */ static int t_pool_add_result(t_pool_job *j, void *data) { t_results_queue *q = j->q; t_pool_result *r; #ifdef DEBUG fprintf(stderr, "%d: Adding resulting to queue %p, serial %d\n", worker_id(j->p), q, j->serial); #endif /* No results queue is fine if we don't want any results back */ if (!q) return 0; if (!(r = malloc(sizeof(*r)))) return -1; r->next = NULL; r->data = data; r->serial = j->serial; pthread_mutex_lock(&q->result_m); if (q->result_tail) { q->result_tail->next = r; q->result_tail = r; } else { q->result_head = q->result_tail = r; } q->queue_len++; q->pending--; #ifdef DEBUG fprintf(stderr, "%d: Broadcasting result_avail (id %d)\n", worker_id(j->p), r->serial); #endif pthread_cond_signal(&q->result_avail_c); #ifdef DEBUG fprintf(stderr, "%d: Broadcast complete\n", worker_id(j->p)); #endif pthread_mutex_unlock(&q->result_m); return 0; } /* Core of t_pool_next_result() */ static t_pool_result *t_pool_next_result_locked(t_results_queue *q) { t_pool_result *r, *last; for (last = NULL, r = q->result_head; r; last = r, r = r->next) { if (r->serial == q->next_serial) break; } if (r) { if (q->result_head == r) q->result_head = r->next; else last->next = r->next; if (q->result_tail == r) q->result_tail = last; if (!q->result_head) q->result_tail = NULL; q->next_serial++; q->queue_len--; } return r; } /* * Pulls a result off the head of the result queue. Caller should * free it (and any internals as appropriate) after use. This doesn't * wait for a result to be present. * * Results will be returned in strict order. * * Returns t_pool_result pointer if a result is ready. * NULL if not. */ t_pool_result *t_pool_next_result(t_results_queue *q) { t_pool_result *r; #ifdef DEBUG fprintf(stderr, "Requesting next result on queue %p\n", q); #endif pthread_mutex_lock(&q->result_m); r = t_pool_next_result_locked(q); pthread_mutex_unlock(&q->result_m); #ifdef DEBUG fprintf(stderr, "(q=%p) Found %p\n", q, r); #endif return r; } t_pool_result *t_pool_next_result_wait(t_results_queue *q) { t_pool_result *r; #ifdef DEBUG fprintf(stderr, "Waiting for result %d...\n", q->next_serial); #endif pthread_mutex_lock(&q->result_m); while (!(r = t_pool_next_result_locked(q))) { /* Possible race here now avoided via _locked() call, but incase... */ struct timeval now; struct timespec timeout; gettimeofday(&now, NULL); timeout.tv_sec = now.tv_sec + 10; timeout.tv_nsec = now.tv_usec * 1000; pthread_cond_timedwait(&q->result_avail_c, &q->result_m, &timeout); } pthread_mutex_unlock(&q->result_m); return r; } /* * Returns true if there are no items on the finished results queue and * also none still pending. */ int t_pool_results_queue_empty(t_results_queue *q) { int empty; pthread_mutex_lock(&q->result_m); empty = q->queue_len == 0 && q->pending == 0; pthread_mutex_unlock(&q->result_m); return empty; } /* * Returns the number of completed jobs on the results queue. */ int t_pool_results_queue_len(t_results_queue *q) { int len; pthread_mutex_lock(&q->result_m); len = q->queue_len; pthread_mutex_unlock(&q->result_m); return len; } int t_pool_results_queue_sz(t_results_queue *q) { int len; pthread_mutex_lock(&q->result_m); len = q->queue_len + q->pending; pthread_mutex_unlock(&q->result_m); return len; } /* * Frees a result 'r' and if free_data is true also frees * the internal r->data result too. */ void t_pool_delete_result(t_pool_result *r, int free_data) { if (!r) return; if (free_data && r->data) free(r->data); free(r); } /* * Initialises a results queue. * * Results queue pointer on success; * NULL on failure */ t_results_queue *t_results_queue_init(void) { t_results_queue *q = malloc(sizeof(*q)); pthread_mutex_init(&q->result_m, NULL); pthread_cond_init(&q->result_avail_c, NULL); q->result_head = NULL; q->result_tail = NULL; q->next_serial = 0; q->curr_serial = 0; q->queue_len = 0; q->pending = 0; return q; } /* Deallocates memory for a results queue */ void t_results_queue_destroy(t_results_queue *q) { #ifdef DEBUG fprintf(stderr, "Destroying results queue %p\n", q); #endif if (!q) return; pthread_mutex_destroy(&q->result_m); pthread_cond_destroy(&q->result_avail_c); memset(q, 0xbb, sizeof(*q)); free(q); #ifdef DEBUG fprintf(stderr, "Destroyed results queue %p\n", q); #endif } /* ---------------------------------------------------------------------------- * The thread pool. */ #define TDIFF(t2,t1) ((t2.tv_sec-t1.tv_sec)*1000000 + t2.tv_usec-t1.tv_usec) /* * A worker thread. * * Each thread waits for the pool to be non-empty. * As soon as this applies, one of them succeeds in getting the lock * and then executes the job. */ static void *t_pool_worker(void *arg) { t_pool_worker_t *w = (t_pool_worker_t *)arg; t_pool *p = w->p; t_pool_job *j; #ifdef DEBUG_TIME struct timeval t1, t2, t3; #endif for (;;) { // Pop an item off the pool queue #ifdef DEBUG_TIME gettimeofday(&t1, NULL); #endif pthread_mutex_lock(&p->pool_m); #ifdef DEBUG_TIME gettimeofday(&t2, NULL); p->wait_time += TDIFF(t2,t1); w->wait_time += TDIFF(t2,t1); #endif // If there is something on the job list and a higher priority // thread waiting, let it handle this instead. // while (p->head && p->t_stack_top != -1 && p->t_stack_top < w->idx) { // pthread_mutex_unlock(&p->pool_m); // pthread_cond_signal(&p->t[p->t_stack_top].pending_c); // pthread_mutex_lock(&p->pool_m); // } while (!p->head && !p->shutdown) { p->nwaiting++; if (p->njobs == 0) pthread_cond_signal(&p->empty_c); #ifdef DEBUG_TIME gettimeofday(&t2, NULL); #endif #ifdef IN_ORDER // Push this thread to the top of the waiting stack if (p->t_stack_top == -1 || p->t_stack_top > w->idx) p->t_stack_top = w->idx; p->t_stack[w->idx] = 1; pthread_cond_wait(&w->pending_c, &p->pool_m); p->t_stack[w->idx] = 0; /* Find new t_stack_top */ { int i; p->t_stack_top = -1; for (i = 0; i < p->tsize; i++) { if (p->t_stack[i]) { p->t_stack_top = i; break; } } } #else pthread_cond_wait(&p->pending_c, &p->pool_m); #endif #ifdef DEBUG_TIME gettimeofday(&t3, NULL); p->wait_time += TDIFF(t3,t2); w->wait_time += TDIFF(t3,t2); #endif p->nwaiting--; } if (p->shutdown) { #ifdef DEBUG_TIME p->total_time += TDIFF(t3,t1); #endif #ifdef DEBUG fprintf(stderr, "%d: Shutting down\n", worker_id(p)); #endif pthread_mutex_unlock(&p->pool_m); pthread_exit(NULL); } j = p->head; if (!(p->head = j->next)) p->tail = NULL; if (p->njobs-- >= p->qsize) pthread_cond_signal(&p->full_c); if (p->njobs == 0) pthread_cond_signal(&p->empty_c); pthread_mutex_unlock(&p->pool_m); // We have job 'j' - now execute it. t_pool_add_result(j, j->func(j->arg)); #ifdef DEBUG_TIME pthread_mutex_lock(&p->pool_m); gettimeofday(&t3, NULL); p->total_time += TDIFF(t3,t1); pthread_mutex_unlock(&p->pool_m); #endif memset(j, 0xbb, sizeof(*j)); free(j); } return NULL; } /* * Creates a worker pool of length qsize with tsize worker threads. * * Returns pool pointer on success; * NULL on failure */ t_pool *t_pool_init(int qsize, int tsize) { int i; t_pool *p = malloc(sizeof(*p)); p->qsize = qsize; p->tsize = tsize; p->njobs = 0; p->nwaiting = 0; p->shutdown = 0; p->head = p->tail = NULL; p->t_stack = NULL; #ifdef DEBUG_TIME p->total_time = p->wait_time = 0; #endif p->t = malloc(tsize * sizeof(p->t[0])); pthread_mutex_init(&p->pool_m, NULL); pthread_cond_init(&p->empty_c, NULL); pthread_cond_init(&p->full_c, NULL); pthread_mutex_lock(&p->pool_m); #ifdef IN_ORDER // rANS needs ~3Mb unless we rewrite to use malloc. pthread_attr_t attr; if (pthread_attr_init(&attr) < 0) return NULL; pthread_attr_setstacksize(&attr, 4*1024*1024); if (!(p->t_stack = malloc(tsize * sizeof(*p->t_stack)))) return NULL; p->t_stack_top = -1; for (i = 0; i < tsize; i++) { t_pool_worker_t *w = &p->t[i]; p->t_stack[i] = 0; w->p = p; w->idx = i; w->wait_time = 0; pthread_cond_init(&w->pending_c, NULL); if (0 != pthread_create(&w->tid, &attr, t_pool_worker, w)) return NULL; } pthread_attr_destroy(&attr); #else pthread_cond_init(&p->pending_c, NULL); for (i = 0; i < tsize; i++) { t_pool_worker_t *w = &p->t[i]; w->p = p; w->idx = i; pthread_cond_init(&w->pending_c, NULL); if (0 != pthread_create(&w->tid, NULL, t_pool_worker, w)) return NULL; } #endif pthread_mutex_unlock(&p->pool_m); return p; } /* * Adds an item to the work pool. * * FIXME: Maybe return 1,0,-1 and distinguish between job dispathed vs * result returned. Ie rather than blocking on full queue we're permitted * to return early on "result available" event too. * Caller would then have a while loop around t_pool_dispatch. * Or, return -1 and set errno to EAGAIN to indicate job not yet submitted. * * Returns 0 on success * -1 on failure */ int t_pool_dispatch(t_pool *p, t_results_queue *q, void *(*func)(void *arg), void *arg) { t_pool_job *j = malloc(sizeof(*j)); if (!j) return -1; j->func = func; j->arg = arg; j->next = NULL; j->p = p; j->q = q; if (q) { pthread_mutex_lock(&q->result_m); j->serial = q->curr_serial++; q->pending++; pthread_mutex_unlock(&q->result_m); } else { j->serial = 0; } #ifdef DEBUG fprintf(stderr, "Dispatching job %p for queue %p, serial %d\n", j, q, j->serial); #endif pthread_mutex_lock(&p->pool_m); // Check if queue is full while (p->njobs >= p->qsize) pthread_cond_wait(&p->full_c, &p->pool_m); p->njobs++; if (p->tail) { p->tail->next = j; p->tail = j; } else { p->head = p->tail = j; } // Let a worker know we have data. #ifdef IN_ORDER if (p->t_stack_top >= 0 && p->njobs > p->tsize - p->nwaiting) pthread_cond_signal(&p->t[p->t_stack_top].pending_c); #else pthread_cond_signal(&p->pending_c); #endif pthread_mutex_unlock(&p->pool_m); #ifdef DEBUG fprintf(stderr, "Dispatched (serial %d)\n", j->serial); #endif return 0; } /* * As above but optional non-block flag. * * nonblock 0 => block if input queue is full * nonblock +1 => don't block if input queue is full, but do not add task * nonblock -1 => add task regardless of whether queue is full (over-size) */ int t_pool_dispatch2(t_pool *p, t_results_queue *q, void *(*func)(void *arg), void *arg, int nonblock) { t_pool_job *j; #ifdef DEBUG fprintf(stderr, "Dispatching job for queue %p, serial %d\n", q, q->curr_serial); #endif pthread_mutex_lock(&p->pool_m); if (p->njobs >= p->qsize && nonblock == 1) { pthread_mutex_unlock(&p->pool_m); errno = EAGAIN; return -1; } if (!(j = malloc(sizeof(*j)))) return -1; j->func = func; j->arg = arg; j->next = NULL; j->p = p; j->q = q; if (q) { pthread_mutex_lock(&q->result_m); j->serial = q->curr_serial; pthread_mutex_unlock(&q->result_m); } else { j->serial = 0; } if (q) { pthread_mutex_lock(&q->result_m); q->curr_serial++; q->pending++; pthread_mutex_unlock(&q->result_m); } // Check if queue is full if (nonblock == 0) while (p->njobs >= p->qsize) pthread_cond_wait(&p->full_c, &p->pool_m); p->njobs++; // if (q->curr_serial % 100 == 0) // fprintf(stderr, "p->njobs = %d p->qsize = %d\n", p->njobs, p->qsize); if (p->tail) { p->tail->next = j; p->tail = j; } else { p->head = p->tail = j; } #ifdef DEBUG fprintf(stderr, "Dispatched (serial %d)\n", j->serial); #endif // Let a worker know we have data. #ifdef IN_ORDER // Keep incoming queue at 1 per running thread, so there is always // something waiting when they end their current task. If we go above // this signal to start more threads (if available). This has the effect // of concentrating jobs to fewer cores when we are I/O bound, which in // turn benefits systems with auto CPU frequency scaling. if (p->t_stack_top >= 0 && p->njobs > p->tsize - p->nwaiting) pthread_cond_signal(&p->t[p->t_stack_top].pending_c); #else pthread_cond_signal(&p->pending_c); #endif pthread_mutex_unlock(&p->pool_m); return 0; } /* * Flushes the pool, but doesn't exit. This simply drains the queue and * ensures all worker threads have finished their current task. * * Returns 0 on success; * -1 on failure */ int t_pool_flush(t_pool *p) { int i; #ifdef DEBUG fprintf(stderr, "Flushing pool %p\n", p); #endif // Drains the queue pthread_mutex_lock(&p->pool_m); // Wake up everything for the final sprint! for (i = 0; i < p->tsize; i++) if (p->t_stack[i]) pthread_cond_signal(&p->t[i].pending_c); while (p->njobs || p->nwaiting != p->tsize) pthread_cond_wait(&p->empty_c, &p->pool_m); pthread_mutex_unlock(&p->pool_m); #ifdef DEBUG fprintf(stderr, "Flushed complete for pool %p, njobs=%d, nwaiting=%d\n", p, p->njobs, p->nwaiting); #endif return 0; } /* * Destroys a thread pool. If 'kill' is true the threads are terminated now, * otherwise they are joined into the main thread so they will finish their * current work load. * * Use t_pool_destroy(p,0) after a t_pool_flush(p) on a normal shutdown or * t_pool_destroy(p,1) to quickly exit after a fatal error. */ void t_pool_destroy(t_pool *p, int kill) { int i; #ifdef DEBUG fprintf(stderr, "Destroying pool %p, kill=%d\n", p, kill); #endif /* Send shutdown message to worker threads */ if (!kill) { pthread_mutex_lock(&p->pool_m); p->shutdown = 1; #ifdef DEBUG fprintf(stderr, "Sending shutdown request\n"); #endif #ifdef IN_ORDER for (i = 0; i < p->tsize; i++) pthread_cond_signal(&p->t[i].pending_c); #else pthread_cond_broadcast(&p->pending_c); #endif pthread_mutex_unlock(&p->pool_m); #ifdef DEBUG fprintf(stderr, "Shutdown complete\n"); #endif for (i = 0; i < p->tsize; i++) pthread_join(p->t[i].tid, NULL); } else { for (i = 0; i < p->tsize; i++) pthread_kill(p->t[i].tid, SIGINT); } pthread_mutex_destroy(&p->pool_m); pthread_cond_destroy(&p->empty_c); pthread_cond_destroy(&p->full_c); #ifdef IN_ORDER for (i = 0; i < p->tsize; i++) pthread_cond_destroy(&p->t[i].pending_c); #else pthread_cond_destroy(&p->pending_c); #endif #ifdef DEBUG_TIME fprintf(stderr, "Total time=%f\n", p->total_time / 1000000.0); fprintf(stderr, "Wait time=%f\n", p->wait_time / 1000000.0); fprintf(stderr, "%d%% utilisation\n", (int)(100 - ((100.0 * p->wait_time) / p->total_time + 0.5))); for (i = 0; i < p->tsize; i++) fprintf(stderr, "%d: Wait time=%f\n", i, p->t[i].wait_time / 1000000.0); #endif if (p->t_stack) free(p->t_stack); free(p->t); free(p); #ifdef DEBUG fprintf(stderr, "Destroyed pool %p\n", p); #endif } /*----------------------------------------------------------------------------- * Test app. */ #ifdef TEST_MAIN #include #include void *doit(void *arg) { int i, k, x = 0; int job = *(int *)arg; int *res; printf("Worker: execute job %d\n", job); usleep(random() % 1000000); // to coerce job completion out of order if (0) { for (k = 0; k < 100; k++) { for (i = 0; i < 100000; i++) { x++; x += x * sin(i); x += x * cos(x); } } x *= 100; x += job; } else { x = job*job; } printf("Worker: job %d terminating, x=%d\n", job, x); free(arg); res = malloc(sizeof(*res)); *res = x; return res; } #define NTHREADS 8 int main(int argc, char **argv) { t_pool *p = t_pool_init(NTHREADS*2, NTHREADS); t_results_queue *q = t_results_queue_init(); int i; t_pool_result *r; // Dispatch jobs for (i = 0; i < 20; i++) { int *ip = malloc(sizeof(*ip)); *ip = i; printf("Submitting %d\n", i); t_pool_dispatch(p, q, doit, ip); // Check for results if ((r = t_pool_next_result(q))) { printf("RESULT: %d\n", *(int *)r->data); t_pool_delete_result(r, 1); } } t_pool_flush(p); while ((r = t_pool_next_result(q))) { printf("RESULT: %d\n", *(int *)r->data); t_pool_delete_result(r, 1); } t_pool_destroy(p, 0); t_results_queue_destroy(q); return 0; } #endif