2
\$\begingroup\$

When processing large files (say for parsing or performing computations based on the contents), we want to be able to use multiple processes to perform the job faster.

In my case I wanted to count the frequency of appearance of features in a LibSVM formated file, similar to a word-count which is a typical parallel processing example.

Example input:

1 4:22 6:22 7:44 8:12312 1 4:44 7:44 0 1:33 9:0.44 -1 1:55 4:0 8:12132 

We want to count how many times each feature index, i.e. the value before the ':', appears. Here feature 4 appears 3 times, feature 7 appears 2 times etc.

Expected output:

[(4, 3), (7, 2), (8, 2), (1, 2), (6, 1), (9, 1)] 

Here's my solution for Python 3:

import argparse import multiprocessing as mp import os from operator import itemgetter from collections import Counter import functools import json def parse_args(): parser = argparse.ArgumentParser() parser.add_argument("--input", required=True) parser.add_argument("--output", action='store_true', default=False) parser.add_argument("--no-stdout", action='store_true', default=False) parser.add_argument("--cores", type=int, default=None) return parser.parse_args() def parse_libsvm_line(line: str) -> list: """ Parses a line in a LibSVM file to return the indexes of non-zero features :param line: A line in LibSVM format: "1 5:22 7:44 99:0.88" :return: A list of ints, one for each index appearing in the line """ features = line.split()[1:] # Get rid of the class value indexes = [int(pair.split(":")[0]) for pair in features] return indexes def process_wrapper(arg_tuple): """ Applies the process function to every line in a chunk of a file, to determine the frequency of features in the chunk. :param arg_tuple: A tuple that contains: line_process_fun, filename, chunk_start, chunk_size :return: A counter object that counts the frequency of each feature in the chunk """ line_process_fun, filename, chunk_start, chunk_size = arg_tuple counter = Counter() with open(filename) as f: f.seek(chunk_start) lines = f.read(chunk_size).splitlines() for line in lines: indexes = line_process_fun(line) for index in indexes: counter[index] += 1 return counter def chunkify(fname, size=1024*1024): """ Creates a generator that indicates how to chunk a file into parts. :param fname: The name of the file to be chunked :param size: The size of each chunk, in bytes. :return: A generator of (chunk_start, chunk_size) tuples for the file. """ file_end = os.path.getsize(fname) with open(fname, 'r') as f: chunk_end = f.tell() while True: chunk_start = chunk_end f.seek(f.tell() + size, os.SEEK_SET) f.readline() chunk_end = f.tell() yield chunk_start, chunk_end - chunk_start if chunk_end > file_end: break if __name__ == '__main__': args = parse_args() pool = mp.Pool(args.cores) jobs = [] # Create one job argument tuple for each chunk of the file for chunk_start, chunk_size in chunkify(args.input): jobs.append((parse_libsvm_line, args.input, chunk_start, chunk_size)) # Process chunks in parallel. The result is a list of Counter objects res_list = pool.map(process_wrapper, jobs) # Aggregate the chunk dictionaries and sort by decreasing value aggregated_count = sorted(functools.reduce(lambda a, b: a + b, res_list).items(), key=itemgetter(1), reverse=True) # Print the result if not args.no_stdout: print(aggregated_count) # Write the result to a file as json (sorted list of [index, count] lists) if args.output: with open(args.input + "_frequencies.json", 'w') as out: json.dump(aggregated_count, out) # Close the pool workers pool.close() 

My questions are:

  • Is it possible to do this in a single pass in parallel? Now I'm using one pass to determine the chunks, then one more for the processing.
  • Is there a more efficient way to chunk text files? Now I'm using chunks of constant byte size.
\$\endgroup\$

    1 Answer 1

    2
    \$\begingroup\$

    A few comments, unfortunately not on the multiprocessing part.

    • parser.add_argument("--output", action='store_true', default=False) is exactly the same as parser.add_argument("--output", action='store_true'), the 'store_true' action makes sure that it is false if the flag is not set.

    • I like to give my argument parsing functions an optional argument, so def parse_args(args=None) and later use return parser.parse_args(args). This allows you to interactively test this function by passing a list of strings to see if the parsing works as expected. When it is None, the parsing proceeds as it currently does.

    • Python 3 has advanced tuple unpacking, so you could do _, *features = line.split() instead of features = line.split()[1:]. Whether or not that is better is debatable, but it is good to know that this feature exists.

    • While "indexes" is a valid plural of "index", if it is used in the mathematical sense, you should probably use "indices".

    • Counter objects have a nice update method. It can either take another Counter (or actually any dict subclass) object, in which case it works just like the normal dict.update. But it can also take an iterable, in which case it consumes that iterable just like it does when creating the object (by counting the occurrences of each object). So

      indexes = line_process_fun(line) for index in indexes: counter[index] += 1 

      Could just be

      counter.update(line_process_fun(line)) 
    • Indeed, that whole function could be greatly simplified by using map and itertools.chain:

      from itertools import chain def process_wrapper(arg_tuple): """ Applies the process function to every line in a chunk of a file, to determine the frequency of features in the chunk. :param arg_tuple: A tuple that contains: line_process_fun, filename, chunk_start, chunk_size :return: A counter object that counts the frequency of each feature in the chunk """ line_process_fun, filename, chunk_start, chunk_size = arg_tuple with open(filename) as f: f.seek(chunk_start) lines = f.read(chunk_size).splitlines() return Counter(chain.from_iterable(map(line_process_fun, lines))) 
    • Right now you manually have to unpack line_process_fun, filename, chunk_start, chunk_size = arg_tuple, but if you used Pool.starmap instead of Pool.map, you could make the signature def process_wrapper(line_process_fun, filename, chunk_start, chunk_size).

    • Counter objects support not only updating, but also summing two instances. In this case, quite intuitively, all counts are added. They also have a most_common method which returns tuples of values and counts from most to least counts, so exactly what your reduce and sort does. And finally, sum takes an optional second argument stating what the base object is:

      res_list = pool.map(process_wrapper, jobs) aggregated_count = sum(res_list, Counter()).most_common() 

      Make sure to test that this does not slow down the processing, but even if it does, it sure is easier to understand. For the small example given, it is slightly slower on my machine.

    • multiprocessing.Pool can also be used as a context manager to ensure it is closed after the processing. This would introduce another level of indentation, though.
    \$\endgroup\$
    2
    • \$\begingroup\$Thanks for the review @Graipher, I like the use of map and chain. For the aggregation I had tried to use sum, but didn't know it takes a second argument, that's cool. I thought about most_common() not sure why I went with the sort approach in the end.\$\endgroup\$
      – Bar
      CommentedOct 19, 2018 at 14:48
    • \$\begingroup\$@Bar You're welcome :-). The second argument of sum is indeed not used very often, but it can come in handy. I also added a point about using Pool.starmap, instead of Pool.map for some more readability.\$\endgroup\$
      – Graipher
      CommentedOct 19, 2018 at 14:55

    Start asking to get answers

    Find the answer to your question by asking.

    Ask question

    Explore related questions

    See similar questions with these tags.