- 浏览: 15746 次
- 性别:
- 来自: 广州
最新评论
多线程发现短信的实例:
1、生产获取处理数据的类:
2、消费者,发送短信的线程类:
3、短信发送主线程类:
4、Spring的配置:
<bean id="messageSendService" class="com.csair.cbd.sms.services.impl.MessageSendServiceImpl">
<constructor-arg type="int" value="100" /><!-- buffer size -->
<constructor-arg type="int" value="6000" /><!-- scan interval -->
<property name="messageChannels">
<list>
<bean id="smsChannel" class="com.csair.cbd.sms.services.impl.SMSChannel" >
<property name="url" value="http://10.101.116.12:888/SmsCenterWs/services/ISmsWS"></property> <!-- http://10.101.67.20:80/SmsCenterWs/services/ISmsWS -->
<!-- property name="username" value="test"></property>
<property name="password" value="test"></property-->
<property name="smsUser">
<map>
<!-- 实时发送短信接口帐号 -->
<entry key="sms_user_realtime">
<bean class="com.csair.cbd.system.services.dto.SmsUserDto">
<property name="userId" value="test"/>
<property name="password" value="test"/>
</bean>
</entry>
<!-- 非实时发送短信接口帐号 -->
<entry key="sms_user2_delay">
<bean class="com.csair.cbd.system.services.dto.SmsUserDto"> </bean>
</entry>
</map>
</property>
</bean>
<bean id="emailChannel" class="com.csair.cbd.sms.services.impl.EmailChannel">
</property>
</bean>
</list>
</property>
</bean>
1、生产获取处理数据的类:
private class ScanTask extends TimerTask { @Override public void run() { // if there is no capacity, cancel this scan if (messageQueue.remainingCapacity() <= 0) { return; } // 取出比剩余容量两倍的信息,为下一分钟的扫描间隙准备,put方法可以挂在queue上 List<SysMessageSending> candidate = sysMessageSendingDao.findForSending(messageQueue.remainingCapacity()*2); //System.out.println("取出" + messageQueue.remainingCapacity()*2); for (SysMessageSending msg : candidate ) { if (checkAppoint(msg)) { try { messageQueue.put(msg); } catch (InterruptedException e) { LOG.warn("putting in message Queue is interrupted."); } } else { // invalid message, delete from the message queue SysMessageSendingLog log = new SysMessageSendingLog(); BeanUtils.copyProperties(msg, log); log.setId(null); log.setOid(msg.getId()); //copy the msg's ID as orginal ID log.setSendStatus(SysMessageSendingLog.SEND_STATUS_FAILED); sysMessageSendingLogDao.insert(log); sysMessageSendingDao.delete(msg); } } if (needReception) { LOG.info("start read receptions."); // scan the log table and find the message's whose reception is null List<SysMessageSendingLog> receptWanted = sysMessageSendingLogDao.findMessageLogForReception(); for (SysMessageSendingLog log : receptWanted) { MessageChannel channel = messageChannelMap.get(log.getSendType()); if (channel != null ) { String recept = channel.getReception(log.getSid()); if (recept != null) { sysMessageSendingLogDao.updateReception(log.getSid(), recept); } } } LOG.info("end read receptions."); } } }
2、消费者,发送短信的线程类:
private class MessageSendingThread extends Thread { public MessageSendingThread() { this.setName("MessageSendService.MessageSendingThread"); } /** * write log and delete original message from queue * @param msg * @param success - true: send succefully, false: send failed and expired */ private void afterMessageSending(SysMessageSending msg, boolean success, String sid) { // write success log SysMessageSendingLog log = new SysMessageSendingLog(); BeanUtils.copyProperties(msg, log); log.setId(null); log.setSid(sid); log.setOid(msg.getId()); //copy the msg's ID as orginal ID if (success) { log.setRealDate(new Date()); log.setSendStatus(SysMessageSendingLog.SEND_STATUS_SUCCESS); } else { log.setSendStatus(SysMessageSendingLog.SEND_STATUS_FAILED); } sysMessageSendingLogDao.insert(log); // delete the message from the queue if (msg.getId() != null) { // if it's from the database, delete it sysMessageSendingDao.delete(msg); } } // do job public void run() { LOG.debug("SendingThread is started"); while (true) { try { SysMessageSending msg = messageQueue.take(); LOG.debug("a message was taken from the message queue. ready for sending"); // try to get the correct message channel MessageChannel channel = messageChannelMap.get(msg.getSendType()); if (channel != null ) { try { if (msg.getId() != null && !sysMessageSendingDao.checkForSending(msg.getId())) { // NOTE: just locking the msg and status not synchronized yet! continue; // omit this msg, it's has been processed by another thread } // all the message in the queue take them as ready for sending Message message = new Message(msg.getContent()); String sid = channel.sendMessage(msg.getAddress(), message.getSubject(), message.getText(),msg.getSmsUser()); try { if (transactionManager != null) { // 把操作消息队列表和日志表的操作放置在一个事务中 DefaultTransactionDefinition def = new DefaultTransactionDefinition(); def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); TransactionStatus status = transactionManager.getTransaction(def); LOG.debug("sending successfully, start transaction for writing message log"); try { afterMessageSending(msg, true, sid); } catch (RuntimeException e) { // to prevent DAO exception interfering the message sending Exeption LOG.error("dao exception happend when transfering message to message log table."); LOG.error("detail error message:" + e.getMessage()); transactionManager.rollback(status); } // commit the transaction if (!status.isCompleted()) { transactionManager.commit(status); LOG.debug("commit transaction."); } LOG.debug("end transaction for message log."); } else { afterMessageSending(msg, true, sid); } } catch (Exception e) { LOG.error("unknown error happend when transfering message to message log table."); e.printStackTrace(); // print detail unknown error } } catch (Exception e) { LOG.warn("error happended when try to send message to:" + msg.getAddress() + " on message channel:" + channel); LOG.warn("error message:" + e.getMessage()); // check if is it expired? if ( new Date().after(msg.getExpireDate()) ) { // write fail message log if (transactionManager != null) { // 把操作消息队列表和日志表的操作放置在一个事务中 DefaultTransactionDefinition def = new DefaultTransactionDefinition(); def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); TransactionStatus status = transactionManager.getTransaction(def); LOG.debug("send failed and message expired, start transaction for writing message log"); try { afterMessageSending(msg, false, null); } catch (RuntimeException re) { LOG.error("dao exception happened and prevent operations on message sending tables. rolling back."); LOG.error("detail error message:" + e.getMessage()); transactionManager.rollback(status); } if (!status.isCompleted()) { transactionManager.commit(status); LOG.debug("commit transaction."); } LOG.debug("end transaction for message log."); } else { afterMessageSending(msg, false, null); } } else { // not expired but failed if (msg.getId() == null) { LOG.info("message sending not succesful now, persist the message to the queue table."); msg.setStatus("0"); // error happend for the first time sysMessageSendingDao.insert(msg); // save the message if it's not persisted yet } else { // increase the error count sysMessageSendingDao.addErrorCount(msg); } } } } else { LOG.warn("ignored this message because cannot find right channel for the send type:" + msg.getSendType()); LOG.info("set the status to FAILED directly for msg which cann't find right channel."); if (msg.getId() == null) { msg.setStatus(SysMessageSending.STATUS_FAILED); sysMessageSendingDao.insert(msg); } else { sysMessageSendingDao.changeMessageStatus(msg.getId(), SysMessageSending.STATUS_FAILED); } } } catch (InterruptedException e) { LOG.warn("taking on the blocking message queue is interrupted."); } } } }
3、短信发送主线程类:
public MessageSendServiceImpl(int bufferSize, int scanInteval, boolean needReception, boolean runThread) { LOG.debug("initialize message buffer queue size to " + bufferSize); this.messageQueue = new ArrayBlockingQueue<SysMessageSending>(bufferSize); // set if need reception this.needReception = needReception; if (runThread) { // start the inner thread - TODO 可以对于每一种通道开一个线程,提高性能,防止通道之间等待 this.sendingThread = new MessageSendingThread(); this.sendingThread.start(); // start the thread // start the scan thread this.timer.schedule(new ScanTask(), 1000, scanInteval); } }
4、Spring的配置:
<bean id="messageSendService" class="com.csair.cbd.sms.services.impl.MessageSendServiceImpl">
<constructor-arg type="int" value="100" /><!-- buffer size -->
<constructor-arg type="int" value="6000" /><!-- scan interval -->
<property name="messageChannels">
<list>
<bean id="smsChannel" class="com.csair.cbd.sms.services.impl.SMSChannel" >
<property name="url" value="http://10.101.116.12:888/SmsCenterWs/services/ISmsWS"></property> <!-- http://10.101.67.20:80/SmsCenterWs/services/ISmsWS -->
<!-- property name="username" value="test"></property>
<property name="password" value="test"></property-->
<property name="smsUser">
<map>
<!-- 实时发送短信接口帐号 -->
<entry key="sms_user_realtime">
<bean class="com.csair.cbd.system.services.dto.SmsUserDto">
<property name="userId" value="test"/>
<property name="password" value="test"/>
</bean>
</entry>
<!-- 非实时发送短信接口帐号 -->
<entry key="sms_user2_delay">
<bean class="com.csair.cbd.system.services.dto.SmsUserDto"> </bean>
</entry>
</map>
</property>
</bean>
<bean id="emailChannel" class="com.csair.cbd.sms.services.impl.EmailChannel">
</property>
</bean>
</list>
</property>
</bean>
发表评论
-
一个容易误解的finally和return的执行顺序
2011-02-24 19:22 688try { System.out.println(&q ... -
系统国际化1
2011-02-24 14:26 794对于稍微大一点的系统,国际化都是一个必须的步骤,很多系统为了不 ... -
事务1-spring 事务支持
2011-02-23 09:51 972当你涉及到两个其他业务方法一起绑定为一个业务操作的时候 ... -
java多线程体验2-生产者消费者
2011-02-22 21:51 1071今天接着复习一下多线 ... -
为何使用SSH
2011-02-19 09:28 636一、说下Hibernate的工作机制 1.读取并解析 ... -
Java运行时的查找路径
2011-02-18 20:01 2465一直使用IDE或者基于web容器来运行Java程序,所以很少去 ... -
java多线程体验1
2011-02-18 10:32 753多线程估计大家都在用,大家每天面对的操作系统都是多线程的应用, ... -
java序列化声明一个显式的UID
2011-02-18 09:48 1567简单来说,Java的序列化机制是通过在运行时判断类的seria ... -
事务2-解惑 spring 嵌套事务
2011-02-18 09:28 815在所有使用 spring 的应用中, 声明式事务管理可能是使用 ...
相关推荐
本毕业设计题目旨在研究和实现一个基于Java多线程与线程安全机制的断点续传下载工具。随着互联网的普及,文件的下载需求日益增加,而大文件的下载往往需要较长的时间,一旦出现网络中断或意外情况,就需要从头开始...
使用场景及目标:通过学习和运行该项目源码,用户可以更好地了解JAVA网络编程的核心技术和实际应用。在以下场景中可以应用本项目源码: 对于初学者,可以通过运行该项目源码,了解JAVA网络编程的整体流程和核心技术...
第12章 java多线程技术与应用性能优化 12.1 java多线程技术 12.1.1 进程与线程 12.1.2 线程的生命周期 12.2 并行任务与性能 12.2.1 并行任务与多线程 12.2.2 并行任务与死锁 12.3 线程池技术与应用性能优化 12.3.1 ...
在以后的章节中,用通俗易懂的手法,紧密联系实际应用的方式,深入浅出地讲解了多线程,常用Java类,Java中的I/O(输入输出)编程,GUI与Applet,网络编程等方面的知识。 本书许多内容都来源于程序员圈子里的非正式...
JProbe在简单易用的集成化套件中,为servlet、JSP和EJB应用代码提供了强大的Java性能分析、内存纠错、代码覆盖及线程分析功能。 JProbe Profiler JProbe Profiler * JProbe Profiler JProbe Profiler内置了Call ...
与cgi的区别在于servlet处于服务器进程中,它通过多线程方式运行其service方法,一个实例可以服务于多个请求,并且其实例一般不会销毁,而CGI对每个请求都产生新的进程,服务完成后就销毁,所以效率上低于servlet。...
越重要,阻止线程(以及应用程序)取得进展的锁就越多, 锁争用的定期报告:即使是暂时的锁争用也会损害应用程序性能,因此 Free-Lunch 会定期计算并报告整个应用程序的 CSP,以便在锁问题出现时尽快检测, 低开销...
与cgi的区别在于servlet处于服务器进程中,它通过多线程方式运行其service方法,一个实例可以服务于多个请求,并且其实例一般不会销毁,而CGI对每个请求都产生新的进程,服务完成后就销毁,所以效率上低于servlet。...
.NET 2.0 泛型在实际开发中的一次小应用 C#2.0 Singleton 的实现 .Net Framwork 强类型设计实践 通过反射调用類的方法,屬性,字段,索引器(2種方法) ASP.NET: State Server Gems 完整的动态加载/卸载程序集的解决方案 ...
JProbe在简单易用的集成化套件中,为servlet、JSP和EJB应用代码提供了强大的Java性能分析、内存纠错、代码覆盖及线程分析功能。 JProbe Profiler JProbe Profiler * JProbe Profiler JProbe Profiler内置了Call ...
JProbe在简单易用的集成化套件中,为servlet、JSP和EJB应用代码提供了强大的Java性能分析、内存纠错、代码覆盖及线程分析功能。 JProbe Profiler JProbe Profiler * JProbe Profiler JProbe Profiler内置了Call ...
结合实际应用开发需求,以情景分析的方式有针对性地对Android的源代码进行了十分详尽的剖析,深刻揭示Android系统的工作原理 机锋网、51CTO、开源中国社区等专业技术网站一致鼎力推荐 内容简介 《深入理解...
作为一款多方倾力打造的平台,Android具有许多优点:实际应用程序运行速度快;开发限制少,平台开放;程序多任务性能优秀,切换迅速等。当然,它也具有系统细节不完善、电源管理不好、软件的界面不太好、支持的软件...
经常以那些技术只适合大型项目为由,避开或忽略它们,实际中,Java 的接口或抽象类是真正体现 Java 思想的核心所在,这些 你都将在 GoF 的设计模式里领略到它们变幻无穷的魔力。 GoF 的设计模式表面上好象也是一种...
Java是一种被广泛使用的网络编程语言,主要应用于企业、政府部门、电信、银行、手机平台开发等多个领域,是现在大型软件项目开发中的主角,长期占据《TIOBE世界编程语言排行榜》第一的宝座。随着Internet的发展,...
针对系统的具体特点和系统要求,我们在进行数据库方案设计时对数据库平台提出下列 性能方面的要求: 标准化程度高,符合标准ANSI SQL 数据库语言的规范 支持Brower/SERVER 模式应用,支持对称处理和多线程技术 所...
<<page 3>> page begin==================== 14.2 多 态 性 .159 14.3 抽象与密封 .163 14.4 继承中关于属性的一些问题.169 14.5 小 结 .172 第四部分 深入了解 C#.174 第十五章 接 口 .174 15.1 组件...
从35个方面对Jmeter从原理到实际演示,一册在手,天下我有 1.性能测试基本概念 1.1.RT -Response time 请求响应时间 从客户端发出请求到得到响应的整个时间 一般包括网络响应时间+server的响应时间。 用户接受...