Java 7 : More concurrency

With Java 7 (Dolphin), we'll have some concurrency and collections updates with the JSR166y, extension of the JSR166 of Doug Lea.

In this post, we'll see the most important news :

  • Fork/Join Framework
  • TrasnferQueue
  • ThreadLocalRandom

Fork/Join Framework

The most important improvement is a new Fork/Join Framework. Fork/Join is basically the parralel version of the divide-and-conquer algorithm resolution. Here is the typical form of that problems (taken from Doug Lea) :

Result solve(Problem problem) {
    if (problem is small)
        directly solve problem
    else {
        split problem into independent parts
        fork new subtasks to solve each part
        join all subtasks
        compose result from subresults
    }
}

Java 7 provide a new class ForkJoinPool to run ForkJoinTask. A ForkJoinTask is lighter than a thread. If you have a lot of ForkJoinTask, you can host them with a smallest number of threads. Two implementations of ForkJoinTask are provided :

  • RecursiveAction : A recursive resultless ForkJoinTask
  • RecursiveTask : A recursive ForkJoinTask that return an object of type E

Of course, you can also directly use the ForkJoinTask class but the recursive actions are enough in almost all the cases.

From a ForkJoinTask you can invoke other task (fork them) using invokeAll methods.

So, now that we have covered the main concepts of this framework, we could start with a little example (directly taken from Javadoc build 87). We'll use divide and conquer to increment all the elements of an array. To know if the problem is small enough to solve it directly, we'll use a threshold representing the number of elements that we can increment directly. If we have more elements than the threshold, we will fork in two task otherwise, we'll compute directly the incrementation on the array. So here is our task :

public class IncrementTask extends RecursiveAction {
   private final long[] array;
   private final int low;
   private final int high;

   private static final int THRESHOLD = 5000;

   public IncrementTask(long[] array, int low, int high) {
      super();

      this.array = array;
      this.low = low;
      this.high= high;
   }

   @Override
   protected void compute() {
      if (high - low < THRESHOLD) {
           for (int i = low; i < high; ++i){
              array[i]++;
           }
        } else {
           int mid = (low + high) >>> 1;

           invokeAll(new IncrementTask(array, low, mid), new IncrementTask(array, mid, high));
      }
   }
}

And you can launch that on an array using ForkJoinPool :

RecursiveAction mainTask = new IncrementTask (anArray, 0, anArray.length);
ForkJoinPool mainPool = new ForkJoinPool();
mainPool.invoke(mainTask

All the elements of the array will be incremented. Depending on the size of the array and of the threshold, the problem will be divided in several sub problems and all these task will be managed by the ForkJoinPool.

You can also make action that return something. By example, we can compute the sum of all the elements of an array :

public class SumTask extends RecursiveTask {
   private final long[] array;
   private final int low;
   private final int high;

   private static final int THRESHOLD = 5000;

   public SumTask(long[] array, int low, int high) {
      super();

      this.array = array;
      this.low = low;
      this.high= high;
   }

   @Override
   protected Long compute() {
      if (high - low < THRESHOLD) {
          long sum = 0;

          for (int i = low; i < high; ++i){
              sum += array[i];
           }

           return sum;
       } else {
           int mid = (low + high) >>> 1;

          RecursiveTask left = new SumTask(array, low, mid);
          RecursiveTask right = new SumTask(array, mid, high);

          right.fork();

          return left.compute() + right.join();
      }
   }
}

And you can use it like that :

RecursiveTask sumTask = new SumTask(anArray, 0, anArray.length);
ForkJoinPool mainPool = new ForkJoinPool();
Long sum = mainPool.invoke(sumTask);

I think it's a clean way to solve big problems with divide-and-conquer.

You can also imagine others ways to divide the problems. An example is to compute the THRESOLD left elements in the task and create a new task to compute the right elements. With that, we create less tasks, but it depends on the context and on the problems. In practive, you'll have  normally more complex problems but if you can find a way to divide the problems, you can use that new framework and have a very clean code.

TransferQueue

A new interesting collection. This collection is a blocking queue especially made for producers/consumers. With that kind of queue, the producers can await for receipt of by the consumers with a new transfer(E) method or like normal queue without waiting for receipt with the put(E) method. It's also possible to make a transfer with timeout with the tryTransfer method. There is no change in the consumer part, you always use take() to get an element and waiting for an element. You've also access to the number of waiting consumer with the getWaitingConsumerCount().

The implementation to use is the LinkedTransferQueue based on linked nodes. The elements are ordered with FIFO. Here are some methods you can use with that new collection :

TransferQueue<String> transfer = new LinkedTransferQueue<String>();

transfer.transfer("Hello"); //Wait for a consumer

if(transfer.tryTransfer("World")){//Don't wait for a consumer
    //The element has been transfered to a consumer
} else {
    //There were no waiting consumer. The element has not been enqueued.
}

boolean transfered = transfer.tryTransfer("Goodbye", 5, TimeUnit.SECONDS);

while(transfer.hasWaitingConsumer()){
    //There is at least one consumer waiting for a transfer
}

It's also an interesting stuff. Useful by example in the case of message passing.

ThreadLocalRandom

A really simple but useful enhancement is the add of the ThreadLocalRandom class. This class is a random number generator linked to the current Thread. It seems that if you use this generator from two different thread, you will have two different random generators. The generator is initialized with a generated seed that you cannot modify (setSeed() throws an UnsupportedOperationException).

You can use that class like that :

long l = ThreadLocalRandom.current().nextLong(22L);

If you always use this form, you have the guarantee that the random generator will never be shared between two threads. Moreover, this new class provide  methods to generate a bounded numbers. By example, to generate a pseudo-random number between 10, inclusive and 33, exclusive, you can type :

int i = ThreadLocalRandom.current().nextInt(10, 33);

This is a little improvement but really useful, i think.

So here we are. I've covered the main features added on Java 7 for concurrency. I hope you find that stuff interesting and that discovering this features will help you to make concurrent programming in Java 7.

Related articles

  • Java Concurrency - Part 7 : Executors and thread pools
  • Java Concurrency - Part 1 : Threads
  • Java Concurrency in Practice - Book Review
  • Java 7 : Languages updates from Project Coin
  • C++11 Concurrency Tutorial - Part 3: Advanced locking and condition variables
  • Java 7 : Add "public defender methods" to Java interfaces
  • Comments

    Comments powered by Disqus