Sunday, January 13, 2019

Java ForkJoinPool with an exmaple

ForkJoinPool was introduced with the release of Java 7 to solve a very particular set of problems that tend to be hard to solve with any other thread pool implementation.

The class is designed to work with divide and conquer algorithm: those where a task can recursively broken into broken sub-tasks.

It looks just like any other thread pool e.e.,ThreadPoolExecutor class, it implements the Executor and ExecutorService interfaces. It uses an unbounded list of tasks that will be run by the number of worker threads configured or by default the number of CPUs exist.

Example: Parallel Merge Sort


Sorting an array of 1 million elements. We have 3 main sub-tasks to sort the array:
  • Sort the first half of the array
  • Sort the second half of the array
  • Merge the two sorted sub-arrays
The base case is when its faster to use insertion sort to sort the sub-array (lets assume when the array has 10 elements) of course makes more sense than using parallel merge sort here. In the end there will be 1 million tasks to sort the leaf arrays, more than 500,000 tasks are needed to merge those sorted sub-arrays, and more than 250,000 tasks to sort the next merged sub-arrays .... and so on.

The most important point to notice is that none of the tasks can complete until the tasks that they have spawned have also completed. Here is when the ForkJoinPool comes very handy. Of course its doable through a ThreadPoolExecutor but can't be done as efficient as ForkJoinPool and the implementation is much more complex.

In ThreadPoolExecutor a parent task must wait for its child tasks to complete, A thread cannot add another task to the queue and then wait for it to complete as once a thread is waiting it can't be used to run one of the sub-tasks.

bla bla bla ... show me the code.

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

import java.util.Arrays;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.IntStream;
import lombok.AllArgsConstructor;

@AllArgsConstructor
public class ForkJoinPoolSample {

  private static int[] elements;

  @AllArgsConstructor
  private static class MergeSortTask extends RecursiveTask<Integer> {
    private int first;
    private int last;

    @Override
    protected Integer compute() {
      int len = 0;

      if (last-first <= 10) {
        Arrays.sort(elements, first, last+1); 
        len = (last-first+1);
      } else {
        /* Sort two sub-arrays */
        int mid = (first+last) >>> 1;
        MergeSortTask leftSubtask = new MergeSortTask(first, mid);
        leftSubtask.fork();
        MergeSortTask rightSubtask = new MergeSortTask(mid+1, last);
        rightSubtask.fork();

        len += leftSubtask.join();
        len += rightSubtask.join();

        /* Merge two sorted sub-arrays */
        MergeTask mergeTask = new MergeTask(first, mid, last);
        mergeTask.fork();
        mergeTask.join();
      }

      return len;
    }
  }

  @AllArgsConstructor
  private static class MergeTask extends RecursiveTask<Integer> {
    private int first;
    private int mid;
    private int last;

    @Override
    protected Integer compute() {
      int[] tmp = new int[last - first + 1];
      int left = first, right = mid + 1, indx = 0;

      while (left <= mid && right <= last) {
        if (elements[left] <= elements[right]) {
          tmp[indx++] = elements[left++];
        } else {
          tmp[indx++] = elements[right++];
        }
      }

      while (left <= mid) {
        tmp[indx++] = elements[left++];
      }

      while (right <= last) {
        tmp[indx++] = elements[right++];
      }

      for (indx = 0; indx < tmp.length; indx++) {
        elements[first + indx] = tmp[indx];
      }

      return tmp.length;
    }
  }

  private static void createRandomInts() {
    elements = new int[100000];
    final ThreadLocalRandom random = ThreadLocalRandom.current();

    IntStream.range(0, 100000)
        .forEach(i -> elements[i] = random.nextInt());
  }

  public static void main(String[] args) {
    createRandomInts();

    long before = System.currentTimeMillis();
    int n = new ForkJoinPool().invoke(new MergeSortTask(0, elements.length-1));
    long after = System.currentTimeMillis();

    System.out.println("Sorted " + n + " Elements in " + (after-before) + " ms.");

    boolean sorted = IntStream.range(0, elements.length-1)
        .allMatch(i -> elements[i] <= elements[i+1]);

    assertThat(sorted, is(true));
  }
}


From the doc:

fork(): Arranges to asynchronously execute this task in the pool the current task is running in
join(): Returns the result of the computation when it is done

Those methods use a series of internal, per-thread queues to manipulate the tasks and switch threads from executing one task to executing another. Of course all of that is transparent to the developer.

References



Saturday, January 5, 2019

Java Weak References

Reusing objects is important in Java but also it can cause memory and performance issues if the objects to be reused can't be freed out by GC. Sometimes we need to reuse objects as soon as they are still being referenced or as soon as they may have good chance to be used in the future.

Weak references give us the opportunity to achieve this by not largely affecting memory and performance of our application (In a more GC-friendly). In today's post I will go through the differences between strong and weak references in Java and the effect of them on garbage collector.

Strong References

 

Strong references are the default type/class references in Java. Objects that are strongly references aren't eligible for GC neither minor nor full GC until they are not strongly referenced anymore. 

 

All object references in Java are strongly referenced unless explicitly specified, see below sections.


StringBuilder builder = new StringBuilder();

Weak References


Weak reference objects are not default and must be explicitly specified. This type of references is to maintain references to objects that are `WEAK` meaning that if the object is eligible for GC `not strongly referenced anymore` but still `weakly referenced` the object can be collected.

StringBuilder builder = new StringBuilder();
WeakReference<StringBuilder> weakBuilder = new WeakReference<>(builder);

Soft References

 

This type of references is also a weak reference that remains in memory for longer - It resists minor GC until memory is really needed to be reclaimed (Application is reaching OOM then it will clear all soft references). Soft references are essentially one large, least recently used (LRU) object pool (cache).

The JVM keeps track of the last access to each reference and calculates if the soft reference is eligible for GC. This is controlled by the JVM `-XX:SoftRefLRUPolicyMSPerMB` flag which is by default = 1000 = One second of lifetime per free megabyte in the heap

StringBuilder builder = new StringBuilder();
SoftReference<StringBuilder> weakBuilder = new SoftReference<>(builder);

Phanotm References

 

Phantom references help us doing finalization to avoid implementing the `Object.finalize` method which could have negative implications to the application as its not deterministic and can make the object reachable again or negatively impact application performance.

Phantom references are different from Weak/Soft references in that GC will not collect a phantom reachable object until its cleared out all acquired resources.

`PhanomReference` class accepts two parameters the referent object and a `ReferenceQueue` which is then used to enqueue objects eligible for clearance.

ReferenceQueue q = new ReferenceQueue();
PhantomReference pr = new PhantomReference(object, referenceQueue);
// Later on another point
Reference r = q.remove();
// Now, clear up any thing you want

Quick Summary (From: Java Performance - The Definitive Guide)

  • Indefinite (soft, weak, phantom) references alter ordinary lifecycle of java objects, allowing them to be reused in ways that may be more GC-friendly.
  • Weak references should be used when an application is interested in an object only if that object is strongly referenced else where in the application.
  • Soft references hold onto objects for (possibly) long periods of time, providing a simple GC-friendly LRU cache.
  • Indefinite reference consume their own memory and hold onto memory of other objects for long periods of time; they should be used sparingly.