二、实现
在集群中的服务器启动的时候,需要单独启动一个线程,来定时发送Heartbeat消息。下面是一个简单的实现。
//在这里为了简单起见,采用了ActiveMQ。在实际的应用中应通过JNDI查找相应的ConnectionFactory和Topic。
public void sentHeartBeat() throws Exception {
ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = factory.createConnection();
connection.start();
final Topic topic = new ActiveMQTopic("heartBeatTopic");
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MapMessage message = session.createMapMessage();
message.setString("hostname", "172.20.0.1");
message.setInt("port", 8080);
message.setString("application", "TestApplication");
message.setInt("instance", 1);
//可以设置一些其他的信息,例如当天交易数量,或最近一小时访问次数等。
...
//启动后台线程来定时发送HeartBeat消息
new Thread() {
public void run() {
while(true){
try {
session.createProducer(topic).send(message);
//等待10分钟
Thread.sleep(1000 * 60 * 10);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}.start();
}
public void sentHeartBeat() throws Exception {
ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = factory.createConnection();
connection.start();
final Topic topic = new ActiveMQTopic("heartBeatTopic");
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MapMessage message = session.createMapMessage();
message.setString("hostname", "172.20.0.1");
message.setInt("port", 8080);
message.setString("application", "TestApplication");
message.setInt("instance", 1);
//可以设置一些其他的信息,例如当天交易数量,或最近一小时访问次数等。
...
//启动后台线程来定时发送HeartBeat消息
new Thread() {
public void run() {
while(true){
try {
session.createProducer(topic).send(message);
//等待10分钟
Thread.sleep(1000 * 60 * 10);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}.start();
}
在监控服务器端,需要根据接收到的消息判断某一台服务器的运行状态。下面是一个简单的实现。
public class ServerMonitor {
public class ServerStatus {
public String hostname;
public int port;
public String application;
public int instance;
public long lastHeartBeatTime;
public boolean isActive() {
//如果最近一次HeartBeat时间距离现在大于20分钟,则断定该服务器已经当机。
return lastHeartBeatTime + 1000 * 60 * 20 > System.currentTimeMillis();
}
}
//用来保存所有服务器的状态
Map<String, ServerStatus> statuses = new HashMap<String, ServerStatus>();
public void setUpMonitor() throws JMSException {
//创建JMS连接
ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = factory.createConnection();
connection.start();
Topic topic = new ActiveMQTopic("heartBeatTopic");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建消息接收者
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message m) {
try {
MapMessage message = (MapMessage) m;
String key = message.getString("application") + message.getInt("instance");
ServerStatus serverStatus = statuses.get(key);
//更新服务器的状态
if (serverStatus == null) {
serverStatus = new ServerStatus();
serverStatus.application = message.getString("application");
serverStatus.instance = message.getInt("instance");
serverStatus.hostname = message.getString("hostname");
serverStatus.port = message.getInt("port");
statuses.put(key, serverStatus);
} else {
serverStatus.lastHeartBeatTime = System.currentTimeMillis();
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
public class ServerStatus {
public String hostname;
public int port;
public String application;
public int instance;
public long lastHeartBeatTime;
public boolean isActive() {
//如果最近一次HeartBeat时间距离现在大于20分钟,则断定该服务器已经当机。
return lastHeartBeatTime + 1000 * 60 * 20 > System.currentTimeMillis();
}
}
//用来保存所有服务器的状态
Map<String, ServerStatus> statuses = new HashMap<String, ServerStatus>();
public void setUpMonitor() throws JMSException {
//创建JMS连接
ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = factory.createConnection();
connection.start();
Topic topic = new ActiveMQTopic("heartBeatTopic");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建消息接收者
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message m) {
try {
MapMessage message = (MapMessage) m;
String key = message.getString("application") + message.getInt("instance");
ServerStatus serverStatus = statuses.get(key);
//更新服务器的状态
if (serverStatus == null) {
serverStatus = new ServerStatus();
serverStatus.application = message.getString("application");
serverStatus.instance = message.getInt("instance");
serverStatus.hostname = message.getString("hostname");
serverStatus.port = message.getInt("port");
statuses.put(key, serverStatus);
} else {
serverStatus.lastHeartBeatTime = System.currentTimeMillis();
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
接下来,我们可以为ServerMonitor创建一个Web页面或者其他的UI,来定时刷新显示statuses中的值,从而很容易得知集群中的所有服务器的运行状态。在这里就不再详细阐述。
这种监控方式最大的优点是配置简单,只要求集群中的服务器和监控服务器指向同一个消息队列即可,所以可以随时向集群中添加和删除服务器,监控服务器不需要做任何的修改。另外,需要注意的是这里使用的是Topic,而不是Queue,原因是,如果需要启动多个Monitor,Topic会将Heartbeat消息发送到每个Monitor,而Queue只会发送到其中的一个Monitor。