`

Spring回顾之八 —— Quartz在集群、分布式系统中的应用

阅读更多
    在Quartz的使用中,简单的任务调度,我们直接在配置文件中进行配置就可以实现,如果需要再复杂点的,我们可以将任务执行信息在数据库中进行管理,然后对任务实现动态的更新,这些上一篇做了基本的介绍,当然这些应用都是基于单节点服务的。然而单节点应用是不能满足典型的企业需求的,假如你需要故障转移的能力并需要运行日益增多的任务调度,必须考虑Quartz集群的问题。使用Quartz集群的应用可以更好的支持更加丰富的业务需求,即使是其中某些机器服务崩溃了也能保证整体系统的正常运行。

    在Quartz集群中,每一个节点是一个独立的应用,它同时又负责管理着其他的节点,每个节点的启动或停止是相互独立的行为,他们之间没有任何通信。那他们是怎么来和其他节点一起协调工作的呢?核心:数据库,Quartz应用节点是通过数据库来和另一节点的进行协作的,接下来先看看Quartz应用如何在数据库中体现。

第一步:Quartz储存方式的尝试
    由于Quartz集群依赖于数据库,所以必须创建Quartz集群所需的数据库表,Quartz提供了几乎所有的数据库支持,并给出了现成的SQL建表脚本,这个我们可以去Quartz官网直接下载,当前可以访问 http://www.quartz-scheduler.org/downloads/ 链接,我们可以看到目前最新版的是 quartz-2.2.3
   

    然后直接下载解压,我们可以得到一整套使用Quartz所需的jar包、示例以及各种文档,我们在docs/dbTables的路径下可以看到如下脚本,由于我们使用的是MySQL数据库进行测试使用,使用 tables_mysql.sql 文件即可
   

    考虑到测试的独立性,我们新建一个名为quartz的数据库,然后将上边的脚本内容跑一下,可以看到生成了一组数据表
   

    接下来,我们需要提供一下Quartz的配置文件,即quartz.properties,代码如下
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount = 3

org.quartz.jobStore.tablePrefix = QRTZ_
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.dataSource = quartzDS

org.quartz.dataSource.quartzDS.driver = com.mysql.cj.jdbc.Driver
org.quartz.dataSource.quartzDS.URL = jdbc:mysql://localhost:3306/quartz?useUnicode=true&serverTimezone=UTC&characterEncoding=utf-8
org.quartz.dataSource.quartzDS.user = root
org.quartz.dataSource.quartzDS.password = 123456
org.quartz.dataSource.quartzDS.maxConnections = 5


    接着定义一个实现Job接口的任务类
package test.demo.job.store;

import org.apache.log4j.Logger;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

public class SampleStoreJob implements Job{
	
	private Logger logger = Logger.getLogger(SampleStoreJob.class); 

	@Override
	public void execute(JobExecutionContext context) throws JobExecutionException {
		logger.info("SampleStoreJob===========execute()");
	}

}

    然后我们需要写一个主调程序来进行任务调度的实现,代码如下
package test.demo.job.store;

import org.quartz.CronScheduleBuilder;
import org.quartz.CronTrigger;
import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerFactory;
import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;

public class SampleStoreQuartz {

	public void run() throws Exception{
		//使用SchedulerFactory创建一个Scheduler
		SchedulerFactory schedulerFactory = new StdSchedulerFactory();
		Scheduler scheduler = schedulerFactory.getScheduler();
		scheduler.clear(); //测试用,避免因为调度存在报错,可以在job未delete的情况下删掉看下效果

		//定义一个具体的Job
		JobDetail jobDetail = JobBuilder.newJob(SampleStoreJob.class).withIdentity("sampleStoreJob", "sampleJobGroup").build();
		//定义一个具体的Trigger
		CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule("0/5 * * * * ?");//具体的执行时间定义
		CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity("sampleStoreTrigger", "sampleTriggerGroup").withSchedule(scheduleBuilder).build();
		//将Job和Trigger绑定至Scheduler
		scheduler.scheduleJob(jobDetail, trigger);
		scheduler.start();//启动运行
		
		Thread.sleep(10*1000);//情节需要,10秒钟
		//定义一个JobKey,用来做删除Job测试
		JobKey jobKey = JobKey.jobKey("sampleStoreJob", "sampleJobGroup");
		scheduler.deleteJob(jobKey);
		
		scheduler.shutdown();//关闭Scheduler
	}
	
	public static void main(String[] args) throws Exception{
		SampleStoreQuartz sampleStoreQuartz = new SampleStoreQuartz();
		sampleStoreQuartz.run();
	}
}


    这里面我们要注意几个点,Quartz在Scheduler的创建过程中会自己去读取加载quartz.properties中的相关信息,我们得确保配置文件信息是准确可用的。还要注意下程序中的注释信息,基本上涵盖了测试运行时的相关问题。
    一切安排妥当,运行程序,可以看到输出栏打印出如下信息
   

    我们可以看到任务运行的效果符合预期。在程序运行过程中,我们可以看到数据库的qrtz_triggers表中添加了一条信息
   

    qrtz_job_details表中添加了如下信息
   

    qrtz_cron_triggers表中添加了如下信息
   

    这些信息将会在程序运行完被删掉,不方便看可以将程序中的 Thread.sleep(10*1000)调大或者将 scheduler.deleteJob(jobKey)注释掉即可。
    通过测试,我们可以看到Quartz当前运行的调度信息都体现在数据库里,如果做好相关配置,多个Quartz节点都围绕这个库进行运行,就可以实现集群了。接下来我们试一下Quartz同Spring结合,实现集群功能。

第二步:Quartz与Spring一起整合实现集群
    上一步做Quartz储存方式实践的时候,我们已经创建好了相关数据库和表,这里我们直接修改下原来的jdbc.properties文件,如下
#MySQL驱动
jdbc.driver=com.mysql.cj.jdbc.Driver
#在使用连接mysql的jdbc驱动最新版时,会遇到数据库和系统时区差异引起的问题,这时候有两种解决方案,一种是降版本,这个我们知道就行了,适可而行,还有一种是在jdbc连接的url后面加上serverTimezone=UTC或GMT即可,如果需要使用gmt+8时区,需要写成GMT%2B8,否则可能会被解析为空。
jdbc.url=jdbc:mysql://localhost:3306/quartz?useUnicode=true&serverTimezone=UTC&characterEncoding=utf-8
jdbc.username=root
jdbc.password=123456
#初始连接数
jdbc.initialSize=0
#定义最大连接数
jdbc.maxActive=20
#最大空闲
jdbc.maxIdle=20
#最小空闲
jdbc.minIdle=1
#最长等待时间
jdbc.maxWait=60000

    接下来我们在原有的Spring配置文件applicationContext.xml上进行修改,代码如下
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:p="http://www.springframework.org/schema/p" 
    xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd 
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.3.xsd 
    http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.3.xsd ">

    <!-- 加载配置文件 -->
    <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="location" value="classpath:jdbc.properties" />
    </bean>
    <!-- ========================= ORM BEGIN  ========================= -->
    <!-- 数据源配置 -->
    <bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource">
        <property name="driverClassName" value="${jdbc.driver}" />
        <property name="url" value="${jdbc.url}" />
        <property name="username" value="${jdbc.username}" />
        <property name="password" value="${jdbc.password}" />
        <!-- 初始化连接大小 -->
        <property name="initialSize" value="${jdbc.initialSize}"></property>
        <!-- 连接池最大数量 -->
        <property name="maxActive" value="${jdbc.maxActive}"></property>
        <!-- 连接池最大空闲 -->
        <property name="maxIdle" value="${jdbc.maxIdle}"></property>
        <!-- 连接池最小空闲 -->
        <property name="minIdle" value="${jdbc.minIdle}"></property>
        <!-- 获取连接最大等待时间 -->
        <property name="maxWait" value="${jdbc.maxWait}"></property>
    </bean> 
    <!-- spring和MyBatis完美整合,不需要mybatis的配置映射文件,mapperLocations的设置将会自动扫描MyBatis的xml文件-->  
    <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">  
        <property name="dataSource" ref="dataSource"/>
        <property name="mapperLocations" value="classpath*:test/demo/mapper/*Mapper.xml"/>
    </bean>
    <!-- DAO接口所在包名,Spring会自动寻找其路径下的接口 -->  
    <bean id="demoDaoFactory" class="org.mybatis.spring.mapper.MapperScannerConfigurer">  
        <property name="basePackage" value="test.demo.dao" />  
        <property name="sqlSessionFactoryBeanName" value="sqlSessionFactory"></property>  
    </bean>
    <!-- 数据事务管理 ( Spring允许允许 MyBatis参与到事务管理中,所以MyBatis没有特定的事务管理器,直接利用了Spring中的 DataSourceTransactionManager) -->  
    <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">  
        <property name="dataSource" ref="dataSource"/>
    </bean> 
    <!-- ========================= ORM END  ========================= -->
    <!-- ========================= Quartz BEGIN  ========================= -->
    <bean id="storeJobDetail" class="org.springframework.scheduling.quartz.JobDetailFactoryBean">
    	<property name="jobClass" value="test.demo.job.SimpleExtendsJob"/>
        <property name="durability" value="true" />
        <property name="requestsRecovery" value="true" />
    </bean>
    <bean id="storeTrigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
        <property name="jobDetail" ref="storeJobDetail" />
        <property name="cronExpression" value="0/5 * * * * ?" />
    </bean>
    <bean name="schedulerFactoryBean" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
        <property name="dataSource" ref="dataSource"/>
        <property name="applicationContextSchedulerContextKey" value="applicationContextKey" />
        <property name="configLocation" value="classpath:quartz.properties" />
        <property name="triggers">
            <list>
                <ref bean="storeTrigger" />
            </list>
        </property>
    </bean>
	<!-- ========================= Quartz END  ========================= -->
	 
</beans> 

    其中需要注意的是storeJobDetail的requestsRecovery属性值必须为true,当Quartz服务被中止后,再次启动或其他节点将会恢复执行之前未完成的所有任务。这里我们用的是连接池的方式来做的数据源配置,具体任务用的是原来的SimpleExtendsJob类,然后还需要重新配置下quartz.properties文件
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount = 3

org.quartz.jobStore.tablePrefix = QRTZ_
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.dataSource = quartzDS

org.quartz.dataSource.quartzDS.driver = com.mysql.cj.jdbc.Driver
org.quartz.dataSource.quartzDS.URL = jdbc:mysql://localhost:3306/quartz?useUnicode=true&serverTimezone=UTC&characterEncoding=utf-8
org.quartz.dataSource.quartzDS.user = root
org.quartz.dataSource.quartzDS.password = 123456
org.quartz.dataSource.quartzDS.maxConnections = 5

    这里比较重要的是org.quartz.jobStore.isClustered属性要配置为true,表明Scheduler实例要它参与到一个集群当中。然后打包部署,启动服务器,我们可以在输出栏看到如下内容
   

    这说明在当前节点,Quartz已经是正常运行了,我们可以去看下数据库表中发生的变化,然后设置不同的tomcat服务端口启动,停掉其中的一个看下效果。

第三步:总结
    集群通常有两种方式:节点在同一台机器上的称为垂直集群,垂直集群依赖于机器本身,机器崩溃了集群本身也就没意义了;节点放在不同的机器上的称为水平集群,水平集群可以避免单点故障的问题,但要注意个节点之间的机器时钟要保持同步,Quartz会在时钟不同步时出现运行异常,这个使用过程中一定要避免。关于集群时钟问题,比较简单的方式是使用Internet 时间服务器(Internet Time Server ITS)来解决。
    最后我们在附件中添加了quartz-2.2.3的压缩包,需要可以直接下载。





  • 大小: 7.9 KB
  • 大小: 3.7 KB
  • 大小: 59.1 KB
  • 大小: 8.7 KB
  • 大小: 13.9 KB
  • 大小: 13.4 KB
  • 大小: 11.8 KB
  • 大小: 3.4 KB
0
2
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics