技术开发 频道

JMS与集群的监控

  二、实现

  在集群中的服务器启动的时候,需要单独启动一个线程,来定时发送Heartbeat消息。下面是一个简单的实现。

//在这里为了简单起见,采用了ActiveMQ。在实际的应用中应通过JNDI查找相应的ConnectionFactory和Topic。
public void sentHeartBeat() throws Exception {
    ConnectionFactory factory
= new ActiveMQConnectionFactory("vm://localhost");
    Connection connection
= factory.createConnection();
    connection.start();
    final Topic topic
= new ActiveMQTopic("heartBeatTopic");

    final Session session
= connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    final MapMessage message
= session.createMapMessage();
    message.setString(
"hostname", "172.20.0.1");
    message.setInt(
"port", 8080);
    message.setString(
"application", "TestApplication");
    message.setInt(
"instance", 1);

    
//可以设置一些其他的信息,例如当天交易数量,或最近一小时访问次数等。
    ...
    
//启动后台线程来定时发送HeartBeat消息
    
new Thread() {
        
public void run() {
            
while(true){
                try {
                    session.createProducer(topic).send(message);
                    
                    
//等待10分钟
                    Thread.sleep(
1000 * 60 * 10);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }.start();
}

  在监控服务器端,需要根据接收到的消息判断某一台服务器的运行状态。下面是一个简单的实现。

public class ServerMonitor {
    
public class ServerStatus {
        
public String hostname;
        
public int port;
        
public String application;
        
public int instance;
        
public long lastHeartBeatTime;

        
public boolean isActive() {
            
//如果最近一次HeartBeat时间距离现在大于20分钟,则断定该服务器已经当机。
            return lastHeartBeatTime
+ 1000 * 60 * 20 > System.currentTimeMillis();
        }
    }

    
//用来保存所有服务器的状态
    Map
<String, ServerStatus> statuses = new HashMap<String, ServerStatus>();

    
public void setUpMonitor() throws JMSException {
        
//创建JMS连接
        ConnectionFactory factory
= new ActiveMQConnectionFactory("vm://localhost");
        Connection connection
= factory.createConnection();
        connection.start();
        Topic topic
= new ActiveMQTopic("heartBeatTopic");
        Session session
= connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        
//创建消息接收者
        MessageConsumer consumer
= session.createConsumer(topic);
        consumer.setMessageListener(
new MessageListener() {
            
public void onMessage(Message m) {
                try {                        
                    MapMessage message
= (MapMessage) m;

                    
String key = message.getString("application") + message.getInt("instance");
                    ServerStatus serverStatus
= statuses.get(key);
                    
//更新服务器的状态
                    
if (serverStatus == null) {
                        serverStatus
= new ServerStatus();
                        serverStatus.application
= message.getString("application");
                        serverStatus.instance
= message.getInt("instance");
                        serverStatus.hostname
= message.getString("hostname");
                        serverStatus.port
= message.getInt("port");
                        statuses.put(key, serverStatus);
                    }
else {
                        serverStatus.lastHeartBeatTime
= System.currentTimeMillis();
                    }

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

    }
}

  接下来,我们可以为ServerMonitor创建一个Web页面或者其他的UI,来定时刷新显示statuses中的值,从而很容易得知集群中的所有服务器的运行状态。在这里就不再详细阐述。

  这种监控方式最大的优点是配置简单,只要求集群中的服务器和监控服务器指向同一个消息队列即可,所以可以随时向集群中添加和删除服务器,监控服务器不需要做任何的修改。另外,需要注意的是这里使用的是Topic,而不是Queue,原因是,如果需要启动多个Monitor,Topic会将Heartbeat消息发送到每个Monitor,而Queue只会发送到其中的一个Monitor。

0
相关文章