Sun Java(tm) System Message Queue
v4.1

com.sun.messaging.jmq.io
Class Packet

java.lang.Object
  extended by com.sun.messaging.jmq.io.Packet

public class Packet
extends Object

Encapsulation of an iMQ packet. A packet consists of three parts: 1) A fixed sized header (size HEADER_SIZE bytes) 2) A variable sized header (for holding strings, and optional packet fields) 3) A serialized java.util.Hashtable (for holding message properties) 4) the packet body (a bag of bytes) This class directly manages #1, uses PacketVariableHeader to manage #2, and uses PacketPayload to manage #3 and #4. This class also employs the use of a buffer pool to manager reuse of direct ByteBuffers. Direct ByteBuffers are an order of magnitude more expensive to allocate than normal ByteBuffers, so keeping a pool of them around is a big win (which is not typically the case for memory allocated off of the Java heap).


Field Summary
protected  ArrayList allocatedBuffers
           
protected static ByteBufferPool bbPool
           
protected  int bitFlags
           
protected  ByteBuffer bodyBuf
           
protected  boolean bufferDirty
           
protected  int bytesWritten
           
protected  long consumerID
           
static boolean DEBUG
           
static int DEFAULT_POOL_BLOCKSIZE
           
static int DEFAULT_POOL_SIZE
           
protected  byte encryption
           
protected  long expiration
           
protected  ByteBuffer fixedBuf
           
protected  boolean genSequenceNumber
           
protected  boolean genTimestamp
           
static int HEADER_SIZE
           
protected  int headerBytesRead
           
protected  int magic
           
static int MAGIC
           
protected static long maxPacketSize
           
protected  int nBufs
           
protected  PacketPayload packetPayload
           
protected  int packetSize
           
protected  short packetType
           
protected  PacketVariableHeader packetVariableHeader
           
protected  byte priority
           
protected  ByteBuffer propBuf
           
protected  int propertyOffset
           
protected  int propertySize
           
protected  ByteBuffer[] readBufs
           
protected  int[] readBufsLimits
           
protected  boolean readInProgress
           
protected  int ropBytesRead
           
protected static int sequenceNumber
           
protected static long SIZE_LOWER_BOUND
           
protected static long SIZE_UPPER_BOUND
           
protected  SysMessageID sysMessageID
           
protected  long transactionID
           
protected  boolean useDirect
           
protected  ByteBuffer varBuf
           
protected  short version
           
static short VERSION1
           
static short VERSION2
           
static short VERSION3
           
protected  boolean versionMismatch
           
protected  ByteBuffer[] writeBufs
           
protected  boolean writeInProgress
           
 
Constructor Summary
Packet()
          Constructs an empty packet that will use direct buffers
Packet(boolean useDirect)
          Construct a packet indicating whether or not to use direct ByteBuffers.
 
Method Summary
 void destroy()
          Destroy a packet and put all direct buffers back into the buffer pool.
 void dump(PrintStream os)
           
 void dump(PrintStream os, String prefix)
          Dump the contents of the packet in human readable form to the specified OutputStream.
static void dumpBody(PrintStream os, int pType, InputStream is, int bodySize, Hashtable props)
          Dump the body of the packet.
static void dumpBufs(ByteBuffer[] bufs)
           
 String dumpPacketString()
           
 String dumpPacketString(String prefix)
          Return a string containing the contents of the packet in a human readable form.
 void fill(Packet sourcePacket)
          Fill this packet with the contents of sourcePacket.
 void generateSequenceNumber(boolean generate)
          Disable (and enable) sequence number generation.
 void generateTimestamp(boolean generate)
          Disable (and enable) timestamp generation.
static ByteBufferPool getBufferPool()
           
 boolean getConsumerFlow()
           
 long getConsumerID()
           
 String getCorrelationID()
           
 String getDestination()
           
 String getDestinationClass()
           
 int getEncryption()
           
 long getExpiration()
           
 boolean getFlag(int flag)
           
 boolean getFlowPaused()
           
protected  ByteBuffer getHeaderBytes()
           
 boolean getIndempontent()
           
 byte[] getIP()
           
 String getIPString()
           
 boolean getIsLast()
           
 boolean getIsQueue()
           
 boolean getIsTransacted()
           
 int getMagic()
           
static long getMaxPacketSize()
           
 ByteBuffer getMessageBody()
          Return the message body as a ByteBuffer.
 int getMessageBodySize()
          Return the size of the message body in bytes
 InputStream getMessageBodyStream()
          Return an InputStream that contains the contents of the message body.
 String getMessageID()
          Get the MessageID for the packet.
 String getMessageType()
           
protected  PacketPayload getPacketPayload()
           
 int getPacketSize()
           
 int getPacketType()
           
protected  PacketVariableHeader getPacketVariableHeader()
           
 boolean getPersistent()
           
 int getPort()
           
 int getPriority()
           
 long getProducerID()
           
 Hashtable getProperties()
          Return the property hashtable for this packet.
 int getPropertyOffset()
           
 int getPropertySize()
           
 boolean getRedelivered()
           
 String getReplyTo()
           
 String getReplyToClass()
           
 boolean getSelectorsProcessed()
           
 boolean getSendAcknowledge()
           
 int getSequence()
           
static long getSizeLowerBound()
           
 SysMessageID getSysMessageID()
          Get the system message ID.
 long getTimestamp()
           
 long getTransactionID()
           
 int getVersion()
           
 String headerToString()
          Return a string representation of the data in the fixed header portion of the packet.
static long myChannelRead(ScatteringByteChannel channel, ByteBuffer[] bufs, int offset, int length)
           
static long myChannelWrite(GatheringByteChannel channel, ByteBuffer[] bufs, int offset, int length)
          myChannelRead and myChannelWrite are used to work-around an nio bug.
 void readPacket(InputStream is)
          Read packet from an InputStream.
 boolean readPacket(ScatteringByteChannel channel, boolean block)
          Read a packet from a ReadableByteChannel.
 void reset()
          Reset packet to initial values.
static boolean setBufferPool(ByteBufferPool p)
          Set the ByteBufferPool that will be used to facilitate reuse of direct ByteBuffers.
 void setConsumerFlow(boolean b)
           
 void setConsumerID(long n)
           
 void setCorrelationID(String s)
           
 void setDestination(String s)
           
 void setDestinationClass(String s)
           
 void setEncryption(int e)
           
 void setExpiration(long e)
           
 void setFlag(int flag, boolean on)
           
 void setFlowPaused(boolean b)
           
 void setIndempontent(boolean b)
           
 void setIP(byte[] ip)
           
 void setIP(byte[] ip, byte[] mac)
           
 void setIsLast(boolean b)
           
 void setIsQueue(boolean b)
           
 void setIsTransacted(boolean b)
           
static long setMaxPacketSize(long n)
          Set the max packet size that should be read.
 void setMessageBody(byte[] body)
          Set the message body.
 void setMessageBody(byte[] body, int off, int len)
          Set the message body.
 void setMessageBody(ByteBuffer body)
          Set the message body.
 void setMessageType(String s)
           
 void setPacketType(int pType)
          Set the packet type.
 void setPersistent(boolean b)
           
 void setPort(int p)
           
 void setPriority(int p)
           
 void setProducerID(long n)
           
 void setProperties(Hashtable props)
          Set the message properties.
 void setRedelivered(boolean b)
           
 void setReplyTo(String s)
           
 void setReplyToClass(String s)
           
 void setSelectorsProcessed(boolean b)
           
 void setSendAcknowledge(boolean b)
           
 void setSequence(int n)
           
static void setSizeLowerBound(long n)
           
 void setTimestamp(long t)
           
 void setTransactionID(long n)
           
 void setVersion(int n)
           
 String toString()
          Return a unique string that identifies the packet
 void updateSequenceNumber()
          Update the sequence number on the packet.
 void updateTimestamp()
          Update the timestamp on the packet.
 boolean writePacket(GatheringByteChannel channel, boolean block)
          Write a packet to a WritableByteChannel.
 void writePacket(OutputStream os)
          Write the packet to an OutputStream.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Field Detail

DEBUG

public static boolean DEBUG

MAGIC

public static final int MAGIC
See Also:
Constant Field Values

VERSION1

public static final short VERSION1
See Also:
Constant Field Values

VERSION2

public static final short VERSION2
See Also:
Constant Field Values

VERSION3

public static final short VERSION3
See Also:
Constant Field Values

HEADER_SIZE

public static final int HEADER_SIZE
See Also:
Constant Field Values

DEFAULT_POOL_SIZE

public static final int DEFAULT_POOL_SIZE
See Also:
Constant Field Values

DEFAULT_POOL_BLOCKSIZE

public static final int DEFAULT_POOL_BLOCKSIZE
See Also:
Constant Field Values

sequenceNumber

protected static int sequenceNumber

bbPool

protected static ByteBufferPool bbPool

SIZE_LOWER_BOUND

protected static long SIZE_LOWER_BOUND

SIZE_UPPER_BOUND

protected static long SIZE_UPPER_BOUND

maxPacketSize

protected static long maxPacketSize

genTimestamp

protected boolean genTimestamp

genSequenceNumber

protected boolean genSequenceNumber

fixedBuf

protected ByteBuffer fixedBuf

bufferDirty

protected boolean bufferDirty

useDirect

protected boolean useDirect

varBuf

protected ByteBuffer varBuf

propBuf

protected ByteBuffer propBuf

bodyBuf

protected ByteBuffer bodyBuf

writeBufs

protected ByteBuffer[] writeBufs

readBufs

protected ByteBuffer[] readBufs

readBufsLimits

protected int[] readBufsLimits

nBufs

protected int nBufs

allocatedBuffers

protected ArrayList allocatedBuffers

version

protected short version

magic

protected int magic

packetType

protected short packetType

packetSize

protected int packetSize

expiration

protected long expiration

propertyOffset

protected int propertyOffset

propertySize

protected int propertySize

encryption

protected byte encryption

transactionID

protected long transactionID

priority

protected byte priority

bitFlags

protected int bitFlags

consumerID

protected long consumerID

sysMessageID

protected SysMessageID sysMessageID

packetVariableHeader

protected PacketVariableHeader packetVariableHeader

packetPayload

protected PacketPayload packetPayload

readInProgress

protected boolean readInProgress

writeInProgress

protected boolean writeInProgress

versionMismatch

protected boolean versionMismatch

headerBytesRead

protected int headerBytesRead

ropBytesRead

protected int ropBytesRead

bytesWritten

protected int bytesWritten
Constructor Detail

Packet

public Packet()
Constructs an empty packet that will use direct buffers


Packet

public Packet(boolean useDirect)
Construct a packet indicating whether or not to use direct ByteBuffers. If you plan on reading and writing packets using the methods that take a channel as a parameter then you should specify useDirect=true. If you plan on reading and writing packets using the methods that take an IO Stream as a parameter then you should specify useDirect = false. Default is to use direct buffers.

Method Detail

setBufferPool

public static boolean setBufferPool(ByteBufferPool p)
Set the ByteBufferPool that will be used to facilitate reuse of direct ByteBuffers. This should be called before the first Packet is constructed. If it is not called then a pool is is created when the first packet is constructed using default parameters. If this method is called after the first Packet is constructed it returns "false" and the pool is not changed. Note that the pool is only used if the packet is using direct ByteBuffers.


setMaxPacketSize

public static long setMaxPacketSize(long n)
Set the max packet size that should be read. If a packet is larger than this the packet read methods will skip the bytes and throw an exception. There is a lower bound on the smallest value you can set the max packet size to. So the actual value set may be different than the value passed. This method returns the actual value set.


getMaxPacketSize

public static long getMaxPacketSize()

setSizeLowerBound

public static void setSizeLowerBound(long n)

getSizeLowerBound

public static long getSizeLowerBound()

getBufferPool

public static ByteBufferPool getBufferPool()

fill

public void fill(Packet sourcePacket)
          throws IOException
Fill this packet with the contents of sourcePacket. The fixed and variable headers are copied. The payload (properties and body) are shared. This method turns off timestamp and sequence number generation since we assume you want the copied header contents to remain unchanged.

Throws:
IOException

getFlag

public boolean getFlag(int flag)

getVersion

public int getVersion()

getMagic

public int getMagic()

getPacketType

public int getPacketType()

getPacketSize

public int getPacketSize()

getTransactionID

public long getTransactionID()

getProducerID

public long getProducerID()

getTimestamp

public long getTimestamp()

getExpiration

public long getExpiration()

getPort

public int getPort()

getIPString

public String getIPString()

getIP

public byte[] getIP()

getSequence

public int getSequence()

getPropertyOffset

public int getPropertyOffset()

getPropertySize

public int getPropertySize()

getEncryption

public int getEncryption()

getPriority

public int getPriority()

getConsumerID

public long getConsumerID()

getPersistent

public boolean getPersistent()

getRedelivered

public boolean getRedelivered()

getIsQueue

public boolean getIsQueue()

getSelectorsProcessed

public boolean getSelectorsProcessed()

getSendAcknowledge

public boolean getSendAcknowledge()

getIsLast

public boolean getIsLast()

getFlowPaused

public boolean getFlowPaused()

getIsTransacted

public boolean getIsTransacted()

getConsumerFlow

public boolean getConsumerFlow()

getIndempontent

public boolean getIndempontent()

getMessageID

public String getMessageID()
Get the MessageID for the packet. If the client has set a MessageID then that is what is returned. Otherwise the system message ID is returned (see getSysMessageID())

Returns:
The packet's MessageID

getSysMessageID

public SysMessageID getSysMessageID()
Get the system message ID. Note that this is not the JMS MessageID set by the client. Rather this is a system-wide unique message ID generated from the timestamp, sequence number, port number and IP address of the packet.

WARNING! This returns a references to the Packet's SysMessageID not a copy.

Returns:
The packet's system MessageID

getDestination

public String getDestination()

getDestinationClass

public String getDestinationClass()

getCorrelationID

public String getCorrelationID()

getReplyTo

public String getReplyTo()

getReplyToClass

public String getReplyToClass()

getMessageType

public String getMessageType()

getProperties

public Hashtable getProperties()
                        throws IOException,
                               ClassNotFoundException
Return the property hashtable for this packet. WARNING! This method emphasizes performance over safety. The HashTable object returned is a reference to the HashTable object in the object -- it is NOT a copy. Modifying the contents of the HashTable will have non-deterministic results so don't do it!

Throws:
IOException
ClassNotFoundException

getMessageBodySize

public int getMessageBodySize()
Return the size of the message body in bytes

Returns:
Size of message body in bytes

getMessageBody

public ByteBuffer getMessageBody()
Return the message body as a ByteBuffer.


getMessageBodyStream

public InputStream getMessageBodyStream()
Return an InputStream that contains the contents of the message body.

Returns:
An InputStream from which the message body can be read from. Or null if no message body.

reset

public void reset()
Reset packet to initial values. This does not free buffers just in case we can reused them.


setPacketType

public void setPacketType(int pType)
Set the packet type.

Parameters:
pType - The type of packet

setTimestamp

public void setTimestamp(long t)

setExpiration

public void setExpiration(long e)

setPort

public void setPort(int p)

setIP

public void setIP(byte[] ip)

setIP

public void setIP(byte[] ip,
                  byte[] mac)

setSequence

public void setSequence(int n)

setVersion

public void setVersion(int n)

setTransactionID

public void setTransactionID(long n)

setProducerID

public void setProducerID(long n)

setEncryption

public void setEncryption(int e)

setPriority

public void setPriority(int p)

setConsumerID

public void setConsumerID(long n)

setPersistent

public void setPersistent(boolean b)

setRedelivered

public void setRedelivered(boolean b)

setIsQueue

public void setIsQueue(boolean b)

setSelectorsProcessed

public void setSelectorsProcessed(boolean b)

setSendAcknowledge

public void setSendAcknowledge(boolean b)

setIsLast

public void setIsLast(boolean b)

setFlowPaused

public void setFlowPaused(boolean b)

setIsTransacted

public void setIsTransacted(boolean b)

setConsumerFlow

public void setConsumerFlow(boolean b)

setIndempontent

public void setIndempontent(boolean b)

setFlag

public void setFlag(int flag,
                    boolean on)

setDestination

public void setDestination(String s)

setDestinationClass

public void setDestinationClass(String s)

setCorrelationID

public void setCorrelationID(String s)

setReplyTo

public void setReplyTo(String s)

setReplyToClass

public void setReplyToClass(String s)

setMessageType

public void setMessageType(String s)

setProperties

public void setProperties(Hashtable props)
Set the message properties. WARNING! The Hashtable is NOT copied.

Parameters:
props - The message properties.

setMessageBody

public void setMessageBody(ByteBuffer body)
Set the message body. 'body' is sliced to derive the message body so be careful what the buffers position is! Note: If you allocate a direct ByteBuffer and pass it here you will get better performance than passing a byte[]

Parameters:
body - The message body.

setMessageBody

public void setMessageBody(byte[] body)
Set the message body. WARNING! The byte array is NOT copied.

Parameters:
body - The message body.

setMessageBody

public void setMessageBody(byte[] body,
                           int off,
                           int len)
Set the message body. Specify offset and length of where to take data from buffer. WARNING! The byte array is NOT copied.

Parameters:
body - The message body.
off - Offset into body that data starts
len - Size of message body

getHeaderBytes

protected ByteBuffer getHeaderBytes()

getPacketPayload

protected PacketPayload getPacketPayload()

getPacketVariableHeader

protected PacketVariableHeader getPacketVariableHeader()

generateSequenceNumber

public void generateSequenceNumber(boolean generate)
Disable (and enable) sequence number generation. The JMQ packet specification defines a "sequence number" field that is defined to be an increasing sequence number. By default Packet will automatically increment the sequence number and set it on the packet every time writePacket() is called. The sequence number is a class variable so all packets in a VM share the same sequence.

Parameters:
generate - true to have the packet automatically generate sequence numbers for you, false to not. Default is "true".

generateTimestamp

public void generateTimestamp(boolean generate)
Disable (and enable) timestamp generation. The JMQ packet specification specifies a "timestamp" field that is defined to be the time the packet was sent. By default ReadWritePacket will automatically generate a timestamp and set it on the packet every time writePacket() is called.

Parameters:
generate - true to have the packet automatically generate a timestamp on each packet sent, false to not. Default is "true".

updateTimestamp

public void updateTimestamp()
Update the timestamp on the packet. If you do this you should call generateTimestamp(false) before writing the packet, otherwise the timestamp will be overwritten when writePacket() is called.


updateSequenceNumber

public void updateSequenceNumber()
Update the sequence number on the packet. If you do this you should call generateSequenceNumber(false) before writing the packet, otherwise the sequence number will be overwritten when writePacket() is called.


readPacket

public boolean readPacket(ScatteringByteChannel channel,
                          boolean block)
                   throws IOException
Read a packet from a ReadableByteChannel. This method may be used for blocking and non-blocking I/O. If you are using blocking I/O then you should specify "true" for 'block'. If you are using non-blocking then specify "false". Returns "true" if a complete packet has been read, else "false". If this method returns false then you may call it again (as many times as necessary) to try and complete the read. If 'block' is true, then the method will block until a complete packet is read. If the packet being read is larger than the limit set by setMaxPacketSize() then the packet bytes are skipped over and this method will throw a BigPacketException. At that point the packet is partially constructed, and only the header fields in the fixed portion of the packet are valid.

Throws:
IOException

writePacket

public boolean writePacket(GatheringByteChannel channel,
                           boolean block)
                    throws IOException
Write a packet to a WritableByteChannel. This method may be used for blocking and non-blocking I/O. If you are using blocking I/O then you should specify "true" for 'block'. If you are using non-blocking then specify "false". Returns "true" if a complete packet has been written, else "false". If this method returns false then you may call it again (as many times as necessary) to try and complete the write. If 'block' is true, then the method will block until a complete packet is written.

Throws:
IOException

toString

public String toString()
Return a unique string that identifies the packet

Overrides:
toString in class Object

dumpPacketString

public String dumpPacketString(String prefix)
Return a string containing the contents of the packet in a human readable form.

Parameters:
prefix - String to prefix string with. By default "********"

dumpPacketString

public String dumpPacketString()

dumpBody

public static void dumpBody(PrintStream os,
                            int pType,
                            InputStream is,
                            int bodySize,
                            Hashtable props)
Dump the body of the packet. This is static so that it can be used by the old packet code as well (that is still used by the client)


dump

public void dump(PrintStream os)

headerToString

public String headerToString()
Return a string representation of the data in the fixed header portion of the packet.


dump

public void dump(PrintStream os,
                 String prefix)
Dump the contents of the packet in human readable form to the specified OutputStream.

Parameters:
os - OutputStream to write packet contents to
prefix - String prefix to print before the packet. If null "********" is used.

readPacket

public void readPacket(InputStream is)
                throws IOException,
                       EOFException,
                       StreamCorruptedException,
                       IllegalArgumentException
Read packet from an InputStream. This method reads one packet from the InputStream and sets the state of this object to reflect the packet read. This method is not synchronized and should only be called by non-concurrent code. If we read a packet with a bad magic number (ie it looks like bogus data), we give up and throw a StreamCorruptedException. If we read a packet that does not match our packet version we attempt to swallow the packet and then throw an IllegalArgumentException. If this method throws an OutOfMemoryError, the caller can try to free memory and call retryReadPacket(). If the packet being read is larger than the limit set by setMaxPacketSize() then the packet bytes are skipped over and this method will throw a BigPacketException. At that point the packet is partially constructed, and only the header fields in the fixed portion of the packet are valid.

Parameters:
is - the InputStream to read the packet from
Throws:
IOException
EOFException
StreamCorruptedException
IllegalArgumentException

writePacket

public void writePacket(OutputStream os)
                 throws IOException
Write the packet to an OutputStream. Blocking.

Parameters:
os - The OutputStream to write the packet to
Throws:
IOException

destroy

public void destroy()
Destroy a packet and put all direct buffers back into the buffer pool. WARNING! Once destroy is called the packet may not be used again. Once destroy is called all buffers ever returned by the packet are invalid!


dumpBufs

public static void dumpBufs(ByteBuffer[] bufs)

myChannelWrite

public static long myChannelWrite(GatheringByteChannel channel,
                                  ByteBuffer[] bufs,
                                  int offset,
                                  int length)
                           throws IOException
myChannelRead and myChannelWrite are used to work-around an nio bug. Nio scattering/gathering IO methods leak a ton of memory, so we kludge our own versions. See iMQ bug 4629634 for more info

Throws:
IOException

myChannelRead

public static long myChannelRead(ScatteringByteChannel channel,
                                 ByteBuffer[] bufs,
                                 int offset,
                                 int length)
                          throws IOException
Throws:
IOException

Sun Java(tm) System Message Queue
v4.1

Copyright 2006 Sun Microsystems, Inc. 4150 Network Circle,
Santa Clara, California, 95054, U.S.A. All Rights Reserved.