论坛首页 Java企业应用论坛

浅谈Spring声明式事务管理ThreadLocal和JDKProxy

浏览 29991 次
该帖已经被评为良好帖
作者 正文
   发表时间:2009-11-12   最后修改:2009-11-25
  我写这篇文章的目的,为了使大家更好的理解和摸清事务的规律,希望对新手学习事务这块内容时有所帮助。

   在我们开发一个应用时,很多时候我们的一个业务操作会对数据库进行多次操作,有时候我们需要保证这么一系列的操作要么全部成功,要么全部失败,其实这个这个概念就是我们今天要谈论的事务。
  
   现在我们开发应用一般都采用三层结构,如果我们控制事务的代码都放在DAO(DataAccessObject)对象中,在DAO对象的每个方法当中去打开事务和关闭事务,当Service对象在调用DAO时,如果只调用一个DAO,那我们这样实现则效果不错,但往往我们的Service会调用一系列的DAO对数据库进行多次操作,那么,这个时候我们就无法控制事务的边界了,因为实际应用当中,我们的Service调用的DAO的个数是不确定的,可根据需求而变化,而且还可能出现Service调用Service的情况,看来手工来控制事务对于一个稍微严谨一点的系统来说完全是不现实的。

   那么现在我们有什么好的解决办法吗?还记得EJB引以为傲的声明式事务吗,虽然它现在已经慢慢没落,但是它的思想被后人所吸取,我们的Spring框架是一个轻量级框架,它同样的实现了声明式事务的支持,使我们能够通过配置及可插拔的方式的完成整个应用的事务的管理。

  
   谈到Sping事务,我们今天要说到的一个东东是ThreadLocal,早在JDK 1.2的版本中就提供java.lang.ThreadLocal,ThreadLocal为解决多线程程序的并发问题提供了一种新的思路。简单的说,ThreadLocal是为每个线程保存一份变量,各个线程访问自己对应的变量,所以我们就可以不使用synchronized关键字同样可以实现线程同步,要了解关于ThreadLocal的详细信息,请参看http://hi.baidu.com/cjjic02/blog/item/1ba41813aabde8886438dbe5.html


为了简单明了,今天我们先抛开AOP,还是先用手工的方式通过ThreadLocal来管理连接,废话不多说,先来看代码
TransactionHelper
package com.hwadee.demo;

import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;

public final class TransactionHelper {
	
	//使用ThreadLocal持有当前线程的数据库连接
	private final static ThreadLocal<Connection> connection_holder = new ThreadLocal<Connection>();
	
	//连接配置,来自connection.properties
	private final static Properties connectionProp = new Properties();
	
	static{		
		//加载配置文件
		InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream("connection.properties");
		try {
			
			connectionProp.load(is);
			is.close();
			//加载驱动程序
			Class.forName(connectionProp.getProperty("driverClassName"));
		} catch (IOException e) {
			 throw new RuntimeException(e.getMessage(),e);
		}catch(ClassNotFoundException e){
			throw new RuntimeException("驱动未找到",e);
		}
	}
	
	//获取当前线程中的数据库连接
	private static Connection getCurrentConnection()
	{
		Connection conn = connection_holder.get();
		if(conn == null){
			conn =  createNotAutoCommitConnection();			
			connection_holder.set(conn);
		}
		return conn;
	}
	
	//执行SQL语句
	public static int executeNonQuery(String sql) throws SQLException{
		
		Connection conn = getCurrentConnection();
		 
		return conn.createStatement().executeUpdate(sql);

	}
	
	//提交事务
	public static void commit(){
		Connection conn = getCurrentConnection();
		try {
			conn.commit();
			conn.close();
			connection_holder.set(null);
		} catch (SQLException e) {
			throw new RuntimeException(e.getMessage(),e);
		}
	}
	
	
	//回滚事务
	public static void rollback(){
		Connection conn = getCurrentConnection();
		try {
			conn.rollback();
			conn.close();
			connection_holder.set(null);
		} catch (SQLException e) {
			throw new RuntimeException(e.getMessage(),e);
		}
	}
	
	//创建一个不自动Commit的数据库连接
	private static Connection createNotAutoCommitConnection() {
		try {
			
			Connection conn = DriverManager.getConnection(connectionProp.getProperty("url")+";databaseName="+ connectionProp.getProperty("databaseName")
					,connectionProp.getProperty("username")
					,connectionProp.getProperty("password"));
			conn.setAutoCommit(false);
			return conn;
		} catch (SQLException e) {
			 throw new RuntimeException(e.getMessage(),e);
		}
	}	
}

这个类实现了基本的连接管理与执行SQL语句的方法,可以在多线程环境下运行


程序入口
package com.hwadee.demo;

import java.sql.SQLException;

public class MainModule {
	
	public static void main(String[] args) {		
		try{
			
			insert1();
			
			insert2();
			
			//方法1和2都无异常,提交事务,任何一个方法出现异常都将导致事务回滚。
			TransactionHelper.commit();
		}catch(SQLException e){			
			TransactionHelper.rollback();
			throw new RuntimeException(e.getMessage(),e);
		}catch(RuntimeException e){			 
			TransactionHelper.rollback();
			throw new RuntimeException(e.getMessage(),e);
		}
	}
	
	
	static void insert1() throws SQLException{		
		String sql = "insert into department values(1,'市场部')";
		
		TransactionHelper.executeNonQuery(sql);		 
	}
	
	static void insert2() throws SQLException{		
		String sql = "insert into department values(2,'研发部')";
		
		TransactionHelper.executeNonQuery(sql);	
		
		//throw new RuntimeException("回滚");		
	}
}



连接字符串配置,请将此文件放入classpath根目录中
connection.properties
url=jdbc:sqlserver://localhost:1433
databaseName=pubs
username=sa
password=password
driverClassName=com.microsoft.sqlserver.jdbc.SQLServerDriver



建表语句
USE [pubs]
go
CREATE TABLE [Department](
	[DEPT_ID] [int] primary key,
	[DEPT_NAME] [varchar](50)
)
GO



   好了现在运行这个应用,可以正常的插入两条数据,接下来,取消insert2方法里面的注释,再运行看看效果。
static void insert2() throws SQLException{		
		String sql = "insert into department values(2,'研发部')";
		
		TransactionHelper.executeNonQuery(sql);	
		
		throw new RuntimeException("回滚");		
	}


很重要的一点是要想实现事务,我们必须用同一个数据库连接执行这些语句,最终才能做到统一的提交和回滚。
我们可以这样假设
insert1和insert2为不同DAO的方法
仔细观察,我们的insert1和insert2并没有负责打开连接和关闭连接。而是间接的调用TransactionHelper.executeNonQuery(sql);
这样使我们执行的所有方法都是使用同一个连接进行数据库操作。

   其实这个例子只是想告诉大家要实现声明式事务的一部分内容,这个例子只能实现简单的单事务模型,要实现更复杂的事务传播模型如嵌套等,还需要我们使用更多的技术,如AOP等等。先写到这里,希望对大家有所帮助!

   感谢大家的支持,应为最近在比较忙,一直没有更新此贴,周末终于有时间可以继续来完成这篇文章了。
   
    前面我们讲到了ThreadLocal管理连接,当然这么一点内容确实和Spring的声明式事务没有多大联系,前面的例子还是由我们自己在管理事务的起点和终点,但大多数时候我们在编写一个业务逻辑时并不能确定事务的边界,而却随着系统越发复杂化,之前的一个事务可能会作为另一个业务逻辑的子事务,那要做到事务原子性,我们就根本没办法在代码里面去写事务控制的逻辑,我们需要一种能够灵活的配置的方式来管理事务,这样,我们只需要在配置文件里面配哪些对象的哪些方法需要事务,而且可以配置事务的传播特性。

这里简要的看看事务传播特性
Spring在TransactionDefinition接口中7种类型的事务传播行为,它们规定了事务方法和事务方法发生嵌套调用时事务如何进行传播:

1.PROPAGATION_REQUIRED
如果当前没有事务,就新建一个事务,如果已经存在一个事务中,加入到这个事务中。这是最常见的选择。

2.PROPAGATION_SUPPORTS
支持当前事务,如果当前没有事务,就以非事务方式执行。

3.PROPAGATION_MANDATORY
使用当前的事务,如果当前没有事务,就抛出异常。

4.PROPAGATION_REQUIRES_NEW
新建事务,如果当前存在事务,把当前事务挂起。

5.PROPAGATION_NOT_SUPPORTED
以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。

6.PROPAGATION_NEVER
以非事务方式执行,如果当前存在事务,则抛出异常。

7.PROPAGATION_NESTED
如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则执行与PROPAGATION_REQUIRED类似的操作。

   我们看到了上面的定义的7中事务传播特性,那么Spring到底是如何来实现事务传播特性呢?Proxy,使用Proxy,我们为所有需要实现事务的对象创建一个代理对象,代理对象能够在目标对象的方法被调用的前后,加入事务判断的逻辑,这样子就可以实现事务传播特性,这就是我们下面要讲的AOP。在Java下实现AOP最常用的两种方案分别为JDK动态代理和CGLib动态代理,它们各有优势。下面简单的对比一下。

JDK动态代理                    |                CGlib动态代理
JDK原生支持                    |                需要第三方库
只能针对接口代理                |               可以针对接口和类进行代理
创建代理的速度快                |             创建代理速度慢
代理对象性能一般                |             代理对象性能很好

  根据上面的这些特性,总结如下,对于Singleton的代理对象或者具有实例池的代理,因为无须频繁创建代理对象,比较适合用CGLib动态代理技术,反之适合用JDK动态代理技术。
顺便要提一点,由于CGLib采用动态创建子类的方式生成代理,所以不能对目标类中的final方法进行代理。

   这里我们来看一个JDK动态代理实现Required事务的例子,代码比较多,需要大家一点点耐心,先来看看代码。

接口 DAO 使用JDK动态代理必须要有接口
package com.hwadee.demo.aop;

public interface DAO {
	void doWork();
}


实现类 DAOImpl1
package com.hwadee.demo.aop;

import java.sql.SQLException;

import com.hwadee.demo.TransactionHelper;

public class DAOImpl1 implements DAO {

	public void doWork() {

		System.out.println(this.getClass().getName() + "." + "doWork  Invoke");

		String sql = "insert into department values(1,'市场部')";

		try {
			TransactionHelper.executeNonQuery(sql);
		} catch (SQLException e) {
			throw new RuntimeException(e.getMessage(), e);
		}
		// 调用dao2
		DAO dao2 = (DAO) BeanFactory.getBean("dao2");
		dao2.doWork();
	}
}


实现类 DAOImpl2
package com.hwadee.demo.aop;

import java.sql.SQLException;

import com.hwadee.demo.TransactionHelper;

public class DAOImpl2 implements DAO {

	public void doWork() {

		System.out.println(this.getClass().getName() + "." + "doWork  Invoke");

		String sql = "insert into department values(2,'研发部')";

		try {
			TransactionHelper.executeNonQuery(sql);
		} catch (SQLException e) {
			throw new RuntimeException(e.getMessage(), e);
		}
		//throw new RuntimeException("回滚");
	}
}


修改过后的 TransactionHelper
package com.hwadee.demo;

import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;

public final class TransactionHelper {

	// 使用ThreadLocal持有当前线程的数据库连接
	private final static ThreadLocal<Connection> connection_holder = new ThreadLocal<Connection>();

	// 当前是否处于事务环境
	private final static ThreadLocal<Boolean> existsTransaction = new ThreadLocal<Boolean>() {
		@Override
		protected Boolean initialValue() {
			return Boolean.FALSE;
		}
	};

	// 是否必须回滚
	private final static ThreadLocal<Boolean> rollbackOnly = new ThreadLocal<Boolean>() {
		@Override
		protected Boolean initialValue() {
			return Boolean.FALSE;
		}
	};

	// 连接配置,来自connection.properties
	private final static Properties connectionProp = new Properties();

	static {
		// 加载配置文件
		InputStream is = Thread.currentThread().getContextClassLoader()
				.getResourceAsStream("connection.properties");
		try {

			connectionProp.load(is);
			is.close();
			// 加载驱动程序
			Class.forName(connectionProp.getProperty("driverClassName"));
		} catch (IOException e) {
			throw new RuntimeException(e.getMessage(), e);
		} catch (ClassNotFoundException e) {
			throw new RuntimeException("驱动未找到", e);
		}
	}

	/**
	 * 是否必须回滚
	 */
	public static boolean isRollbackOnly() {
		return rollbackOnly.get();
	}

	/**
	 * 设置当前事务环境的回滚状态
	 */
	public static void setRollbackOnly(boolean flag) {
		rollbackOnly.set(flag);
	}

	/**
	 * 当前是否存在事务
	 */
	public static boolean existsTransaction() {
		return existsTransaction.get();
	}

	// 设置当前事务环境
	private static void setExistsTransaction(boolean flag) {
		existsTransaction.set(flag);
	}

	/**
	 * 开始一个事务
	 */
	public static void beginTransaction() {
		Connection conn = createNotAutoCommitConnection();
		connection_holder.set(conn);
		setExistsTransaction(Boolean.TRUE);
	}

	// 获取当前线程中的数据库连接
	private static Connection getCurrentConnection() {
		return connection_holder.get();
	}

	// 执行SQL语句
	public static int executeNonQuery(String sql) throws SQLException {

		Connection conn = getCurrentConnection();

		return conn.createStatement().executeUpdate(sql);

	}

	/**
	 * 提交事务
	 */
	public static void commit() {
		Connection conn = getCurrentConnection();
		try {
			conn.commit();
			conn.close();
			connection_holder.set(null);
			setExistsTransaction(Boolean.FALSE);
		} catch (SQLException e) {
			throw new RuntimeException(e.getMessage(), e);
		}
	}

	/**
	 * 回滚事务
	 */
	public static void rollback() {
		Connection conn = getCurrentConnection();
		try {
			conn.rollback();
			conn.close();
			connection_holder.set(null);
			setExistsTransaction(Boolean.FALSE);
		} catch (SQLException e) {
			throw new RuntimeException(e.getMessage(), e);
		}
	}

	// 创建一个不自动Commit的数据库连接
	private static Connection createNotAutoCommitConnection() {
		try {

			Connection conn = DriverManager.getConnection(connectionProp
					.getProperty("url")
					+ ";databaseName="
					+ connectionProp.getProperty("databaseName"),
					connectionProp.getProperty("username"), connectionProp
							.getProperty("password"));
			conn.setAutoCommit(false);
			return conn;
		} catch (SQLException e) {
			throw new RuntimeException(e.getMessage(), e);
		}
	}

}



RequiredTransactionInterceptor 事务拦截器
package com.hwadee.demo.aop;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

import com.hwadee.demo.TransactionHelper;

/**
 * 事务拦截器 代理对象执行接口的任意方法都会被拦截 对方法的调用交由本类的 invoke 方法处理
 */
public class RequiredTransactionInterceptor implements InvocationHandler {

	// 目标对象
	private Object target;

	// 在构造方法中传入目标对象
	public RequiredTransactionInterceptor(Object target) {
		this.target = target;
	}

	/**
	 * 在代理对象调用接口方法时的请求会被此方法拦截
	 * 
	 * @param proxy
	 *            代理对象
	 * @param method
	 *            目标对象当前调用的方法
	 * @param args
	 *            调用此方法时传递的参数
	 */
	public Object invoke(Object proxy, Method method, Object[] args)
			throws Throwable {

		// 在目标方法被调用前织入的逻辑,此处以Required传播属性为例
		// 判断当前的事务环境,是开始一个新事务还是加入已有的事务

		boolean existsTransaction = TransactionHelper.existsTransaction();

		if (existsTransaction == false) {
			TransactionHelper.beginTransaction();
			System.out.println("当前事务环境还没有事务,开启一个新事务");
		} else {
			System.out.println("当前事务环境已存在事务,加入事务");
		}

		// 目标方法的返回值
		Object result = null;

		// 此处才真正调用目标对象的方法
		try {
			result = method.invoke(target, args);
		} catch (InvocationTargetException e) {
			// 捕获调用目标异常,如果目标异常是运行时异常则设置回滚标志
			Throwable cause = e.getCause();
			if (cause instanceof RuntimeException) {
				TransactionHelper.setRollbackOnly(true);
				System.out.println("出现运行时异常,事务环境被设置为必须回滚");
			} else {
				System.out.println("出现非运行时异常,忽略");
			}
		}

		// 在目标方法被调用后织入的逻辑
		System.out.println("判断当前的事务环境,是应该提交事务还是回滚事务");
		if (existsTransaction == false
				&& TransactionHelper.isRollbackOnly() == false) {
			TransactionHelper.commit();
			System.out.println("事务已提交");
		} else if (existsTransaction == false
				&& TransactionHelper.isRollbackOnly() == true) {
			TransactionHelper.rollback();
			System.out.println("事务已回滚");
		} else if (existsTransaction == true) {
			System.out.println("子事务忽略事务提交或回滚");
		}

		System.out.println("=============================");

		return result;
	}
}



BeanFactory  Bean工厂,负责创建代理对象
package com.hwadee.demo.aop;

import java.lang.reflect.Proxy;
import java.util.HashMap;
import java.util.Map;

/**
 * 模拟Spring BeanFactory 最简化的实现
 * 
 * 默认会创建两个Bean dao1 dao2
 * 
 * 它们都是经过了代理过后的对象
 */
public class BeanFactory {

	// Bean容器
	private final static Map<String, Object> beanContainer = new HashMap<String, Object>();

	// 初始化创建两个代理对象
	static {
		DAO dao1 = new DAOImpl1();
		Object dao1Proxy = createTransactionProxy(dao1);
		beanContainer.put("dao1", dao1Proxy);

		DAO dao2 = new DAOImpl2();
		Object dao2Proxy = createTransactionProxy(dao2);
		beanContainer.put("dao2", dao2Proxy);
	}

	// 创建代理对象
	private static Object createTransactionProxy(Object target) {
		// 使用 Proxy.newProxyInstance 方法创建一个代理对象
		Object proxy = Proxy.newProxyInstance(target.getClass()
				.getClassLoader(), target.getClass().getInterfaces(),
				new RequiredTransactionInterceptor(target));
		return proxy;
	}

	// 获取Bean
	public static Object getBean(String id) {
		return beanContainer.get(id);
	}
}


MainModule  程序入口
package com.hwadee.demo.aop;

public class MainModule {

	public static void main(String[] args) {
		DAO dao1 = (DAO) BeanFactory.getBean("dao1");
		// 调用dao1,doa1的doWork方法内又调用了dao2
		dao1.doWork();		
	}
}


   好了,现在我们可以运行一下这个应用,记得先把数据库里已有的记录清空。
看看控制台输出的内容
当前事务环境还没有事务,开启一个新事务
com.hwadee.demo.aop.DAOImpl1.doWork  Invoke
当前事务环境已存在事务,加入事务
com.hwadee.demo.aop.DAOImpl2.doWork  Invoke
判断当前的事务环境,是应该提交事务还是回滚事务
子事务忽略事务提交或回滚
=============================
判断当前的事务环境,是应该提交事务还是回滚事务
事务已提交
=============================


接下来,把DAOImpl2中这段代码的注释取消掉,再次执行此应用,记得先清空数据库数据
//throw new RuntimeException("回滚");


再来看控制台输出的内容
当前事务环境还没有事务,开启一个新事务
com.hwadee.demo.aop.DAOImpl1.doWork  Invoke
当前事务环境已存在事务,加入事务
com.hwadee.demo.aop.DAOImpl2.doWork  Invoke
出现运行时异常,事务环境被设置为必须回滚
判断当前的事务环境,是应该提交事务还是回滚事务
子事务忽略事务提交或回滚
=============================
判断当前的事务环境,是应该提交事务还是回滚事务
事务已回滚
=============================


  朋友们,这不就是Required事务传播模型吗,离Spring的声明式事务已经不远了,附上最新的代码。欢迎拍砖!待续!
  • Demo.rar (5.3 KB)
  • 描述: 附源代码
  • 下载次数: 220
   发表时间:2009-11-12  
多线程下不行吧,每个线程都有自己的connection,如何控制事务的原子化?EJB的事务是JTA,不是这么道理吧?
0 请登录后投票
   发表时间:2009-11-12  
fralepg 写道
多线程下不行吧,每个线程都有自己的connection,如何控制事务的原子化?EJB的事务是JTA,不是这么道理吧?


多线程下ThreadLocal可行。
你说的是非本地线程及RPC的情况,JTA采用两段式提交解决事务问题
0 请登录后投票
   发表时间:2009-11-12  
fralepg 写道
多线程下不行吧,每个线程都有自己的connection,如何控制事务的原子化?EJB的事务是JTA,不是这么道理吧?


本贴并未讨论分布式事务!谢谢。

Tomcat等容器是基于多线程来进行响应的,所以,在Web应用中我们需要考虑线程同步的问题。据我分析Spring的源码及参考一些相关的书籍,为什么我们在使用Spring框架以后,我们的DAO可以是单例,(前提是我们的DAO必须继承至Spring的DaoSupport),是由于在其内部使用Threadlocal持有连接(JDBC为Connection,Hibernate为Session等等)及事务相关信息,这样可以避免线程间资源的冲突,使我们不用关心连接的打开与关闭的同时使DAO可以单例的模式呈现。

欢迎大家共同讨论!
0 请登录后投票
   发表时间:2009-11-12  
linliangyi2007 写道
fralepg 写道
多线程下不行吧,每个线程都有自己的connection,如何控制事务的原子化?EJB的事务是JTA,不是这么道理吧?


多线程下ThreadLocal可行。
你说的是非本地线程及RPC的情况,JTA采用两段式提交解决事务问题


这位仁兄是个老鸟!见解略同!
0 请登录后投票
   发表时间:2009-11-12  
fralepg 写道
多线程下不行吧,每个线程都有自己的connection,如何控制事务的原子化?EJB的事务是JTA,不是这么道理吧?

 

 LZ的事务管理没有问题,对于Connection、Session这样的资源ThreadLocal解决是比较好的,不仅解决了DAO的单例问题,而且还解决了事务传播的问题。而且在非分布式环境下,使用ThreadLocal管理事务往往比JTA管理事务性能上更好,况且大多数企业级应用往往不涉及分布式环境。

1 请登录后投票
   发表时间:2009-11-13  
这个级别的不能算良好贴吧,ThreadLocal已经广泛应用了
0 请登录后投票
   发表时间:2009-11-13  
我对这句话有点疑问,求lz解释下“ //方法1和2都无异常,提交事务,任何一个方法出现异常都将导致事务回滚。"

既然是 1、2两个方法,那就是掉了两次ThreadLocal 为什么说只要一个不成功就回滚?难道ThreadLocal 做了什么机制?请解释。
 
0 请登录后投票
   发表时间:2009-11-13  
xqh1022 写道
我对这句话有点疑问,求lz解释下“ //方法1和2都无异常,提交事务,任何一个方法出现异常都将导致事务回滚。"

既然是 1、2两个方法,那就是掉了两次ThreadLocal 为什么说只要一个不成功就回滚?难道ThreadLocal 做了什么机制?请解释。
 


很重要的一点是要想实现事务,我们必须用同一个数据库连接执行这些语句,最终才能做到统一的提交和回滚。

我们可以这样假设
insert1和insert2为不同DAO的方法
仔细观察,我们的insert1和insert2并没有负责打开连接和关闭连接。而是间接的调用TransactionHelper.executeNonQuery(sql);
这样使我们执行的所有方法都是使用同一个连接进行数据库操作。

仔细去看看TransactionHelper.getCurrentConnection()方法,如果你还不懂的话,证明你对ThreadLocal的作用还没有理解。
0 请登录后投票
   发表时间:2009-11-13  
楼主说的内容没错,但是开始一直再说声明式事务,后来话锋一转提到ThreadLocal,又没有解释两者之间的关系,很容易让别人觉得迷惑的。
0 请登录后投票
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics