August 9, 2012

Introduction to Fork Join Framework

Over the years of evolutions on the hardware fronts, finally we are in the era of multicore processors, which have been capturing its place in our day-to-day devices. Multicore processors are able to perform multiple parallel tasks in separate cores. This has made the programmers and developers to think about doing Parallel Programming or Multi Core Programming and making maximum benefits out of the hardware potentials.




This Java Programming Tutorial is an introduction to Java's latest Fork Join Framework, which effectively utilize Multicore potentials of the hardwares and efficiently improves performance of Threads by implementing the Work Stealing Algorithm. For the ease of understanding the concepts, instead of directly jumping to the Fork Join Framework Tutorial, we will start with the Basic Java Multithreading concepts and techniques.Then, we will move ahead to the Java Executor Framework, where we will see the concepts and some sample Hello World examples of Executor, Executor Service, Executor Thread Pool, Callable, and the Future objects. Finally, we will have a detailed look at the Fork Join Framework with the help of example code. By the end of the tutorials we will be able to understand the comparisons like Fork Join Framework vs Executor.     



The Older Approach

From the beginning itself, Java Programming has a built-in support for concurrency, in the form of Threads. Java Parallel Programming used to allow programmers to create their own threads and develop the programs running in concurrent fashion. Below is an example of  the plain Java Multithreading.

 new Thread (new Runnable(){
     public void run(){
         //Tasks to be performed on this thread
     }
 }).start();

While doing so, developers had to take care of thread creation; managing the thread pools; managing the thread life cycles and handling the inter thread communications. Though, the language has provided us with methods to manage the thread life cycles and methods like wait, notify, and notifyAll, which are used for thread concurrency, but it was really a hard job to write error free codes.

Many a times, the threads used to get stuck while waiting for locks to open. Lets consider a scenario when a Provider is waiting, as the Consumers queue is full and the consumer is waiting for Provider to push its next update. These kinds of problems were very difficult to debug and fix.

It was not easy to monitor operations of any single thread, and to end it after certain timeout or on occurrence of an exception.

Also due to the non-atomic behavior of variable operations, it used to give unexpected operational results when multiple threads share the same variables. A common solution for such a problem was synchronous code. It was again a pain to optimize the amount of Synchronous and Asynchronous code. By writing synchronous code we were not using concurrency in its full strength. Also limiting the concurrency (using Synchronized blocks) affects the performance


The Concurrency Package:

Then, in the 5th version of Java, came the famous concurrency package, which was, later enhanced in Java SE 6 and Java SE 7.

It has additions of Atomic Primitive Wrapper classes. To explain this better lets consider an operation ‘x++’ on any integer ‘x’. This operation involves two tasks, one is to get the value of ‘x’ and other is to set the new value as  ‘x+1’. These kinds of operations are more error prone in multithreaded environment, as the thread performing this operation may get suspended within the get and set operations and till the time any preempted thread may update the value of ‘x’ to something else.

To overcome such problems, Java SE 7 has come up with Atomic Wrapper classes e.g. AtomicInteger, AtomicFloat etc. These classes have atomic methods like getAndIncrement, incrementAndGet, getAndDecrement etc. 

Apart from this, the concurrency package, primarily introduces the concept of Executor Framework, which abstract out the underlying programming complexity of threads and provide us with many easy-to-use methods. Executors can wrap the Runnable instances and mange their life cycles and pooling of the threads.

Till the time, threads were not allowed to return the computational results to the main thread. We used to use workarounds like shared objects. Now with the additions of Callable , threads are able to return the results.

Callable is similar to Runnable as both are designed in a manner that their instances are executed by another threads. The Callable returns results in the form of Future objects, which represent the ‘future’ data. Future object can be checked asynchronously to see if its corresponding thread has finished the processing or it has thrown any exception.

Lets have a look at a simple use of Executor and Callable. We have a list of students with their scores in three subjects. We need to calculate and print average scores for each student.

Without using threads, we can simply iterate over the list of students and calculate their average scores one after the other. But now, we are interested doing it concurrently. Below program creates a separate thread for each student, each thread calculates and returns average scores of respective student.


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ExecutorInJava {
 public static void main(String[] arg) {
  // Prepare list of 'Callable' students
  List<Student> students = new ArrayList<Student>();
  students.add(new Student("Bob", 66, 80, 95));
  students.add(new Student("Tom", 94, 82, 72));
  students.add(new Student("Joy", 88, 85, 99));
  students.add(new Student("Mills", 82, 75, 89));

  // Create Executor service with 3 threads in a pool
  ExecutorService executor = Executors.newFixedThreadPool(3);
  // Ask executor to invoke all of the operations
  List<Future<Float>> results = null;
  try {
   results = executor.invokeAll(students);
  } catch (InterruptedException e1) {
   e1.printStackTrace();
  }

  // Print the results
  for (Future<Float> future : results) {
   try {
    System.out.println(future.get());
   } catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
   }
  }
 }
}

class Student implements Callable<Float> {
 String name;
 Integer subject1;
 Integer subject2;
 Integer subject3;

 public Student(String name, Integer subject1, Integer subject2,
   Integer subject3) {
  super();
  this.name = name;
  this.subject1 = subject1;
  this.subject2 = subject2;
  this.subject3 = subject3;
 }

 @Override
 public Float call() throws Exception {
  return (subject1 + subject2 + subject3) / 3.0f;
 }
}


Did you see that, how the concurrency package helped us to make the multithreaded program look so simple. We have passed the Collection of Callable students to the invokeAll method. The ExecutorService simply distribute the Callable tasks among the threads residing in a pool of size 3.

The invokeAll method returns a collection of Future objects. The Future objects can be tested individually to check whether their corresponding thread operations are finished, whether there was an exception, and also we can anytime cancel those operations. Cancellation of plain old threads was a tedious job to perform. The invokeAll method is non-blocking and hence we can anytime iterate through the collection of Future and start working on the completed results.

The Executors are a big step forward than the plain old threads because they ease the management of concurrent tasks. The algorithms they work on is ‘Divide-and-Conquer’, which is also referred as ‘Map-and-Reduce’ algorithm. According to this algorithm, the bigger tasks are divided into smaller chunks of subtasks, later the subtasks are executed concurrently and finally the results of the subtasks are combined to get the final outcome. Identifying parallel chunks of subtasks and dividing the task is called as mapping and combining the results of subtasks to form a final outcome is called as reducing.

Lets modify the above problem. Now, we want to calculate overall average of subject1 scores in entire classroom. Generally, we will iterate through the list of students and calculate total of subject1 scores for all students and then divide it by number of students (i.e. size of the collection).

But the ‘Map-and-Reduce’ algorithm has given us with another option. Calculating the average of the entire classroom is a big task. Lets think of dividing it into multiple individual subtasks. While iterating through the list of students, we will form chunks of 5 students each. That means for every 5 students we will create a separate Callable and assign it with the marks of 5 students.

Finally for a classroom of 100 students we will have 20 threads calculating the averages of their own chunk of Students. We can simply iterate over the resulted Collection of Future, add the averages and divide the total by number of chunks (20 chunks in our case). The Map-and-reduce algorithm will surely boost the performance than that in case of a Single thread model.

The problem with Executor is related to the Parallelism. When a Callable waits for the results of another Callable, it is put in a waiting state and hence wasting an opportunity to take over another Callable task, which is waiting in a queue. To overcome this issue Java 7 has issued the Fork and Join framework. Lets have a look at it in detail.    

 

Fork And Join:

The newly added ForkJoinPool executor is dedicated to run the instances implementing ForkJoinTask. These executors support the creation of subtasks and wait for the subtasks to complete. The major difference with the ForkJoinPool executor is that it can dispatch the tasks between its internal threads pool by stealing jobs when a task is waiting for another task to complete and there are other pending jobs to be executed. This kind of algorithm is called as Work Stealing algorithm.

In this algorithm, ideally there will be no worker thread sitting idle. The Idle workers will steal work form those workers who are busy.

The ForkJoinPool ­is a specialized implementation of the ExecutorService, implementing the Work Stealing algorithm. It is able to execute the problems modeled in the ForkJoinTask. ForkJoinTasks has two implementations RecursiveTask and RecursiveAction both of them are similar except that the RecursiveAction cannot return anything while the RecursiveTask can return object of specified type.

There are two methods defined in the ForkJoinTask one is ‘fork’ and other is ‘join’.

The fork helps ForkJoinTask to be planned and also allows launching a new ForkJoinTask from the existing one.

The join method allows ForkJoinTask to wait for completion of another ForkJoinTask.
Before reaching to the end of this article, we will try to see the Fork and Join framework in action. For the purpose of this demo I have picked up the famous Fibonacci Series.

 
Index
0
1
2
3
4
5
Element
0
1
1
2
3
5

Above is an example of Fibonacci Series of 6 elements. As we all are familiar with it lets directly jump into a simple, single threaded example, which generates and prints Fibonacci Series up to 25 elements.

import java.util.ArrayList;
import java.util.List;

public class FibonnacciSeries {
 public static void main(String[] arg) {
  int size = 25;
  List<integer> fibinacciSeries = new ArrayList<>();
  for (int index = 0; index < size; index++) {
   fibinacciSeries.add(FibonnacciGenerator.generate(index));
  }
  dumpList(fibinacciSeries);
 }

 public static void dumpList(List list) {
  int index = 0;
  for (Object object : list) {
   System.out.printf("%d\t%d\n", index++, object);
  }
 }
}

class FibonnacciGenerator {
 public static Integer generate(Integer index) {
  if (index == 0) {
   return 0;
  }
  if (index < 2) {
   return 1;
  }
  Integer result = generate(index - 1) + generate(index - 2);
  return result;
 }
}
  When we run this program we get the below output
0     0
1     1
2     1
3     2
4     3
5     5
6     8
7     13
8     21
9     34
10    55
11    89
12    144
13    233
14    377
15    610
16    987
17    1597
18    2584
19    4181
20    6765
21    10946
22    17711
23    28657
24    46368


In this example we have not created any separate thread. And it will go through series of number of iterations. The performance hit would be more visible if we increase the number of elements.   Now lets see how to solve the same problem with the help of Fork and Join framework.

import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class FibonacciSeries_ForkJoin {
 public static void main(String[] arg) {
  int size = 25;
  Long startTime = Calendar.getInstance().getTimeInMillis();
  final ForkJoinPool pool = new ForkJoinPool();
  List fibonacciSeries = new ArrayList<>();
  for (int index = 0; index < size; index++) {
   FibonacciSeriesGeneratorTask task = new FibonacciSeriesGeneratorTask(
     index);
   fibonacciSeries.add(pool.invoke(task));
  }
  Long endTime = Calendar.getInstance().getTimeInMillis();
  System.out.println(endTime - startTime);
  dumpList(fibonacciSeries);
 }

 public static void dumpList(List list) {
  int index = 0;
  for (Object object : list) {
   System.out.printf("%d\t%d\n", index++, object);
  }
 }
}

class FibonacciSeriesGeneratorTask extends RecursiveTask {
 private static final long serialVersionUID = 1L;
 private Integer index = 0;

 public FibonacciSeriesGeneratorTask(Integer index) {
  super();
  this.index = index;
 }

 @Override
 protected Integer compute() {
  if (index == 0) {
   return 0;
  }
  if (index < 2) {
   return 1;
  }
  final FibonacciSeriesGeneratorTask worker1 = new FibonacciSeriesGeneratorTask(index - 1);
  worker1.fork();

  final FibonacciSeriesGeneratorTask worker2 = new FibonacciSeriesGeneratorTask(index - 2);
  return worker2.compute() + worker1.join();
 }
}

And not to surprise, the result is exactly the same as of the previous example. The difference is that, this time we have divided the work within multiple worker threads, which run concurrently to compute the Fibonacci Series.

We have created the ForkJoinPool with the help of a default constructor. Many developers habitually create ForkJoinPool by passing the number of available processors.

New ForkJoinPool (Runtime.availableProcessors());

But this is not required, as the default constructor of ForkJoinPool creates the parallelism as per the available processors.

Then for each index position, we are creating a new instance of FibonacciSeriesGeneratorTask and passing it to the pools ‘invoke’ method.
FibonacciSeriesGeneratorTask is an implementation of RecursiveTask. Note: We have not implemented RecursiveAction, because it cannot return anything. We wanted to return the result of computation and hence we have used RecursiveTask.

FibonacciSeriesGeneratorTask implements the compute method, which creates further instances of FibonacciSeriesGeneratorTask and ‘fork’ them.  The ‘join’ method will ask the current thread to wait till the results are returned from the forked methods.