#include "FastxParser.hpp" #include "FastxParserThreadUtils.hpp" #include "fcntl.h" #include "unistd.h" #include #include #include #include #include #include #include #include #include #include namespace fastx_parser { template FastxParser::FastxParser(std::vector files, uint32_t numConsumers, uint32_t numParsers, uint32_t chunkSize) : FastxParser(files, {}, numConsumers, numParsers, chunkSize) {} template FastxParser::FastxParser(std::vector files, std::vector files2, uint32_t numConsumers, uint32_t numParsers, uint32_t chunkSize) : inputStreams_(files), inputStreams2_(files2), numParsing_(0), blockSize_(chunkSize) { if (numParsers > files.size()) { std::cerr << "Can't make user of more parsing threads than file (pairs); " "setting # of parsing threads to " << files.size(); numParsers = files.size(); } numParsers_ = numParsers; // nobody is parsing yet numParsing_ = 0; readQueue_ = moodycamel::ConcurrentQueue>>( 4 * numConsumers, numParsers, 0); seqContainerQueue_ = moodycamel::ConcurrentQueue>>( 4 * numConsumers, 1 + numConsumers, 0); workQueue_ = moodycamel::ConcurrentQueue(numParsers_); // push all file ids on the queue for (size_t i = 0; i < files.size(); ++i) { workQueue_.enqueue(i); } // every parsing thread gets a consumer token for the seqContainerQueue // and a producer token for the readQueue. for (size_t i = 0; i < numParsers_; ++i) { consumeContainers_.emplace_back( new moodycamel::ConsumerToken(seqContainerQueue_)); produceReads_.emplace_back(new moodycamel::ProducerToken(readQueue_)); } // enqueue the appropriate number of read chunks so that we can start // filling them once the parser has been started. moodycamel::ProducerToken produceContainer(seqContainerQueue_); for (size_t i = 0; i < 4 * numConsumers; ++i) { auto chunk = make_unique>(blockSize_); seqContainerQueue_.enqueue(produceContainer, std::move(chunk)); } } template ReadGroup FastxParser::getReadGroup() { return ReadGroup(getProducerToken_(), getConsumerToken_()); } template moodycamel::ProducerToken FastxParser::getProducerToken_() { return moodycamel::ProducerToken(seqContainerQueue_); } template moodycamel::ConsumerToken FastxParser::getConsumerToken_() { return moodycamel::ConsumerToken(readQueue_); } template FastxParser::~FastxParser() { if (isActive_ or numParsing_ > 0) { // Think about if this is too noisy --- but the user really shouldn't do this. std::cerr << "\n\nEncountered FastxParser destructor while parser was still marked active (or while parsing threads were still active). " << "Be sure to call stop() before letting FastxParser leave scope!\n"; try { stop(); } catch (const std::exception& e) { // Should exiting here be a user-definable behavior? // What is the right mechanism for that. std::cerr << "\n\nParser encountered exception : " << e.what() << "\n"; std::exit(-1); } } // Otherwise, we are good to go (i.e., destruct) } template bool FastxParser::stop() { bool ret{false}; if (isActive_) { for (auto& t : parsingThreads_) { t->join(); } isActive_ = false; for (auto& res : threadResults_) { if (res == -3) { throw std::range_error("Error reading from the FASTA/Q stream. Make sure the file is valid."); } else if (res < -1) { std::stringstream ss; ss << "Error reading from the FASTA/Q stream. Minimum return code for left and right read was (" << res << "). Make sure the file is valid."; throw std::range_error(ss.str()); } } ret = true; } else { // Is this being too loud? Again, if this triggers, the user has violated the API. std::cerr << "stop() was called on a FastxParser that was not marked active. Did you remember " << "to call start() on this parser?\n"; } return ret; } template int parse_reads( std::vector& inputStreams, std::atomic& numParsing, moodycamel::ConsumerToken* cCont, moodycamel::ProducerToken* pRead, moodycamel::ConcurrentQueue& workQueue, moodycamel::ConcurrentQueue>>& seqContainerQueue_, moodycamel::ConcurrentQueue>>& readQueue_) { using namespace klibpp; using fastx_parser::thread_utils::MIN_BACKOFF_ITERS; auto curMaxDelay = MIN_BACKOFF_ITERS; T* s; uint32_t fn{0}; while (workQueue.try_dequeue(fn)) { auto& file = inputStreams[fn]; std::unique_ptr> local; while (!seqContainerQueue_.try_dequeue(*cCont, local)) { fastx_parser::thread_utils::backoffOrYield(curMaxDelay); // Think of a way to do this that wouldn't be loud (or would allow a user-definable logging mechanism) // std::cerr << "couldn't dequeue read chunk\n"; } size_t numObtained{local->size()}; // open the file and init the parser gzFile fp = gzopen(file.c_str(), "r"); // The number of reads we have in the local vector size_t numWaiting{0}; auto seq = make_kstream(fp, gzread, mode::in); s = &((*local)[numWaiting]); while ( seq >> *s ) { //ksv >= 0 numWaiting++; // If we've filled the local vector, then dump to the concurrent queue if (numWaiting == numObtained) { curMaxDelay = MIN_BACKOFF_ITERS; while (!readQueue_.try_enqueue(std::move(local))) { fastx_parser::thread_utils::backoffOrYield(curMaxDelay); } numWaiting = 0; numObtained = 0; // And get more empty reads curMaxDelay = MIN_BACKOFF_ITERS; while (!seqContainerQueue_.try_dequeue(*cCont, local)) { fastx_parser::thread_utils::backoffOrYield(curMaxDelay); } numObtained = local->size(); } s = &((*local)[numWaiting]); } // if we had an error in the stream if (seq.err()) { --numParsing; return -3; } else if (seq.tqs()) { // if we had a quality string of the wrong length // tqs == truncated quality string --numParsing; return -2; } // If we hit the end of the file and have any reads in our local buffer // then dump them here. if (numWaiting > 0) { local->have(numWaiting); curMaxDelay = MIN_BACKOFF_ITERS; while (!readQueue_.try_enqueue(*pRead, std::move(local))) { fastx_parser::thread_utils::backoffOrYield(curMaxDelay); } numWaiting = 0; } else if (numObtained > 0){ curMaxDelay = MIN_BACKOFF_ITERS; while (!seqContainerQueue_.try_enqueue(std::move(local))) { fastx_parser::thread_utils::backoffOrYield(curMaxDelay); } } // destroy the parser and close the file gzclose(fp); } --numParsing; return 0; } template int parse_read_pairs( std::vector& inputStreams, std::vector& inputStreams2, std::atomic& numParsing, moodycamel::ConsumerToken* cCont, moodycamel::ProducerToken* pRead, moodycamel::ConcurrentQueue& workQueue, moodycamel::ConcurrentQueue>>& seqContainerQueue_, moodycamel::ConcurrentQueue>>& readQueue_) { using namespace klibpp; using fastx_parser::thread_utils::MIN_BACKOFF_ITERS; size_t curMaxDelay = MIN_BACKOFF_ITERS; T* s; uint32_t fn{0}; while (workQueue.try_dequeue(fn)) { // for (size_t fn = 0; fn < inputStreams.size(); ++fn) { auto& file = inputStreams[fn]; auto& file2 = inputStreams2[fn]; std::unique_ptr> local; while (!seqContainerQueue_.try_dequeue(*cCont, local)) { fastx_parser::thread_utils::backoffOrYield(curMaxDelay); // Think of a way to do this that wouldn't be loud (or would allow a user-definable logging mechanism) // std::cerr << "couldn't dequeue read chunk\n"; } size_t numObtained{local->size()}; // open the file and init the parser gzFile fp = gzopen(file.c_str(), "r"); gzFile fp2 = gzopen(file2.c_str(), "r"); // The number of reads we have in the local vector size_t numWaiting{0}; auto seq = make_kstream(fp, gzread, mode::in); auto seq2 = make_kstream(fp2, gzread, mode::in); s = &((*local)[numWaiting]); while ( (seq >> s->first) and (seq2 >> s->second) ) {//ksv >= 0 and ksv2 >= 0) { numWaiting++; // If we've filled the local vector, then dump to the concurrent queue if (numWaiting == numObtained) { curMaxDelay = MIN_BACKOFF_ITERS; while (!readQueue_.try_enqueue(std::move(local))) { fastx_parser::thread_utils::backoffOrYield(curMaxDelay); } numWaiting = 0; numObtained = 0; // And get more empty reads curMaxDelay = MIN_BACKOFF_ITERS; while (!seqContainerQueue_.try_dequeue(*cCont, local)) { fastx_parser::thread_utils::backoffOrYield(curMaxDelay); } numObtained = local->size(); } s = &((*local)[numWaiting]); } // if we had an error in the stream if (seq.err() or seq2.err()) { --numParsing; return -3; } else if (seq.tqs() or seq2.tqs()) { // if we had a quality string of the wrong length // tqs == truncated quality string --numParsing; return -2; } // If we hit the end of the file and have any reads in our local buffer // then dump them here. if (numWaiting > 0) { local->have(numWaiting); curMaxDelay = MIN_BACKOFF_ITERS; while (!readQueue_.try_enqueue(*pRead, std::move(local))) { fastx_parser::thread_utils::backoffOrYield(curMaxDelay); } numWaiting = 0; } else if (numObtained > 0){ curMaxDelay = MIN_BACKOFF_ITERS; while (!seqContainerQueue_.try_enqueue(std::move(local))) { fastx_parser::thread_utils::backoffOrYield(curMaxDelay); } } // destroy the parser and close the file gzclose(fp); gzclose(fp2); } --numParsing; return 0; } template <> bool FastxParser::start() { if (numParsing_ == 0) { isActive_ = true; threadResults_.resize(numParsers_); std::fill(threadResults_.begin(), threadResults_.end(), 0); for (size_t i = 0; i < numParsers_; ++i) { ++numParsing_; parsingThreads_.emplace_back(new std::thread([this, i]() { this->threadResults_[i] = parse_reads(this->inputStreams_, this->numParsing_, this->consumeContainers_[i].get(), this->produceReads_[i].get(), this->workQueue_, this->seqContainerQueue_, this->readQueue_); })); } return true; } else { return false; } } template <> bool FastxParser::start() { if (numParsing_ == 0) { isActive_ = true; // Some basic checking to ensure the read files look "sane". if (inputStreams_.size() != inputStreams2_.size()) { throw std::invalid_argument("There should be the same number " "of files for the left and right reads"); } for (size_t i = 0; i < inputStreams_.size(); ++i) { auto& s1 = inputStreams_[i]; auto& s2 = inputStreams2_[i]; if (s1 == s2) { throw std::invalid_argument("You provided the same file " + s1 + " as both a left and right file"); } } threadResults_.resize(numParsers_); std::fill(threadResults_.begin(), threadResults_.end(), 0); for (size_t i = 0; i < numParsers_; ++i) { ++numParsing_; parsingThreads_.emplace_back(new std::thread([this, i]() { this->threadResults_[i] = parse_read_pairs(this->inputStreams_, this->inputStreams2_, this->numParsing_, this->consumeContainers_[i].get(), this->produceReads_[i].get(), this->workQueue_, this->seqContainerQueue_, this->readQueue_); })); } return true; } else { return false; } } template <> bool FastxParser::start() { if (numParsing_ == 0) { isActive_ = true; // Some basic checking to ensure the read files look "sane". if (inputStreams_.size() != inputStreams2_.size()) { throw std::invalid_argument("There should be the same number " "of files for the left and right reads"); } for (size_t i = 0; i < inputStreams_.size(); ++i) { auto& s1 = inputStreams_[i]; auto& s2 = inputStreams2_[i]; if (s1 == s2) { throw std::invalid_argument("You provided the same file " + s1 + " as both a left and right file"); } } threadResults_.resize(numParsers_); std::fill(threadResults_.begin(), threadResults_.end(), 0); for (size_t i = 0; i < numParsers_; ++i) { ++numParsing_; parsingThreads_.emplace_back(new std::thread([this, i]() { this->threadResults_[i] = parse_read_pairs(this->inputStreams_, this->inputStreams2_, this->numParsing_, this->consumeContainers_[i].get(), this->produceReads_[i].get(), this->workQueue_, this->seqContainerQueue_, this->readQueue_); })); } return true; } else { return false; } } template bool FastxParser::refill(ReadGroup& seqs) { finishedWithGroup(seqs); auto curMaxDelay = fastx_parser::thread_utils::MIN_BACKOFF_ITERS; while (numParsing_ > 0) { if (readQueue_.try_dequeue(seqs.consumerToken(), seqs.chunkPtr())) { return true; } fastx_parser::thread_utils::backoffOrYield(curMaxDelay); } return readQueue_.try_dequeue(seqs.consumerToken(), seqs.chunkPtr()); } template void FastxParser::finishedWithGroup(ReadGroup& s) { // If this read group is holding a valid chunk, then give it back if (!s.empty()) { seqContainerQueue_.enqueue(s.producerToken(), std::move(s.takeChunkPtr())); s.setChunkEmpty(); } } template class FastxParser; template class FastxParser; //template class FastxParser; template class FastxParser; }