`
m635674608
  • 浏览: 4929180 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

Spring+MyBatis实现数据库读写分离方案

 
阅读更多

百度关键词:spring mybatis 多数据源 读写分离

 

 

推荐第四种

方案1

通过MyBatis配置文件创建读写分离两个DataSource,每个SqlSessionFactoryBean对象的mapperLocations属性制定两个读写数据源的配置文件。将所有读的操作配置在读文件中,所有写的操作配置在写文件中。

优点:实现简单

缺点:维护麻烦,需要对原有的xml文件进行重新修改,不支持多读,不易扩展

实现方式

<bean id="abstractDataSource" abstract="true" class="com.alibaba.druid.pool.DruidDataSource" init-method="init"

      destroy-method="close">

    <property name="driverClassName" value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>

    <!-- 配置获取连接等待超时的时间 -->

    <property name="maxWait" value="60000"/>

    <!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->

    <property name="timeBetweenEvictionRunsMillis" value="60000"/>

    <!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->

    <property name="minEvictableIdleTimeMillis" value="300000"/>

    <property name="validationQuery" value="SELECT 'x'"/>

    <property name="testWhileIdle" value="true"/>

    <property name="testOnBorrow" value="false"/>

    <property name="testOnReturn" value="false"/>

    <!-- 打开PSCache,并且指定每个连接上PSCache的大小 -->

    <property name="poolPreparedStatements" value="true"/>

    <property name="maxPoolPreparedStatementPerConnectionSize" value="20"/>

    <property name="filters" value="config"/>

    <property name="connectionProperties" value="config.decrypt=true" />

</bean>

 

<bean id="readDataSource" parent="abstractDataSource">

    <!-- 基本属性 url、user、password -->

    <property name="url" value="${read.jdbc.url}"/>

    <property name="username" value="${read.jdbc.user}"/>

    <property name="password" value="${read.jdbc.password}"/>

    <!-- 配置初始化大小、最小、最大 -->

    <property name="initialSize" value="${read.jdbc.initPoolSize}"/>

    <property name="minIdle" value="10"/>

    <property name="maxActive" value="${read.jdbc.maxPoolSize}"/>

</bean>

 

<bean id="writeDataSource" parent="abstractDataSource">

    <!-- 基本属性 url、user、password -->

    <property name="url" value="${write.jdbc.url}"/>

    <property name="username" value="${write.jdbc.user}"/>

    <property name="password" value="${write.jdbc.password}"/>

    <!-- 配置初始化大小、最小、最大 -->

    <property name="initialSize" value="${write.jdbc.initPoolSize}"/>

    <property name="minIdle" value="10"/>

    <property name="maxActive" value="${write.jdbc.maxPoolSize}"/>

</bean>

 

<bean id="readSqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">

    <!-- 实例化sqlSessionFactory时需要使用上述配置好的数据源以及SQL映射文件 -->

    <property name="dataSource" ref="readDataSource"/>

    <property name="mapperLocations" value="classpath:mapper/read/*.xml"/>

</bean>

 

<bean id="writeSqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">

    <!-- 实例化sqlSessionFactory时需要使用上述配置好的数据源以及SQL映射文件 -->

    <property name="dataSource" ref="writeDataSource"/>

    <property name="mapperLocations" value="classpath:mapper/write/*.xml"/>

</bean>

方案2

通过Spring AOP在业务层实现读写分离,在DAO层调用前定义切面,利用Spring的AbstractRoutingDataSource解决多数据源的问题,实现动态选择数据源

优点:通过注解的方法在DAO每个方法上配置数据源,原有代码改动量少,易扩展,支持多读

缺点:需要在DAO每个方法上配置注解,人工管理,容易出错

实现方式

//定义枚举类型,读写

public enum DynamicDataSourceGlobal {

    READ, WRITE;

}

import java.lang.annotation.ElementType;

import java.lang.annotation.Retention;

import java.lang.annotation.RetentionPolicy;

import java.lang.annotation.Target;

 

/**

 * RUNTIME

 * 定义注解

 * 编译器将把注释记录在类文件中,在运行时 VM 将保留注释,因此可以反射性地读取。

 * @author shma1664

 *

 */

@Retention(RetentionPolicy.RUNTIME)

@Target(ElementType.METHOD)

public @interface DataSource {

 

    public DynamicDataSourceGlobal value() default DynamicDataSourceGlobal.READ;

 

}

/**

 * Created by IDEA

 * 本地线程设置和获取数据源信息

 * User: mashaohua

 * Date: 2016-07-07 13:35

 * Desc:

 */

public class DynamicDataSourceHolder {

 

    private static final ThreadLocal<DynamicDataSourceGlobal> holder = new ThreadLocal<DynamicDataSourceGlobal>();

 

    public static void putDataSource(DynamicDataSourceGlobal dataSource){

        holder.set(dataSource);

    }

 

    public static DynamicDataSourceGlobal getDataSource(){

        return holder.get();

    }

 

    public static void clearDataSource() {

        holder.remove();

    }

 

}

import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;

 

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.concurrent.ThreadLocalRandom;

import java.util.concurrent.atomic.AtomicLong;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

 

/**

 * Created by IDEA

 * User: mashaohua

 * Date: 2016-07-14 10:56

 * Desc: 动态数据源实现读写分离

 */

public class DynamicDataSource extends AbstractRoutingDataSource {

 

    private Object writeDataSource; //写数据源

 

    private List<Object> readDataSources; //多个读数据源

 

    private int readDataSourceSize; //读数据源个数

 

    private int readDataSourcePollPattern = 0; //获取读数据源方式,0:随机,1:轮询

 

    private AtomicLong counter = new AtomicLong(0);

 

    private static final Long MAX_POOL = Long.MAX_VALUE;

 

    private final Lock lock = new ReentrantLock();

 

    @Override

    public void afterPropertiesSet() {

        if (this.writeDataSource == null) {

            throw new IllegalArgumentException("Property 'writeDataSource' is required");

        }

        setDefaultTargetDataSource(writeDataSource);

        Map<Object, Object> targetDataSources = new HashMap<>();

        targetDataSources.put(DynamicDataSourceGlobal.WRITE.name(), writeDataSource);

        if (this.readDataSources == null) {

            readDataSourceSize = 0;

        } else {

            for(int i=0; i<readDataSources.size(); i++) {

                targetDataSources.put(DynamicDataSourceGlobal.READ.name() + i, readDataSources.get(i));

            }

            readDataSourceSize = readDataSources.size();

        }

        setTargetDataSources(targetDataSources);

        super.afterPropertiesSet();

    }

 

    @Override

    protected Object determineCurrentLookupKey() {

 

        DynamicDataSourceGlobal dynamicDataSourceGlobal = DynamicDataSourceHolder.getDataSource();

 

        if(dynamicDataSourceGlobal == null

                || dynamicDataSourceGlobal == DynamicDataSourceGlobal.WRITE

                || readDataSourceSize <= 0) {

            return DynamicDataSourceGlobal.WRITE.name();

        }

 

        int index = 1;

 

        if(readDataSourcePollPattern == 1) {

            //轮询方式

            long currValue = counter.incrementAndGet();

            if((currValue + 1) >= MAX_POOL) {

                try {

                    lock.lock();

                    if((currValue + 1) >= MAX_POOL) {

                        counter.set(0);

                    }

                } finally {

                    lock.unlock();

                }

            }

            index = (int) (currValue % readDataSourceSize);

        } else {

            //随机方式

            index = ThreadLocalRandom.current().nextInt(0, readDataSourceSize);

        }

        return dynamicDataSourceGlobal.name() + index;

    }

 

    public void setWriteDataSource(Object writeDataSource) {

        this.writeDataSource = writeDataSource;

    }

 

    public void setReadDataSources(List<Object> readDataSources) {

        this.readDataSources = readDataSources;

    }

 

    public void setReadDataSourcePollPattern(int readDataSourcePollPattern) {

        this.readDataSourcePollPattern = readDataSourcePollPattern;

    }

}

import org.apache.log4j.Logger;

import org.aspectj.lang.JoinPoint;

import org.aspectj.lang.reflect.MethodSignature;

 

import java.lang.reflect.Method;

 

/**

 * Created by IDEA

 * User: mashaohua

 * Date: 2016-07-07 13:39

 * Desc: 定义选择数据源切面

 */

public class DynamicDataSourceAspect {

 

    private static final Logger logger = Logger.getLogger(DynamicDataSourceAspect.class);

 

    public void pointCut(){};

 

    public void before(JoinPoint point)

    {

        Object target = point.getTarget();

        String methodName = point.getSignature().getName();

        Class<?>[] clazz = target.getClass().getInterfaces();

        Class<?>[] parameterTypes = ((MethodSignature) point.getSignature()).getMethod().getParameterTypes();

        try {

            Method method = clazz[0].getMethod(methodName, parameterTypes);

            if (method != null && method.isAnnotationPresent(DataSource.class)) {

                DataSource data = method.getAnnotation(DataSource.class);

                DynamicDataSourceHolder.putDataSource(data.value());

            }

        } catch (Exception e) {

            logger.error(String.format("Choose DataSource error, method:%s, msg:%s", methodName, e.getMessage()));

        }

    }

 

    public void after(JoinPoint point) {

        DynamicDataSourceHolder.clearDataSource();

    }

}

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

       xmlns:tx="http://www.springframework.org/schema/tx"

       xmlns:aop="http://www.springframework.org/schema/aop"

       xsi:schemaLocation="http://www.springframework.org/schema/beans

       http://www.springframework.org/schema/beans/spring-beans-4.1.xsd

       http://www.springframework.org/schema/tx

       http://www.springframework.org/schema/tx/spring-tx-4.1.xsd

       http://www.springframework.org/schema/aop

       http://www.springframework.org/schema/aop/spring-aop-4.1.xsd">

 

    <bean id="abstractDataSource" abstract="true" class="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close">

        <property name="driverClassName" value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>

        <!-- 配置获取连接等待超时的时间 -->

        <property name="maxWait" value="60000"/>

        <!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->

        <property name="timeBetweenEvictionRunsMillis" value="60000"/>

        <!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->

        <property name="minEvictableIdleTimeMillis" value="300000"/>

        <property name="validationQuery" value="SELECT 'x'"/>

        <property name="testWhileIdle" value="true"/>

        <property name="testOnBorrow" value="false"/>

        <property name="testOnReturn" value="false"/>

        <!-- 打开PSCache,并且指定每个连接上PSCache的大小 -->

        <property name="poolPreparedStatements" value="true"/>

        <property name="maxPoolPreparedStatementPerConnectionSize" value="20"/>

        <property name="filters" value="config"/>

        <property name="connectionProperties" value="config.decrypt=true" />

    </bean>

 

    <bean id="dataSourceRead1" parent="abstractDataSource">

        <property name="driverClassName" value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>

        <!-- 基本属性 url、user、password -->

        <property name="url" value="${read1.jdbc.url}"/>

        <property name="username" value="${read1.jdbc.user}"/>

        <property name="password" value="${read1.jdbc.password}"/>

        <!-- 配置初始化大小、最小、最大 -->

        <property name="initialSize" value="${read1.jdbc.initPoolSize}"/>

        <property name="minIdle" value="${read1.jdbc.minPoolSize}"/>

        <property name="maxActive" value="${read1.jdbc.maxPoolSize}"/>

    </bean>

 

    <bean id="dataSourceRead2" parent="abstractDataSource">

        <property name="driverClassName" value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>

        <!-- 基本属性 url、user、password -->

        <property name="url" value="${read2.jdbc.url}"/>

        <property name="username" value="${read2.jdbc.user}"/>

        <property name="password" value="${read2.jdbc.password}"/>

        <!-- 配置初始化大小、最小、最大 -->

        <property name="initialSize" value="${read2.jdbc.initPoolSize}"/>

        <property name="minIdle" value="${read2.jdbc.minPoolSize}"/>

        <property name="maxActive" value="${read2.jdbc.maxPoolSize}"/>

    </bean>

 

    <bean id="dataSourceWrite" parent="abstractDataSource">

        <property name="driverClassName" value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>

        <!-- 基本属性 url、user、password -->

        <property name="url" value="${write.jdbc.url}"/>

        <property name="username" value="${write.jdbc.user}"/>

        <property name="password" value="${write.jdbc.password}"/>

        <!-- 配置初始化大小、最小、最大 -->

        <property name="initialSize" value="${write.jdbc.initPoolSize}"/>

        <property name="minIdle" value="${write.jdbc.minPoolSize}"/>

        <property name="maxActive" value="${write.jdbc.maxPoolSize}"/>

    </bean>

 

    <bean id="dataSource" class="com.test.api.dao.datasource.DynamicDataSource">

        <property name="writeDataSource" ref="dataSourceWrite" />

        <property name="readDataSources">

            <list>

                <ref bean="dataSourceRead1" />

                <ref bean="dataSourceRead2" />

            </list>

        </property>

        <!--轮询方式-->

        <property name="readDataSourcePollPattern" value="1" />

        <property name="defaultTargetDataSource" ref="dataSourceWrite"/>

    </bean>

 

    <tx:annotation-driven transaction-manager="transactionManager"/>

 

    <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">

        <property name="dataSource" ref="dataSource"/>

    </bean>

 

    <!-- 针对myBatis的配置项 -->

    <!-- 配置sqlSessionFactory -->

    <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">

        <!-- 实例化sqlSessionFactory时需要使用上述配置好的数据源以及SQL映射文件 -->

        <property name="dataSource" ref="dataSource"/>

        <property name="mapperLocations" value="classpath:mapper/*.xml"/>

    </bean>

 

    <!-- 配置扫描器 -->

    <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">

        <!-- 扫描包以及它的子包下的所有映射接口类 -->

        <property name="basePackage" value="com.test.api.dao.inte"/>

        <property name="sqlSessionFactoryBeanName" value="sqlSessionFactory"/>

    </bean>

 

    <!-- 配置数据库注解aop -->

    <bean id="dynamicDataSourceAspect" class="com.test.api.dao.datasource.DynamicDataSourceAspect" />

    <aop:config>

        <aop:aspect id="c" ref="dynamicDataSourceAspect">

            <aop:pointcut id="tx" expression="execution(* com.test.api.dao.inte..*.*(..))"/>

            <aop:before pointcut-ref="tx" method="before"/>

            <aop:after pointcut-ref="tx" method="after"/>

        </aop:aspect>

    </aop:config>

    <!-- 配置数据库注解aop -->

</beans>

方案3

通过Mybatis的Plugin在业务层实现数据库读写分离,在MyBatis创建Statement对象前通过拦截器选择真正的数据源,在拦截器中根据方法名称不同(select、update、insert、delete)选择数据源。

优点:原有代码不变,支持多读,易扩展

缺点:

实现方式

/**

 * Created by IDEA

 * User: mashaohua

 * Date: 2016-07-19 15:40

 * Desc: 创建Connection代理接口

 */

public interface ConnectionProxy extends Connection {

 

    /**

     * 根据传入的读写分离需要的key路由到正确的connection

     * @param key 数据源标识

     * @return

     */

    Connection  getTargetConnection(String key);

}

import java.lang.reflect.InvocationHandler;

import java.lang.reflect.InvocationTargetException;

import java.lang.reflect.Method;

import java.lang.reflect.Proxy;

import java.sql.Connection;

import java.sql.SQLException;

import java.util.ArrayList;

import java.util.List;

import java.util.logging.Logger;

 

import javax.sql.DataSource;

 

import org.springframework.beans.factory.InitializingBean;

import org.springframework.jdbc.datasource.AbstractDataSource;

import org.springframework.jdbc.datasource.lookup.DataSourceLookup;

import org.springframework.jdbc.datasource.lookup.JndiDataSourceLookup;

import org.springframework.util.Assert;

 

public abstract class AbstractDynamicDataSourceProxy extends AbstractDataSource implements InitializingBean {

 

    private List<Object> readDataSources;

    private List<DataSource> resolvedReadDataSources;

 

    private Object writeDataSource;

    private DataSource resolvedWriteDataSource;

 

    private int readDataSourcePollPattern = 0;

 

    private int readDsSize;

 

    private boolean defaultAutoCommit = true;

    private int defaultTransactionIsolation = Connection.TRANSACTION_READ_COMMITTED;

 

    public static final String READ = "read";

 

    public static final String WRITE = "write";

 

    private DataSourceLookup dataSourceLookup = new JndiDataSourceLookup();

 

    @Override

    public Connection getConnection() throws SQLException {

        return (Connection) Proxy.newProxyInstance(

                com.autohome.api.dealer.tuan.dao.rwmybatis.ConnectionProxy.class.getClassLoader(),

                new Class[] {com.autohome.api.dealer.tuan.dao.rwmybatis.ConnectionProxy.class},

                new RWConnectionInvocationHandler());

    }

 

    @Override

    public Connection getConnection(String username, String password)

            throws SQLException {

        return (Connection) Proxy.newProxyInstance(

                com.autohome.api.dealer.tuan.dao.rwmybatis.ConnectionProxy.class.getClassLoader(),

                new Class[] {com.autohome.api.dealer.tuan.dao.rwmybatis.ConnectionProxy.class},

                new RWConnectionInvocationHandler(username,password));

    }

 

    public int getReadDsSize(){

        return readDsSize;

    }

 

    public List<DataSource> getResolvedReadDataSources() {

        return resolvedReadDataSources;

    }

 

    public void afterPropertiesSet() throws Exception {

 

        if(writeDataSource == null){

            throw new IllegalArgumentException("Property 'writeDataSource' is required");

        }

        this.resolvedWriteDataSource = resolveSpecifiedDataSource(writeDataSource);

 

        resolvedReadDataSources = new ArrayList<DataSource>(readDataSources.size());

        for(Object item : readDataSources){

            resolvedReadDataSources.add(resolveSpecifiedDataSource(item));

        }

        readDsSize = readDataSources.size();

    }

 

    protected DataSource determineTargetDataSource(String key) {

        Assert.notNull(this.resolvedReadDataSources, "DataSource router not initialized");

        if(WRITE.equals(key)){

            return resolvedWriteDataSource;

        }else{

            return loadReadDataSource();

        }

    }

 

    public Logger getParentLogger() {

        // NOOP Just ignore

        return null;

    }

 

    /**

     * 获取真实的data source

     * @param dataSource (jndi | real data source)

     * @return

     * @throws IllegalArgumentException

     */

    protected DataSource resolveSpecifiedDataSource(Object dataSource) throws IllegalArgumentException {

        if (dataSource instanceof DataSource) {

            return (DataSource) dataSource;

        }

        else if (dataSource instanceof String) {

            return this.dataSourceLookup.getDataSource((String) dataSource);

        }

        else {

            throw new IllegalArgumentException(

                    "Illegal data source value - only [javax.sql.DataSource] and String supported: " + dataSource);

        }

    }

 

    protected abstract DataSource loadReadDataSource();

 

    public void setReadDsSize(int readDsSize) {

        this.readDsSize = readDsSize;

    }

 

    public List<Object> getReadDataSources() {

        return readDataSources;

    }

 

    public void setReadDataSources(List<Object> readDataSources) {

        this.readDataSources = readDataSources;

    }

 

    public Object getWriteDataSource() {

        return writeDataSource;

    }

 

    public void setWriteDataSource(Object writeDataSource) {

        this.writeDataSource = writeDataSource;

    }

 

    public void setResolvedReadDataSources(List<DataSource> resolvedReadDataSources) {

        this.resolvedReadDataSources = resolvedReadDataSources;

    }

 

    public DataSource getResolvedWriteDataSource() {

        return resolvedWriteDataSource;

    }

 

    public void setResolvedWriteDataSource(DataSource resolvedWriteDataSource) {

        this.resolvedWriteDataSource = resolvedWriteDataSource;

    }

 

    public int getReadDataSourcePollPattern() {

        return readDataSourcePollPattern;

    }

 

    public void setReadDataSourcePollPattern(int readDataSourcePollPattern) {

        this.readDataSourcePollPattern = readDataSourcePollPattern;

    }

 

    /**

     * Invocation handler that defers fetching an actual JDBC Connection

     * until first creation of a Statement.

     */

    private class RWConnectionInvocationHandler implements InvocationHandler {

 

        private String username;

 

        private String password;

 

        private Boolean readOnly = Boolean.FALSE;

 

        private Integer transactionIsolation;

 

        private Boolean autoCommit;

 

        private boolean closed = false;

 

        private Connection target;

 

        public RWConnectionInvocationHandler() {

 

        }

 

        public RWConnectionInvocationHandler(String username, String password) {

            this();

            this.username = username;

            this.password = password;

        }

 

        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

            // Invocation on ConnectionProxy interface coming in...

 

            if (method.getName().equals("equals")) {

                // We must avoid fetching a target Connection for "equals".

                // Only consider equal when proxies are identical.

                return (proxy == args[0] ? Boolean.TRUE : Boolean.FALSE);

            }

            else if (method.getName().equals("hashCode")) {

                // We must avoid fetching a target Connection for "hashCode",

                // and we must return the same hash code even when the target

                // Connection has been fetched: use hashCode of Connection proxy.

                return new Integer(System.identityHashCode(proxy));

            }

            else if (method.getName().equals("getTargetConnection")) {

                // Handle getTargetConnection method: return underlying connection.

                return getTargetConnection(method,args);

            }

 

            if (!hasTargetConnection()) {

                // No physical target Connection kept yet ->

                // resolve transaction demarcation methods without fetching

                // a physical JDBC Connection until absolutely necessary.

 

                if (method.getName().equals("toString")) {

                    return "RW Routing DataSource Proxy";

                }

                else if (method.getName().equals("isReadOnly")) {

                    return this.readOnly;

                }

                else if (method.getName().equals("setReadOnly")) {

                    this.readOnly = (Boolean) args[0];

                    return null;

                }

                else if (method.getName().equals("getTransactionIsolation")) {

                    if (this.transactionIsolation != null) {

                        return this.transactionIsolation;

                    }

                    return defaultTransactionIsolation;

                    // Else fetch actual Connection and check there,

                    // because we didn't have a default specified.

                }

                else if (method.getName().equals("setTransactionIsolation")) {

                    this.transactionIsolation = (Integer) args[0];

                    return null;

                }

                else if (method.getName().equals("getAutoCommit")) {

                    if (this.autoCommit != null)

                        return this.autoCommit;

                    return defaultAutoCommit;

                    // Else fetch actual Connection and check there,

                    // because we didn't have a default specified.

                }

                else if (method.getName().equals("setAutoCommit")) {

                    this.autoCommit = (Boolean) args[0];

                    return null;

                }

                else if (method.getName().equals("commit")) {

                    // Ignore: no statements created yet.

                    return null;

                }

                else if (method.getName().equals("rollback")) {

                    // Ignore: no statements created yet.

                    return null;

                }

                else if (method.getName().equals("getWarnings")) {

                    return null;

                }

                else if (method.getName().equals("clearWarnings")) {

                    return null;

                }

                else if (method.getName().equals("isClosed")) {

                    return (this.closed ? Boolean.TRUE : Boolean.FALSE);

                }

                else if (method.getName().equals("close")) {

                    // Ignore: no target connection yet.

                    this.closed = true;

                    return null;

                }

                else if (this.closed) {

                    // Connection proxy closed, without ever having fetched a

                    // physical JDBC Connection: throw corresponding SQLException.

                    throw new SQLException("Illegal operation: connection is closed");

                }

            }

 

            // Target Connection already fetched,

            // or target Connection necessary for current operation ->

            // invoke method on target connection.

            try {

                return method.invoke(target, args);

            }

            catch (InvocationTargetException ex) {

                throw ex.getTargetException();

            }

        }

 

        /**

         * Return whether the proxy currently holds a target Connection.

         */

        private boolean hasTargetConnection() {

            return (this.target != null);

        }

 

        /**

         * Return the target Connection, fetching it and initializing it if necessary.

         */

        private Connection getTargetConnection(Method operation,Object[] args) throws SQLException {

 

            if (this.target == null) {

                String key = (String) args[0];

                // No target Connection held -> fetch one.

                if (logger.isDebugEnabled()) {

                    logger.debug("Connecting to database for operation '" + operation.getName() + "'");

                }

 

                // Fetch physical Connection from DataSource.

                this.target = (this.username != null) ?

                        determineTargetDataSource(key).getConnection(this.username, this.password) :

                        determineTargetDataSource(key).getConnection();

 

                // If we still lack default connection properties, check them now.

                //checkDefaultConnectionProperties(this.target);

 

                // Apply kept transaction settings, if any.

                if (this.readOnly.booleanValue()) {

                    this.target.setReadOnly(this.readOnly.booleanValue());

                }

                if (this.transactionIsolation != null) {

                    this.target.setTransactionIsolation(this.transactionIsolation.intValue());

                }

                if (this.autoCommit != null && this.autoCommit.booleanValue() != this.target.getAutoCommit()) {

                    this.target.setAutoCommit(this.autoCommit.booleanValue());

                }

            }

 

            else {

                // Target Connection already held -> return it.

                if (logger.isDebugEnabled()) {

                    logger.debug("Using existing database connection for operation '" + operation.getName() + "'");

                }

            }

 

            return this.target;

        }

    }

 

}

import javax.sql.DataSource;

import java.util.concurrent.ThreadLocalRandom;

import java.util.concurrent.atomic.AtomicLong;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

 

/**

 * Created by IDEA

 * User: mashaohua

 * Date: 2016-07-19 16:04

 * Desc:

 */

public class DynamicRoutingDataSourceProxy extends AbstractDynamicDataSourceProxy {

 

    private AtomicLong counter = new AtomicLong(0);

 

    private static final Long MAX_POOL = Long.MAX_VALUE;

 

    private final Lock lock = new ReentrantLock();

 

    @Override

    protected DataSource loadReadDataSource() {

        int index = 1;

 

        if(getReadDataSourcePollPattern() == 1) {

            //轮询方式

            long currValue = counter.incrementAndGet();

            if((currValue + 1) >= MAX_POOL) {

                try {

                    lock.lock();

                    if((currValue + 1) >= MAX_POOL) {

                        counter.set(0);

                    }

                } finally {

                    lock.unlock();

                }

            }

            index = (int) (currValue % getReadDsSize());

        } else {

            //随机方式

            index = ThreadLocalRandom.current().nextInt(0, getReadDsSize());

        }

        return getResolvedReadDataSources().get(index);

    }

}

import org.apache.ibatis.executor.statement.RoutingStatementHandler;

import org.apache.ibatis.executor.statement.StatementHandler;

import org.apache.ibatis.mapping.MappedStatement;

import org.apache.ibatis.mapping.SqlCommandType;

import org.apache.ibatis.plugin.*;

 

import java.sql.Connection;

import java.util.Properties;

 

/**

 * 拦截器

 */

@Intercepts({ @Signature(type = StatementHandler.class, method = "prepare", args = { Connection.class }) })

public class DynamicPlugin implements Interceptor {

 

    public Object intercept(Invocation invocation) throws Throwable {

 

 

        Connection conn = (Connection)invocation.getArgs()[0];

        //如果是采用了我们代理,则路由数据源

        if(conn instanceof com.autohome.api.dealer.tuan.dao.rwmybatis.ConnectionProxy){

            StatementHandler statementHandler = (StatementHandler) invocation

                    .getTarget();

 

            MappedStatement mappedStatement = null;

            if (statementHandler instanceof RoutingStatementHandler) {

                StatementHandler delegate = (StatementHandler) ReflectionUtils

                        .getFieldValue(statementHandler, "delegate");

                mappedStatement = (MappedStatement) ReflectionUtils.getFieldValue(

                        delegate, "mappedStatement");

            } else {

                mappedStatement = (MappedStatement) ReflectionUtils.getFieldValue(

                        statementHandler, "mappedStatement");

            }

            String key = AbstractDynamicDataSourceProxy.WRITE;

 

            if(mappedStatement.getSqlCommandType() == SqlCommandType.SELECT){

                key = AbstractDynamicDataSourceProxy.READ;

            }else{

                key = AbstractDynamicDataSourceProxy.WRITE;

            }

 

            ConnectionProxy connectionProxy = (ConnectionProxy)conn;

            connectionProxy.getTargetConnection(key);

 

        }

 

        return invocation.proceed();

 

    }

 

    public Object plugin(Object target) {

 

        return Plugin.wrap(target, this);

    }

 

    public void setProperties(Properties properties) {

        //NOOP

 

    }

 

}

import org.apache.ibatis.logging.Log;

import org.apache.ibatis.logging.LogFactory;

 

import java.lang.reflect.*;

 

public class ReflectionUtils {

 

    private static final Log logger = LogFactory.getLog(ReflectionUtils.class);

 

    /**

     * 直接设置对象属性值,无视private/protected修饰符,不经过setter函数.

     */

    public static void setFieldValue(final Object object, final String fieldName, final Object value) {

        Field field = getDeclaredField(object, fieldName);

 

        if (field == null)

            throw new IllegalArgumentException("Could not find field [" + fieldName + "] on target [" + object + "]");

 

        makeAccessible(field);

 

        try {

            field.set(object, value);

        } catch (IllegalAccessException e) {

 

        }

    }

 

    /**

     * 直接读取对象属性值,无视private/protected修饰符,不经过getter函数.

     */

    public static Object getFieldValue(final Object object, final String fieldName) {

        Field field = getDeclaredField(object, fieldName);

 

        if (field == null)

            throw new IllegalArgumentException("Could not find field [" + fieldName + "] on target [" + object + "]");

 

        makeAccessible(field);

 

        Object result = null;

        try {

            result = field.get(object);

        } catch (IllegalAccessException e) {

 

        }

        return result;

    }

 

    /**

     * 直接调用对象方法,无视private/protected修饰符.

     */

    public static Object invokeMethod(final Object object, final String methodName, final Class<?>[] parameterTypes,

            final Object[] parameters) throws InvocationTargetException {

        Method method = getDeclaredMethod(object, methodName, parameterTypes);

        if (method == null)

            throw new IllegalArgumentException("Could not find method [" + methodName + "] on target [" + object + "]");

 

        method.setAccessible(true);

 

        try {

            return method.invoke(object, parameters);

        } catch (IllegalAccessException e) {

 

        }

 

        return null;

    }

 

    /**

     * 循环向上转型,获取对象的DeclaredField.

     */

    protected static Field getDeclaredField(final Object object, final String fieldName) {

        for (Class<?> superClass = object.getClass(); superClass != Object.class; superClass = superClass

                .getSuperclass()) {

            try {

                return superClass.getDeclaredField(fieldName);

            } catch (NoSuchFieldException e) {

            }

        }

        return null;

    }

 

    /**

     * 循环向上转型,获取对象的DeclaredField.

     */

    protected static void makeAccessible(final Field field) {

        if (!Modifier.isPublic(field.getModifiers()) || !Modifier.isPublic(field.getDeclaringClass().getModifiers())) {

            field.setAccessible(true);

        }

    }

 

    /**

     * 循环向上转型,获取对象的DeclaredMethod.

     */

    protected static Method getDeclaredMethod(Object object, String methodName, Class<?>[] parameterTypes) {

        for (Class<?> superClass = object.getClass(); superClass != Object.class; superClass = superClass

                .getSuperclass()) {

            try {

                return superClass.getDeclaredMethod(methodName, parameterTypes);

            } catch (NoSuchMethodException e) {

 

            }

        }

        return null;

    }

 

    /**

     * 通过反射,获得Class定义中声明的父类的泛型参数的类型.

     * eg.

     * public UserDao extends HibernateDao<User>

     *

     * @param clazz The class to introspect

     * @return the first generic declaration, or Object.class if cannot be determined

     */

    @SuppressWarnings("unchecked")

    public static <T> Class<T> getSuperClassGenricType(final Class clazz) {

        return getSuperClassGenricType(clazz, 0);

    }

 

    /**

     * 通过反射,获得Class定义中声明的父类的泛型参数的类型.

     * eg.

     * public UserDao extends HibernateDao<User>

     *

     * @param clazz The class to introspect

     * @return the first generic declaration, or Object.class if cannot be determined

     */

    @SuppressWarnings("unchecked")

    public static Class getSuperClassGenricType(final Class clazz, final int index) {

 

        Type genType = clazz.getGenericSuperclass();

 

        if (!(genType instanceof ParameterizedType)) {

            logger.warn(clazz.getSimpleName() + "'s superclass not ParameterizedType");

            return Object.class;

        }

 

        Type[] params = ((ParameterizedType) genType).getActualTypeArguments();

 

        if (index >= params.length || index < 0) {

            logger.warn("Index: " + index + ", Size of " + clazz.getSimpleName() + "'s Parameterized Type: "

                    + params.length);

            return Object.class;

        }

        if (!(params[index] instanceof Class)) {

            logger.warn(clazz.getSimpleName() + " not set the actual class on superclass generic parameter");

            return Object.class;

        }

 

        return (Class) params[index];

    }

 

    /**

     * 将反射时的checked exception转换为unchecked exception.

     */

    public static IllegalArgumentException convertToUncheckedException(Exception e) {

        if (e instanceof IllegalAccessException || e instanceof IllegalArgumentException

                || e instanceof NoSuchMethodException)

            return new IllegalArgumentException("Refelction Exception.", e);

        else

            return new IllegalArgumentException(e);

    }

}

<?xml version="1.0" encoding="UTF-8"?>

<!DOCTYPE configuration PUBLIC "-//mybatis.org//DTD SQL Map Config 3.0//EN"  

    "http://mybatis.org/dtd/mybatis-3-config.dtd">

<configuration>

    <plugins>

        <plugin interceptor="com.test.api.dao.mybatis.DynamicPlugin">

        </plugin>

    </plugins>

 

</configuration>

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

       xmlns:tx="http://www.springframework.org/schema/tx"

       xmlns:aop="http://www.springframework.org/schema/aop"

       xsi:schemaLocation="http://www.springframework.org/schema/beans

       http://www.springframework.org/schema/beans/spring-beans-4.1.xsd

       http://www.springframework.org/schema/tx

       http://www.springframework.org/schema/tx/spring-tx-4.1.xsd

       http://www.springframework.org/schema/aop

       http://www.springframework.org/schema/aop/spring-aop-4.1.xsd">

 

    <bean id="abstractDataSource" abstract="true" class="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close">

        <property name="driverClassName" value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>

        <!-- 配置获取连接等待超时的时间 -->

        <property name="maxWait" value="60000"/>

        <!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->

        <property name="timeBetweenEvictionRunsMillis" value="60000"/>

        <!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->

        <property name="minEvictableIdleTimeMillis" value="300000"/>

        <property name="validationQuery" value="SELECT 'x'"/>

        <property name="testWhileIdle" value="true"/>

        <property name="testOnBorrow" value="false"/>

        <property name="testOnReturn" value="false"/>

        <!-- 打开PSCache,并且指定每个连接上PSCache的大小 -->

        <property name="poolPreparedStatements" value="true"/>

        <property name="maxPoolPreparedStatementPerConnectionSize" value="20"/>

        <property name="filters" value="config"/>

        <property name="connectionProperties" value="config.decrypt=true" />

    </bean>

 

    <bean id="dataSourceRead1" parent="abstractDataSource">

        <property name="driverClassName" value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>

        <!-- 基本属性 url、user、password -->

        <property name="url" value="${read1.jdbc.url}"/>

        <property name="username" value="${read1.jdbc.user}"/>

        <property name="password" value="${read1.jdbc.password}"/>

        <!-- 配置初始化大小、最小、最大 -->

        <property name="initialSize" value="${read1.jdbc.initPoolSize}"/>

        <property name="minIdle" value="${read1.jdbc.minPoolSize}"/>

        <property name="maxActive" value="${read1.jdbc.maxPoolSize}"/>

    </bean>

 

    <bean id="dataSourceRead2" parent="abstractDataSource">

        <property name="driverClassName" value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>

        <!-- 基本属性 url、user、password -->

        <property name="url" value="${read2.jdbc.url}"/>

        <property name="username" value="${read2.jdbc.user}"/>

        <property name="password" value="${read2.jdbc.password}"/>

        <!-- 配置初始化大小、最小、最大 -->

        <property name="initialSize" value="${read2.jdbc.initPoolSize}"/>

        <property name="minIdle" value="${read2.jdbc.minPoolSize}"/>

        <property name="maxActive" value="${read2.jdbc.maxPoolSize}"/>

    </bean>

 

    <bean id="dataSourceWrite" parent="abstractDataSource">

        <property name="driverClassName" value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>

        <!-- 基本属性 url、user、password -->

        <property name="url" value="${write.jdbc.url}"/>

        <property name="username" value="${write.jdbc.user}"/>

        <property name="password" value="${write.jdbc.password}"/>

        <!-- 配置初始化大小、最小、最大 -->

        <property name="initialSize" value="${write.jdbc.initPoolSize}"/>

        <property name="minIdle" value="${write.jdbc.minPoolSize}"/>

        <property name="maxActive" value="${write.jdbc.maxPoolSize}"/>

    </bean>

 

    <bean id="dataSource" class="com.test.api.dao.datasource.DynamicRoutingDataSourceProxy">

        <property name="writeDataSource" ref="dataSourceWrite" />

        <property name="readDataSources">

            <list>

                <ref bean="dataSourceRead1" />

                <ref bean="dataSourceRead2" />

            </list>

        </property>

        <!--轮询方式-->

        <property name="readDataSourcePollPattern" value="1" />

    </bean>

 

    <tx:annotation-driven transaction-manager="transactionManager"/>

 

    <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">

        <property name="dataSource" ref="dataSource"/>

    </bean>

 

    <!-- 针对myBatis的配置项 -->

    <!-- 配置sqlSessionFactory -->

    <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">

        <!-- 实例化sqlSessionFactory时需要使用上述配置好的数据源以及SQL映射文件 -->

        <property name="dataSource" ref="dataSource"/>

        <property name="mapperLocations" value="classpath:mapper/*.xml"/>

        <property name="configLocation" value="classpath:mybatis-plugin-config.xml" />

    </bean>

 

    <bean id="sqlSessionTemplate" class="org.mybatis.spring.SqlSessionTemplate">

        <constructor-arg ref="sqlSessionFactory" />

    </bean>

    <!-- 通过扫描的模式,扫描目录下所有的mapper, 根据对应的mapper.xml为其生成代理类-->

    <bean id="mapper" class="org.mybatis.spring.mapper.MapperScannerConfigurer">

        <property name="basePackage" value="com.test.api.dao.inte" />

        <property name="sqlSessionTemplate" ref="sqlSessionTemplate"></property>

    </bean>

 

</beans>

方案4

如果你的后台结构是spring+mybatis,可以通过spring的AbstractRoutingDataSource和mybatis Plugin拦截器实现非常友好的读写分离,原有代码不需要任何改变。推荐第四种方案

import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;

 

import java.util.HashMap;

import java.util.Map;

 

/**

 * Created by IDEA

 * User: mashaohua

 * Date: 2016-07-14 10:56

 * Desc: 动态数据源实现读写分离

 */

public class DynamicDataSource extends AbstractRoutingDataSource {

 

    private Object writeDataSource; //写数据源

 

    private Object readDataSource; //读数据源

 

    @Override

    public void afterPropertiesSet() {

        if (this.writeDataSource == null) {

            throw new IllegalArgumentException("Property 'writeDataSource' is required");

        }

        setDefaultTargetDataSource(writeDataSource);

        Map<Object, Object> targetDataSources = new HashMap<>();

        targetDataSources.put(DynamicDataSourceGlobal.WRITE.name(), writeDataSource);

        if(readDataSource != null) {

            targetDataSources.put(DynamicDataSourceGlobal.READ.name(), readDataSource);

        }

        setTargetDataSources(targetDataSources);

        super.afterPropertiesSet();

    }

 

    @Override

    protected Object determineCurrentLookupKey() {

 

        DynamicDataSourceGlobal dynamicDataSourceGlobal = DynamicDataSourceHolder.getDataSource();

 

        if(dynamicDataSourceGlobal == null

                || dynamicDataSourceGlobal == DynamicDataSourceGlobal.WRITE) {

            return DynamicDataSourceGlobal.WRITE.name();

        }

 

        return DynamicDataSourceGlobal.READ.name();

    }

 

    public void setWriteDataSource(Object writeDataSource) {

        this.writeDataSource = writeDataSource;

    }

 

    public Object getWriteDataSource() {

        return writeDataSource;

    }

 

    public Object getReadDataSource() {

        return readDataSource;

    }

 

    public void setReadDataSource(Object readDataSource) {

        this.readDataSource = readDataSource;

    }

}

/**

 * Created by IDEA

 * User: mashaohua

 * Date: 2016-07-14 10:58

 * Desc:

 */

public enum DynamicDataSourceGlobal {

    READ, WRITE;

}

public final class DynamicDataSourceHolder {

 

    private static final ThreadLocal<DynamicDataSourceGlobal> holder = new ThreadLocal<DynamicDataSourceGlobal>();

 

    private DynamicDataSourceHolder() {

        //

    }

 

    public static void putDataSource(DynamicDataSourceGlobal dataSource){

        holder.set(dataSource);

    }

 

    public static DynamicDataSourceGlobal getDataSource(){

        return holder.get();

    }

 

    public static void clearDataSource() {

        holder.remove();

    }

 

}

import org.springframework.jdbc.datasource.DataSourceTransactionManager;

import org.springframework.transaction.TransactionDefinition;

 

/**

 * Created by IDEA

 * User: mashaohua

 * Date: 2016-08-10 14:34

 * Desc:

 */

public class DynamicDataSourceTransactionManager extends DataSourceTransactionManager {

 

    /**

     * 只读事务到读库,读写事务到写库

     * @param transaction

     * @param definition

     */

    @Override

    protected void doBegin(Object transaction, TransactionDefinition definition) {

 

        //设置数据源

        boolean readOnly = definition.isReadOnly();

        if(readOnly) {

            DynamicDataSourceHolder.putDataSource(DynamicDataSourceGlobal.READ);

        } else {

            DynamicDataSourceHolder.putDataSource(DynamicDataSourceGlobal.WRITE);

        }

        super.doBegin(transaction, definition);

    }

 

    /**

     * 清理本地线程的数据源

     * @param transaction

     */

    @Override

    protected void doCleanupAfterCompletion(Object transaction) {

        super.doCleanupAfterCompletion(transaction);

        DynamicDataSourceHolder.clearDataSource();

    }

}

import org.apache.ibatis.executor.Executor;

import org.apache.ibatis.executor.keygen.SelectKeyGenerator;

import org.apache.ibatis.mapping.BoundSql;

import org.apache.ibatis.mapping.MappedStatement;

import org.apache.ibatis.mapping.SqlCommandType;

import org.apache.ibatis.plugin.*;

import org.apache.ibatis.session.ResultHandler;

import org.apache.ibatis.session.RowBounds;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.transaction.support.TransactionSynchronizationManager;

 

import java.util.Locale;

import java.util.Map;

import java.util.Properties;

import java.util.concurrent.ConcurrentHashMap;

 

/**

 * Created by IDEA

 * User: mashaohua

 * Date: 2016-08-10 11:09

 * Desc:

 */

@Intercepts({

@Signature(type = Executor.class, method = "update", args = {

        MappedStatement.class, Object.class }),

@Signature(type = Executor.class, method = "query", args = {

        MappedStatement.class, Object.class, RowBounds.class,

        ResultHandler.class }) })

public class DynamicPlugin implements Interceptor {

 

    protected static final Logger logger = LoggerFactory.getLogger(DynamicPlugin.class);

 

    private static final String REGEX = ".*insert\\u0020.*|.*delete\\u0020.*|.*update\\u0020.*";

 

    private static final Map<String, DynamicDataSourceGlobal> cacheMap = new ConcurrentHashMap<>();

 

    @Override

    public Object intercept(Invocation invocation) throws Throwable {

 

        boolean synchronizationActive = TransactionSynchronizationManager.isSynchronizationActive();

        if(!synchronizationActive) {

            Object[] objects = invocation.getArgs();

            MappedStatement ms = (MappedStatement) objects[0];

 

            DynamicDataSourceGlobal dynamicDataSourceGlobal = null;

 

            if((dynamicDataSourceGlobal = cacheMap.get(ms.getId())) == null) {

                //读方法

                if(ms.getSqlCommandType().equals(SqlCommandType.SELECT)) {

                    //!selectKey 为自增id查询主键(SELECT LAST_INSERT_ID() )方法,使用主库

                    if(ms.getId().contains(SelectKeyGenerator.SELECT_KEY_SUFFIX)) {

                        dynamicDataSourceGlobal = DynamicDataSourceGlobal.WRITE;

                    } else {

                        BoundSql boundSql = ms.getSqlSource().getBoundSql(objects[1]);

                        String sql = boundSql.getSql().toLowerCase(Locale.CHINA).replaceAll("[\\t\\n\\r]", " ");

                        if(sql.matches(REGEX)) {

                            dynamicDataSourceGlobal = DynamicDataSourceGlobal.WRITE;

                        } else {

                            dynamicDataSourceGlobal = DynamicDataSourceGlobal.READ;

                        }

                    }

                }else{

                    dynamicDataSourceGlobal = DynamicDataSourceGlobal.WRITE;

                }

                logger.warn("设置方法[{}] use [{}] Strategy, SqlCommandType [{}]..", ms.getId(), dynamicDataSourceGlobal.name(), ms.getSqlCommandType().name());

                cacheMap.put(ms.getId(), dynamicDataSourceGlobal);

            }

            DynamicDataSourceHolder.putDataSource(dynamicDataSourceGlobal);

        }

 

        return invocation.proceed();

    }

 

    @Override

    public Object plugin(Object target) {

        if (target instanceof Executor) {

            return Plugin.wrap(target, this);

        } else {

            return target;

        }

    }

 

    @Override

    public void setProperties(Properties properties) {

        //

    }

http://blog.csdn.net/xwnxwn/article/details/53893972

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics