5
\$\begingroup\$

I am trying to write a multi-threading program which implements the Producer/Consumer model. Typically, I want to use one Producer which reads lines from a file and puts them in a BlockingQueue, and have multiple Consumers do some processing after retrieving the lines from the BlockingQueue, and store the results in a new file.

Please give me some feedback on what I should consider to achieve high performance. I've spent weeks reading about concurrency and synchronization because I don't want to miss anything, but I am looking for some external feedback, specifically:

  • What type of BlockingQueue implementations should I use for better performance? I can't use a fixed-size BlockingQueue because we don't know how many lines the file has. Or should I use it even if the Producer will be locked? (if the queue is full)

  • If f() is the method that the producers use to process the file lines; knowing that I am using a BlockingQueue, should I synchronize f()? If yes, isn't that going to affect my application? because other Consumers will have to wait for the release of the lock.

Here is my code:

class Producer implements Runnable { private String location; private BlockingQueue<String> blockingQueue; private float numline=0; protected transient BufferedReader bufferedReader; protected transient BufferedWriter bufferedWriter; public Producer (String location, BlockingQueue<String> blockingQueue) { this.location=location; this.blockingQueue=blockingQueue; try { bufferedReader = new BufferedReader(new FileReader(location)); // Create the file where the processed lines will be stored createCluster(); } catch (FileNotFoundException e1) { e1.printStackTrace(); } } @Override public void run() { String line=null; try { while ((line = bufferedReader.readLine()) != null) { // Count the read lines numline++; blockingQueue.put(line); } } catch (IOException e) { System.out.println("Problem reading the log file!"); e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } public void createCluster () { try { String clusterName=location+".csv"; bufferedWriter = new BufferedWriter(new FileWriter(clusterName, true)); bufferedWriter.write("\n"); } catch (IOException e) { e.printStackTrace(); } } } 

And this is the Consumer, where multiple threads will take results from the BlockingQueue and do some processing (f()), and then store the results in a new file:

class Consumer implements Runnable { private String location; private BlockingQueue<String> blockingQueue; protected transient BufferedWriter bufferedWriter; private String clusterName; public Consumer (String location, BlockingQueue<String> blockingQueue) { this.blockingQueue=blockingQueue; this.location=location; clusterName=location+".csv"; } @Override public void run() { while (true) { try { //Retrieve the lines String line = blockingQueue.take(); // Call result=f(line) // TO DO // //bufferedWriter = new BufferedWriter(new FileWriter(clusterName, true)); //BufferedWriter.write(result+ "\n"); } catch (InterruptedException e) { e.printStackTrace(); } } } } 

And the code in my main class, which uses 1 producer and 3 consumers:

BlockingQueue<String> queue = new ArrayBlockingQueue<>(100); Producer readingThread = new Producer(location, queue); new Thread(readingThread).start(); Consumer normalizers = new Consumer(location,queue); ExecutorService executor = Executors.newFixedThreadPool(3); for (int i = 1; i <= 3; i++) { executor.submit(normalizers); } System.out.println("Stopped"); executor.shutdown(); 

Finally, this post really confused me. It suggests that if consumers store the results in a file, it will slow down the process. This might be a problem because I want performance and speed.

\$\endgroup\$

    1 Answer 1

    3
    \$\begingroup\$

    First of all, using a fixed size blocking queue is totally OK. Yes, eventually your producer might wait for space to be available in the queue, but this is exactly the scenario: if the consumers are "fast enough" to keep the queue basically empty, your producer can go full speed. If the consumers run at maximum capacity and still cannot hold up with the queue, wait for them instead of risking a memory overflow.

    Regarding the speed: it does not matter. As long as I/O is involved, from a CPU perspective, the process is so slow that you will not ever measure a significant difference between queue implementations.

    Regarding the complete process: as mentioned in the answer to the post you referred to, using multiple threads to write a single file is a bad idea and pure looking for trouble. Thus, if you have a scenario where your f() function is CPU intense and benefits from multiple processors, you should rather send your outputs to a single consumer using a second queue.

     Producer (reads file) | [Queue 1] | V multiple consumers (in memory processing) | [Queue 2] | V Single Consumer (write output file) 

    That single consumer should keep the file open until the process is finished to benefit the most from I/O-buffering. Note however, that in such a setup it might actually be a challenge to signal the end of the process. It may prove difficult to "know" when the input file has been fully read AND the queue is empty AND all processing units are idle.

    One general note: as you mentioned performance multiple times and this is a common beginner's mistake, please search for "premature optimization is the root of all evil" (the words of guru Knuth about this and quite a number of discussion sites.) Basically: make it working first, measure, check your algorithms, and only as a last resort introduce complicated stuff like multiprocessing. Do you actually have a performance problem? Might it be concerned with a sub-optimal algorithm in function f()? (Codereview might help here ;-)) If not: stick to single threading, or maybe to streams using a parallel stream. Do not do this for some imagined problem without measurements to back up your view.

    \$\endgroup\$
    4
    • \$\begingroup\$Sorry for the late replay, multi-threading is hard to program especially for a beginner like me. Thank for your answer !!!! Regarding your remark about the problem in the SinglerConsumer and the second Queue in which it hard for the SinglerConsumer to "know" when the Consumers have done consuming the first Queue. I've encountered a similar problem in the first Queue and theProducer. To solve that I've used a Poison Pill that the Producer puts on the first Queue to let know the Consumers that is has finished Producing. Is this a solution to for the second scenario ?\$\endgroup\$
      – U. User
      CommentedJun 12, 2018 at 14:04
    • \$\begingroup\$Not really. As you cannot know which consumer will grab the poison pill or end-marker (or even any number of end-markers) you might just end up passing the end-markers through consumer 1 while consumers 2 and 3 are still busy with their respective tasks.\$\endgroup\$
      – mtj
      CommentedJun 12, 2018 at 20:06
    • \$\begingroup\$So what do you propose to solve this issue?\$\endgroup\$
      – U. User
      CommentedJun 12, 2018 at 20:35
    • 1
      \$\begingroup\$Hmm... I guess I misunderstood the "poison pill"... If the meaning is, that the consumer terminates after receiving this message, yes, it should indeed work. Each consumer should send a "I terminated" message to queue 2, and if the single output consumer received all of these, it can also terminate.\$\endgroup\$
      – mtj
      CommentedJun 13, 2018 at 5:21

    Start asking to get answers

    Find the answer to your question by asking.

    Ask question

    Explore related questions

    See similar questions with these tags.