#ifndef __FASTX_PARSER__ #define __FASTX_PARSER__ #include "fcntl.h" #include "unistd.h" #include #include #include #include #include #include #include "kseq++.hpp" #include "concurrentqueue.h" #ifndef __FASTX_PARSER_PRECXX14_MAKE_UNIQUE__ #define __FASTX_PARSER_PRECXX14_MAKE_UNIQUE__ #if __cplusplus >= 201402L #include using std::make_unique; #else #include #include #include #include template struct _Unique_if { using _Single_object = std::unique_ptr; }; template struct _Unique_if { using _Unknown_bound = std::unique_ptr; }; template struct _Unique_if { using _Known_bound = void; }; template typename _Unique_if::_Single_object make_unique(Args&&... args) { return std::unique_ptr(new T(std::forward(args)...)); } template typename _Unique_if::_Unknown_bound make_unique(size_t n) { using U = typename std::remove_extent::type; return std::unique_ptr(new U[n]()); } template typename _Unique_if::_Known_bound make_unique(Args&&...) = delete; #endif // C++11 #endif //__FASTX_PARSER_PRECXX14_MAKE_UNIQUE__ namespace fastx_parser { using ReadSeq = klibpp::KSeq; using ReadQual = klibpp::KSeq; // The ReadPair and ReadQualPair are obviously // redundant. But, having them as separate types // here would allow us to say something at compile // time about if we expect to be able to look // at qualities etc. Think more about if we // really want to keep both of these. struct ReadPair { klibpp::KSeq first; klibpp::KSeq second; }; struct ReadQualPair { klibpp::KSeq first; klibpp::KSeq second; }; template class ReadChunk { public: ReadChunk(size_t want) : group_(want), want_(want), have_(want) {} inline void have(size_t num) { have_ = num; } inline size_t size() { return have_; } inline size_t want() const { return want_; } T& operator[](size_t i) { return group_[i]; } typename std::vector::iterator begin() { return group_.begin(); } typename std::vector::iterator end() { return group_.begin() + have_; } private: std::vector group_; size_t want_; size_t have_; }; template class ReadGroup { public: ReadGroup(moodycamel::ProducerToken&& pt, moodycamel::ConsumerToken&& ct) : pt_(std::move(pt)), ct_(std::move(ct)) {} moodycamel::ConsumerToken& consumerToken() { return ct_; } moodycamel::ProducerToken& producerToken() { return pt_; } // get a reference to the chunk this ReadGroup owns std::unique_ptr>& chunkPtr() { return chunk_; } // get a *moveable* reference to the chunk this ReadGroup owns std::unique_ptr>&& takeChunkPtr() { return std::move(chunk_); } inline void have(size_t num) { chunk_->have(num); } inline size_t size() { return chunk_->size(); } inline size_t want() const { return chunk_->want(); } T& operator[](size_t i) { return (*chunk_)[i]; } typename std::vector::iterator begin() { return chunk_->begin(); } typename std::vector::iterator end() { return chunk_->begin() + chunk_->size(); } void setChunkEmpty() { chunk_.release(); } bool empty() const { return chunk_.get() == nullptr; } private: std::unique_ptr> chunk_{nullptr}; moodycamel::ProducerToken pt_; moodycamel::ConsumerToken ct_; }; template class FastxParser { public: FastxParser(std::vector files, uint32_t numConsumers, uint32_t numParsers = 1, uint32_t chunkSize = 1000); FastxParser(std::vector files, std::vector files2, uint32_t numConsumers, uint32_t numParsers = 1, uint32_t chunkSize = 1000); ~FastxParser(); bool start(); bool stop(); ReadGroup getReadGroup(); bool refill(ReadGroup& rg); void finishedWithGroup(ReadGroup& s); private: moodycamel::ProducerToken getProducerToken_(); moodycamel::ConsumerToken getConsumerToken_(); std::vector inputStreams_; std::vector inputStreams2_; uint32_t numParsers_; std::atomic numParsing_; // NOTE: Would like to use std::future here instead, but that // solution doesn't seem to work. It's unclear exactly why // see (https://twitter.com/nomad421/status/917748383321817088) std::vector> parsingThreads_; // holds the results of the parsing threads, which is simply equal to // the return value of kseq_read() for the last call to that function. // A value < -1 signifies some sort of error. std::vector threadResults_; size_t blockSize_; moodycamel::ConcurrentQueue>> readQueue_, seqContainerQueue_; // holds the indices of files (file-pairs) to be processed moodycamel::ConcurrentQueue workQueue_; std::vector> produceReads_; std::vector> consumeContainers_; bool isActive_{false}; }; } #endif // __FASTX_PARSER__