2
\$\begingroup\$

I've implemented a multithreaded merge sort using Java's ForkJoin framework, and I wanted to gather feedback on its correctness, efficiency, and scalability.

Here's my implementation:

import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; public class CustomRecursiveTaskForArraysSorting extends RecursiveTask<List<Integer>> { private static final int THRESHOLD = 2; private final List<Integer> workload; public CustomRecursiveTaskForArraysSorting(List<Integer> workload) { if (workload == null) { throw new IllegalArgumentException("Workload cannot be null"); } this.workload = workload; } @Override protected List<Integer> compute() { if (workload.size() > THRESHOLD) { return ForkJoinTask.invokeAll(createSubtasks(workload)) .stream() .map(ForkJoinTask::join) .reduce(this::merge) .orElseThrow(); } else { return processing(workload); } } private List<CustomRecursiveTaskForArraysSorting> createSubtasks(List<Integer> workload) { List<CustomRecursiveTaskForArraysSorting> subtasks = new ArrayList<>(); List<Integer> firstHalf = workload.subList(0, workload.size() / 2); List<Integer> secondHalf = workload.subList(workload.size() / 2, workload.size()); subtasks.add(new CustomRecursiveTaskForArraysSorting(new ArrayList<>(firstHalf))); subtasks.add(new CustomRecursiveTaskForArraysSorting(new ArrayList<>(secondHalf))); return subtasks; } private List<Integer> processing(List<Integer> workload) { Collections.sort(workload); return workload; } private List<Integer> merge(List<Integer> a, List<Integer> b) { List<Integer> merged = new ArrayList<>(); int i = 0, j = 0; while (i < a.size() && j < b.size()) { if (a.get(i) < b.get(j)) { merged.add(a.get(i++)); } else { merged.add(b.get(j++)); } } merged.addAll(a.subList(i, a.size())); merged.addAll(b.subList(j, b.size())); return merged; } } 

Key Features:

  1. I use RecursiveTask to divide the array into smaller parts recursively until the threshold size is met, after which the smaller arrays are sorted directly
  2. Merge functionality combines two sorted subarrays into a single sorted array
  3. Small arrays are sorted using Collections.sort()

Questions:

  1. Are there any edge cases or performance bottlenecks that I might have overlooked?
  2. Could this implementation be optimized further for larger datasets or higher concurrency?
  3. How does this compare to manual thread management in terms of performance and readability?

I appreciate any constructive feedback and suggestions to improve the implementation.

\$\endgroup\$

    1 Answer 1

    3
    \$\begingroup\$

    Summary

    If your goal was just to create a parallelized piece of code that works correctly using Fork/Join framework, then you succeeded (with one caveat).

    But if you had greater ambitions, like creating a well performing paralleled merge sort implementation, you should have tried harder.

    And what I found puzzling is the fact that you decided to sort the lists with the size below the threshold using JDK's built-in Timsort. If you're fine with employing built-in sorting, then you can make use of Arrays.parrallelSort() (a parallelized Timsort implementation) instead of inventing your own thing.

    That can be done in the following way:

    var sorted = source.parallelStream().sorted().toList(); 

    In the parallel stream shown above, operation sorted() will dump all elements into a new array and then will delegate the sorting to Arrays.parrallelSort()

    Also worth to point out, you should be dealing with a huge dataset to warrant parallel sorting (concurrency is not free). The current implementation of parallelSort() will resort to sequential sorting if the number of elements is less or equal to 8192.

    If you allow me to use a metaphor, you essentially hired a highly skilled professional and a novice for a job, telling them that the novice would do all the heavy lifting while the top-notch worker handed him tools and swept dirt from the floor.

    In the code you presented, JDK's highly adaptive Timsort is applied for sorting the smallest sublists (of size 2 and 1).

    And regardless of how you're dialing with these tiny sublists, by itself, it's not a great idea performancewise to create a new asynchronous task just to reorder 2 elements or to find out that a sublist has only one element (hence already sorted). The overhead of concurrent execution is a high price to pay for such trivial operations.

    Also, there's a lot of unnecessary data copies and object creation happening during each partitioning, which can be eliminated.


    One note concerning terminology: you should not conflate arrays and Lists. If you do this in an interview, it will cost you points. A List implementation is not necessarily array-backed, Lists as well as other Collections have behavior, while Java arrays don't, arrays cannot grow, arrays behave differently in regard to inheritance (Java arrays are covariant). Definitely explore this topic, if you don't know this already.

    Bug

    The behavior of the presented code will vary depending on the size of the provided list.

    If the given list has the size of 3 or more elements (above the threshold), your code will produce a new sorted list, leaving the source intact. Which should be the expected behavior. You can even provide an unmodifiable list and everything will be fine.

    But if the number of elements is 2, all of a sudden the initial list gets modified. Try passing new ArrayList<>(List.of(1, 0)) and see its ordering updated to 0, 1. And if the initial list is unmodifiable, execution will blow up with an UnsupportedOperationException, which should not happen.

    Memory consumption

    The problem described above can be solved by making a defensive copy of the list to sort in the constructor of the recursive task. But that will result in making extra data copies while constructing subtasks.

    A better alternative is to introduce a method responsible for parallel sorting which will be exposed to the end user, and hide the implementation, i.e. your fork/join task class (for instance, you can define it as package private, that's how JDK hides many of it's classes).

    That would allow to make a copy of the given list, so that RecursiveTask can work that copy instead of making copies at each partitioning step.

    • Eliminating sublist copies

    The presented solution creates a sublist copy for each subtask:

    List<Integer> firstHalf = workload.subList(0, workload.size() / 2); List<Integer> secondHalf = workload.subList(workload.size() / 2, workload.size()); subtasks.add(new CustomRecursiveTaskForArraysSorting(new ArrayList<>(firstHalf))); subtasks.add(new CustomRecursiveTaskForArraysSorting(new ArrayList<>(secondHalf))); 

    Naming is also an issue, which I'll address later

    Instead of spawning these new copies, we can directly work with the copy of the initial list (provided while creating the root task, as I described above). I.e. no need to wrap each sublist with a new ArrayList.

    One might wonder if the modifications made by a thread reordering a sublist would be visible to a thread merging sublists? Here's a quote from the documentation of ForkJoinTask.fork():

    Subsequent modifications to the state of this task or any data it operates on are not necessarily consistently observable by any thread other than the one executing it unless preceded by a call to join or related methods, or a call to isDone returning true

    emphasis added

    I.e. when join() is used for accessing the data of an asynchronously executed subtask, visibility is guaranteed with no additional synchronization.

    • Unnecessary object creation

    During each partitioning step in the compute() method, you're constructing a new Stream to process two elements. In this case, this one-liner obscures the underlying business intent rather than improves readability.

    Compare these two methods

    @Override protected List<Integer> compute() { if (workload.size() > THRESHOLD) { return ForkJoinTask.invokeAll(createSubtasks(workload)) .stream() .map(ForkJoinTask::join) .reduce(this::merge) .orElseThrow(); } else { return processing(workload); } } private List<CustomRecursiveTaskForArraysSorting> createSubtasks(List<Integer> workload) { List<CustomRecursiveTaskForArraysSorting> subtasks = new ArrayList<>(); List<Integer> firstHalf = workload.subList(0, workload.size() / 2); List<Integer> secondHalf = workload.subList(workload.size() / 2, workload.size()); subtasks.add(new CustomRecursiveTaskForArraysSorting(new ArrayList<>(firstHalf))); subtasks.add(new CustomRecursiveTaskForArraysSorting(new ArrayList<>(secondHalf))); return subtasks; } 

    with this refactored equivalent

    @Override protected List<Integer> compute() { if (workload.size() <= THRESHOLD) { return sort(workload); } var left = workload.subList(0, workload.size() / 2); var right = workload.subList(workload.size() / 2, workload.size()); var task1 = new SortingTask(left).fork(); var task2 = new SortingTask(right).fork(); return merge(task1.join(), task2.join()); } 

    It's quite easy to follow, and we've eliminated the creation of dozens of tiny streams and auxiliary 2-element lists.

    • Avoid resizing while merging

    Never use no-args constructor to instantiate an ArrayList or other array-backed Collection when you know the final capacity in advance, because it will lead to resizing with allocating new arrays and copying the elements over:

    private List<Integer> merge(List<Integer> a, List<Integer> b) { List<Integer> merged = new ArrayList<>(); 

    To avoid that, you should provide the required capacity upfront to allocate just enough space to store all the elements: new ArrayList<>(a.size() + b.size()).

    • Treating sublists below the threshold

    Merge sort is not efficient on small-sized lists.

    You might consider raising TRESHOLD to 32 and applying insertion sort to lists that are below the threshold.

    Naming

    Class name is extremely verbose, in the snippet shown above I've shortened it to SortingTask.

    Method name processing() is a bit strange (why using a verb in the continuous form?) and doesn't communicate the purpose of the method to the code reader. It's quite faceless.

    Generics

    There's no advantage in narrowing the specialization of this class only to sorting Lists of Integer. You can make it more versatile by introducing a constructor expecting a parameter of type Comparator (in addition to a constructor that doesn't require a comparator). When a comparator wasn't provided, the elements of the List should be treated as implementing Comparable.

    \$\endgroup\$

      Start asking to get answers

      Find the answer to your question by asking.

      Ask question

      Explore related questions

      See similar questions with these tags.