`
aol_aog
  • 浏览: 16766 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

ITM事件直接接收并解析

阅读更多

之前在实施一个监控项目时,客户因为买了IBM的小机,当前就赠送了TIVOLI的系统监控软件一套,客户也在他们的生产环境中部署了ITM的监控,由于没有购买IBM的netcool,无法集中管理告警事件,请要求我们直接把ITM的告警接受过来处理,当前我研究了一个ITM与netcool的接口eif,发现它是通过socket实现的,并且数据是文件格式,所以当时就开发了一个程序从ITM中直接接收事件,好东东要分享嘛,发给大家参考,有用的可以拿过去使用。

TECSocketServer.java,程序的主方法类,启动本地端口接收ITM的告警事件。

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.log4j.Logger;

/**
* TECSocketServer采集TEMS转发过来的事件信息。
*
* @author James Gao
* @since iMon 2.0 2011-11-6
*/
public class TECSocketServer extends Thread {

/**
* 绑定本地网络端口,接收TEC发送过来的事件。
*/
private ServerSocket ss;

/**
* 是否运行。
*/
boolean runFlag = false;

/**
* 缓存数据的队列的数据结构.
*/
LinkedBlockingQueue queue;
/**
* 日志记录。
*/
static final Logger logger = Logger
.getLogger(TECSocketServer.class);

TECEventParser eventParser;

private TECEventCachedHandler handler;
private int port;

/**
* 默认事件缓存1000000条。
*/
private static final int EVNET_CACHE_SIZE = 1000000;

public TECSocketServer(int port, int eventCacheSize) {
// netcoolEventHandler = new TECEventCachedHandler();
this.eventParser = new TECEventParser(",");
this.port = port;

if (eventCacheSize <= 1000) {
this.queue = new LinkedBlockingQueue(EVNET_CACHE_SIZE);
} else {
this.queue = new LinkedBlockingQueue(eventCacheSize);

}
}

/**
* 需要在线程里启动绑定本地端口的服务,这样便于在后台运行。
*/
public void run() {
try {

ss = new ServerSocket(port);

while (runFlag) {
// 得到客户端连接
Socket socket = ss.accept();
socket.setKeepAlive(true);
// 启动接收服务
CollectorWork work = new CollectorWork(socket);
work.start();

}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
if (ss != null) {
try {
ss.close();
} catch (IOException e) {

}
}
}

}

/**
* 启动采集服务
*
* @param eventParser事件解析
*/
public void startCollectServer() {
runFlag = true;

handler = new TECEventCachedHandler(this, eventParser);
handler.setDaemon(true);
handler.start();
logger.info("启动TEC事件处理线程完毕。");

// 绑定本地网络端口,开始接收TEMS转发过来的事件。
this.start();
logger.info("TEC接收TEMS转发事件服务启动完毕。");
}

public void stopCollectServer() {
runFlag = false;
try {
ss.close();
} catch (Exception ex) {
logger.error(ex.getMessage(), ex);
}

try {
handler.interrupt();
} catch (Exception ex) {
}
logger.info("TEC接收TEMS转发事件服务关闭,释放绑定端口。");
}

/**
* 事件接收线程
*
*/
class CollectorWork extends Thread {
private BufferedReader in;
private Socket socket;

public CollectorWork() {

}

public CollectorWork(Socket socket) {
this.socket = socket;
}

public void run() {

try {

// 15分钟内无事件则关闭客户端
socket.setSoTimeout(15 * 60 * 1000);
logger.info("*******************************************************************");
logger.info("A TEMS client come in:" + socket);
logger.info("*******************************************************************");
StringBuffer sb = new StringBuffer();
// String event = null;
in = new BufferedReader(new InputStreamReader(
socket.getInputStream()));
String line = "";
boolean endflag = false;
while (runFlag && (line = in.readLine()) != null) {
line = line.trim();
System.out.println(line);
System.out.println();
if (line.equals(""))
continue;

// tems转发过来的每条事件以Start开头
if (line.startsWith("<START>")&&line.endsWith("END")) {
endflag = true;

sb = new StringBuffer();
// sb.append(line).append(";");
sb.append(line);

}
// // SocketGateway转发过来的每条事件以End开头
// if (line.indexOf("end") == 0) {
// endflag = true;
//
// }
// if (!endflag) {
// if (event != null) {
// sb.append(line);
// }
// }

if (endflag) {
logger.info("**********receives data==*************"
+ sb.toString());
// 放入缓存队件中。
queue.put(sb.toString());
}

}
} catch (Exception e) {
logger.info(e.getMessage(), e);
} finally {
try {
if (in != null) {
in.close();
}
} catch (IOException e) {
logger.info(e.getMessage(), e);
}
try {
if (socket != null) {
socket.close();
}
} catch (IOException e) {
logger.info(e.getMessage(), e);
}
}
}
}


public static void main(String[] args){
TECSocketServer server = new TECSocketServer(5529,100000);
server.startCollectServer();
}
}


TECEventCachedHandler.java

import java.util.HashMap;

/**
*
* 针对TEC发送过来的事件进行缓存队列,并从缓存队列中取出并解析使事件成为标准化事件后进行处理。
*
* @author James Gao,create on 2010-2-9
* @version v1.0
*/
class TECEventCachedHandler extends Thread {
/**
*
*/
private final TECSocketServer tecSocketServer;
private TECEventParser parseHandler;

public TECEventCachedHandler(TECSocketServer tecSocketServer, TECEventParser parseHandler) {
this.tecSocketServer = tecSocketServer;
this.parseHandler = parseHandler;
}

public void run() {

while (this.tecSocketServer.runFlag) {

String data = null;
try {
// 从缓存队列中取出进行处理。
data = (String) this.tecSocketServer.queue.take();

if (TECSocketServer.logger.isDebugEnabled()) {
TECSocketServer.logger.debug("Take a event data from CacheQueue,data=["
+ data.toString() + "]");
}

HashMap eventMap = parseHandler
.parserStr( data);
if (TECSocketServer.logger.isDebugEnabled()) {
TECSocketServer.logger.debug("Process completely.");
}
} catch (InterruptedException ex) {
TECSocketServer.logger.warn("Read data from cache queue error ,cause : "
+ ex.getMessage(), ex);
} catch (Exception ex) {
TECSocketServer.logger.warn("Process event data fail, data=[" + data
+ "],cause by: " + ex.getMessage(), ex);
}
}
}

}

TECEventParser.java主要用来解析文本为数组。

import java.util.HashMap;

import org.apache.log4j.Logger;

/**
* 事件解析处理
*
* @author James Gao, 2011-11-6
* @since SOP iMon 2.0
*/
public class TECEventParser {

private static final Logger logger = Logger
.getLogger(TECEventParser.class);

private String separator;

public TECEventParser(String separator) {
this.separator = separator;
}

/**
* 解析组合字符串
*
* @param initStr
*/
HashMap mapForData = new HashMap();
public HashMap parserStr(String initStr) {
long t1 = System.currentTimeMillis();
String event = initStr;
HashMap mapForValue = new HashMap();
if (initStr != null && !initStr.equals("")) {
for(int a,b=0,c,d,i=0;i<initStr.length();i=d ){

a = initStr.indexOf("='");
b = initStr.indexOf("';");
c = initStr.indexOf(";");
String x = initStr.substring(c+1, b+1);
System.out.println(x);
if(!"".equals(x)&&(x!=null)){
int e = x.indexOf("='");
String key = x.substring(0, e);
System.out.println(key);
String firstValue = x.substring(e+2, x.length());
String lastValue = firstValue.substring(0, firstValue.length()-1);
if(lastValue.equals("")){
lastValue ="no value";
}
System.out.println(lastValue);
if(key!=null&&!"".equals(key)&&key.equals("source")){
mapForValue.put(key, lastValue);
}
else if(key!=null&&!"".equals(key)&&key.equals("sub_source")){
mapForValue.put(key, lastValue);
}
else if(key!=null&&!"".equals(key)&&key.equals("severity")){
mapForValue.put(key, lastValue);
}
else if(key!=null&&!"".equals(key)&&key.equals("cms_hostname")){
mapForValue.put(key, lastValue);
}
else if(key!=null&&!"".equals(key)&&key.equals("situation_name")){
mapForValue.put(key, lastValue);
}
else if(key!=null&&!"".equals(key)&&key.equals("situation_fullname")){
mapForValue.put(key, lastValue);
}
else if(key!=null&&!"".equals(key)&&key.equals("situation_displayitem")){
mapForValue.put(key, lastValue);
}
else if(key!=null&&!"".equals(key)&&key.equals("situation_origin")){
mapForValue.put(key, lastValue);
}
else if(key!=null&&!"".equals(key)&&key.equals("situation_time")){
mapForValue.put(key, lastValue);
}
else if(key!=null&&!"".equals(key)&&key.equals("situation_group")){
mapForValue.put(key, lastValue);
}
else if(key!=null&&!"".equals(key)&&key.equals("situation_status")){
mapForValue.put(key, lastValue);
}
else if(key!=null&&!"".equals(key)&&key.equals("origin")){
mapForValue.put(key, lastValue);
}
else if(key!=null&&!"".equals(key)&&key.equals("hostname")){
mapForValue.put(key, lastValue);
}
else if(key!=null&&!"".equals(key)&&key.equals("master_reset_flag")){
mapForValue.put(key, lastValue);
}
else if(key!=null&&!"".equals(key)&&key.equals("integration_type")){
mapForValue.put(key, lastValue);
}
else{
mapForValue.put(key, lastValue);
}



}
System.out.println("~~~~~~~~~~");
String x1 = initStr.substring(a+2, b);
// System.out.println(x1);
String x2 = initStr.substring(b+1, initStr.length());
if(x2.indexOf("END")==1){
break;
}
c = x2.indexOf("';");
String x3 = x2.substring(1, c+1);
initStr =x2;
d =(initStr.length())-b;

}
} else {
logger.info("接收到的原始数据 initStr is NULL");
}
long t4 = System.currentTimeMillis();
logger.info("解析告警" + event + "耗时:" + (t4 - t1) + "ms");
return mapForValue;
}


}


分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics