On the web
Nick Stephen's blog
Archives
« November 2009
SunMonTueWedThuFriSat
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
     
       
Today
Click me to subscribe
Search

Links
 

Today's Page Hits: 82

« Java: Tracing classl... | Main | Some Assembly Requir... »
20061110 Friday November 10, 2006

JMX: Building a JMX Notification Hub

Introduction

JMX Notifications allow MBeans to send asynchronous messages and are typically used to indicate a state change, a received event, or a problem.

However, it's necessary for clients to individually subscribe to each MBean that they are interested in receiving notifications from. When there are many such MBeans, or when the MBeans may be created dynamically, this can pose a problem for the client.

This article shows how to build a JMX Notification Hub MBean which provides a single place for clients to subscribe to receive notifications from many MBeans.

JMX 1.2 Notifications - background

The JMX 1.2 Notification framework requires clients interested in a particular notification to subscribe to the MBean responsible for publishing the notification, with an optional NotificationFilter, identifying it via its ObjectName. The MBean is responsible for emitting the notification to all interested clients, this is typically implemented via help from the NotificationBroadcasterSupport class.

Whilst there is a publisher and a subscriber, this doesn't really follow the publish/subscribe paradigm. The publisher and the subscriber need to know about each other, whereas in a true publish/subscribe paradigm they are dissociated from each other. This leads to several limitations in the existing implementation, amongst which are:

  1. The client must subscribe to each and every MBean it is interested in, even if there are thousands of them.
  2. The client cannot pre-subscribe to an MBean before it is created, so must listen for creation notifications and deal with asynchronous subscription.
  3. The MBean emitting the notification must maintain state for each client interested in receiving the notification, which otherwise it would not need to maintain.
  4. If the client disconnects and reconnects, it must resubscribe to all MBeans, and may have lost notifications during the interval.
In fact, it's not quite true to say that the MBean must always maintain state for all clients. JMX Connector implementations typically only add one subscriber to an MBean, and the connector implementation takes charge of the fan-out delivery of notifications to all interested remote clients. However, local MBean clients are still registered directly to the MBean itself. Also the JMX-Remoting API still doesn't let clients subscribe to an ObjectName pattern, just to individual ObjectNames, as per the local API.

Building a JMX 1.2 Notification Hub

What I'd really like to be able to do is have a single place for clients to subscribe to notifications from multiple MBeans, and to be able to filter which notifications they want to receive based upon an ObjectName pattern and a NotificationFilter.

It's actually quite easy to build a simple one of these... all you need to do is write a NotificationHubMBean that is itself a NotificationEmitter, and which sits there listening to every other MBean and forwarding the Notifications.

Here's the basic MBean interface for our NotificationHub... we don't actually need anything at all in this interface, what's important for the MBean is that the implementation class declares itself as implementing NotificationEmitter:

public interface NotificationHubMBean {

    /**
     * accessor
     * @return number of notifications hub has received
     */
    int getNotificationCount();

} 

 

And here's the implementation of the NotificationHubMBean. All it does is subscribe to every other MBean in sight and forward their notifications.

import java.util.Set;
import javax.management.MBeanRegistration;
import javax.management.MBeanServer;
import javax.management.MBeanServerNotification;
import javax.management.Notification;
import javax.management.NotificationBroadcasterSupport;
import javax.management.NotificationListener;
import javax.management.ObjectName;

/**
 * A NotificationHubMBean implementation
 * that subscribes to all MBeans and forwards
 * their Notifications
 */
public class NotificationHub
        extends NotificationBroadcasterSupport
        implements NotificationHubMBean, MBeanRegistration,
        NotificationEmitter, NotificationListener {

    private MBeanServer mbs;
    private ObjectName myObjectName;

    /** private count of number of notifications we receive */
    private int notificationCount;

    /** Creates a new instance of NotificationHub */
    public NotificationHub() {
    }

    /**
     * accessor
     * @return number of notifications hub has received
     */
    public int getNotificationCount() {
        return notificationCount;
    }

    /* MBeanRegistration Interface implementation */

    /**
     * Implementation private. Does a queryNames() and tries to
     * subscribe to every MBean.
     * @param mbs
     * @param name
     * @throws java.lang.Exception
     * @return the ObjectName for this MBean
     */
    public ObjectName preRegister(MBeanServer mbs, ObjectName name)
        throws Exception {

        this.mbs = mbs;
        this.myObjectName = name;

        // Subscribe to all existing ObjectNames that are broadcasters

        Set<ObjectName> objectNameSet = 
	    mbs.queryNames(new ObjectName("*:*"), null);

        for (ObjectName on : objectNameSet) {

            // We don't subscribe to any other NotificationHubs
            // as this could lead to an infinite loop!
            try {
                if (!mbs.isInstanceOf(on, this.getClass().getName())) {
                    mbs.addNotificationListener(on, this, null, null);
                }
            } catch (Exception e) {
                // Ignore exception as the current MBean in the list
                // might not be a notification broadcaster.
            }
        }
        return name;
    }

    /**
     * implementation private.
     */
    public void postRegister(Boolean registrationDone) {
    }

    /**
     * Implementation private.
     * Unregister our listener from all MBeans
     */
    public void preDeregister() throws Exception {
        // Unsubscribe to all existing ObjectNames that are broadcasters

        Set<ObjectName> objectNameSet =
	    mbs.queryNames(new ObjectName("*:*"), null);

        for (ObjectName on : objectNameSet) {

            // Try and unsubscribe from all mbeans
            try {
                mbs.removeNotificationListener(on, this);
            } catch (Exception e) {
                // Ignore exception as the current MBean in the list
                // might not be a notification broadcaster.
            }
        }
    }

    /**
     * implementation private
     */
    public void postDeregister() {
    }

    //--------------------------------
    // NOTIFICATION LISTENER INTERFACE
    //--------------------------------
    /**
     * Receives notifications and forwards to all listeners.
     * It also manages notifications coming from the MBeanServerDelegate and
     * subscribes to new mbeans based upon REGISTRATION_NOTIFICATION events
     */
    public void handleNotification(Notification notification,
                                   Object handback) {
        // If new MBeans are created then put listeners on them.
 
        String type = notification.getType();

        notificationCount++;

        if (type != null &&
            type.equals(MBeanServerNotification.REGISTRATION_NOTIFICATION)) {

            try {
	        ObjectName on =
                    ((MBeanServerNotification)notification).getMBeanName();

                // Avoid loops by not subscribing to other Hubs.
                if (!mbs.isInstanceOf(on,
                        this.getClass().getName())) {
                    mbs.addNotificationListener(on,
                            this, null, null);
                }
            } catch (Exception e) {
                // Ignore exception as the new created MBean might
                // not be a notification broadcaster.
            }
        }
        // Deliver all notifications to anyone subscribed to us
        sendNotification(notification);
    }
}


 

An example program

In order to exercise this MBean, we can write a little application which registers a NotificationHubMBean, registers itself as a listener to the NotificationHubMBean, then registers a few TimerMBeans with different tick intervals:


import java.lang.management.ManagementFactory;
import java.util.Date;
import javax.management.MBeanServer;
import javax.management.Notification;
import javax.management.NotificationListener;
import javax.management.ObjectName;
import javax.management.timer.Timer;

/**
 * NotificationHub example application
 */
public class NotificationHubExample implements NotificationListener {

    public NotificationHubExample() {
        try {
            ObjectName on;

            on = new ObjectName("example:type=NotificationHubMBean");

            NotificationHubMBean nh = new NotificationHub();

            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();

            mbs.registerMBean(nh, on);

            // Now add ourselves as a listener to the hub
            mbs.addNotificationListener(on, this, null, null);

            // Create a few timer mbeans that send regular notifications
            for (int i = 1 ; i < 4; i++) {
                on = new ObjectName("example:type=TimerMBean,name="+i);
                Timer timer = new Timer();
                timer.addNotification("TICK"+i, "tick from "+i,
                        null,
                        new Date(),
                        1000 * i);
                mbs.registerMBean(timer, on);
                timer.start();
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    /* from NotificationListener interface */
    public void handleNotification(Notification notification, Object handback) {
        System.out.println("Got notification : "+notification);
    }

    /**
     * @param args the command line arguments
     */
    public static void main(String[] args) {

        NotificationHubExample me = new NotificationHubExample();

        // wait for a keypress and exit
        try {
            int c = System.in.read();
        } catch (Exception ex) {
            ex.printStackTrace();
        }
        // Don't bother killing Timer threads, just exit
        System.exit(0);
    }
}


 
The output we get looks like the below - note that we're picking up the REGISTRATION_NOTIFICATION Notifications from the MBeanServerDelegateMBean, indicating that each TimerMBean is being registered, and also the notifications from each of the timers:
Got notification : javax.management.MBeanServerNotification[source=JMImplementation:type=MBeanServerDelegate][type=JMX.mbean.registered][message=]
Got notification : javax.management.MBeanServerNotification[source=JMImplementation:type=MBeanServerDelegate][type=JMX.mbean.registered][message=]
Got notification : javax.management.timer.TimerNotification[source=example:type=TimerMBean,name=1][type=TICK1][message=tick from 1]
Got notification : javax.management.MBeanServerNotification[source=JMImplementation:type=MBeanServerDelegate][type=JMX.mbean.registered][message=]
Got notification : javax.management.timer.TimerNotification[source=example:type=TimerMBean,name=2][type=TICK2][message=tick from 2]
Got notification : javax.management.timer.TimerNotification[source=example:type=TimerMBean,name=1][type=TICK1][message=tick from 1]
Got notification : javax.management.timer.TimerNotification[source=example:type=TimerMBean,name=1][type=TICK1][message=tick from 1]
Got notification : javax.management.timer.TimerNotification[source=example:type=TimerMBean,name=2][type=TICK2][message=tick from 2]
Got notification : javax.management.timer.TimerNotification[source=example:type=TimerMBean,name=3][type=TICK3][message=tick from 3]
Got notification : javax.management.timer.TimerNotification[source=example:type=TimerMBean,name=1][type=TICK1][message=tick from 1]
Got notification : javax.management.timer.TimerNotification[source=example:type=TimerMBean,name=1][type=TICK1][message=tick from 1]
Got notification : javax.management.timer.TimerNotification[source=example:type=TimerMBean,name=2][type=TICK2][message=tick from 2]
Got notification : javax.management.timer.TimerNotification[source=example:type=TimerMBean,name=1][type=TICK1][message=tick from 1]
Got notification : javax.management.timer.TimerNotification[source=example:type=TimerMBean,name=3][type=TICK3][message=tick from 3]
Got notification : javax.management.timer.TimerNotification[source=example:type=TimerMBean,name=1][type=TICK1][message=tick from 1]
Got notification : javax.management.timer.TimerNotification[source=example:type=TimerMBean,name=2][type=TICK2][message=tick from 2]
Got notification : javax.management.timer.TimerNotification[source=example:type=TimerMBean,name=1][type=TICK1][message=tick from 1]
Got notification : javax.management.timer.TimerNotification[source=example:type=TimerMBean,name=1][type=TICK1][message=tick from 1]
Got notification : javax.management.timer.TimerNotification[source=example:type=TimerMBean,name=2][type=TICK2][message=tick from 2] 
 
Another way of seeing what is going on is to use the JConsole to visualize the MBeans in the MBeanServer, and to subscribe to the Notifications coming from the NotificationHubMBean, in which case you could get output looking like this:

Limitations of this implementation

There are a few things missing from this simplistic implementation of a NotificationHubMBean, in particular:

JMX 2.0's Event Service

You might wonder why I mentioned "JMX 1.2" in the above... well, the answer is that JMX 2.0 is planning to deliver a new Event service which, whilst maintaining backward compatibility with the existing Notification functionality, will remove the need for such things as this NotificationHubMBean, but for now and the near future, this is still a valuable tool in one's toolbox.

( Nov 10 2006, 04:41:35 PM CET ) Permalink

Comments:

Post a Comment:
Comments are closed for this entry.