Monday May 25, 2009

Duke Well it's that time of year again, JavaOne has come around so quickly. This year I'll be co-presenting on BOF-5087: All Things I/O with JDK™ Release 7 with Alan.

I'm happy to have the opportunity to tell you about what we've been up to in the networking area, as well as give an update on the state of SCTP in the JDK. I've also been prototyping a cool way of leveraging SCTP multi-homing without having to change your app. This BOF is a great opportunity to ask questions and discuss any aspects of file or networking I/O. Please come along.

Wednesday May 20, 2009

Providing support for Stream Control Transport Protocol (SCTP) in Java has been approved as one of the features for JDK7, and the work of defining the API and reference implementation was done through the SCTP OpenJDK project. This work was integrated into JDK7 Milestone 3 and is available in all future promotions.

Brief Introduction to SCTP

The Stream Control Transport Protocol (SCTP) is a reliable, message-oriented, transport protocol existing at an equivalent level with UDP (User Datagram Protocol) and TCP (Transmission Control Protocol). SCTP is session oriented and an association between the endpoints must be established before any data can be transmitted.

SCTP has direct support for multi-homing, meaning that an endpoint may be represented by more than one address and each address may be used for sending and receiving data, thus providing network redundancy. The connection between two endpoints is referred to as an association between those endpoints. Endpoints can exchange a list of addresses during association setup. One address is designated as the primary address, this is the default address that the peer will use for sending data. A single port number is used across the entire address list at an endpoint for a specific session.

SCTP is message based. I/O operations operate upon messages and message boundaries are preserved. Each association may support multiple independent logical streams. Each stream represents a sequence of messages within a single association and streams are independent of one another, meaning that stream identifiers and sequence numbers are included in the data packet to allow sequencing of messages on a per-stream basis.

Key Features of SCTP

  • Message framing

  • Reliable transport service

  • Session-oriented

  • Ordered and unordered message delivery

  • Multi-Homing

    • Association between exactly two endpoints

    • Each endpoint may be represented by multiple IP addresses

    • Provides failover and redundancy

  • Multi-Streaming

    • Data partitioned into multiple streams

    • Independent sequenced delivery

  • Eliminates head-of-line blocking

Support for SCTP in JDK7

The Java API is based on the NIO channels framework so that application requiring SCTP can take advantage of non-blocking multiplexed I/O. A new package com.sun.nio.sctp was defined to hold the new classes/interfaces. The package name is com.sun.nio.sctp rather than something like java.nio.channels.sctp. This distinction means that the API and implementation are fully supported and publicly accessible, but not part of the Java SE platform. Once there is more experience in the industry with SCTP then a standard API can be defined. The main classes within this package are the three new channel types. These new channels can be split into two logical groups.

  1. The first logical group has similar semantics to TCP, SctpChannel and SctpServerChannel. An SctpChannel can control only a single association, that is, sending and receiving data to and from a single endpoint. SctpServerChannel listens and accepts new associations initiated on its socket address.

  2. The second logical group consists of just SctpMultiChannel. Instances of this channel type can control multiple associations, therefore can send and receive data to and from many different endpoints.

The SCTP stack is event driven and applications can receive notifications of certain SCTP events. These events are most useful with SctpMultiChannel, since it can control multiple associations you need to track the status of these notifications. For example, AssociationChangeNotification lets you know when new associations are started or terminated. If the association supports dynamic address configuration then PeerAddressChangeNotification lets you know about IP addresses that have been added or removed from the peer endpoint. MessageInfo allows you to provide ancillary data for the message either being sent or received.

Multi-Streaming Example

This example demonstrates the multi-streaming feature of SCTP. The server implements a type of daytime protocol. It sends the current time date formatted in US English on one stream and in French on another.

Error handling is omitted to make the code more readable.

Multilingual DayTime Server   The source for DaytimeServer can be obtained here
public class DaytimeServer {
    static int SERVER_PORT = 3456;
    static int US_STREAM = 0;
    static int FR_STREAM = 1;

    static SimpleDateFormat USformatter = new SimpleDateFormat(
                                "h:mm:ss a EEE d MMM yy, zzzz", Locale.US);
    static SimpleDateFormat FRformatter = new SimpleDateFormat(
                                "h:mm:ss a EEE d MMM yy, zzzz", Locale.FRENCH);

    public static void main(String[] args) throws IOException {
        SctpServerChannel ssc = SctpServerChannel.open();
        InetSocketAddress serverAddr = new InetSocketAddress(SERVER_PORT);
        ssc.bind(serverAddr);

        ByteBuffer buf = ByteBuffer.allocateDirect(60);
        CharBuffer cbuf = CharBuffer.allocate(60);
        Charset charset = Charset.forName("ISO-8859-1");
        CharsetEncoder encoder = charset.newEncoder();

        while (true) {
            SctpChannel sc = ssc.accept();

            /* get the current date */
            Date today = new Date();
            cbuf.put(USformatter.format(today)).flip();
            encoder.encode(cbuf, buf, true);
            buf.flip();

            /* send the message on the US stream */
            MessageInfo messageInfo = MessageInfo.createOutgoing(null,
                                                                 US_STREAM);
            sc.send(buf, messageInfo);

            /* update the buffer with French format */
            cbuf.clear();
            cbuf.put(FRformatter.format(today)).flip();
            buf.clear();
            encoder.encode(cbuf, buf, true);
            buf.flip();

            /* send the message on the French stream */
            messageInfo.streamNumber(FR_STREAM);
            sc.send(buf, messageInfo);

            cbuf.clear();
            buf.clear();

            sc.close();
        }
    }
}

Multilingual DayTime Client   The source for DaytimeClient can be obtained here
public class DaytimeClient {
    static int SERVER_PORT = 3456;
    static int US_STREAM = 0;
    static int FR_STREAM = 1;

    public static void main(String[] args) throws IOException {
        InetSocketAddress serverAddr = new InetSocketAddress("localhost", 
                                                             SERVER_PORT);
        ByteBuffer buf = ByteBuffer.allocateDirect(60);
        Charset charset = Charset.forName("ISO-8859-1");
        CharsetDecoder decoder = charset.newDecoder();

        SctpChannel sc = SctpChannel.open(serverAddr, 0, 0);

        /* handler to keep track of association setup and termination */
        AssociationHandler assocHandler = new AssociationHandler();

         /* expect two messages and two notifications */
        MessageInfo messageInfo = null;
        do {
            messageInfo = sc.receive(buf, System.out, assocHandler);
            buf.flip();

            if (buf.remaining() > 0 &&
                messageInfo.streamNumber() == US_STREAM) {

                System.out.println("(US) " + decoder.decode(buf).toString());
            } else if (buf.remaining() > 0 && 
                       messageInfo.streamNumber() == FR_STREAM) {

                System.out.println("(FR) " +  decoder.decode(buf).toString());
            }
            buf.clear();
        } while (messageInfo != null);

        sc.close();
    }

    static class AssociationHandler
        extends AbstractNotificationHandler<PrintStream>
    {
        public HandlerResult handleNotification(AssociationChangeNotification not,
                                                PrintStream stream) {
            if (not.event().equals(COMM_UP)) {
                int outbound = not.association().maxOutboundStreams();
                int inbound = not.association().maxInboundStreams();
                stream.printf("New association setup with %d outbound streams" +
                              ", and %d inbound streams.\n", outbound, inbound);
            }

            return HandlerResult.CONTINUE;
        }

        public HandlerResult handleNotification(ShutdownNotification not,
                                                PrintStream stream) {
            stream.printf("The association has been shutdown.\n");
            return HandlerResult.RETURN;
        }
    }
}

Sample Output:

>: java DaytimeClient
New association setup with 32 outbound streams, and 32 inbound streams.
(US) 4:00:51 PM Fri 15 May 09, British Summer Time
(FR) 4:00:51 PM ven. 15 mai 09, Heure d'ete britannique
The association has been shutdown.

As well as posting comments or feedback on this blog please feel free to mail the sctp development mailing list.

Friday Sep 07, 2007

Here are some pics of the California Superbike School I took part in at Mondello race track.











Wednesday Feb 15, 2006

HttpURLConnection, as a HTTP 1.1 client has an in built keep-alive implementation to handle persistent connections. This has been around in previous releases, but mustang beta brings one significant improvement to this. The ability to reuse connections where not all of the response body has been read. This issue was at one time in the top 10 of the top 25 Bugs , with 117 votes against it.

We have implemented an asynchronous cleanup of unread response body from the underlying socket. So if the InputStream is closed before all of the response body is read, its underlying TCP connection is put on a queue so that the remaining data can be read and then the connection put into the keep-alive cache. The queue is serviced on demand by a thread, Keep-Alive-SocketCleaner. So if you see this thread you will know what it is doing.

Since different applications have different requirements, and depending on your network speed, you may want to tune this feature. We have made this available through the network properties http.KeepAlive.remainingData and http.KeepAlive.queuedConnections in ../jre/lib/net.properties.

  # HTTP Keep Alive settings. remainingData is the maximum amount of data
  # in kilobytes that will be cleaned off the underlying socket so that it
  # can be reused (default value is 512K), queuedConnections is the maximum
  # number of Keep Alive connections to be on the queue for clean up (default
  # value is 10).
  # http.KeepAlive.remainingData=512
  # http.KeepAlive.queuedConnections=10

We have taken 512K as the default value for the maximum amount of data remaining to be read. That is, if there is <= 512K of unread response body on the InputStream then it will be queued for async cleanup, otherwise the underlying socket will be closed. This value was chosen as an approximate cost of creating a new connection but may vary depending on the connection type, for example, HTTPS connections can take sometime to perform the SSL handshake.

http.KeepAlive.queuedConnections is simply to ensure that the queue does no grow out of all proportion. If you are planning to make use of this feature in your code (or maybe you're using 3rd party code and don't have a choice ) you may want to increase this default value.

Monday Feb 06, 2006

Based on a peabody contribution by Matthias File.deleteOnExit was re-implemented in mustang b63 and now uses java level shutdown hooks. Previously there was a native implementation that registered filenames to be deleted at exit in a native linked list type structure and which was called during the vm exit to do the cleanup. Diagnosing issues where excessive use of deleteOnExit was causing OutOfMemoryErrors was very difficult as the list was held in native heap.

The re-implementation of deleteOnExit using java level shutdown hooks has several benefits:

  • A file name is only added once to the list of files to be deleted, so if deleteOnExit is invoked multiple times on the same File then it will not consume any additional memory. This issue was tracked by 4813777 in Sun bug database.

  • The new implementation stores the file names in a HashSet in the java heap. This allows for better diagnosability of further issues with deleteOnExit (and I will describe how to do this later).

There is still an issue with some code that creates a lot of temporary files and calls deleteOnExit on them to ensure that they will be removed during shutdown. This is currently a limitation of the way deleteOnExit is specified as you need to ensure that a potentially different file with the same abstract path is removed even if you specifically call delete on the current one. You need to be careful when using deleteOnExit with temporary files in this manner. So what do you do if you think you are running into this issue, read on...

As I mentioned earlier, in the past it was difficult to figure out if deleteOnExit was causing your OutOfMemoryError's because it allocates in the native heap and not the java heap, and inspecting the native heap is much more difficult. Here are 2 solutions if you are running with mustang b63 (or later) and think you are experiencing issues cause by deleteOnExit.

  1. Examine the heap dump

    You can use jmap to dump the java heap of a running process or add -XX:+HeapDumpOnOutOfMemoryError option to your startup arguments. You can see how to do this in Alans blog. Once you have your dump, run jhat on it. If you are not familiar with jhat then look at Sundar's blog. JHat will read the dump file and start a HTTP server. You will see something like this:

    
        Reading from heap.bin...
        Dump file created Wed Feb 01 13:57:42 GMT 2006<
        Snapshot read, resolving...
        Resolving 18956 objects...
        Chasing references, expect 3 dots...
        Eliminating duplicate references...
        Snapshot resolved.
        Started HTTP server on port 7000
        Server is ready.
    

    Now connect to the HTTP server using a browser and follow the links, e.g.

    select All classes including platform
    search for class java.io.DeleteOnExitHook, select it
    From its Static Data Members you will see files (L) : java.util.HashSet@0xXXXXXXXX (XX bytes), select it
    In its Instance data members you will see map (L) : java.util.HashMap@0xXXXXXXXX (XX bytes), select it
    Its Instance data members will look something like this:

      entrySet (L) : 
      keySet (L) : 
      loadFactor (F) : 0.75
      modCount (I) : 5000
      size (I) : 5000
      table (L) : [Ljava.util.HashMap$Entry;@0xbbc19ff8 (32776 bytes)
      threshold (I) : 6144
      values (L) : 
    

    size is the number of files currently marked as deleteOnExit. In this example 5000. You can see the amount of heap space they are comsuming by looking at the number of bytes the table instance data member is occuping. In this example 32776 bytes.

    You can also see the files that are currently marked as deleteOnExit by clicking on the table instance data member, which is a HashMap$Entry array. Each HashMap$Entry has a key which is a java.lang.String whose value is the abstract path name.


  2. Write yourself a tool

    This is a little diagnostic tool that allows you to poll the length of the deleteOnExit list. It uses reflection to access the same data members as in the above solution, but automates it so you don't need to know about the internal data structures.

    The new Attach API lets you load an agent into a running vm. The following sample code contains a simple agent that uses reflection to find out the number of files marked deleteOnExit and sends this to the client over a Socket. This allows you to connect to an already running java process (started as usual without any special arguments) and obtain a count of the files marked deleteOnExit. You can use jps to retrieve the process id of the java process that you wish to attach to and then pass it as a command line argument to CountDeleteOnExitFiles

    
        weetabix : jps -l
        6829 DeleteOnExitTest
        6835 sun.tools.jps.Jps
        
        weetabix : java -cp /j2se/jdk6.0/lib/tools.jar:. CountDeleteOnExitFiles 6829
        Number of files registered to be deleted on exit is 32
        Number of files registered to be deleted on exit is 37
        Number of files registered to be deleted on exit is 42
        ....
        
    Client that attachs to the target vm and loads the Agent:
    CountDeleteOnExitFiles.java
    
    import java.net.*;
    import java.io.*;
    import com.sun.tools.attach.*;
    
    public class CountDeleteOnExitFiles
    {
        public static void main(String[] args) {
    	if (args.length != 1 || args[0].equals("-h")) {
    	    System.err.println("Prints the number of files registered to be deleted on exit in process <pid> every 5 seconds");
    	    System.err.println("Usage: \n  java CountDeleteOnExitFiles <pid>");
    	    return;
    	}
    
    	try {     
    	    ServerSocket ss = new ServerSocket(0);
    	    int port = ss.getLocalPort();
    	    
    	    // attach to target VM and load Agent
                VirtualMachine vm = VirtualMachine.attach(args[0]);
                
                File jarfile = new File("DeleteOnExitAgent.jar");
                if (!jarfile.exists()) {
                    System.err.println("agent not found");
                    return;
                }
                String agent = jarfile.getCanonicalPath(); 
    
                vm.loadAgent(agent, Integer.toString(port));
                vm.detach();
                
                Socket socket = ss.accept();
    	    ss.close();
    
    	    DataInputStream in = new DataInputStream(socket.getInputStream());
    	    for (;;) {
    		int size = in.readInt();
    		System.out.println("Number of files registered to be deleted on exit is " + size);
    	    }
    	} catch (Exception e) {
    	    e.printStackTrace();
    	    System.exit(0);
    	}
        }
    }
    
    Agent that is loaded into the target vm and sends the count of files marked deleteOnExit to the client (agent uses reflection to access Sun's internal deleteOnExit implementation and is therefore Sun jdk specific). This Agent needs to be deployed as a jar file in accordance with the "Starting Agents After VM Startup" section of java.lang.instrument. Specifically it needs to be compiled and put in a jar file called DeleteOnExitAgent.jar with the attribute "Agent-Class: DeleteOnExitAgent" in its manifest.
      
    weetabix : javac DeleteOnExitAgent.java
    weetabix : more manifest.mf
    Agent-Class: DeleteOnExitAgent
    weetabix : jar -cvfm DeleteOnExitAgent.jar manifest.mf DeleteOnExitAgent*.class
    added manifest
    adding: DeleteOnExitAgent$1.class(in = 486) (out= 338)(deflated 30%)
    adding: DeleteOnExitAgent.class(in = 2357) (out= 1381)(deflated 41%)
    weetabix : cp DeleteOnExitAgent.jar /j2se/jdk6.0/jre/lib/ext
    
    DeleteOnExitAgent.java
    
    import java.util.*;
    import java.io.*;
    import java.net.*;
    import java.lang.reflect.*;
    
    public class DeleteOnExitAgent
    {
        static int DEFAULT_INTERVAL = 5000;
        Socket socket = null;
    
        // invoked by attach mechanism
        public static void agentmain(String agentArgs) {
    	//port number to send data to
    	final int port;
    
    	if (agentArgs != null && agentArgs.equals("")) {
                try {
                    port = Integer.parseInt(agentArgs);
                } catch (NumberFormatException nfe)
                    return;
            }
            else
                return;
    
    	Thread agentThread = new Thread( new Runnable() {
    		public void run() {
     		    startAgent(port);
    		}});
    	agentThread.setDaemon(true);
    	agentThread.start();
        }
    
        static void startAgent(int port) {
    	Socket socket = null;
    	DataOutputStream dataOut = null;
    	try {
    	    socket = new Socket("localhost", port);
    	    dataOut = new DataOutputStream(socket.getOutputStream());
    
    	    Class DeleteOnExitHookClass = Class.forName("java.io.DeleteOnExitHook");
    	    Field filesField = DeleteOnExitHookClass.getDeclaredField("files");
    	    filesField.setAccessible(true);
    	    HashSet hashSet = (HashSet)filesField.get(null);
    
    	    Class hashSetClass = Class.forName("java.util.HashSet");
    	    Field mapField = hashSetClass.getDeclaredField("map");
    	    mapField.setAccessible(true);
    	    HashMap hashMap = (HashMap)mapField.get(hashSet);
        
    	    Class hashMapClass = Class.forName("java.util.HashMap");
    	    Field sizeField = hashMapClass.getDeclaredField("size");
    	    sizeField.setAccessible(true);
    
    	    for (;;) {
    	        int size = sizeField.getInt(hashMap);
    	        dataOut.writeInt(size);
    		Thread.sleep(DEFAULT_INTERVAL);
    	    }
    	} catch (SocketException se) {
    	    try { if (socket != null ) { dataOut.close(); socket.close(); } }
    	    catch (IOException e) { e.printStackTrace(); }
    	} catch (Exception e) {
    	   e.printStackTrace(); 
    	}
        }
    }