When processing large files (say for parsing or performing computations based on the contents), we want to be able to use multiple processes to perform the job faster.
In my case I wanted to count the frequency of appearance of features in a LibSVM formated file, similar to a word-count which is a typical parallel processing example.
Example input:
1 4:22 6:22 7:44 8:12312 1 4:44 7:44 0 1:33 9:0.44 -1 1:55 4:0 8:12132
We want to count how many times each feature index, i.e. the value before the ':', appears. Here feature 4 appears 3 times, feature 7 appears 2 times etc.
Expected output:
[(4, 3), (7, 2), (8, 2), (1, 2), (6, 1), (9, 1)]
Here's my solution for Python 3:
import argparse import multiprocessing as mp import os from operator import itemgetter from collections import Counter import functools import json def parse_args(): parser = argparse.ArgumentParser() parser.add_argument("--input", required=True) parser.add_argument("--output", action='store_true', default=False) parser.add_argument("--no-stdout", action='store_true', default=False) parser.add_argument("--cores", type=int, default=None) return parser.parse_args() def parse_libsvm_line(line: str) -> list: """ Parses a line in a LibSVM file to return the indexes of non-zero features :param line: A line in LibSVM format: "1 5:22 7:44 99:0.88" :return: A list of ints, one for each index appearing in the line """ features = line.split()[1:] # Get rid of the class value indexes = [int(pair.split(":")[0]) for pair in features] return indexes def process_wrapper(arg_tuple): """ Applies the process function to every line in a chunk of a file, to determine the frequency of features in the chunk. :param arg_tuple: A tuple that contains: line_process_fun, filename, chunk_start, chunk_size :return: A counter object that counts the frequency of each feature in the chunk """ line_process_fun, filename, chunk_start, chunk_size = arg_tuple counter = Counter() with open(filename) as f: f.seek(chunk_start) lines = f.read(chunk_size).splitlines() for line in lines: indexes = line_process_fun(line) for index in indexes: counter[index] += 1 return counter def chunkify(fname, size=1024*1024): """ Creates a generator that indicates how to chunk a file into parts. :param fname: The name of the file to be chunked :param size: The size of each chunk, in bytes. :return: A generator of (chunk_start, chunk_size) tuples for the file. """ file_end = os.path.getsize(fname) with open(fname, 'r') as f: chunk_end = f.tell() while True: chunk_start = chunk_end f.seek(f.tell() + size, os.SEEK_SET) f.readline() chunk_end = f.tell() yield chunk_start, chunk_end - chunk_start if chunk_end > file_end: break if __name__ == '__main__': args = parse_args() pool = mp.Pool(args.cores) jobs = [] # Create one job argument tuple for each chunk of the file for chunk_start, chunk_size in chunkify(args.input): jobs.append((parse_libsvm_line, args.input, chunk_start, chunk_size)) # Process chunks in parallel. The result is a list of Counter objects res_list = pool.map(process_wrapper, jobs) # Aggregate the chunk dictionaries and sort by decreasing value aggregated_count = sorted(functools.reduce(lambda a, b: a + b, res_list).items(), key=itemgetter(1), reverse=True) # Print the result if not args.no_stdout: print(aggregated_count) # Write the result to a file as json (sorted list of [index, count] lists) if args.output: with open(args.input + "_frequencies.json", 'w') as out: json.dump(aggregated_count, out) # Close the pool workers pool.close()
My questions are:
- Is it possible to do this in a single pass in parallel? Now I'm using one pass to determine the chunks, then one more for the processing.
- Is there a more efficient way to chunk text files? Now I'm using chunks of constant byte size.