I am trying to write a multi-threading program which implements the Producer/Consumer model. Typically, I want to use one Producer
which reads lines from a file and puts them in a BlockingQueue
, and have multiple Consumer
s do some processing after retrieving the lines from the BlockingQueue
, and store the results in a new file.
Please give me some feedback on what I should consider to achieve high performance. I've spent weeks reading about concurrency and synchronization because I don't want to miss anything, but I am looking for some external feedback, specifically:
What type of
BlockingQueue
implementations should I use for better performance? I can't use a fixed-sizeBlockingQueue
because we don't know how many lines the file has. Or should I use it even if theProducer
will be locked? (if the queue is full)If
f()
is the method that the producers use to process the file lines; knowing that I am using aBlockingQueue
, should I synchronizef()
? If yes, isn't that going to affect my application? because otherConsumer
s will have to wait for the release of the lock.
Here is my code:
class Producer implements Runnable { private String location; private BlockingQueue<String> blockingQueue; private float numline=0; protected transient BufferedReader bufferedReader; protected transient BufferedWriter bufferedWriter; public Producer (String location, BlockingQueue<String> blockingQueue) { this.location=location; this.blockingQueue=blockingQueue; try { bufferedReader = new BufferedReader(new FileReader(location)); // Create the file where the processed lines will be stored createCluster(); } catch (FileNotFoundException e1) { e1.printStackTrace(); } } @Override public void run() { String line=null; try { while ((line = bufferedReader.readLine()) != null) { // Count the read lines numline++; blockingQueue.put(line); } } catch (IOException e) { System.out.println("Problem reading the log file!"); e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } public void createCluster () { try { String clusterName=location+".csv"; bufferedWriter = new BufferedWriter(new FileWriter(clusterName, true)); bufferedWriter.write("\n"); } catch (IOException e) { e.printStackTrace(); } } }
And this is the Consumer, where multiple threads will take results from the BlockingQueue
and do some processing (f()
), and then store the results in a new file:
class Consumer implements Runnable { private String location; private BlockingQueue<String> blockingQueue; protected transient BufferedWriter bufferedWriter; private String clusterName; public Consumer (String location, BlockingQueue<String> blockingQueue) { this.blockingQueue=blockingQueue; this.location=location; clusterName=location+".csv"; } @Override public void run() { while (true) { try { //Retrieve the lines String line = blockingQueue.take(); // Call result=f(line) // TO DO // //bufferedWriter = new BufferedWriter(new FileWriter(clusterName, true)); //BufferedWriter.write(result+ "\n"); } catch (InterruptedException e) { e.printStackTrace(); } } } }
And the code in my main class, which uses 1 producer and 3 consumers:
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100); Producer readingThread = new Producer(location, queue); new Thread(readingThread).start(); Consumer normalizers = new Consumer(location,queue); ExecutorService executor = Executors.newFixedThreadPool(3); for (int i = 1; i <= 3; i++) { executor.submit(normalizers); } System.out.println("Stopped"); executor.shutdown();
Finally, this post really confused me. It suggests that if consumers store the results in a file, it will slow down the process. This might be a problem because I want performance and speed.