Background:
The program reads 1000000 lines in the file. Every four line will be parsed into an object in a vector. If it has 2 objects with the same name, it will drop 1 object and increment one of the field in another object. After that, the vector is required to sort out. Finally, it writes to output file.
The main code
#include <cmath> #include <algorithm> #include <fstream> #include <iostream> #include <map> #include <mutex> #include <queue> #include <thread> #include <vector> std::queue<FASTQentry> shared_queue; bool queue_closed = false; std::mutex shared_queue_mutex; int main(int argc, char* argv[]) { std::ifstream input_file; std::ofstream output_file; std::size_t total_reads = 0; std::vector<FASTQentry> fqs; // parse input fastq file // fqs = parse_fastq_file(input_file, total_reads); std::thread readerThread(parse_fastq_file_mutli, std::ref(input_file), std::ref(total_reads)); std::thread processThread(count_frequencies_and_merge_qualities_multi, std::ref(fqs)); readerThread.join(); processThread.join(); sort(fqs.begin(), fqs.end(), sort_by_freq_quality); for (auto& out_entry : fqs) { output_file << out_entry; } return 0; }
Class
class FASTQentry { public: FASTQentry() = default; FASTQentry(const std::string& hdr, const std::string& seq, const std::string& plus, const std::vector<double>& probs) { header = hdr; sequence = seq; this->plus = plus; probas = probs; freq = 1; } std::string header; std::string sequence; std::string plus; std::vector<double> probas; int freq; friend std::ostream& operator<<(std::ostream& o, const FASTQentry& fq_entry); friend std::istream& operator>>(std::istream& is, FASTQentry& fq_entry); };
Parse File Function
void parse_fastq_file_mutli(std::ifstream& infile, std::size_t& total_reads) { while (!infile.eof()) { FASTQentry fq; fq.freq = 1; infile >> fq; { std::lock_guard<std::mutex> lock(shared_queue_mutex); shared_queue.push(fq); } ++total_reads; } { std::lock_guard<std::mutex> lock(shared_queue_mutex); queue_closed = true; } } std::istream& operator>>(std::istream& is, FASTQentry& fq_entry) { is.ignore(std::numeric_limits<std::streamsize>::max(), '\n'); std::getline(is, fq_entry.sequence); is.ignore(std::numeric_limits<std::streamsize>::max(), '\n'); std::string quals; std::getline(is, quals); fq_entry.probas = qualities_to_probabilities(quals); return is; } std::vector<double> qualities_to_probabilities(const std::string& quals) { std::vector<double> result; result.reserve(quals.size()); for (auto& q : quals) { result.push_back(std::pow(10, (q - 33) / -10)); } return result; }
Processing Function
void count_frequencies_and_merge_qualities_multi(std::vector<FASTQentry>& fqs) { std::map<std::string, int> read2fqs; bool is_closed; bool empty_entry; int counter = 1; // Check if the file has been closed or if there are no more entries { std::lock_guard<std::mutex> lock(shared_queue_mutex); is_closed = queue_closed; empty_entry = shared_queue.empty(); } while (!is_closed || !empty_entry) { if (!empty_entry) { FASTQentry fq; { std::lock_guard<std::mutex> lock(shared_queue_mutex); fq = shared_queue.front(); shared_queue.pop(); } if (read2fqs.count(fq.sequence) > 0) { int index = read2fqs[fq.sequence] - 1; fqs[index].freq += 1; for (int i = 0; i < fqs[index].probas.size(); ++i) { fqs[index].probas[i] += fq.probas[i]; } } else { read2fqs[fq.sequence] = counter; fq.header = "@seq_" + std::to_string(counter) + "_x"; fq.plus = "+"; fqs.push_back(fq); counter += 1; } } // Check if the file has been closed or if there are no more entries { std::lock_guard<std::mutex> lock(shared_queue_mutex); is_closed = queue_closed; empty_entry = shared_queue.empty(); } } }
Sorting Function
bool sort_by_freq_quality(FASTQentry fq1, FASTQentry fq2) { if (fq1.freq != fq2.freq) { return fq1.freq > fq2.freq; } else { // cumulative sum of quality auto s1 = 0.; auto s2 = 0.; for (auto i = 0ul; i < fq1.probas.size(); ++i) { s1 += fq1.probas[i]; } for (auto i = 0ul; i < fq2.probas.size(); ++i) { s2 += fq2.probas[i]; } s1 /= fq1.freq; s2 /= fq2.freq; if (s1 != s2){ return s1 > s2; } return fq1.sequence > fq2.sequence; } }
Write to File
std::string probabilities_to_qualities(const std::vector<double>& probas, int freq) { std::string result; result.reserve(probas.size()); for (auto& p : probas) { result.push_back(-10 * std::log10(p / freq) + 33); } return result; }
I convert the code from single thread to this but I can't see much improvement. What is the bottleneck of these code?
Thanks!