I wrote this code for a home task, and received some negative feedback. Though I really appreciated the reviewer, but I disagree when he said I use auto
disorderly, and the algorithm is not optimal (I know it's not optimal, but comparing to the code that they expected which runs about 5 times slower than mine, it's really hard to accept).
There are two big problems I see (when reviewing again my code):
- I did not handle the case when there are more than 1024 chunks, which usually is the number of max open files in Linux. I'm aware of that and I mentioned it when I sent the code instead of handling it in my code, as we can increase the limit of the system.
- There is a bug when storing strings with vectors. When we use GCC, the vector will resize by allocating new
2x current memory
and copy new elements to the new location, so we can use more memory than we expected. Interestingly, no one points out this.
I would appreciate any comments to know where should I improve and get better next time. Below is from the reviewer:
- His application could not run with 1.6GB, this error might come from the limitation for reading file simultaneously. I think he did not check with huge file and he could not avoid the I/O problem. {{{ ./main ~/PycharmProjects/serp_improvement/data_restore/ch_result_20.csv out_ch_result_20.csv 100000 Sort chunks in async mode Intermediate files 1021 are missing! Considering remove all temp files with "rm -r *.tmp" }}}
- His algorithm is not good enough:
- To get the data by line, he used the seek function to back the latest line for avoiding limitation exceeding. This method is quite complex.
- He merges all chunks files to the output file at the same time. This method faces I/O blocking, on my machine, OS allows reading 1024 files simultaneously.
- His coding is quite complex, hard to read: for example, sometimes, he used auto, sometimes no.
My code:
#include <iostream> #include <string> #include <vector> #include <fstream> #include <future> #include <memory> #include <algorithm> #include <locale> #include <codecvt> #include <deque> #include <cstdio> #include <queue> // we will use async read & write, so make the stream shareable typedef std::shared_ptr<std::ifstream> istream_pointer; typedef std::shared_ptr<std::ofstream> ostream_pointer; // string comparison, sort by < by default // uncomment bellow line to sort with std::locale // #define ENABLE_LOCALE #ifdef ENABLE_LOCALE bool strComp(const std::string& a, const std::string& b) { // en rules works quite well for latin characters static std::locale comp("en_US.UTF-8"); return comp(a, b); } #else bool strComp(const std::string& a, const std::string& b) { return a < b; } #endif // read from a stream no more than `limit` bytes std::vector<std::string> readFromStream(const istream_pointer& fin, size_t limit) { std::vector<std::string> ret; std::string line; size_t sz = 0; // total bytes of contents size_t len = fin->tellg(); // for back track while (std::getline(*fin, line)) { if (sz + line.size() > limit) { // put line back to stream fin->seekg(len, std::ios_base::beg); break; } sz += line.size(); len += line.size() + 1; // +1 for newline ret.push_back(std::move(line)); } // assume copy elision return ret; } // write a vector of string to a stream line by line int writeToStream(const ostream_pointer& fout, const std::vector<std::string>& chunks) { for (auto& v : chunks) { *fout << v << '\n'; } return 1; } // split file to chunks and sort each chunks size_t sortChunks(const istream_pointer& fin, size_t nbytes) { // pre-read some lines std::vector<std::string> cur; const size_t sz = nbytes; readFromStream(fin, sz).swap(cur); size_t n_chunks = 0; while (cur.size() > 0) { // sort current chunk std::sort(cur.begin(), cur.end(), strComp); // write the chunk to fout ostream_pointer fout = std::make_shared<std::ofstream>(std::to_string(n_chunks++) + ".tmp"); writeToStream(fout, cur); // read new chunks cur.clear(); readFromStream(fin, sz).swap(cur); } return n_chunks; } // split file to chunks and sort each chunks - async read size_t sortChunksAsync(const istream_pointer& fin, size_t nbytes) { // pre-read some lines std::vector<std::string> cur; const size_t sz = nbytes / 2; readFromStream(fin, sz).swap(cur); int n_chunks = 0; while (cur.size() > 0) { // async point: read next chunk from stream async while process current // chunk std::future<std::vector<std::string>> nxt = std::async( std::launch::async, readFromStream, fin, sz); // non-blocking // sort current chunk std::sort(cur.begin(), cur.end(), strComp); // write the chunk to fout ostream_pointer fout = std::make_shared<std::ofstream>(std::to_string(n_chunks++) + ".tmp"); writeToStream(fout, cur); // wait for reading nxt done nxt.wait(); // async point: swap cur with next nxt.get().swap(cur); } return n_chunks; } // we will use priority queue to merge k buffers, but the actual strings should // not be pushed to the queue. In stead, we use vector iterator (which is just // pointer). We also need the identity of the buffer to handle when we reach // end() typedef std::pair<std::vector<std::string>::iterator, size_t> vstring_iterator; // Merge K streams and write to fout void kWayMerge(const std::vector<istream_pointer>& streams, const ostream_pointer& fout, size_t nbytes) { const size_t n_chunks = streams.size(); std::cout << "Merging " << n_chunks << " streams\n"; // max size of chunks const size_t sz = nbytes / (n_chunks + 1); // buffer to store sorted chunks std::vector<std::vector<std::string>> bufs(n_chunks); // fill the buffer some lines for (size_t i = 0; i < n_chunks; ++i) { readFromStream(streams[i], sz).swap(bufs[i]); } // output buffers std::vector<std::string> ret; // comparator for priority queue auto comp = [](const vstring_iterator& it0, const vstring_iterator& it1) { return strComp(*it1.first, *it0.first); // min heap }; std::priority_queue<vstring_iterator, std::vector<vstring_iterator>, decltype(comp)> pq(comp); // push the begining of each buffer to pq for (size_t i = 0; i < n_chunks; ++i) { if (bufs[i].size() > 0) { pq.push({bufs[i].begin(), i}); } else { streams[i]->close(); // empty stream } } size_t sz2 = 0; // keep track the size of output buffer // now run untill we have nothing to push to pq while (!pq.empty()) { auto vit = pq.top(); auto it = vit.first; //current iterator auto id = vit.second; // id of the buffer pq.pop(); // std::cout << *it << std::endl; if (sz2 + it->size() > sz) { writeToStream(fout, ret); sz2 = 0; ret.clear(); } sz2 += it->size(); auto nxt = it + 1; // next string in bufs[id] ret.push_back(move(*it)); if (nxt == bufs[id].end()) { // reach end of buffer id bufs[id].clear(); readFromStream(streams[id], sz).swap(bufs[id]); if (bufs[id].size() > 0) { nxt = bufs[id].begin(); pq.push({nxt, id}); } else { // if buf is empty, streams is ended streams[id]->close(); } } else { // if not, just push to queue pq.push({nxt, id}); } } // last write writeToStream(fout, ret); return; } // Merge K streams and write to fout - async read void kWayMergeAsync(const std::vector<istream_pointer>& streams, const ostream_pointer& fout, size_t nbytes) { const size_t n_chunks = streams.size(); std::cout << "Merging " << n_chunks << " streams\n"; // max size of chunks const size_t sz = nbytes / n_chunks; // we only use half limit size of buffer for async read const size_t bz = sz / 2; // buffer to store strings in sorted chunks std::vector<std::vector<std::string>> bufs(n_chunks); // next buffers std::vector<std::future<std::vector<std::string>>> nxt_bufs(n_chunks); // fill the buffer some line for (size_t i = 0; i < n_chunks; ++i) { readFromStream(streams[i], bz).swap(bufs[i]); } // prefetch some next buffer for (size_t i = 0; i < n_chunks; ++i) { nxt_bufs[i] = std::async(readFromStream, streams[i], bz); } // mereged buffers std::vector<std::string> ret; std::future<int> pret = std::async(std::launch::async,writeToStream,fout,std::move(ret)); // comparator for priority queue auto comp = [](vstring_iterator& it0, vstring_iterator& it1) { return strComp(*it1.first, *it0.first); // min heap }; std::priority_queue<vstring_iterator, std::vector<vstring_iterator>, decltype(comp)> pq(comp); // push the begining of each buffer to pq for (size_t i = 0; i < n_chunks; ++i) { if (bufs[i].size() > 0) { pq.push({bufs[i].begin(), i}); } else { streams[i]->close(); // empty stream } } size_t sz2 = 0; // keep track the size of merged buffer // now run until we have nothing to push to pq while (!pq.empty()) { auto vit = pq.top(); auto it = vit.first; //current iterator auto id = vit.second; // id of the buffer pq.pop(); // std::cout << *it << std::endl; if (sz2 + it->size() > bz) { pret.wait(); pret = std::async(std::launch::async,writeToStream,fout,std::move(ret)); sz2 = 0; } sz2 += it->size(); auto nxt = it + 1; // next string in bufs[id] ret.push_back(move(*it)); if (nxt == bufs[id].end()) { // reach end of buffer id // wait for next buffer - expected no wait nxt_bufs[id].wait(); // swap contents of current buffer with next buffer nxt_bufs[id].get().swap(bufs[id]); if (bufs[id].size() > 0) { nxt = bufs[id].begin(); pq.push({nxt, id}); // prefetch next bufs[id] nxt_bufs[id] = std::async(std::launch::async, readFromStream, streams[id], bz); } else { // if buf is empty, streams is ended streams[id]->close(); } } else { // if not, just push to queue pq.push({nxt, id}); } } // last write pret.wait(); writeToStream(fout, ret); return; // what if using k thread to push to a priority queue and one thread to from // the queue? } void cleanTmpFiles(size_t chunks) { for (size_t i = 0; i < chunks; ++i) { std::remove((std::to_string(i) + ".tmp").c_str()); } } // Our extenal sort funtion void externalSort(const char* input_file, const char* output_file, size_t limits, int async) { // read input file istream_pointer fin = std::make_shared<std::ifstream>(input_file); if (!fin->is_open()) { throw std::logic_error("Input file is missing"); } // sort the stream size_t n_chunks = 0; if (async & 1) { std::cout << "Sort chunks in async mode\n"; n_chunks = sortChunksAsync(fin, limits); } else { n_chunks = sortChunks(fin, limits); } fin->close(); // read temporary file std::vector<istream_pointer> streams(n_chunks); for (size_t i = 0; i < n_chunks; ++i) { istream_pointer isptr = std::make_shared<std::ifstream>(std::to_string(i) + ".tmp"); if (!isptr->is_open()) { cleanTmpFiles(n_chunks); throw std::logic_error("Itermediate files " + std::to_string(i) + " are missing!"); } streams[i] = std::move(isptr); } // stream to output file ostream_pointer fout = std::make_shared<std::ofstream>(output_file); // merge the streams if (async & 2) { std::cout << "Merge chunks in async mode\n"; kWayMergeAsync(streams, fout, limits); } else { kWayMerge(streams, fout, limits); } fout->close(); // clean tmp file cleanTmpFiles(n_chunks); std::cout << "Done!\n"; } // out main application int main(int argc, char* argv[]) { const char* input_file = (argc > 1) ? argv[1] : "input.txt"; const char* output_file = (argc > 2) ? argv[2] : "output.txt"; size_t limits = (argc > 3) ? std::stoull(argv[3]) : 6000; int async = (argc > 4) ? std::stoi(argv[4]) : 3; try { externalSort(input_file, output_file, limits, async); } // should clean tmp files, but don't know how many files, so let remind users // doing it catch (const std::exception& e) { std::cerr << e.what() << "\n"; std::cerr << "Considering remove all temp files with \"rm -r *.tmp\"\n"; return -1; } catch (...) { std::cerr << "Exception caught during sorting!\nConsidering remove all temp " "files with \"rm -r *.tmp\""; return -1; } return 0; }
UPDATE I will summarise the bugs here, and make the better version in my spare time.
Bugs found from user673679
:
- Formatting is hard to read.
2 spaces
indent is good for me, but I think it may be hard for others to read. I usedclang-format
, so it may make thing worst. - Bugs related to the minimum size of memory; we should handle the case when we have more than allowed opened files. I think it can be done with incremental merging (merge 512 files at a time for example).
- The code is over-complicated, especially when handling async IO. He suggests an
async_read
andasync_write
class that hold bothcurrent
andnext
buffer and hide all internal async detail. I think this is a very nice idea. - I should use
runtime_error
instead oflogic_error
.