`
m635674608
  • 浏览: 4956601 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

Mycat 分布式事务的实现

 
阅读更多

引言:Mycat已经成为了一个强大的开源分布式数据库中间件产品。面对企业应用的海量数据事务处理,是目前最好的开源解决方案。但是如果想让多台机器中的数据保存一致,比较常规的解决方法是引入“协调者”来统一调度所有节点的执行。 
本文选自《分布式数据库架构及企业实践——基于Mycat中间件》。

  随着并发量、数据量越来越大及业务已经细化到不能再按照业务划分,我们不得不使用分布式数据库提高系统的性能。在分布式系统中,各个节点在物理上都是相对独立的,每个节点上的数据操作都可以满足 ACID。但是,各独立节点之间无法知道其他节点事务的执行情况,如果想让多台机器中的数据保存一致,就必须保证所有节点上的数据操作要么全部执行成功,要么全部不执行,比较常规的解决方法是引入“协调者”来统一调度所有节点的执行。

XA 规范

  X/Open 组织(即现在的 Open Group)定义了分布式事务处理模型。X/Open DTP 模型(1994)包括应用程序(AP)、事务管理器(TM)、资源管理器(RM)、通信资源管理器(CRM)四部分。事务管理器(TM)是交易中间件,资源管理器(RM)是数据库,通信资源管理器(CRM)是消息中间件。通常把一个数据库内部的事务处理看作本地事务,而分布式事务处理的对象是全局事务。全局事务是指在分布式事务处理环境中,多个数据库可能需要共同完成一个工作,这个工作就是一个全局事务。在一个事务中可能更新几个不同的数据库,此时一个数据库对自己内部所做操作的提交不仅需要本身的操作成功,还需要全局事务相关的其他数据库的操作成功。如果任一数据库的任一操作失败,则参与此事务的所有数据库所做的所有操作都必须回滚。XA就是X/Open DTP 定义的交易中间件与数据库之间的接口规范(即接口函数),交易中间件用它来通知数据库事务的开始、结束、提交、回滚等,XA 接口函数由数据库厂商提供,根据这一思想衍生出二阶段提交协议和三阶段提交协议。

二阶段提交

  所谓的两个阶段是指准备阶段和提交阶段。 
  准备阶段指事务协调者(事务管理器)向每个参与者(资源管理器)发送准备消息,每个参与者要么直接返回失败消息(如权限验证失败),要么在本地执行事务,写本地的 redo 和undo日志但不提交,可以进一步将准备阶段分为以下三步。 
  (1)协调者节点向所有参与者节点询问是否可以执行提交操作(vote),并开始等待各参与者节点的响应。 
  (2)参与者节点执行询问发起为止的所有事务操作,并将 undo 信息和 redo 信息写入日志。 
  (3)各参与者节点响应协调者节点发起的询问。如果参与者节点的事务操作实际执行成功,则它返回一个“同意”消息;如果参与者节点的事务操作实际执行失败,则它返回一个“中止”消息。 
  提交阶段指如果协调者收到了参与者的失败消息或者超时,则直接向每个参与者发送回滚(Rollback)消息,否则发送提交(Commit)消息,参与者根据协调者的指令执行提交或者回滚操作,释放所有事务在处理过程中使用的锁资源。 
  二阶段提交所存在的缺点如下。 
  (1)同步阻塞问题,在执行过程中所有参与节点都是事务阻塞型的,当参与者占用公共资源时,其他第三方节点访问公共资源时不得不处于阻塞状态。 
  (2)单点故障,由于协调者的重要性,一旦协调者发生故障,则参与者会一直阻塞下去。 
  (3)数据不一致,在二阶段提交的第 2 个阶段中,当协调者向参与者发送 commit 请求之后发生了局部网络异常或者在发送 commit 请求的过程中协调者发生了故障,则会导致只有一部分参与者接收到了 commit 请求,而在这部分参与者在接收到 commit 请求之后就会执行commit操作,其他部分未接收到 commit 请求的机器则无法执行事务提交,于是整个分布式系统便出现了数据不一致的现象。 
  由于二阶段提交存在诸如同步阻塞、单点问题、数据不一致、宕机等缺陷,所以,研究者们在二阶段提交的基础上做了改进,提出了三阶段提交。

三阶段提交

  三阶段提交(Three-phase commit,3PC),也叫作三阶段提交协议(Three-phase commitprotocol),是二阶段提交(2PC)的改进版本。三阶段提交把二阶段提交的准备阶段再次一分为二,这样三阶段提交就有 CanCommit、PreCommit、DoCommit 三个阶段。 
  (1)CanCommit 阶段:三阶段提交的 CanCommit 阶段其实和二阶段提交的准备阶段很像,协调者向参与者发送 commit 请求,参与者如果可以提交就返回 Yes 响应,否则返回 No 响应。 
  (2)PreCommit 阶段:协调者根据参与者的反应情况来决定是否可以记录事务的 PreCommit操作。根据响应情况,有以下两种可能。

  • 假如协调者从所有参与者那里获得的反馈都是 Yes 响应,则执行事务。
  • 假如有任何一个参与者向协调者发送了 No 响应,或者等待超时之后协调者都没有接到参与者的响应,则执行事务的中断。

(3)DoCommit阶段:该阶段进行真正的事务提交,也可以分为执行提交、中断事务两种执行情况。

  执行提交的过程如下。

  • 协调者接收到参与者发送的ACK响应后,将从预提交状态进入提交状态,并向所有参与者发送doCommit请求。
  • 事务提交参与者接收到doCommit请求之后,执行正式的事务提交,并在完成事务提交之后释放所有的事务资源。
  • 事务提交完之后,向协调者发送ACK响应。
  • 协调者接收到所有参与者的ACK响应之后,完成事务。中断事务的过程如下。
  • 协调者向所有参与者发送abort请求。
  • 参与者接收到 abort 请求之后,利用其在第 2 个阶段记录的 undo 信息来执行事务的回滚操作,并在完成回滚之后释放所有的事务资源。
  • 参与者完成事务回滚之后,向协调者发送 ACK 消息。
  • 协调者接收到参与者反馈的 ACK 消息之后,执行事务的中断。

Mycat 中分布式事务的实现

  Mycat在1.6版本以后已经完全支持 XA 分布式强事务类型了,先通过一个简单的示例来了解Mycat中XA的用法。 
  用户应用侧(AP)的使用流程如下: 
  (1)set autocommit=0 
  在应用层需要设置事务不能自动提交; 
  (2)set xa=on 
  在 SQL 中设置 XA 为开启状态; 
  (3)执行 SQL 
   insert into travelrecord(id,name) values(1,’N’),(6000000,’A’),(321,’D’),(13400000,’C’),(59,’E’); 
  (4)commit 或者 rollback 
  对事务进行提交(提交成功或者回滚异常)。 
  完整的流程图如图所示。 
<iframe id="iframe_0.4215102749876678" style="margin: 0px; padding: 0px; border-style: none; border-width: initial; width: 730px; height: 275px;" src="data:text/html;charset=utf8,%3Cstyle%3Ebody%7Bmargin:0;padding:0%7D%3C/style%3E%3Cimg%20id=%22img%22%20src=%22http://download.broadview.com.cn/Original/1701ac4c16f450169022?_=6273515%22%20style=%22border:none;max-width:730px%22%3E%3Cscript%3Ewindow.onload%20=%20function%20()%20%7Bvar%20img%20=%20document.getElementById('img');%20window.parent.postMessage(%7BiframeId:'iframe_0.4215102749876678',width:img.width,height:img.height%7D,%20'http://www.cnblogs.com');%7D%3C/script%3E" frameborder="0" scrolling="no"></iframe> 
  Mycat 内部实现侧的实现流程如下: 
  (1)set autocommit=0 
  将 MysqlConnection 中的 autocommit 设置为 false; 
  (2)set xa=on 
  在Mycat中开启 XA 事务管理器,用 MycatServer.getInstance().genXATXID()生成 XID,用XA START XID 命令进行 XA 事务开始标记,继续拼装 SQL 业务(Mycat 会将上面的 insert 数据分片到不同的节点上),拼装 XA END XID,XA PREPARE XID 最后进行 1pc 提交并记录日志到 tm.log 中,如果 1pc 阶段有异常,则直接回滚事务 XA ROLLBACK xid。 
  (3)在多节点 MySQL 中全部进行 2pc 提交(XA COMMIT),提交成功后,事务结束;如果有异常,则对事务进行重新提交或者回滚。 
  Mycat 中的 XA 分布式事务的异常处理流程如下: 
  (1)一阶段 commit 异常:如果 1pc 提交任意一个 mysql 节点无法提交或者异常,则全部节点的事务进行回滚,抛出异常给应用侧事务回滚。 
  (2)Mycat Crash Recovery 
  Mycat 崩溃以后,根据 tm.log 事务日志再进行重启恢复,mycat 启动后执行事务日志查找各个节点中已经 prepared 的 XA 事务,进行 commit 或者 rollback。

1. 相关类说明

  通过用户应用侧发送 set xa = on ; SQL 开启 Mycat 内部 XA 事务管理器的功能,事务管理器将对 MySQL 数据库进行 XA 方式的事务管理,具体事务管理功能的实现代码如下:

  • MySQLConnection:数据库连接。
  • NonBlockingSession:用户连接 Session。
  • MultiNodeCoordinator:协调者。
  • CommitNodeHandler:分片提交处理。
  • RollbackNodeHandler:分片回滚处理。

2. 代码解析

  XA 事务启动的源码如下:

public class MySQLConnection extends BackendAIOConnection {
    //设置开启事务
    private void getAutocommitCommand(StringBuilder sb, boolean autoCommit) {
        if (autoCommit) {
            sb.append("SET autocommit=1;");
        } else {
            sb.append("SET autocommit=0;");
        }
    }
    public void execute(RouteResultsetNode rrn, ServerConnection sc,boolean autocommit) throws UnsupportedEncodingException {
        if(!modifiedSQLExecuted && rrn.isModifySQL()) {
            modifiedSQLExecuted = true;
        }
        //获取当前事务 ID
        String xaTXID = sc.getSession2().getXaTXID();
        synAndDoExecute(xaTXID, rrn, sc.getCharsetIndex(), sc.getTxIsolation(),autocommit);
    }
……
……//省略此处代码,建议读者参考 GitHub 仓库的 MyCAT-Server 项目的 MySQLConnection.java源码
}

  用户应用侧设置手动提交以后,Mycat 会在当前连接中加入

  SET autocommit=0;

  将该语句加入到 StringBuffer 中,等待提交到数据库。 
  用户连接 Session 的源码如下:

public class NonBlockingSession implements Session {
    ……
……//省略此处代码,建议读者参考 GitHub 仓库的 MyCAT-Server 项目的 NonBlockingSession.java 源码
}
SET XA = ON ;语句分析

  用户应用侧发送该语句到 Mycat 中,由 SQL 语句解析器解析后交由 SetHandle 进行处理c.getSession2().setXATXEnabled (true); 
  调用 NonBlockSession 中的 setXATXEnable d 方法设置 XA 开关启动,并生成 XID,代码如下:

public void setXATXEnabled(boolean xaTXEnabled) {
    LOGGER.info("XA Transaction enabled ,con " + this.getSource());
    if (xaTXEnabled && this.xaTXID == null) {
        xaTXID = genXATXID();
    }
}

  另外,NonBlockSession 会接收来自于用户应用侧的 commit, 调用 commit 方法进行处理事务提交的逻辑。 
  在 commit()方法中,首先会 check 节点个数,一个节点和多个节点分为不同的处理过程,这里只讲下多个节点的处理方法 checkDistriTransaxAndExecute(); 
  该方法会对多个节点的事务进行提交。 
  协调者的源码如下:

public class MultiNodeCoordinator implements ResponseHandler {
    ……
……//省略此处代码,建议读者参考 GitHub 仓库 MyCAT-Server 项目的 MultiNodeCoordinator.java 源码
}

  在 NonBlockSession 的 checkDistriTransaxAndExecute()方法中, NonBlockSession 会话类会调用专门进行多节点协同的 MultiNodeCoordinator 类进行具体的处理,在 MultiNodeCoordinator类中,executeBatchNodeCmd 方法加入 XA 1PC 提交的处理,代码片段如下:

for (RouteResultsetNode rrn : session.getTargetKeys()) {
    ……
    if (mysqlCon.getXaStatus() == TxState.TX_STARTED_STATE){
        //recovery Log
        participantLogEntry[started] = new
        ParticipantLogEntry(xaTxId,conn.getHost(),0,conn.getSchema(),((MySQLConnection) conn).getXaStatus());
        String[] cmds = new String[]{"XA END " + xaTxId,"XA PREPARE " + xaTxId};
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Start execute the batch cmd : "+ cmds[0] + ";" +cmds[1]+","+"current connection:"+conn.getHost()+":"+conn.getPort());
        }
    mysqlCon.execBatchCmd(cmds);
    }
……
}

  在 MultiNodeCoordinator 类的 okResponse 方法中,则进行 2pc 的事务提交

MySQLConnection mysqlCon = (MySQLConnection) conn;
switch (mysqlCon.getXaStatus()){
    case TxState.TX_STARTED_STATE:
    if (mysqlCon.batchCmdFinished()){
        String xaTxId = session.getXaTXID();
        String cmd = "XA COMMIT " + xaTxId;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Start execute the cmd :"+cmd+",current host:"+mysqlCon.getHost()+":"+mysqlCon.getPort());
        }
        //recovery log
        CoordinatorLogEntry coordinatorLogEntry =inMemoryRepository.get(xaTxId);
        for(int i=0; i<coordinatorLogEntry.participants.length;i++){
            LOGGER.debug("[In MemoryCoordinatorLogEntry]"+coordinatorLogEntry.participants[i]);
            if(coordinatorLogEntry.participants[i].resourceName.equals(conn.getSchema())){
                coordinatorLogEntry.participants[i].txState =TxState.TX_PREPARED_STATE;
            }
        }
        inMemoryRepository.put(session.getXaTXID(),coordinatorLogEntry);
        fileRepository.writeCheckpoint(inMemoryRepository.getAllCoordinatorLogEntries());
        //send commit
        mysqlCon.setXaStatus(TxState.TX_PREPARED_STATE);
        mysqlCon.execCmd(cmd);
    }
    return;
……
}

  分片事务提交处理的源码如下:

public class CommitNodeHandler implements ResponseHandler {
    //结束 XA
    public void commit(BackendConnection conn) {
        ……
……//省略此处代码,建议读者参考 GitHub 仓库 MyCAT-Server 项目的 CommitNodeHandler.java源码
    }
    //提交 XA
    @Override
    public void okResponse(byte[] ok, BackendConnection conn) {
        ……
……//省略此处代码,建议读者参考 GitHub 仓库的 MyCAT-Server 项目的 CommitNodeHandler.java 源码
}

  在 Mycat 中同样支持单节点 MySQL 数据库的 XA 事务处理,在 CommitNodeHandler 类中就是对单节点的 XA 二阶段处理,处理方式与 MultiNodeCoordinator 类同,通过 commit 方法进行 1pc 的提交,而通过 okResponse 的方法进行 2pc 阶段的事务提交。 
  分片事务回滚处理的源码如下:

public class RollbackNodeHandler extends MultiNodeHandler {
    ……
……//省略此处代码,建议读者参考 GitHub 仓库的 MyCAT-Server 项目的 RollbackNodeHandler.java 源码
}

  在 RollbackNodeHandler 的 rollback 方法中加入了对 XA 事务的 rollback 处理,用户应用侧发起的 rollback 会在这个方法中进行处理。

for (final RouteResultsetNode node : session.getTargetKeys()) {
    ……
    //support the XA rollback
    MySQLConnection mysqlCon = (MySQLConnection) conn;
    if(session.getXaTXID()!=null) {
        String xaTxId = session.getXaTXID();
        mysqlCon.execCmd("XA END " + xaTxId + ";");
        mysqlCon.execCmd("XA ROLLBACK " + xaTxId + ";");
    }else {
    conn.rollback();
    }
……
}

http://www.cnblogs.com/broadview/p/6273515.html
分享到:
评论

相关推荐

    Mycat分布式事务的实现

    但是,各独立节点之间无法知道其他节点事务的执行情况,如果想让多台机器中的数据保存一致,就必须保证所有节点上的数据操作要么全部执行成功,要么全部不执行,比较常规的解决方法是引入“协调者”来统一调度所有...

    分布式数据库架构及企业实践-基于Mycat中间件

    本书对 Mycat 从入门到进阶、从高级技术实践到架构剖析、从网络通信协议解析到系统工作原理的方方面面进行了详细讲解,并剖析了 Mycat的 SQL 路由、跨库联合查询、分布式事务及原生 MySQL、PostgreSQL 协议等核心...

    分布式数据库架构及企业实践-基于Mycat中间件.pdf

    Mycat 从入门到进阶、从高级技术实践到架构剖析、从网络通信协议解析到系统工作原理的方方面面进行了详细讲解,并剖析了 Mycat的 SQL 路由、跨库联合查询、分布式事务及原生 MySQL、PostgreSQL 协议等核心技术。通过...

    分布式数据库架构及企业实践_基于mycat中间件

    Mycat 从入门到进阶、从高级技术实践到架构剖析、从网络通信协议解析到系统工作原理的方方面面进行了详细讲解,并剖析了 Mycat的 SQL 路由、跨库联合查询、分布式事务及原生 MySQL、PostgreSQL 协议等核心技术。通过...

    分布式数据库架构及企业实践-基于Mycat中间件 学习书籍

    Mycat 从入门到进阶、从高级技术实践到架构剖析、从网络通信协议解析到系统工作原理的方方面面进行了详细讲解,并剖析了 Mycat的 SQL 路由、跨库联合查询、分布式事务及原生 MySQL、PostgreSQL 协议等核心技术。通过...

    分布式数据库架构及企业实践 基于Mycat中间件

    Mycat 从入门到进阶、从高级技术实践到架构剖析、从网络通信协议解析到系统工作原理的方方面面进行了详细讲解,并剖析了 Mycat的 SQL 路由、跨库联合查询、分布式事务及原生 MySQL、PostgreSQL 协议等核心技术。通过...

    分布式数据库架构及企业实践:基于Mycat中间件

    Mycat 从入门到进阶、从高级技术实践到架构剖析、从网络通信协议解析到系统工作原理的方方面面进行了详细讲解,并剖析了 Mycat的 SQL 路由、跨库联合查询、分布式事务及原生 MySQL、PostgreSQL 协议等核心技术。通过...

    分布式数据库架构及企业实践-基于Mycat.pdf

    本书对 Mycat 从入门到进阶、从高级技术实践到架构剖析、从网络通信协议解析到系统工作原理的方方面面进行了详细讲解,并剖析了 Mycat的 SQL 路由、跨库联合查询、分布式事务及原生 MySQL、PostgreSQL 协议等核心...

    程序员面试刷题的书哪个好-mysqlsplit:mysql分库分表,分布式事务

    分布式事务 1:XA分布式事务 2:TCC分布式事务 3:消息分布式事务 Mycat分片规则 Mycat读写分离 Mycat故障切换 Mycat+Percona+Haproxy+keepalived Zookeeper搭建Mycat高可用集群 Mycat注解技术 Mycat性能监控 Mycat架构...

    全面讲解开源数据库中间件MyCat使用及原理视频教程

    MyCat是一个彻底开源的,面向企业应用数据库中间件,支持事务、ACID、可以视为MySQL集群的企业级数据库,用来替代昂贵的Oracle集群,并结合传统数据库和新型分布式数据仓库的新一代企业级数据库中间件产品。...

    Mycat-server-1.6-RELEASE源码

    支持分布式事务(弱xa)。 支持XA分布式事务(1.6.5)。 支持全局序列号,解决分布式下的主键生成问题。 分片规则丰富,插件化开发,易于扩展。 强大的web,命令行监控。 支持前端作为MySQL通用代理,后端JDBC方式...

    mycat window 安装包

    mycat window 安装包,博客中包含 使用 MyCAT实现MySQL的读写分离的文章 什么是MYCAT 一个彻底开源的,面向企业应用开发的大数据库集群 支持事务、ACID、可以替代MySQL的加强版数据库 一个可以视为MySQL集群的企业级...

    最详细MyCat学习资料(源码)

    分布式架构:MyCat采用分布式架构,通过将数据分片(Sharding)存储在多个MySQL实例上,实现数据的水平扩展和负载均衡。 读写分离:MyCat支持读写分离功能,可以将读操作和写操作分发到不同的MySQL实例上,有效减轻...

    MyCat技术分享

    原理、实现和应用 什么是MyCAT?简单的说,MyCAT就是: 一个彻底开源的,面向企业应用开发的“大数据库集群” 支持事务、ACID、可以替代Mysql的加强版数据库 一个可以视为“Mysql”集群的企业级数据库,用来替代...

    Mycat数据库中间件-其他

    支持分布式事务(弱xa)。 支持全局序列号,解决分布式下的主键生成问题。 分片规则丰富,插件化开发,易于扩展。 强大的web,命令行监控。 支持前端作为mysq通用代理,后端JDBC方式支持Oracle、DB2、SQL Server、...

    Mycat2数据库中间件-其他

    支持分布式事务(弱xa)。支持全局序列号,解决分布式下的主键生成问题。分片规则丰富,插件化开发,易于扩展。强大的web,命令行监控。支持前端作为mysq通用代理,后端JDBC方式支持Oracle、DB2、SQL Server、...

    B站 (锋迷商城) 学习资料

    分布式数据库中间件Mycat/Sharding-jdbc、分布式事务Seata、分布式全局ID、接口幂等性、SpringCloud之Eureka服务治理、SpringCloud之Ribbon和Feign、SpringCloud之Hystrix和Config、SpringCloud之ZUUL(Gateway)和...

    基于Mycat的拟态数据库中间件研究

    本文通过Mycat作为数据库中间件,通过对数据库的读写分离,集群的高可用,分布式事务处理,对指纹化SQL识别,以减轻单一数据库的数据存取和处理压力.Mycat将数据库模块变为异构动态冗余模式,实现数据库拟态化,使...

    Mycat数据库中间件 v1.13

    支持分布式事务(弱xa)。 支持全局序列号,解决分布式下的主键生成问题。 分片规则丰富,插件化开发,易于扩展。 强大的web,命令行监控。 支持前端作为mysq通用代理,后端JDBC方式支持Oracle、DB2、SQL Server...

Global site tag (gtag.js) - Google Analytics