Java 7 : New I/O features (Asynchronous operations, multicasting, random access) with JSR 203 (NIO.2)

Like I've said in other post, we will have a new API to access File System in Java 7, but we'll have several others new features in NIO.2 that I've not covered

So I'll try to cover them in that post. Indeed the JSR 203 (also known as NIO.2) add several new classes that improve I/O code.

In this post I cover the following features :

  • SeekableByteChannel : A random access channel
  • MulticastChannel : A channel that allow for IP multicasting
  • NetworkChannel : The new super interface for the network-oriented channels
  • Asynchronous I/O API : The new API to make I/O operations in an asynchronous way.

SeekableByteChannel

First of all, the Java 7 includes a new ByteChannel, the SeekableByteChannel. This Channel maintains a current position, so you can read and write from this position. That allows random access positions. With that type of Channel, you can even add several threads reading/writing the same threads at different positions.

SeekableByteChannel channel1 = Paths.get("Path to file").newByteChannel(); //Simply READ
SeekableByteChannel channel2 = Paths.get("Path to file").newByteChannel(StandardOpenOption.READ, StandardOpenOption.WRITE); //READ and WRITE

You can use these methods to manipulate the positions and size of the channel :

  • long position() : Return the current position
  • long size() : Return the current size of the entity this channel is connected to, by example the size of the file the channel is connecting to
  • position(long newPosition) : Move the current position to the given one
  • truncate(long size) : Truncates the entity to the given size.

The position() and truncate() methods simply returns the current Channel to allow chained invocations.

Now, FileChannel implements this new interface, so you can make random access with all FileChannels.

You can of course read a file with that channel :

SeekableByteChannel channel = null;

try {
    channel = Paths.get("Path to file").newByteChannel(StandardOpenOption.READ);
    ByteBuffer buffer = ByteBuffer.allocate(4096);

    System.out.println("File size: " + channel.size());

    while (channel.read(buffer) > 0) {
        buffer.rewind();

        System.out.print(new String(buffer.array(), 0, buffer.remaining()));

        buffer.flip();

        System.out.println("Current position : " + channel.position());
    }
} catch (IOException e) {
    System.out.println("Expection when reading : " + e.getMessage());
    e.printStackTrace();
} finally {
    if (sbc != null){
        channel.close();
    }
}

MulticastChannel

This new interface enable to make Internet Protocol (IP) Multicasting. So you can send and receive IP datagrams from a complete group. The multicast implementations are directly bind to the native multicast facility. This interface is implement by DatagramChannel and AsynchronousDatagramChannel.

A simple example taken from the Javadoc to open a DatagramChannel t :

NetworkInterface networkInterface = NetworkInterface.getByName("hme0");

DatagramChannel dc = DatagramChannel.open(StandardProtocolFamily.INET)
         .setOption(StandardSocketOption.SO_REUSEADDR, true)
         .bind(new InetSocketAddress(5000))
         .setOption(StandardSocketOption.IP_MULTICAST_IF, networkInterface);

InetAddress group = InetAddress.getByName("225.4.5.6");

MembershipKey key = dc.join(group, networkInterface);

With that, you can use your DatagramChannel as all others DatagramChannel you used in the past, but the operations are made with multicast, so you receive all the packets of the interface and you send packets to all the group.

NetworkChannel

Now, all the network-oriented channels implements the new NetworkChannel interface. With that, you easily bind the channel socket, set and query for socket options. Furthermore, the socket optioins are now extensible, so you can use operating system specific options, that could be interesting for high performances servers.

Asynchronous I/O

And after that little introduction, we go to the main new feature : The new Asynchronous I/O API. Its name indicate all the purpose of this new features, indeed enable Asynchronous I/O operations.This new channels provide asynchronous operations for both sockets and files.

Of course, all that operations are non-blocking, but there is also blocking operations that you can do with all the asynchronous channels.

All the asynchronous I/O operations have one of two forms :

  • The first one returns a java.util.concurrent.Future that represent the pending result. You can use that Future to wait for the I/O operations to finish.
  • The second one is created using  a CompletionHandler. That handler is invoked when the operation is has completed, like callbacks systems.

So here are the examples of the two forms :

The first form, using Future :

AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("Path to file"));
ByteBuffer buffer = ByteBuffer.allocate(capacity);
Future result = channel.read(buffer, 100); //Read capacity bytes from the file starting at position 100
boolean done = result.isDone(); //Indicate if the result is already terminated</pre>

You can also wait for completion :

int bytesRead = result.get();

Or wait with a timeout :

int bytesRead = result.get(10, TimeUnit.SECONDS); //Wait at most 10 seconds on the result

The second form, using CompletionHandler :

Future result = channel.read(buffer, 100, null, new CompletionHandler(){
    public void completed(Integer result, Object attachement){
        //Compute the result
    }

    public void failed(Throwable exception, Object attachement){
        //Answer to the fail
    }
}

As you can see, you can give an attachement to the operation. This attachement is given to the CompletionHandler at the end of the operation. You can give null as attachement with no problem. But you can pass anything you want, like the Connection for a AsynchronousSocketChannel or the ByteBuffer for our read :

Future result = channel.read(buffer, 100, buffer, new CompletionHandler(){
    public void completed(Integer result, ByteBuffer buffer){
        //Compute the result
    }

    public void failed(Throwable exception, ByteBuffer buffer){
        //Answer to the fail
    }
}

And as you can see, the form with the CompletionHandle gives also you a Future element representing the pending result, so you can merge the two forms.

Here, are all the asynchronous channels available in NIO.2 :

  • AsynchronousFileChannel : An asynchronous channel for reading and writing from and to a file. This channel has no global positions, so each read/write operations needs a position to operate. You can access concurrently to different parts of the file using different threads. You have to specify the options (READ, WRITE, but not APPEND) when you open this channel.
  • AsynchronousSocketChannel : A simple asynchronous channel to a Socket. The connect, read/write and scatter/gather methods are all asynchronous. The read/write method supports timeouts.
  • AsynchronousServerSocketChannel : An asynchronous channel to a ServerSocket. The accept() method is asynchronous and the CompletionHandler is called when a connection has been accepted. The result of this kind of connection is an AsynchronousSocketChannel.
  • AsynchronousDatagramChannel :

An channel to datagram-oriented socket. The read/write (connected) and receive/send (unconnected) methods are asynchronous.

Groups

When you use AsynchronousChannels, there is of course threads that invoke the completion handlers. These threads are bound to an AsynchronousChannelGroup. This group contains a thread pool and encapsulates the resources shared by all the threads working for the channels. You can greate these groups using thread pool. The AsynchronousFileChannel can be created with its own group, passing an ExecutorService as argument to the open() method. The channels are created using an AsynchronousChannelGroup in the open method, if you don't give it a group or you pass null, the default group is used. The channel is said to be owned by the group, so, if the group is closed, the channel is closed too.

You can create a group with a ThreadFactory :

ThreadFactory myThreadFactory = Executors.defaultThreadFactory();
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(25, myThreadFactory);

Or with an ExecutorService :

ExecutorService service = Executors.newFixedThreadPool(25);

AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withThreadPool(service);

And you easily use it :

AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open(channelGroup);

You can close the group using the shutdown() method on the group. After that, you cannot create more channels using this group and the group effectively terminates when all the channels are closed, the completion handlers terminated and the resources released.

You must give attention to a points when you use any type of pools and CompletionHandler : DO NOT USE blocking or long operation inside a CompletionHandler. That can block the entire application if all the threads are blocking. If you've custom or cached thread pool, that can make the queue growing infinitely and cause OutOfMemoryError.

I think (and I hope), I've covered the main news from this new Asynchronous I/O API. This is not simple stuff of course and that will not be used by every one but that can be useful in several cases and it's a good thing that Java will have this kind of I/O. I therefore apologize if I made some errors in my code or my explanations, is not a easy subject and I tried to explain it for everyone.

You have also others informations in the overview of Asynchronous I/O at JavaOne 2009, by Alan Bateman and Jean-François Arcand.

Related articles

  • Asynchronous Message Passing in JR
  • NIO.2 : The new Path API in Java 7
  • Google Analytics goes asynchronous !
  • Java File Copy Benchmark Updates (once again)
  • Java Concurrency - Part 7 : Executors and thread pools
  • File copy in Java - Benchmark
  • Comments

    Comments powered by Disqus