|
|
Using Project OpenDMK APIs To Send Traps From Multiple Threads |
I recently received a question from a member of the Java community
asking for advises on how to implement a trap sending service
for a highly multi-threaded application.
I thought I would quickly write a small example to show that the
SnmpAdaptorServer provided by Project OpenDMK could be safely used to send traps in
multi-threaded environments. But to my biggest surprise,
my test failed.
A quick investigation of the code revealed the bug: the
SnmpAdaptorServer will
let you send traps when it is OFFLINE.
However, in that case,
it will open the trap socket on demand, and close it after the trap
has been sent. In a highly multi-threaded environment this can
result in the socket being closed by one thread while the other
threads are still using it. In other words, when the
adapter is OFFLINE sending traps is not thread safe.
This is what I call a bug.
Fortunately the work around is simple:
- either encapsulate the adapter in an object that will use a
java.util.concurrent.Lock to ensure that only one thread can
call xxxxTrap() at a time,
- or - probably safer, start the adapter before using it.
This is how it should be in the majority of the cases anyway - since
traps are usually sent by SNMP agents, and SNMP agents usually
have MIBs (and hence an SnmpAdaptorServer which needs
to be started).
Anyway - start the adapter is my advice - and here below is
my little test case. You'll need to have jdmkrt.jar
from Project OpenDMK
in your classpath to run it.
Cheers,
-- daniel
package sendtrapmultithread;
import com.sun.jdmk.tasks.DaemonTaskServer;
import com.sun.management.comm.SnmpAdaptorServer;
import com.sun.management.comm.SnmpV3AdaptorServer;
import com.sun.management.snmp.SnmpOid;
import com.sun.management.snmp.SnmpPdu;
import com.sun.management.snmp.SnmpPduPacket;
import com.sun.management.snmp.SnmpPduRequest;
import com.sun.management.snmp.SnmpPduTrap;
import com.sun.management.snmp.SnmpScopedPduRequest;
import com.sun.management.snmp.SnmpStatusException;
import com.sun.management.snmp.SnmpTimeticks;
import com.sun.management.snmp.SnmpVarBindList;
import com.sun.management.snmp.SnmpEventReportDispatcher;
import com.sun.management.snmp.manager.SnmpPeer;
import com.sun.management.snmp.manager.SnmpSession;
import com.sun.management.snmp.manager.SnmpTrapListener;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
@author
public class MultiThreadTrapTest implements Runnable {
private static final Logger LOG =
Logger.getLogger(MultiThreadTrapTest.class.getName());
private final SnmpPeer peer;
private final SnmpAdaptorServer server;
private final CyclicBarrier barrier;
private final CountDownLatch latch;
public final int TRAP_COUNT_LIMIT;
public final int THREAD_COUNT;
private volatile int trapSent;
private synchronized int incr() {
return ++trapSent;
}
public int getSentCount() {
return trapSent;
}
public MultiThreadTrapTest(SnmpPeer peer) {
final SnmpV3AdaptorServer srv;
try {
srv = new SnmpV3AdaptorServer(false,0,null);
} catch (Exception x) {
throw new IllegalStateException("Failed to initialize",x);
}
server = srv;
TRAP_COUNT_LIMIT = 1000;
THREAD_COUNT = 20;
this.peer = peer;
barrier = new CyclicBarrier(THREAD_COUNT, new Runnable() {
public void run() {
System.out.println("5, 4, 3, 2, 1, Go!");
}
});
latch = new CountDownLatch(THREAD_COUNT);
}
public void launch() {
if (!server.isActive()) {
server.start();
server.waitState(server.ONLINE,1000);
}
for (int i=0; i<THREAD_COUNT; i++) {
new Thread(this,"Sender["+i+"]").start();
}
}
public void await() throws InterruptedException {
try {
latch.await();
} finally {
if (server.isActive())
server.stop();
}
}
private void sendTrap(SnmpPeer peer) throws IOException, SnmpStatusException {
server.snmpV2Trap(peer, new SnmpOid("1.2.3.4.0.1"),
new SnmpVarBindList(),
new SnmpTimeticks(server.getSysUpTime()));
incr();
}
public void run() {
try {
barrier.await();
while (trapSent <= TRAP_COUNT_LIMIT) {
try {
sendTrap(peer);
} catch (Exception x) {
System.err.println(
Thread.currentThread().getName()+
": failed to send trap: "+x);
x.printStackTrace();
return;
}
}
} catch (Exception x) {
System.err.println(Thread.currentThread().getName()+": failed: "+x);
} finally {
latch.countDown();
}
}
private static int waitForTraps(final BlockingQueue<SnmpPdu> trapQueue,
final int count) {
int receivedCount = 0;
System.out.println("Excpecting "+count+" traps...");
while (receivedCount < count) {
SnmpPdu pdu = null;
try {
pdu = trapQueue.poll(1,TimeUnit.SECONDS);
} catch (Exception x) {
System.err.println("Failed to get trap: "+x);
}
if (pdu == null) {
return receivedCount;
}
receivedCount++;
}
return receivedCount;
}
@paramargs
public static void main(String[] args) throws Exception {
if (args == null || args.length == 0) args = new String[] { "6789" };
final int trapPort = Integer.parseInt(args[0]);
final DaemonTaskServer srv = new DaemonTaskServer();
srv.start();
final SnmpEventReportDispatcher trapReceiver =
new SnmpEventReportDispatcher(trapPort,
null,srv,null);
final BlockingQueue<SnmpPdu> trapQueue =
new ArrayBlockingQueue<SnmpPdu>(1000);
final SnmpTrapListener handler = new SnmpTrapListener() {
public void enqueue(SnmpPdu aTrapPDU) {
try {
if (!trapQueue.offer(aTrapPDU,10,TimeUnit.MILLISECONDS))
System.out.println("Can't insert trap in the queue: timeout expired");
} catch (InterruptedException x) {
System.out.println("Can't insert trap in the queue: "+x);
}
}
public void processSnmpTrapV1(SnmpPduTrap aTrapPDU) {
enqueue(aTrapPDU);
}
public void processSnmpTrapV2(SnmpPduRequest aTrapPDU) {
enqueue(aTrapPDU);
}
public void processSnmpTrapV3(SnmpScopedPduRequest aTrapPDU) {
enqueue(aTrapPDU);
}
};
trapReceiver.addTrapListener(handler);
final Thread receiverThread = new Thread(trapReceiver,"TrapReceiver");
receiverThread.setDaemon(true);
receiverThread.setPriority(Thread.MAX_PRIORITY);
receiverThread.start();
final SnmpPeer peer = new SnmpPeer(null,trapPort);
final MultiThreadTrapTest test = new MultiThreadTrapTest(peer);
test.launch();
int receivedCount = waitForTraps(trapQueue,test.TRAP_COUNT_LIMIT);
if (receivedCount < test.TRAP_COUNT_LIMIT) {
System.err.println("Test failed: timeout exceeded: "+
receivedCount + " traps received, " +
test.getSentCount() + "traps sent");
throw new RuntimeException("Test failed");
}
test.await();
if (receivedCount < test.getSentCount()) {
receivedCount += waitForTraps(trapQueue,
test.getSentCount()-receivedCount);
}
if ((receivedCount+trapQueue.size()) < test.getSentCount() ) {
System.err.println("Test failed: bad received count: "+
(int)(receivedCount + trapQueue.size())+
" traps received, " +
test.getSentCount() + "traps sent");
throw new RuntimeException("Test failed");
}
System.out.println("Success! "+
receivedCount + " traps received, " +
test.getSentCount() + "traps sent");
}
}
|
Tags:
monitoring
opendmk
opensource
snmp
traps
Posted by dfuchs
( Oct 05 2007, 06:17:57 PM CEST )
Permalink
|