`
laserdance
  • 浏览: 90372 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

日志统计平台

阅读更多
这是一个商用的项目,是给XX证券用的.其实现的功能是,将各个服务器上的日志文件下载(通过FTP或SFTP的方式),然后按照给定的日志格式分析并导入到DB中存储起来,然后再用WEB界面实现一个和用户交互的查询.
日志一天只要导入一次就可以的,从该平台的稳定性来考虑,用到了数据库的存储过程,用程序也是可以实现的,不过用数据库更稳定一些.所以该平台采用了sqlserver2005的存储过程.
WEB界面展示的数据是用图形显示的,所以引用了大名鼎鼎的jfreechart来作图表展现.

1 日志文件的存储格式(采用CSV无列头,以comma分隔,\n\r结束为一条日志记录的方式)
2 日志文件的下载(将ftp或sftp的参数配置在xml文件),然后写成定时任务(由于日志是按天生成的,所以定时任务也是一天执行一次)
3 日志下载完成以后就是解析了,然后就将数据批量导入到数据库中存储起来.
4 写数据库存储过程,将原始数据处理
5 写web界面的查询,数据显示以图表展现(JFreechart)

以上就是分的模块.
日志文件格式还包括日志文件名,但为了灵活使用,我们将日志文件名在配置文件中实现,假设我们有三种格式的日志文件要下载--下载日志,交易日志,行情日志.然后我们可以在配置文件中配置日志的名字,以及日志名字中的日期的格式,以及日志文件内容的字段,见一代码片段:
<?xml version="1.0" encoding="gbk"?>
<dataconfig>
	<task name="quote" csvReader="这里填写该任务的读取实现类" excutetime="02:00:00">
		<!-- 批量导入数据库的SQL语句及一次批量插入最多的记录条数-->
		<sql
			insert="insert into quotelogs (visittime,ipaddress,pageviews) values(?,?,?)"
			 maxCommitNumber="100"/>
		<table name="quotelogs">
			<column name="visittime" type="datetime" format="yyyyMMdd HHmmss" csvindex="0" />
			<column name="ipaddress" type="string" csvindex="1" />
			<column name="pageviews" type="int" csvindex="2" />
		</table>
		<!-- fileMaxSize是以M为单位的,此处200是指200M即 200*1024*1024 -->
		<datasource server="192.168.10.21" port="21" username="ftptest"
			password="ftptest!@#" fileMaxSize="200"
			fileName="accessquote.log.{yyyyMMdd}" initPath="/home/ftptest/" />
		<!-- 存放下载日志的路径-->
		<filePath path="F:/Logs/" />
	</task>
	<task>......</task>
         <task>......</task>
	</dataconfig>

我详细解释一下以上配置文件中每一个细节:
task name是给任务取的名字,用来区分多个任务的,csvreader是读取该文件的类,executetime是该定时任务执行的时间
sql这段是批量导入数据库时用到的insert sql语句,maxCommitNumber是批量导入时的最大commit数,table中的csvindex就是指明该字段在表中的位置.遇到日期这样的我们要规定一下日期的表现形式,如yyyyMMddHHmmss.这个格式其实是指明了日志文件中该字段的日期表现形式,而导入数据库时用的是timestamp的形式.这里要注意的是该表的字段和下面table中的都是一致的,这里所有的顺序还和日志文件中内容相一致,这就实现了最大化的定制性(customize),假设另一个日志文件内容为downloadtime,ipaddress,mobilefac,filename,哪我们就可以将以上内容分别替换为
		<sql
			insert="insert into downloadlogs (downloadtime,ipaddress,mobilefac,filename) values (?,?,?,?)"
			 maxCommitNumber="100" />
		<table name="downloadlogs">
			<column name="downloadtime" type="datetime" format="yyyyMMddHHmmss" csvindex="0" />
			<column name="ipaddress" type="string" csvindex="1" />
			<column name="mobilefac" type="string" csvindex="2" />
			<column name="filename" type="string" csvindex="3" />
		</table>

datasource就是配置的ftp或sftp的连接,端口及路径的参数了,如果有多个ftp下载,照上面一datasource copy一个,并列即可,我们的处理xml的xmlManager会将多个ftp下载源读出来的.
filepath就是将远程日志下载以后存放的路径,如果有多个ftp下载我们可以用ftp的ip作为文件名的前缀然后再加文件名来区分不同ftp来源的日志文件.

看,就这一小段配置文件就把我们上面所说的定时任务,日志文件格式,数据库字段,批量导入时的最大commit数全部解决了.
这里面我们平台在运行过程中发生的问题就是多线程读取日志文件到库中时,产生了一些问题,后来更正了.由于每个日志文件的规则不同,所以我们实现的csvreader就不同,所以我们提供csvreader,具体实现由您自己扩展.该平台中由于只有交易日志和其他两个日志规则不同,所以我们只实现了两个reader.
如有疑问,请在后面留言.
下面我们来看定时任务.
public class SchedulerEngine {
	Logger logger = LoggerFactory.getLogger(SchedulerEngine.class);
	private Document doc;
	// 定时执行调度类,来定时调度
	private ScheduledExecutorService schuledExecutor;
	// 所有定时任务的Future
	private Map<String, Future<?>> futureMap = new Hashtable<String, Future<?>>();
	/** path配置文件的路径 . */
	final static String confpath = "/conf/dataconfig.xml";
	public void init() {
		try {
			URL url = getClass().getResource(confpath);
//URLDecoder.decode不懂的查API
			FileInputStream fis = new FileInputStream(URLDecoder.decode(url
					.getFile(), "UTF-8"));
			byte[] buf = new byte[fis.available()];
			fis.read(buf);
			String strXml = new String(buf, "GBK");
			doc = XmlManager.createDomByString(strXml);
		} catch (Exception e) {
			logger.error(e.toString(), e);
		} 
	}
	public void start() throws Exception {
		try {
			schuledExecutor = Executors.newScheduledThreadPool(3);
			NodeList tnl = XmlManager.queryList("/dataconfig/task", doc);
			for (int i = 0; i < tnl.getLength(); i++) {
				Element ele = (Element) tnl.item(i);
				String name = ele.getAttribute("name");
				String schtime = ele.getAttribute("excutetime");
				long curTime = System.currentTimeMillis();
				DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
				String tmpstr = df.format(new Date(curTime));
				String tmpstr2 = tmpstr.substring(0, 11) + schtime;
				long executeTime = df.parse(tmpstr2).getTime();
				long ONEDAY = 24 * 60 * 60 * 1000;
				// 启动定时task任务
				long delay = executeTime - curTime < 0 ? (executeTime - curTime + ONEDAY)
						: (executeTime - curTime);
//				delay = 0;
				// 调度服务监控代码
				ImportTask task = new ImportTask(ele);
				Future<?> future = schuledExecutor.scheduleAtFixedRate(task,
						delay, ONEDAY, TimeUnit.MILLISECONDS);
				futureMap.put(name, future);
			}
		} catch (Exception ex) {
			logger.error(ex.toString(), ex);
			throw new Exception(ex);
		}
	}
	public void reload() throws Exception {
		stop();
		start();
	}
	public void stop() {
		schuledExecutor.shutdown();
		for (Future<?> fu : futureMap.values()) {
			fu.cancel(true);
		}
		futureMap.clear();
	}

}

start里的遍历就是查找有几个task,如果有多个就会循环运行,下篇文章我们来讲ImportTask,即批导入的任务
3
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics