`
envy2002
  • 浏览: 149520 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

分布式事务2

 
阅读更多

   1.jar包,应该用带有二步提交事务的驱动,mysql-connector-java-5.1.6-bin.jar

 

 

 

import javax.transaction.xa.Xid;


class MyXid implements Xid{
    int formatId;
    byte globalTransactionId[];
    byte branchQualifier[];
    public MyXid(){
    }
    public MyXid(int formatId,byte[] globalTransactionId,byte[] branchQualifier){
        this.formatId = formatId;
        this.globalTransactionId = globalTransactionId;
        this.branchQualifier = branchQualifier;
    }
    public int getFormatId() {
        return this.formatId;
    }
    public void setFormatId(int formatId){
        this.formatId = formatId;
    }
    public byte[] getGlobalTransactionId() {
        return this.globalTransactionId;
    }
    public void setGlobalTransactionId(byte[] globalTransactionId){
        this.globalTransactionId = globalTransactionId;
    }
    public byte[] getBranchQualifier() {
        return this.branchQualifier;
    }
    public void setBranchQualifier(byte[] branchQualifier){
        this.branchQualifier = branchQualifier;
    }
}

 

package testDistributeTransaction;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;

import javax.sql.XAConnection;
import javax.sql.XADataSource;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;



import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource;


public class JTATest {  
	 
    public static void main(String[] args) { 
        XADataSource xaDs1=JTATest.getDataSource("jdbc:mysql://127.0.0.1:3306/test", "root", "root");  
        XAConnection xaCon1=null;  
        XAResource xaRes1 = null;
        Connection conn1=null;
        Statement stmt1 =null;
       
        XADataSource xaDs2=JTATest.getDataSource("jdbc:mysql://192.168.9.106:3306/test", "web", "web");   
        XAConnection xaCon2= null;  
        XAResource xaRes2 = null;
        Connection conn2 = null;
        Statement stmt2 = null;
          
        int ret1=0;
        int ret2=0;  
          
        Xid xid1=new MyXid(100, new byte[]{0x01}, new byte[]{0x02});  
        Xid xid2=new MyXid(100, new byte[]{0x02}, new byte[]{0x03});
        try {             
            xaCon1 = JTATest.getXAConnetion(xaDs1);
            conn1= JTATest.getConnection(xaCon1);
            stmt1=conn1.createStatement();
            xaRes1=xaCon1.getXAResource();
           
            xaCon2 = JTATest.getXAConnetion(xaDs2);
            conn2= JTATest.getConnection(xaCon2);
            stmt2=conn2.createStatement();
            xaRes2=xaCon2.getXAResource();
              
            xaRes1.start(xid1, XAResource.TMNOFLAGS);
            stmt1.execute("insert into user values(1,\"bard\")");
            xaRes1.end(xid1, XAResource.TMSUCCESS); 
           
            xaRes2.start(xid2, XAResource.TMNOFLAGS);
            stmt2.execute("insert into debt values(\"bard\",33)");
            xaRes2.end(xid2, XAResource.TMSUCCESS);
               
            //parepre
            ret1=xaRes1.prepare(xid1);
            ret2=xaRes2.prepare(xid2);
           
            if (ret1 == XAResource.XA_OK&&ret2==XAResource.XA_OK) {   
             xaRes1.commit(xid1, false);
             //在这里,如果断点住,然后关闭数据库2,后跑出xa异常
             xaRes2.commit(xid2, false);
            }else {
                xaRes1.rollback(xid1);
                xaRes2.rollback(xid2);
   }
        } catch (SQLException e) {  
            // TODO Auto-generated catch block  
            e.printStackTrace();  
        } catch (XAException e) {  
            // TODO Auto-generated catch block  
            e.printStackTrace();  
            //即使这里,xaRes1. xaRes2都rollback,xaRes1还是提交了。
            //所以两步提交法也是有缺陷的。
        } finally{
         try {
          conn1.close();
          conn2.close();
          stmt1.close();
          stmt2.close();
          xaCon1.close();
          xaCon2.close();
   } catch (SQLException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   }
        }
    }  
    private static XADataSource getDataSource(String url,String user,String password) {  
        // TODO Auto-generated method stub  
        MysqlXADataSource dataSource = new MysqlXADataSource();  
        dataSource.setUrl(url);  
        dataSource.setUser(user);  
        dataSource.setPassword(password);  
        return dataSource;  
    }
      
 
    public static XAConnection getXAConnetion(XADataSource dataSource) {  
        XAConnection XAConn = null;  
        try {  
            XAConn = dataSource.getXAConnection();  
        } catch (SQLException e) {  
            e.printStackTrace();  
        }  
        return XAConn;
    }  
 
    public static Connection getConnection(XAConnection XAConn) {  
        Connection conn = null;  
        try {  
            conn = XAConn.getConnection();  
        } catch (SQLException e) {  
            e.printStackTrace();  
        }  
        return conn;  
    }  
} 

 

 三个概念:   网络通讯的危险期、一阶段提交(1PC)和两阶段提交(2PC)   网络通讯的危险期

   由于网络通讯故障随时可能发生,任何发出请求后等待回应的程序都会有失去联系的危险。这种危险发生在发出请求之后,服务器返回应答之前,如果在这个期间网 络通讯发生故障,发出请求一方无法收到回应,于是无法判断服务器是否已经成功地处理请求,因为收不到回应可能是请求没有成功地发送到服务器,也可能是服务 器处理完成后的回应无法传回请求方。这段时间称为网络通讯的危险期(In-doubt Time)。

   一阶段提交(1PC One Phase Commit)   一 阶段提交就是事务处理器向数据库服务器发出提交请求,然后等待数据库服务器的回应,收到回应后完成事务的提交,或者服务器返回提交失败的结果就回撤事务。 危险期从发出请求开始,到收到回应结束,这段时间中数据库完成数据的修改、日志记录等处理,处理越复杂,危险期就越长。    两阶段提交(2PC Two Phase Commit)

    两阶段提交把事务提交分成两个阶段:

  • 第一阶段,事务处理器向数据库服务器发出"准备提交"请求,数据库收到请求后执行相同的数据修改和日志记录等处理,不同的是处理完成后只是把事务的状态改成"可以提交",然后把结果返回给事务处理器。
  • 事务处理器收到回应后进入第二阶段,如果在第一阶段内的危险期中发生了故障,事务处理器收不到回应,则认为事务失败,回撤事务。数据库服务器收不到第二阶段的确认提交请求,把"可以提交"的事务回撤.
  • 两阶段的第二阶段中事务处理器向数据库服务器发出"确认提交"请求,数据库服务器把事务的"可以提交"状态改为"提交完成"状态,然后返回应答。

   从严格意义上说,两阶段提交并没有完全解决网络通讯危险期的问题,但因为第二阶段的处理很简单,只是修改了事务的状态,与第一阶段相比其处理时间极短,所以危险期极短,发生事务提交故障的可能性几乎不存在。 

 

所谓两阶段提交, 即Two Phase Commit (2PC), 是分布式事务采用的一种处理方式.XA基础

   在谈到XA规范之前,必须首先了解分布式事务处理(Distributed Transaction Processing,DTP)的概念。Transaction,即事务,又称之为交易,指一个程序或程序段,在一个或多个资源如数据库或文件上为完成某些功能的执行过程的集合。

   分布式事务处理是指一个事务可能涉及多个数据库操作,分布式事务处理的关键是必须有一种方法可以知道事务在任何地方所做的所有动作,提交或回滚事务的决定必须产生统一的结果(全部提交或全部回滚)。

   X/Open组织(即现在的Open Group)定义了分布式事务处理模型。X/Open DTP模型(1994)包括应用程序(AP)、事务管理器(TM)、资源管理器(RM)、通信资源管理器(CRM)四部分。

   一般,常见的事务管理器(TM)是交易中间件,常见的资源管理器(RM)是数据库,常见的通信资源管理器(CRM)是消息中间件。为表述方便起见,在本文中直接以其常见表现形式进行描述。 

   通常把一个数据库内部的事务处理,如对多个表的操作,作为本地事务看待。数据库的事务处理对象是本地事务,而分布式事务处理的对象是全局事务。

   所谓全局事务,是指分布式事务处理环境中,多个数据库可能需要共同完成一个工作,这个工作即是一个全局事务.

   例如,一个事务中可能更新几个不同的数据库。对数据库的操作发生在系统的各处但必须全部被提交或回滚。此时一个数据库对自己内部所做操作的提交不仅依赖本 身操作是否成功,还要依赖与全局事务相关的其它数据库的操作是否成功,如果任一数据库的任一操作失败,则参与此事务的所有数据库所做的所有操作都必须回 滚。

   一般情况下,某一数据库无法知道其它数据库在做什么,因此,在一个DTP环境中,交易中间件是必需的,由它通知和协调相关数据库的提交或回滚。而一个数据库只将其自己所做的操作(可恢复)影射到全局事务中。

   XA就是X/Open DTP定义的交易中间件与数据库之间的接口规范(即接口函数),交易中间件用它来通知数据库事务的开始、结束以及提交、回滚等。XA接口函数由数据库厂商提供。

   通常情况下,交易中间件与数据库通过XA 接口规范,使用两阶段提交来完成一个全局事务,XA规范的基础是两阶段提交协议。

    在第一阶段,交易中间件请求所有相关数据库准备提交(预提交)各自的事务分支,以确认是否所有相关数据库都可以提交各自的事务分支。

    当某一数据库收到预提交后,如果可以提交属于自己的事务分支,则将自己在该事务分支中所做的操作固定记录下来,并给交易中间件一个同意提交的应答,此时数 据库将不能再在该事务分支中加入任何操作,但此时数据库并没有真正提交该事务,数据库对共享资源的操作还未释放(处于上锁状态)。

   如果由于某种原因数据库无法提交属于自己的事务分支,它将回滚自己的所有操作,释放对共享资源上的锁,并返回给交易中间件失败应答。

  在第二阶段,交易中间件审查所有数据库返回的预提交结果,如所有数据库都可以提交,交易中间件将要求所有数据库做正式提交,这样该全局事务被提交。而如果有任一数据库预提交返回失败,交易中间件将要求所有其它数据库回滚其操作,这样该全局事务被回滚。

----------------------------------

from: http://rdc.taobao.com/blog/cs/?p=1183

 

在分布式系统中,事务往往包含有多个参与者的活动,单个参与者上的活动是能够保证原子性的,而多个参与者之间原子性的保证则需要通过两阶段提交来实现,两阶段提交是分布式事务实现的关键。

很明显,两阶段提交保证了分布式事务的原子性,这些子事务要么都做,要么都不做。而数据库的一致性是由数据库的完整性约束实现的,持久性则是通过commit日志来实现的,不是由两阶段提交来保证的。至于两阶段提交如何保证隔离性,可以参考Large-scale Incremental Processing Using Distributed Transactions and Notifications中两阶段提交的具体实现。

两阶段提交的过程涉及到协调者和参与者。协调者可以看做成事务的发起者,同时也是事务的一个参与者。对于一个分布式事务来说,一个事务是涉及到多个参与者的。具体的两阶段提交的过程如下:

第一阶段:

首先,协调者在自身节点的日志中写入一条的日志记录,然后所有参与者发送消息prepare T,询问这些参与者(包括自身),是否能够提交这个事务;

参与者在接受到这个prepare T 消息以后,会根据自身的情况,进行事务的预处理,如果参与者能够提交该事务,则会将日志写入磁盘,并返回给协调者一个ready T信息,同时自身进入预提交状态状态;如果不能提交该事务,则记录日志,并返回一个not commit T信息给协调者,同时撤销在自身上所做的数据库改;

参与者能够推迟发送响应的时间,但最终还是需要发送的。

第二阶段:

协调者会收集所有参与者的意见,如果收到参与者发来的not commit T信息,则标识着该事务不能提交,协调者会将Abort T 记录到日志中,并向所有参与者发送一个Abort T 信息,让所有参与者撤销在自身上所有的预操作;

如果协调者收到所有参与者发来prepare T信息,那么协调者会将Commit T日志写入磁盘,并向所有参与者发送一个Commit T信息,提交该事务。若协调者迟迟未收到某个参与者发来的信息,则认为该参与者发送了一个VOTE_ABORT信息,从而取消该事务的执行。

参与者接收到协调者发来的Abort T信息以后,参与者会终止提交,并将Abort T 记录到日志中;如果参与者收到的是Commit T信息,则会将事务进行提交,并写入记录

一般情况下,两阶段提交机制都能较好的运行,当在事务进行过程中,有参与者宕机时,他重启以后,可以通过询问其他参与者或者协调者,从而知道这个事务到底提交了没有。当然,这一切的前提都是各个参与者在进行每一步操作时,都会事先写入日志。

唯一一个两阶段提交不能解决的困境是:当协调者在发出commit T消息后宕机了,而唯一收到这条命令的一个参与者也宕机了,这个时候这个事务就处于一个未知的状态,没有人知道这个事务到底是提交了还是未提交,从而需要数据库管理员的介入,防止数据库进入一个不一致的状态。当然,如果有一个前提是:所有节点或者网络的异常最终都会恢复,那么这个问题就不存在了,协调者和参与者最终会重启,其他节点也最终也会收到commit T的信息。

 

        看来分布式事务,不是这样搞的,架构只有有个参与者,协调者,也就是有个中间件才OK了。这个没有试过,不知有没有这样的素材可以试验。

           最后看一下二步提交法解决的场景:还是以银行跨行转账。A数据库和B数据库事务预演能成功的时候,随后一段极小极小的窗口期,才是commit。这个commit已经是能保证的,所以二步提交法是可以用的。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics