`
wu-yansheng
  • 浏览: 15746 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

java多线程体验3-实际项目应用

阅读更多
多线程发现短信的实例:
1、生产获取处理数据的类:
private class ScanTask extends TimerTask {

		@Override
		public void run() {
			// if there is no capacity, cancel this scan
			if (messageQueue.remainingCapacity() <= 0) {
				return;
			}
			// 取出比剩余容量两倍的信息,为下一分钟的扫描间隙准备,put方法可以挂在queue上
			List<SysMessageSending> candidate = sysMessageSendingDao.findForSending(messageQueue.remainingCapacity()*2);
			//System.out.println("取出" + messageQueue.remainingCapacity()*2);
			for (SysMessageSending msg : candidate ) {
				if (checkAppoint(msg)) {
					try {
						messageQueue.put(msg);
					} catch (InterruptedException e) {
						LOG.warn("putting in message Queue is interrupted.");
					}
				} else {
					// invalid message, delete from the message queue
					SysMessageSendingLog log = new SysMessageSendingLog();
					BeanUtils.copyProperties(msg, log);
					log.setId(null);
					log.setOid(msg.getId()); //copy the msg's ID as orginal ID
					log.setSendStatus(SysMessageSendingLog.SEND_STATUS_FAILED);
					sysMessageSendingLogDao.insert(log);
					sysMessageSendingDao.delete(msg);
				}
			}
			if (needReception) {
				LOG.info("start read receptions.");
				// scan the log table and find the message's whose reception is null
				List<SysMessageSendingLog> receptWanted = sysMessageSendingLogDao.findMessageLogForReception();
				for (SysMessageSendingLog log : receptWanted) {
					MessageChannel channel = messageChannelMap.get(log.getSendType());
					if (channel != null ) {
						String recept = channel.getReception(log.getSid());
						if (recept != null) {
							sysMessageSendingLogDao.updateReception(log.getSid(), recept);
						}
					}
				}
				LOG.info("end read receptions.");
			}
		}
	}


2、消费者,发送短信的线程类:
private class MessageSendingThread extends Thread {
		
		public MessageSendingThread() {
			this.setName("MessageSendService.MessageSendingThread");
		}
		/**
		 * write log and delete original message from queue
		 * @param msg
		 * @param success - true: send succefully, false: send failed and expired
		 */
		private void afterMessageSending(SysMessageSending msg, boolean success, String sid) {
			// write success log
			SysMessageSendingLog log = new SysMessageSendingLog();
			BeanUtils.copyProperties(msg, log);
			log.setId(null);
			log.setSid(sid);
			log.setOid(msg.getId()); //copy the msg's ID as orginal ID
			if (success) {
				log.setRealDate(new Date());
				log.setSendStatus(SysMessageSendingLog.SEND_STATUS_SUCCESS);
			} else {
				log.setSendStatus(SysMessageSendingLog.SEND_STATUS_FAILED);
			}
			sysMessageSendingLogDao.insert(log);
			// delete the message from the queue
			if (msg.getId() != null) {
				// if it's from the database, delete it
				sysMessageSendingDao.delete(msg);
			}
		}
		// do job
		public void run() {
			LOG.debug("SendingThread is started");
			while (true) {
				try {
					SysMessageSending msg = messageQueue.take();
					LOG.debug("a message was taken from the message queue. ready for sending");
					// try to get the correct message channel
					MessageChannel channel = messageChannelMap.get(msg.getSendType());
					if (channel != null ) {
						try {
							if (msg.getId() != null
									&& !sysMessageSendingDao.checkForSending(msg.getId())) {
								// NOTE: just locking the msg and status not synchronized yet!
								continue; // omit this msg, it's has been processed by another thread
							}
							// all the message in the queue take them as ready for sending
							Message message = new Message(msg.getContent());
							String sid = channel.sendMessage(msg.getAddress(), message.getSubject(), message.getText(),msg.getSmsUser());
							try {
								if (transactionManager != null) {
									// 把操作消息队列表和日志表的操作放置在一个事务中
									DefaultTransactionDefinition def = new DefaultTransactionDefinition();
									def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
									TransactionStatus status = transactionManager.getTransaction(def);
									LOG.debug("sending successfully, start transaction  for writing message log");
									try {
										afterMessageSending(msg, true, sid);
									} catch (RuntimeException e) {
										// to prevent DAO exception interfering the message sending Exeption
										LOG.error("dao exception happend when transfering message to message log table.");
										LOG.error("detail error message:" + e.getMessage());
										transactionManager.rollback(status);
									}
									// commit the transaction
									if (!status.isCompleted()) {
										transactionManager.commit(status);
										LOG.debug("commit transaction.");
									}
									LOG.debug("end transaction for message log.");
								} else {
									afterMessageSending(msg, true, sid);
								}
							} catch (Exception e) {
								LOG.error("unknown error happend when transfering message to message log table.");
								e.printStackTrace(); // print detail unknown error
							}
						} catch (Exception e) {
							LOG.warn("error happended when try to send message to:"
									+ msg.getAddress() + " on message channel:" + channel);
							LOG.warn("error message:" + e.getMessage());
							// check if is it expired?
							if ( new Date().after(msg.getExpireDate()) ) {
								// write fail message log
								if (transactionManager != null) {
									// 把操作消息队列表和日志表的操作放置在一个事务中
									DefaultTransactionDefinition def = new DefaultTransactionDefinition();
									def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
									TransactionStatus status = transactionManager.getTransaction(def);
									LOG.debug("send failed and message expired, start transaction for writing message log");
									try {
										afterMessageSending(msg, false, null);
									} catch (RuntimeException re) {
										LOG.error("dao exception happened and prevent operations on message sending tables. rolling back.");
										LOG.error("detail error message:" + e.getMessage());
										transactionManager.rollback(status);
									}
									if (!status.isCompleted()) {
										transactionManager.commit(status);
										LOG.debug("commit transaction.");
									}
									LOG.debug("end transaction for message log.");
								} else {
									afterMessageSending(msg, false, null);
								}
							} else {
								// not expired but failed
								if (msg.getId() == null) {
									LOG.info("message sending not succesful now, persist the message to the queue table.");
									msg.setStatus("0"); // error happend for the first time
									sysMessageSendingDao.insert(msg); // save the message if it's not persisted yet
								} else {
									// increase the error count
									sysMessageSendingDao.addErrorCount(msg);
								}
							}
						}
					} else {
						LOG.warn("ignored this message because cannot find right channel for the send type:" + msg.getSendType());
						LOG.info("set the status to FAILED directly for msg which cann't find right channel.");
						if (msg.getId() == null) {
							msg.setStatus(SysMessageSending.STATUS_FAILED);
							sysMessageSendingDao.insert(msg);
						} else {
							sysMessageSendingDao.changeMessageStatus(msg.getId(), SysMessageSending.STATUS_FAILED);
						}
					}
				} catch (InterruptedException e) {
					LOG.warn("taking on the blocking message queue is interrupted.");
				}
			}
		}
	}

3、短信发送主线程类:
public MessageSendServiceImpl(int bufferSize, int scanInteval, boolean needReception, boolean runThread) {
		
		LOG.debug("initialize message buffer queue size to " + bufferSize);
		this.messageQueue = new ArrayBlockingQueue<SysMessageSending>(bufferSize);
		// set if need reception
		this.needReception = needReception;
		if (runThread) {
			// start the inner thread - TODO 可以对于每一种通道开一个线程,提高性能,防止通道之间等待
			this.sendingThread = new MessageSendingThread();
			this.sendingThread.start(); // start the thread
			// start the scan thread
			this.timer.schedule(new ScanTask(), 1000, scanInteval);
		}
		
	}

4、Spring的配置:
<bean id="messageSendService" class="com.csair.cbd.sms.services.impl.MessageSendServiceImpl">
<constructor-arg type="int" value="100" /><!-- buffer size -->
<constructor-arg type="int" value="6000" /><!-- scan interval -->
<property name="messageChannels">
<list>
<bean id="smsChannel" class="com.csair.cbd.sms.services.impl.SMSChannel" >
<property name="url" value="http://10.101.116.12:888/SmsCenterWs/services/ISmsWS"></property> <!-- http://10.101.67.20:80/SmsCenterWs/services/ISmsWS -->
<!-- property name="username" value="test"></property>
<property name="password" value="test"></property-->
<property name="smsUser">
<map>
<!-- 实时发送短信接口帐号 -->
<entry key="sms_user_realtime">
<bean class="com.csair.cbd.system.services.dto.SmsUserDto">
<property name="userId" value="test"/>
<property name="password" value="test"/>
</bean>
</entry>
<!-- 非实时发送短信接口帐号 -->
<entry key="sms_user2_delay">
<bean class="com.csair.cbd.system.services.dto.SmsUserDto"> </bean>
</entry>
</map>
</property>

</bean>
<bean id="emailChannel" class="com.csair.cbd.sms.services.impl.EmailChannel">
</property>
</bean>
</list>
</property>
</bean>
分享到:
评论

相关推荐

    Java多线程与线程安全实践-基于Http协议的断点续传(源码)

    本毕业设计题目旨在研究和实现一个基于Java多线程与线程安全机制的断点续传下载工具。随着互联网的普及,文件的下载需求日益增加,而大文件的下载往往需要较长的时间,一旦出现网络中断或意外情况,就需要从头开始...

    基于JAVA的网络通讯系统设计与实现源码+文全套

    使用场景及目标:通过学习和运行该项目源码,用户可以更好地了解JAVA网络编程的核心技术和实际应用。在以下场景中可以应用本项目源码: 对于初学者,可以通过运行该项目源码,了解JAVA网络编程的整体流程和核心技术...

    Java优化编程(第2版)

    第12章 java多线程技术与应用性能优化 12.1 java多线程技术 12.1.1 进程与线程 12.1.2 线程的生命周期 12.2 并行任务与性能 12.2.1 并行任务与多线程 12.2.2 并行任务与死锁 12.3 线程池技术与应用性能优化 12.3.1 ...

    张孝祥Java就业培训教程.pdf

    在以后的章节中,用通俗易懂的手法,紧密联系实际应用的方式,深入浅出地讲解了多线程,常用Java类,Java中的I/O(输入输出)编程,GUI与Applet,网络编程等方面的知识。 本书许多内容都来源于程序员圈子里的非正式...

    JAVA性能瓶颈和漏洞检测

    JProbe在简单易用的集成化套件中,为servlet、JSP和EJB应用代码提供了强大的Java性能分析、内存纠错、代码覆盖及线程分析功能。 JProbe Profiler JProbe Profiler * JProbe Profiler JProbe Profiler内置了Call ...

    java 面试题 总结

    与cgi的区别在于servlet处于服务器进程中,它通过多线程方式运行其service方法,一个实例可以服务于多个请求,并且其实例一般不会销毁,而CGI对每个请求都产生新的进程,服务完成后就销毁,所以效率上低于servlet。...

    xalanjava源码-FreeLunch:用于Java应用程序的连续且低开销的锁分析器

    越重要,阻止线程(以及应用程序)取得进展的锁就越多, 锁争用的定期报告:即使是暂时的锁争用也会损害应用程序性能,因此 Free-Lunch 会定期计算并报告整个应用程序的 CSP,以便在锁问题出现时尽快检测, 低开销...

    超级有影响力霸气的Java面试题大全文档

    与cgi的区别在于servlet处于服务器进程中,它通过多线程方式运行其service方法,一个实例可以服务于多个请求,并且其实例一般不会销毁,而CGI对每个请求都产生新的进程,服务完成后就销毁,所以效率上低于servlet。...

    asp.net知识库

    .NET 2.0 泛型在实际开发中的一次小应用 C#2.0 Singleton 的实现 .Net Framwork 强类型设计实践 通过反射调用類的方法,屬性,字段,索引器(2種方法) ASP.NET: State Server Gems 完整的动态加载/卸载程序集的解决方案 ...

    JAVA性能瓶颈和漏洞检测.JProbe.Suite.v7.0.part2

    JProbe在简单易用的集成化套件中,为servlet、JSP和EJB应用代码提供了强大的Java性能分析、内存纠错、代码覆盖及线程分析功能。 JProbe Profiler JProbe Profiler * JProbe Profiler JProbe Profiler内置了Call ...

    JAVA性能瓶颈和漏洞检测].JProbe.Suite.v7.0.part1

    JProbe在简单易用的集成化套件中,为servlet、JSP和EJB应用代码提供了强大的Java性能分析、内存纠错、代码覆盖及线程分析功能。 JProbe Profiler JProbe Profiler * JProbe Profiler JProbe Profiler内置了Call ...

    深入理解Android:卷I--详细书签版

     结合实际应用开发需求,以情景分析的方式有针对性地对Android的源代码进行了十分详尽的剖析,深刻揭示Android系统的工作原理  机锋网、51CTO、开源中国社区等专业技术网站一致鼎力推荐 内容简介  《深入理解...

    Android程序设计基础

    作为一款多方倾力打造的平台,Android具有许多优点:实际应用程序运行速度快;开发限制少,平台开放;程序多任务性能优秀,切换迅速等。当然,它也具有系统细节不完善、电源管理不好、软件的界面不太好、支持的软件...

    二十三种设计模式【PDF版】

    经常以那些技术只适合大型项目为由,避开或忽略它们,实际中,Java 的接口或抽象类是真正体现 Java 思想的核心所在,这些 你都将在 GoF 的设计模式里领略到它们变幻无穷的魔力。 GoF 的设计模式表面上好象也是一种...

    javapms门户网站源码

    Java是一种被广泛使用的网络编程语言,主要应用于企业、政府部门、电信、银行、手机平台开发等多个领域,是现在大型软件项目开发中的主角,长期占据《TIOBE世界编程语言排行榜》第一的宝座。随着Internet的发展,...

    平台设计方案.doc

    针对系统的具体特点和系统要求,我们在进行数据库方案设计时对数据库平台提出下列 性能方面的要求: 标准化程度高,符合标准ANSI SQL 数据库语言的规范 支持Brower/SERVER 模式应用,支持对称处理和多线程技术 所...

    C#微软培训资料

    &lt;&lt;page 3&gt;&gt; page begin==================== 14.2 多 态 性 .159 14.3 抽象与密封 .163 14.4 继承中关于属性的一些问题.169 14.5 小 结 .172 第四部分 深入了解 C#.174 第十五章 接 口 .174 15.1 组件...

    JMeter操作手册大全.docx

    从35个方面对Jmeter从原理到实际演示,一册在手,天下我有 1.性能测试基本概念 1.1.RT -Response time 请求响应时间 从客户端发出请求到得到响应的整个时间 一般包括网络响应时间+server的响应时间。 用户接受...

Global site tag (gtag.js) - Google Analytics