Sunday, June 16, 2013

ForkJoin - a quick exploration .. long overdue

ForkJoin has been available to us Java folks since Java 7 and if you consider the JSR 166 packages, then even longer. I found the time to explore this API only recently.

Having written about Phasers a couple of years ago and realizing that I'd still not found a use for it in production, I was not too eager to explore another "thread-pool" (just kidding - where would we be today without j.u.c classes).

Anyway, I downloaded the latest JDK 8 pre-relase (b93), changed my IntelliJ 12 language mode to Java 8-with-lambdas and ran some simple tests.

Mind you, the JavaDocs for ForkJoin and related classes are quite elaborate and expect you to set aside some time to go through it in detail... which you can probably postpone if you read this post.

ForkJoin is recommended as a thread-pool if your main task has to divide itself into a lot of smaller tasks, usually recursively. Usually in such scenarios the number of children tasks are not known upfront. Technically, the work-stealing aspect of ForkJoin and the claim that it scales well when faced with a large number of tasks makes it a good fit for such workloads.

Essentially, there are 3 ways in which you can write jobs/tasks to run in a ForkJoinPool - RecursiveAction, RecursiveTask and the new JDK 8 CountedCompleter.

The RecursiveAction is fairly simple. It embodies the logic to work on the root of your computation problem. It also splits its work into smaller sub-tasks recursively. Very similar to a binary search but searching each half will be spawned off as a sub-task recursively. The computations for this tree of tasks completes when the leaf nodes are processed.

I can think of a simplified but realistic use case where you'd want to do a mix of sync and async, parallel sub-tasks:

  1. Receive purchase order request from client
  2. Convert request payload (JSON, XML) to Java object
  3. Make synchronous authorization check with LDAP
  4. Make some async requests
    1. Make async request to inventory service to check and reserve stock
    2. Make async request to shipment service and find closest free shipment date to requested destination
    3. Make async request to fetch similar/recommended items to offer package deals
  5. Consolidate results of async requests
  6. Generate response JSON
You could do steps 2, 3 and 6 in a regular ThreadPoolExecutor. If you need to accommodate priority purchase order processing then you could easily do it with a combination of PriorityBlockingQueue and the right constructor on TPE.

In fact, there are so many implementations of BlockingQueue, for example LinkedTransferQueue and SynchronousQueue which could be useful in some special cases. The Exchanger is another such nugget in the j.u.c package. Apparently CompletableFuture is ideal for such cases (like Scala's Promise and Google Guavas' ListenableFuture) but I was surprised to see there were no examples in the JavaDoc.

(Ok, this is turning out to be a longer post than I had expected. Not a quick exploration after all)

Going back to our example, incorporating the 3 asynchronous operations in step 4 might constitute as sub-tasks of step 4. Although in reality, the JavaDoc for ForkJoinPool says that the ForkJoinTasks should ideally not block on external resources like I/O. This is called "unmanaged synchronization" as it involves waiting for resources outside the fork-join system. For that the ManagedBlocker is recommended, although to me it looks like it was added only as an after thought.

So, sadly the above seemingly real-world example might not be a good case for ForkJoin. Which means the ideal use case is something that involves recursively decomposing and pure computation - a.k.a in-memory map-reduce.

So, we make our way back to the overly geeky sort-merge example used in the JavaDocs. In my case, I decided to dispense with the sorting part and simplified the problem even further - purely for illustration purposes.

In my examples, I use ForkJoin to recursively split and list numbers from "start" to "end". At each step if the start to end range is larger than 5 it splits that range into 2 equal halves and forks them off as sub-tasks. Otherwise that task is the leaf level and just adds the numbers in a for-loop from start to end into a queue that is passed around to all tasks.

The first test is a naive implementation of RecursiveAction where it just keeps forking away sub-tasks till the leaf levels. So, the thread that created the root level task attempts to wait for the whole tree of computations to complete. Since each level that spawns the next level of 2 sub-tasks asynchronously and does not wait ("fork()") for the children to complete, the whole tree completes asynchronously. This way the caller thread in the "main()" method comes out of "invoke()" prematurely. As a result this recursive task is almost what we wanted but not entirely.

Since the naive approach of forking did not suffice, we make a small change by making the parent task wait for its children to complete by calling the "join()" method on its children.

An even better approach is to allow each task to fork  away sub-tasks and not have to "join()" on them. Because waiting only means that a thread is not in idle-wait state where it should've been "stealing" work from other threads and making progress. What we need is for a way to let the sub-tasks notify the parent task that it has completed. We can let this bubble up all the way and register a listener at the root.

For the listener we will even use the fancy Lamda feature and something from the new java.util.function package to register a listener. In fact completion listeners can be registered at any level - for example to print to the console that certain % of the tree is complete and so on. There are 2 versions of this - one that sub-classes CountedCompleter to simply let the completions bubble up and then eventually notifies the blocked calling thread in "main()".

The more sophisticated implementation using Lambdas.

Here's an even more sophisticated example that wraps the fork-join pool as a CompletionService and submits 5 tasks and then picks up the results as they complete.

There are a few other things worth reading up about. I skipped the use of RecursiveTask. I also skipped mentioning the different methods to "steal" tasks. The RecursiveAction JavaDocs even has an example that keeps track of spawned sub-tasks and then follows that chain to try and complete them if another thread has not already done it. The reason I did not venture into this bit is because I'm not sure as of now whether it is worth doing this instead of letting the FJ framework do the scheduling internally.

Without studying the source code I can only guess that manually keeping track of spawned sub-tasks and then trying and unforking them would be to help complete that sub-tree of tasks quickly. If we were to use a simple queue to just dump sub-tasks like in the ThreadPoolExecutor, then they would get mixed up with other sub-tasks from other threads in the pool. This means that the current sub-tree may not complete on time because the dependent sub-tasks are somewhere at the back of the queue. This is where FJ shines in addition to it scalability.

One thing we do lose with FJ is that tasks do not have priorities unlike using a PriorityBlockingQueue with TPE so you might end up using multiple FJ pools.

Until next time!