5
\$\begingroup\$

I wrote this code for a home task, and received some negative feedback. Though I really appreciated the reviewer, but I disagree when he said I use auto disorderly, and the algorithm is not optimal (I know it's not optimal, but comparing to the code that they expected which runs about 5 times slower than mine, it's really hard to accept).

There are two big problems I see (when reviewing again my code):

  • I did not handle the case when there are more than 1024 chunks, which usually is the number of max open files in Linux. I'm aware of that and I mentioned it when I sent the code instead of handling it in my code, as we can increase the limit of the system.
  • There is a bug when storing strings with vectors. When we use GCC, the vector will resize by allocating new 2x current memory and copy new elements to the new location, so we can use more memory than we expected. Interestingly, no one points out this.

I would appreciate any comments to know where should I improve and get better next time. Below is from the reviewer:

  • His application could not run with 1.6GB, this error might come from the limitation for reading file simultaneously. I think he did not check with huge file and he could not avoid the I/O problem. {{{ ./main ~/PycharmProjects/serp_improvement/data_restore/ch_result_20.csv out_ch_result_20.csv 100000 Sort chunks in async mode Intermediate files 1021 are missing! Considering remove all temp files with "rm -r *.tmp" }}}
  • His algorithm is not good enough:
  • To get the data by line, he used the seek function to back the latest line for avoiding limitation exceeding. This method is quite complex.
  • He merges all chunks files to the output file at the same time. This method faces I/O blocking, on my machine, OS allows reading 1024 files simultaneously.
  • His coding is quite complex, hard to read: for example, sometimes, he used auto, sometimes no.

My code:

#include <iostream> #include <string> #include <vector> #include <fstream> #include <future> #include <memory> #include <algorithm> #include <locale> #include <codecvt> #include <deque> #include <cstdio> #include <queue> // we will use async read & write, so make the stream shareable typedef std::shared_ptr<std::ifstream> istream_pointer; typedef std::shared_ptr<std::ofstream> ostream_pointer; // string comparison, sort by < by default // uncomment bellow line to sort with std::locale // #define ENABLE_LOCALE #ifdef ENABLE_LOCALE bool strComp(const std::string& a, const std::string& b) { // en rules works quite well for latin characters static std::locale comp("en_US.UTF-8"); return comp(a, b); } #else bool strComp(const std::string& a, const std::string& b) { return a < b; } #endif // read from a stream no more than `limit` bytes std::vector<std::string> readFromStream(const istream_pointer& fin, size_t limit) { std::vector<std::string> ret; std::string line; size_t sz = 0; // total bytes of contents size_t len = fin->tellg(); // for back track while (std::getline(*fin, line)) { if (sz + line.size() > limit) { // put line back to stream fin->seekg(len, std::ios_base::beg); break; } sz += line.size(); len += line.size() + 1; // +1 for newline ret.push_back(std::move(line)); } // assume copy elision return ret; } // write a vector of string to a stream line by line int writeToStream(const ostream_pointer& fout, const std::vector<std::string>& chunks) { for (auto& v : chunks) { *fout << v << '\n'; } return 1; } // split file to chunks and sort each chunks size_t sortChunks(const istream_pointer& fin, size_t nbytes) { // pre-read some lines std::vector<std::string> cur; const size_t sz = nbytes; readFromStream(fin, sz).swap(cur); size_t n_chunks = 0; while (cur.size() > 0) { // sort current chunk std::sort(cur.begin(), cur.end(), strComp); // write the chunk to fout ostream_pointer fout = std::make_shared<std::ofstream>(std::to_string(n_chunks++) + ".tmp"); writeToStream(fout, cur); // read new chunks cur.clear(); readFromStream(fin, sz).swap(cur); } return n_chunks; } // split file to chunks and sort each chunks - async read size_t sortChunksAsync(const istream_pointer& fin, size_t nbytes) { // pre-read some lines std::vector<std::string> cur; const size_t sz = nbytes / 2; readFromStream(fin, sz).swap(cur); int n_chunks = 0; while (cur.size() > 0) { // async point: read next chunk from stream async while process current // chunk std::future<std::vector<std::string>> nxt = std::async( std::launch::async, readFromStream, fin, sz); // non-blocking // sort current chunk std::sort(cur.begin(), cur.end(), strComp); // write the chunk to fout ostream_pointer fout = std::make_shared<std::ofstream>(std::to_string(n_chunks++) + ".tmp"); writeToStream(fout, cur); // wait for reading nxt done nxt.wait(); // async point: swap cur with next nxt.get().swap(cur); } return n_chunks; } // we will use priority queue to merge k buffers, but the actual strings should // not be pushed to the queue. In stead, we use vector iterator (which is just // pointer). We also need the identity of the buffer to handle when we reach // end() typedef std::pair<std::vector<std::string>::iterator, size_t> vstring_iterator; // Merge K streams and write to fout void kWayMerge(const std::vector<istream_pointer>& streams, const ostream_pointer& fout, size_t nbytes) { const size_t n_chunks = streams.size(); std::cout << "Merging " << n_chunks << " streams\n"; // max size of chunks const size_t sz = nbytes / (n_chunks + 1); // buffer to store sorted chunks std::vector<std::vector<std::string>> bufs(n_chunks); // fill the buffer some lines for (size_t i = 0; i < n_chunks; ++i) { readFromStream(streams[i], sz).swap(bufs[i]); } // output buffers std::vector<std::string> ret; // comparator for priority queue auto comp = [](const vstring_iterator& it0, const vstring_iterator& it1) { return strComp(*it1.first, *it0.first); // min heap }; std::priority_queue<vstring_iterator, std::vector<vstring_iterator>, decltype(comp)> pq(comp); // push the begining of each buffer to pq for (size_t i = 0; i < n_chunks; ++i) { if (bufs[i].size() > 0) { pq.push({bufs[i].begin(), i}); } else { streams[i]->close(); // empty stream } } size_t sz2 = 0; // keep track the size of output buffer // now run untill we have nothing to push to pq while (!pq.empty()) { auto vit = pq.top(); auto it = vit.first; //current iterator auto id = vit.second; // id of the buffer pq.pop(); // std::cout << *it << std::endl; if (sz2 + it->size() > sz) { writeToStream(fout, ret); sz2 = 0; ret.clear(); } sz2 += it->size(); auto nxt = it + 1; // next string in bufs[id] ret.push_back(move(*it)); if (nxt == bufs[id].end()) { // reach end of buffer id bufs[id].clear(); readFromStream(streams[id], sz).swap(bufs[id]); if (bufs[id].size() > 0) { nxt = bufs[id].begin(); pq.push({nxt, id}); } else { // if buf is empty, streams is ended streams[id]->close(); } } else { // if not, just push to queue pq.push({nxt, id}); } } // last write writeToStream(fout, ret); return; } // Merge K streams and write to fout - async read void kWayMergeAsync(const std::vector<istream_pointer>& streams, const ostream_pointer& fout, size_t nbytes) { const size_t n_chunks = streams.size(); std::cout << "Merging " << n_chunks << " streams\n"; // max size of chunks const size_t sz = nbytes / n_chunks; // we only use half limit size of buffer for async read const size_t bz = sz / 2; // buffer to store strings in sorted chunks std::vector<std::vector<std::string>> bufs(n_chunks); // next buffers std::vector<std::future<std::vector<std::string>>> nxt_bufs(n_chunks); // fill the buffer some line for (size_t i = 0; i < n_chunks; ++i) { readFromStream(streams[i], bz).swap(bufs[i]); } // prefetch some next buffer for (size_t i = 0; i < n_chunks; ++i) { nxt_bufs[i] = std::async(readFromStream, streams[i], bz); } // mereged buffers std::vector<std::string> ret; std::future<int> pret = std::async(std::launch::async,writeToStream,fout,std::move(ret)); // comparator for priority queue auto comp = [](vstring_iterator& it0, vstring_iterator& it1) { return strComp(*it1.first, *it0.first); // min heap }; std::priority_queue<vstring_iterator, std::vector<vstring_iterator>, decltype(comp)> pq(comp); // push the begining of each buffer to pq for (size_t i = 0; i < n_chunks; ++i) { if (bufs[i].size() > 0) { pq.push({bufs[i].begin(), i}); } else { streams[i]->close(); // empty stream } } size_t sz2 = 0; // keep track the size of merged buffer // now run until we have nothing to push to pq while (!pq.empty()) { auto vit = pq.top(); auto it = vit.first; //current iterator auto id = vit.second; // id of the buffer pq.pop(); // std::cout << *it << std::endl; if (sz2 + it->size() > bz) { pret.wait(); pret = std::async(std::launch::async,writeToStream,fout,std::move(ret)); sz2 = 0; } sz2 += it->size(); auto nxt = it + 1; // next string in bufs[id] ret.push_back(move(*it)); if (nxt == bufs[id].end()) { // reach end of buffer id // wait for next buffer - expected no wait nxt_bufs[id].wait(); // swap contents of current buffer with next buffer nxt_bufs[id].get().swap(bufs[id]); if (bufs[id].size() > 0) { nxt = bufs[id].begin(); pq.push({nxt, id}); // prefetch next bufs[id] nxt_bufs[id] = std::async(std::launch::async, readFromStream, streams[id], bz); } else { // if buf is empty, streams is ended streams[id]->close(); } } else { // if not, just push to queue pq.push({nxt, id}); } } // last write pret.wait(); writeToStream(fout, ret); return; // what if using k thread to push to a priority queue and one thread to from // the queue? } void cleanTmpFiles(size_t chunks) { for (size_t i = 0; i < chunks; ++i) { std::remove((std::to_string(i) + ".tmp").c_str()); } } // Our extenal sort funtion void externalSort(const char* input_file, const char* output_file, size_t limits, int async) { // read input file istream_pointer fin = std::make_shared<std::ifstream>(input_file); if (!fin->is_open()) { throw std::logic_error("Input file is missing"); } // sort the stream size_t n_chunks = 0; if (async & 1) { std::cout << "Sort chunks in async mode\n"; n_chunks = sortChunksAsync(fin, limits); } else { n_chunks = sortChunks(fin, limits); } fin->close(); // read temporary file std::vector<istream_pointer> streams(n_chunks); for (size_t i = 0; i < n_chunks; ++i) { istream_pointer isptr = std::make_shared<std::ifstream>(std::to_string(i) + ".tmp"); if (!isptr->is_open()) { cleanTmpFiles(n_chunks); throw std::logic_error("Itermediate files " + std::to_string(i) + " are missing!"); } streams[i] = std::move(isptr); } // stream to output file ostream_pointer fout = std::make_shared<std::ofstream>(output_file); // merge the streams if (async & 2) { std::cout << "Merge chunks in async mode\n"; kWayMergeAsync(streams, fout, limits); } else { kWayMerge(streams, fout, limits); } fout->close(); // clean tmp file cleanTmpFiles(n_chunks); std::cout << "Done!\n"; } // out main application int main(int argc, char* argv[]) { const char* input_file = (argc > 1) ? argv[1] : "input.txt"; const char* output_file = (argc > 2) ? argv[2] : "output.txt"; size_t limits = (argc > 3) ? std::stoull(argv[3]) : 6000; int async = (argc > 4) ? std::stoi(argv[4]) : 3; try { externalSort(input_file, output_file, limits, async); } // should clean tmp files, but don't know how many files, so let remind users // doing it catch (const std::exception& e) { std::cerr << e.what() << "\n"; std::cerr << "Considering remove all temp files with \"rm -r *.tmp\"\n"; return -1; } catch (...) { std::cerr << "Exception caught during sorting!\nConsidering remove all temp " "files with \"rm -r *.tmp\""; return -1; } return 0; } 

UPDATE I will summarise the bugs here, and make the better version in my spare time.

Bugs found from user673679:

  • Formatting is hard to read. 2 spaces indent is good for me, but I think it may be hard for others to read. I used clang-format, so it may make thing worst.
  • Bugs related to the minimum size of memory; we should handle the case when we have more than allowed opened files. I think it can be done with incremental merging (merge 512 files at a time for example).
  • The code is over-complicated, especially when handling async IO. He suggests an async_read and async_write class that hold both current and next buffer and hide all internal async detail. I think this is a very nice idea.
  • I should use runtime_error instead of logic_error.
\$\endgroup\$

    1 Answer 1

    3
    \$\begingroup\$

    Formatting:

    • Please don't use two spaces for indent - it's really hard to visually match indentation levels. Either use 4 spaces, or an actual tab character (so that someone with bad eyes like me can set the spacing to whatever they need without having to reformat the code).

    • Some vertical spacing would also help readability.


    Bugs:

    I used a ~4MB random word list for input, and rapidly found the following:

    • Windows (or more specifically the C runtime library on Windows) limits the number of open files to 512 by default. We can increase this to 8192 with _setmaxstdio, but this may not be sufficient either. We probably need an algorithm that explicitly works with a specified file limit.

    • If we fail to open a temporary file (e.g. because we hit the file limit), cleanTmpFiles is called. However, this will not clean up any temporary files that we already have open (i.e. all the files up to the file limit).

    • If the line length is greater than the byte limit, we silently produce nonsense output. With the random word list and the default 6000 byte limit, we get limits that are too small to be practically used (sz = 3, bz = 1 in kWayMergeAsync). We should either emit an error if this happens, or accept lines longer than the limit when necessary.

    • writeToStream writes an empty line at the end of each temporary file. We should either avoid this, or change readFromStream to ignore empty lines.

    • The sorted output contains some duplicate words where the input has no duplicates (I think it's a problem with the read function backtracking, though I haven't tracked it down exactly).


    Buffering:

    The OS does its own buffering for file I/O. I'm not convinced there's a benefit to doing manual buffering on top of that in the merge stage of the algorithm. (If we have strict memory limits, we can call setbuf / pubsetbuf to supply our own buffers).

    I'd suggest starting with an unbuffered version, and examining how that performs before adding any extra complexity.


    Complexity:

    I agree with the reviewers that the code is over-complicated. I think mostly this is due to logic and state that should be hidden (in another function or object) being "inlined" into the higher-level algorithm. e.g. kWayMergeAsync could look something like:

    void kWayMergeAsync(const std::vector<istream_pointer> &streams, const ostream_pointer &fout, size_t nbytes) { // ... declare limits as before ... // class async_buffered_reader: // holds a pair of buffers (current and future) // hides the complexity of buffering and async reading std::vector<async_buffered_reader> bufs; for (size_t i = 0; i != n_chunks; ++i) bufs.emplace_back(streams[i], bz); // ... declare pq as before ... for (size_t i = 0; i != n_chunks; ++i) pq.push({ bufs.next(), i }); // next() does everything we need in terms of waiting, swapping buffers, launching async reads, etc. // class async_buffered_writer: // holds the output buffer // hides the complexity of buffering and async writing async_buffered_writer abw_out(fout, sz); while (!pq.empty()) { auto [nxt, i] = pq.top(); pq.pop(); abw_out.push(std::move(nxt)); // push() does buffering / writing as necessary. if (!bufs[i].is_complete()) pq.push({ bufs.next(), i }); // again, next() does everything we need :) } // abw_out destructor will ensure we finish writing... so we don't need to do anything here! } 

    Quite a rough sketch, but all the complexity that doesn't matter to the algorithm is hidden away in the async_buffered_reader and async_buffered_writer classes.

    We could even implement other reader and writer classes (e.g. without buffering or with different async implementations) with the same interface. And, if we make kWayMerge a template function, we could then switch to those types without changing the merge algorithm at all!


    Algorithm:

    Beyond the bugs mentioned, I don't think there's too much wrong with the algorithm.

    I/O from disk is really slow compared to sorting, so minimizing the amount of reading and writing to disk (which your algorithm does) is a good thing.

    However, it also means that there's not much point in the async I/O (at least with the current implementation). With a fixed memory limit, we always have to wait for a chunk to be fully read or written. An HDD / SSD can only read or write one thing at a time (at least over SATA).

    So improvements we could make would probably focus on:

    • Working with specific hardware (do we have an M.2 SSD allowing multiple operations at once? do we have 8 drives to work with, or just one?).

    • Allowing the user to specify multiple drives / locations for the temporary files.

    • Creating one thread per location for I/O, instead of using std::async.

    • Profiling the code (e.g. using minitrace and chrome://tracing), to optimize it for a specific scenario.


    Misc.:

    • std::logic_error is the wrong exception type to use here (std::runtime_error is probably what we want).

    • We don't need to wrap the file streams in std::shared_ptr.

    • writeToStream returns int for some reason? It could also use a const& when iterating.

    \$\endgroup\$

      Start asking to get answers

      Find the answer to your question by asking.

      Ask question

      Explore related questions

      See similar questions with these tags.