4
\$\begingroup\$

I am developing a plugin for a system, the plugin catches some events in that system, wraps them and pushes out to another system for analysis. The overhead generated by the plugin must be as low as possible, because the main system will be blocked until the plugin finishes work. So what I have so far is that I catch the event and push it into a BlockingQueue and another thread is listening, if the queue gets an item it will try to push it out from the system.

@Override public void start(){ StatsManager statsManager = new StatsManager(); } //catch event public void taskFinished(){ statsManager.pushData(Context.getData()); } 

The StatsManager:

private BlockingQueue<MyItem> blockingQueue; public StatsManager(){ blockingQueue = new ArrayBlockingQueue<MyItem>(32768); SenderThread sender = new SenderThread(blockingQueue); new Thread(sender).start(); } public void pushData(ItemFromContext ctxItem){ try { MyItem item = CollectInterestingData(ctxItem); blockingQueue.put(item); } catch (InterruptedException e) { logger.error(e.toString()); // Is the thread interrupted anyway? Thread.currentThread().interrupt(); } } private class SenderThread implements Runnable{ private BlockingQueue<MyItem> blockingQueue; public SenderThread(BlockingQueue<MyItem> queue){ blockingQueue = queue; } @Override public void run() { while (true){ try { MyItem item = blockingQueue.take(); postItem(item); } catch (InterruptedException e) { logger.error(e.toString()); Thread.currentThread().interrupt(); } } } 

In the other end, a REST service is waiting. Once it receives an item, it pushes it into a BlockingQueue and returns. Another thread is listening to that queue and once item arrives, it writes it down to database.

What problem I do not foresee? One thing that seems obvious is that I am not handling situation where other end fails to receive.

I am not very experienced with this kind of stuff. Does it make sense at all? All comments will be warmly welcomed.


EDIT 1

I read from the BlockingQueue docs:

put(E e) Inserts the specified element at the tail of this queue, waiting for space to become available if the queue is full.

Do I understand correctly that should the queue be full, the thread will simply hang until space becomes available in the queue, hence blocking the whole systems execution? This is, of course, the worst that could happen to me. I am going to use offer for now, unless somebody suggests anything better.

offer(E e) Inserts the specified element at the tail of this queue if it is possible to do so immediately without exceeding the queue's capacity, returning true upon success and false if this queue is full.

So the balance here would be whether I risk losing some data or I risk blocking the whole system.

Also, would it make sense to implement the whole StatsManager as Runnable and use ThreadPoolExecutor to spin off wrapping and sending tasks? Could that be more efficient? Would this be thread-safe?

If it's relevant, I am expecting roughly 80000-100000 items per 24 hours.

\$\endgroup\$

    2 Answers 2

    3
    \$\begingroup\$
     private BlockingQueue<MyItem> blockingQueue; 

    Make the fields final and initialize them inline, if possible.

     blockingQueue = new ArrayBlockingQueue<MyItem>(32768); 

    Usually, you should define a constant. IMHO it's OK to omit it, if the value appears only once and only in a field initializer (as it's well visible, anyway).

    try { MyItem item = CollectInterestingData(ctxItem); blockingQueue.put(item); } catch (InterruptedException e) { logger.error(e.toString()); // Is the thread interrupted anyway? Thread.currentThread().interrupt(); } 

    The comment makes no sense to me.

    Re-interrupting a thread in an endless loop is rather pointless. All you get is an exception on the next call. This considerable slows down all the following operations and fills your logs for no gain at all.

    Not interrupting, i.e., logging and swallowing the exception would be better. But interrupts are the only way how you can terminate a thread. So in the catch block you should test if termination is really desired and then conditionally exit the loop.

    If it's relevant, I am expecting roughly 80000-100000 items per 24 hours.

    If this were that many per second, you'd have a serious problem. Per minute should be doable (assuming the items are small).

    So the balance here would be whether I risk losing some data or I risk blocking the whole system.

    True. Measure, if you have enough memory to sustain long enough for someone coming and fixing it. Make sure, someone gets alerting before a problem happens. Test the queue for being half full or alike.

    Also, would it make sense to implement the whole StatsManager as Runnable and use ThreadPoolExecutor to spin off wrapping and sending tasks?

    Only if you want to parallelize the communication and are prepared to deal with out-of-order arrivals at the other end.

    Could that be more efficient?

    Not more efficient in terms of CPU usage, but maybe in terms of overall throughput. Before you optimize, ask yourself if you need it. How long does one item take? Multiply it by the itemsPerDay and look at what you get.

    Would this be thread-safe?

    Yes, unless your definition of safety includes in-order-arrival at the other end (which is AFAIK normally unwarranted anyway). The BlockingQueue and Executor are designed for such tasks.

    \$\endgroup\$
    3
    • \$\begingroup\$I like your answer. I even learnt few things from it.\$\endgroup\$
      – SSC
      CommentedNov 29, 2014 at 12:03
    • \$\begingroup\$How would you monitor for SenderThread being alive? Would that mean creating another .. say TimerTask to check if the thread is alive and create new thread if it's dead?\$\endgroup\$
      – Erki M.
      CommentedDec 8, 2014 at 12:46
    • \$\begingroup\$@ErkiM. Possibly. You could instead make the thread never terminate or report its dead by catching Throwable in it. Or just Exception as something like a OutOfMemoryError should better terminate it. Note that watching for termination and creating another thread suffers from the same problems.\$\endgroup\$CommentedDec 8, 2014 at 13:44
    1
    \$\begingroup\$

    Few of my comments & suggestions as per my little knowledge:

    blockingQueue = new ArrayBlockingQueue<MyItem>(32768); 

    It's always good to use a constant than just primitive:

    blockingQueue = new ArrayBlockingQueue<MyItem>(Short.MAX_VALUE); 

    As per Java Docs:

    put(E e) Inserts the specified element into this queue, waiting if necessary for space to become available. So, this can possible block if there is no more capacity.

    blockingQueue.put(item); 

    So, use

    blockingQueue.offer(item); 

    instead.

    What problem I do not foresee? One thing that seems obvious is that I am not handling situation where other end fails to receive.

    Yes, other end might fail, so you can implement a timeout to get the response and if it doesn't, push your item back in the queue. And BlockingQueue is threadsafe already so you don't need to worry about using ThreadPoolExecutor instead just to make it thread safe. As far as efficiency is concerned, I am not very sure about it, may be someone else could provide suggestion about that.

    \$\endgroup\$
    5
    • 1
      \$\begingroup\$I guess you mean Short.MAX_VALUE.\$\endgroup\$CommentedNov 29, 2014 at 8:37
    • \$\begingroup\$Yes, Integer.MAX_VALUE would be too much, from what I understand, it would use around 2GB of memory even if the queue is empty. I have been sweating hard to figure out the optimal capacity of that queue (because plugin and main system run inside one java process and share memory). There are actually 2 queues in the system, both 32k long and I hope they have capacity for more or less 24h.\$\endgroup\$
      – Erki M.
      CommentedNov 29, 2014 at 10:02
    • \$\begingroup\$I updated the answer from Integer.MAX_VALUE to Short.MAX_VALUE\$\endgroup\$
      – SSC
      CommentedNov 29, 2014 at 12:00
    • 1
      \$\begingroup\$@ErkiM. You said you expect up to 100k items per 24h so if you want to be able to buffer for 24h in case the external system goes down then you'd need to make your buffer that long.\$\endgroup\$
      – ChrisWue
      CommentedNov 29, 2014 at 22:34
    • \$\begingroup\$@ChrisWue, there are actually more queues than 1 (I have differenct type of items, each have their own queue), Calculations show, that they should have room for roughly 24 h.\$\endgroup\$
      – Erki M.
      CommentedDec 2, 2014 at 15:51

    Start asking to get answers

    Find the answer to your question by asking.

    Ask question

    Explore related questions

    See similar questions with these tags.