Please review my design and code implementation and suggest if any optimisation is possible in terms of performance (time complexity / space complexity ) or any better way of design or implementation. You are welcome to provide your review comments so that I learn to code and design better.
Git hub link : https://github.com/lalitkumar-kulkarni15/Consumer-Producer-case-study
Problem statement :-
Consumer-Producer-case-study
# Case Study: ## Provided Together with this document we provide src
directory containing source code, which is the basis for the task. ## Expectation Implement the task as described below. You can start whenever you like and send us the result once you are done. Add documentation like you would in a real project. Bring the code to a production standard (not more, not less). Do not spend more than 2 hours on this exercise. ## User Story There is a producer (Producer) of price updates for stocks. This producer will generate constant price updates for a fix number of stocks. The producer should not block, every price update should be consumed as quickly as possible. Furthermore there is a load handler (LoadHandler) which will consume the price updates of the producer. In the current implementation the load handler is just passing on the update to a consumer. This should be changed. The consumer (Consumer) will receive the price updates from the load handler. (The current implementation will just print out all price updates for convenience sake.) The consumer is representing a legacy system that cannot consumer more than a certain number of price updates per second. Otherwise it would fall over. ## Task The task of this exercise is to extend the LoadHandler to limit the updates per second to the consumer to a certain given number (MAX_PRICE_UPDATES). In order to achieve this, it is allowed to drop price updates, since otherwise the application will run out of memory, if the application will keep all of them. It is important that, if a price update will be send to the consumer, it has to be the most recent price. ## Result - Fork the project - Implement your solution
Below are the classes :-
1) Producer.java
package com.exercise.producer; import com.exercise.model.PriceUpdate; import com.exercise.producer.Producer; import com.exercise.regulator.LoadHandler; public class Producer extends Thread { private LoadHandler loadHandler; public Producer(LoadHandler loadHandler) { this.loadHandler = loadHandler; } @Override public void run() { System.out.println("Run inside the producer is called."); try { generateUpdates(); } catch (InterruptedException e) { e.printStackTrace(); } } public void generateUpdates() throws InterruptedException{ for (int i = 1; i < 10000000; i++) { System.out.println("Stock set start"); Thread.sleep(5000); System.out.println("-----------------------"); loadHandler.receive(new PriceUpdate("Apple", 97.85)); loadHandler.receive(new PriceUpdate("Google", 160.71)); loadHandler.receive(new PriceUpdate("Facebook", 91.66)); loadHandler.receive(new PriceUpdate("Google", 160.73)); loadHandler.receive(new PriceUpdate("Facebook", 91.71)); loadHandler.receive(new PriceUpdate("Google", 160.76)); loadHandler.receive(new PriceUpdate("Apple", 97.85)); loadHandler.receive(new PriceUpdate("Google", 160.71)); loadHandler.receive(new PriceUpdate("Facebook", 91.63)); System.out.println("-----------------------"); System.out.println("Stock set over"); } } }
2) Consumer.java
package com.excercise.consumer; import java.util.List; import com.exercise.model.PriceUpdate; /** * Please do not change the Consumer. * */ public class Consumer { public void send(List<PriceUpdate> priceUpdates) { System.out.println("List of price updates received at consumer class is size : "+priceUpdates.size()); priceUpdates.forEach(System.out::println); } }
3) PriceUpdate.java
package com.exercise.model; import java.time.LocalDateTime; public class PriceUpdate { private final String companyName; private final double price; private LocalDateTime localDateTime; public PriceUpdate(String companyName, double price) { this.companyName = companyName; this.price = price; this.localDateTime = LocalDateTime.now(); } public String getCompanyName() { return this.companyName; } public double getPrice() { return this.price; } public LocalDateTime getLocalDateTime() { return localDateTime; } public void setLocalDateTime(LocalDateTime localDateTime) { this.localDateTime = localDateTime; } @Override public String toString() { return companyName + " - " + price +" - "+localDateTime; } @Override public boolean equals(Object obj) { if(null==obj) { return false; } else if(null != obj && obj instanceof PriceUpdate) { final PriceUpdate priceUpdate = (PriceUpdate) obj; if(null!=priceUpdate && priceUpdate.getCompanyName().equalsIgnoreCase(this.getCompanyName()) && priceUpdate.getPrice()==(this.getPrice()) && (priceUpdate.getLocalDateTime().equals(this.getLocalDateTime()))) { System.out.println("Equals returning true"); return true; } } System.out.println("Equals returning false"); return false; } @Override public int hashCode() { int hash = this.companyName.hashCode() * Double.valueOf(this.price).hashCode() * this.localDateTime.hashCode(); return hash; } }
4) LoadHandler.java
package com.exercise.regulator; import java.util.LinkedList; import java.util.Queue; import com.excercise.consumer.Consumer; import com.exercise.model.PriceUpdate; public class LoadHandler { private static final int MAX_PRICE_UPDATES = 100; private final Consumer consumer; public LoadHandler (Consumer consumer) { this.consumer = consumer; Scheduler scheduler = new Scheduler(consumer); scheduler.startConsumerFeedJobThread(); } private static Queue<PriceUpdate> priceUpdateQueue = new LinkedList<>(); public static Queue<PriceUpdate> getPriceUpdateQueue() { return priceUpdateQueue; } public static void setPriceUpdateQueue(Queue<PriceUpdate> priceUpdateQueue) { LoadHandler.priceUpdateQueue = priceUpdateQueue; } public void receive(PriceUpdate priceUpdate) { if(null!=priceUpdate) { if(priceUpdateQueue.size()<MAX_PRICE_UPDATES) { priceUpdateQueue.add(priceUpdate); System.out.println("Stock price added successfully."); } else { priceUpdateQueue.poll(); System.out.println("Stock price polled successfully."); priceUpdateQueue.add(priceUpdate); System.out.println("Stock price added after poll successfully."); } } } }
5) RemoveOlderStcksPredicate.java
package com.exercise.regulator; import java.time.LocalDateTime; import java.util.function.Predicate; import com.exercise.model.PriceUpdate; public class RemoveOlderStcksPredicate { public static Predicate<PriceUpdate> isStockEqual(LocalDateTime localDateTime){ System.out.println("Inside is stock equal localdateTime is ::"+localDateTime); return p->p.getLocalDateTime().isBefore(localDateTime); } }
6) StockPredicate.java
package com.exercise.regulator; import java.util.HashSet; import java.util.Queue; import java.util.function.Predicate; import com.exercise.model.PriceUpdate; public class StockPredicate { public static Predicate<PriceUpdate> isStockEqual(Queue<PriceUpdate> stocksSentToConsumerList){ return new HashSet<>(stocksSentToConsumerList)::contains; } }
7) Scheduler.java
package com.exercise.regulator; import java.time.LocalDateTime; import java.util.Deque; import java.util.LinkedList; import java.util.List; import java.util.Queue; import com.excercise.consumer.Consumer; import com.exercise.model.PriceUpdate; public class Scheduler { private Consumer consumer; private static Deque<PriceUpdate> stocksSentToConsumerList = new LinkedList<>(); private static LocalDateTime lastSentDateTime; public static void setLastSentDateTime(LocalDateTime lastSentDateTime) { Scheduler.lastSentDateTime = lastSentDateTime; } public Scheduler(final Consumer consumer) { this.consumer = consumer; } public void startConsumerFeedJobThread() { final Runnable stockReguRunnable = getRunnableForstockRegu(); final Thread stockRegulatorThread = new Thread(stockReguRunnable); stockRegulatorThread.start(); } private Runnable getRunnableForstockRegu() { final Runnable runnable = () -> { try { sendRegulatedStcksToCnsmr(); } catch (InterruptedException exception) { exception.printStackTrace(); } }; return runnable; } private void sendRegulatedStcksToCnsmr() throws InterruptedException { System.out.println("----Starting the scheduler for fetch in scheduler----"); while (true) { askThreadToSleep(); System.out.println("Got the stock price collection from main queue"); Queue<PriceUpdate> priceUpdateQueue = LoadHandler.getPriceUpdateQueue(); System.out.println("Price update queue size after fetching ::"+priceUpdateQueue.size()); List<PriceUpdate> priceUpdateQueueCopy = new LinkedList<>(priceUpdateQueue); System.out.println("Copied the stock collection into new queue"); System.out.println("Going to check for already sent stock prices"); System.out.println("-----Printing stocks inside stocksSentToConsumerList------"); stocksSentToConsumerList.forEach(System.out::println); System.out.println("-----------------------------------------------------------"); System.out.println("-----Printing stocks inside priceUpdateQueueCopy------"); priceUpdateQueueCopy.forEach(System.out::println); System.out.println("-----------------------------------------------------------"); if(stocksSentToConsumerList.size()>0) { priceUpdateQueueCopy.removeIf((StockPredicate.isStockEqual(stocksSentToConsumerList).or(RemoveOlderStcksPredicate.isStockEqual(lastSentDateTime)))); } else{ priceUpdateQueueCopy.removeIf((StockPredicate.isStockEqual(stocksSentToConsumerList))); } System.out.println("-----Printing stocks inside priceUpdateQueueCopy after filtering------"); priceUpdateQueueCopy.forEach(System.out::println); System.out.println("-----------------------------------------------------------"); System.out.println("Got filtered stock list with size ::"+priceUpdateQueueCopy.size()); this.consumer.send(priceUpdateQueueCopy); if(null!=priceUpdateQueueCopy && priceUpdateQueueCopy.size()>0) { savePrevConsumdStcks(priceUpdateQueueCopy); } } } private void askThreadToSleep() throws InterruptedException { System.out.println("----Scheduler sleeping for 1 sec----"); Thread.sleep(1000); System.out.println("----Scheduler woke up after 1 sec----"); } private void savePrevConsumdStcks(final List<PriceUpdate> priceUpdateListToSend) { System.out.println("Clearing the stock sent to consumer list before adding the price update list"); stocksSentToConsumerList.clear(); stocksSentToConsumerList.addAll(priceUpdateListToSend); setLastSentDateTime(stocksSentToConsumerList.peekLast().getLocalDateTime()); System.out.println("Added the stock price sent list to the collection for next cycle comparison."); System.out.println("Last sent timestamp is :"+lastSentDateTime); } }
8) Exercise.java ( Main class )
package com.exercise.init; import com.excercise.consumer.Consumer; import com.exercise.producer.Producer; import com.exercise.regulator.LoadHandler; /** * Scenario: There is a producer (Producer) of price updates for stocks. * This producer will generate constant price updates for a fix number of stocks. * The producer should not block, every price update should be consumed as quickly as possible. * * Furthermore there is a load handler (LoadHandler) which will consume the price updates of the producer. * In the current implementation the load handler will just pass on the update to a consumer. This should be changed. * * The consumer (Consumer) will receive the price updates from the load handler. * (The current implementation will just print out all price updates for convenience sake.) * The consumer should represent a legacy system that cannot consumer more than a certain number of price updates per second. * Otherwise it will fall over. * * The task of this exercise is to extend the LoadHandler to limit the updates per second to the consumer to a certain given number (MAX_PRICE_UPDATES). * In order to achieve this, it is a allowed to drop price updates, since otherwise the application will run out of memory, if the application will keep all of them. * It is important that, if a price update will be send to the consumer, it has to be the most recent price. * * Example: * * Updates arrive in this order from the consumer: * * Apple - 97.85 * Google - 160.71 * Facebook - 91.66 * Google - 160.73 * * The load balancer has received all updates and is going to send them out to the consumer like this now: * * Apple - 97.85 * Google - 160.73 * Facebook - 91.66 * * So the first Google update (160.73) has been dropped. * * In order to limit the number of updates per second to the consumer, * it will be necessary to write some sort of scheduler/timer. * It is acceptable to send the updates as bulk once per second. * Ideally the load should be spread out into smaller chunks during that second. * * Please consider that the number of stocks might be bigger than the number of allowed updates per second to the consumer. * Make sure that the application will not run out of memory, even if the number of stocks or updates per second might be bigger than MAX_PRICE_UPDATES. * * Please implement the <b>hashCode</b> and <b>equals</b> in PriceUpdate, * since those methods might be relevant for the task. * * It is fine to create additional classes and tests. * * You can use all features of Java 8 as well as any additional library as long as it is open source and will be provided with the solution. * * */ public class Exercise { public static void main(String[] args) { Consumer consumer = new Consumer (); LoadHandler loadHandler = new LoadHandler(consumer); Producer producer = new Producer(loadHandler); producer.run(); } }