首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

spring动态路由实践二

2012-11-05 
spring动态路由实践2? ?前一节总结了如何配置动态路由,本节讨论如何在同一事物中访问不同数据库即分库事物

spring动态路由实践2

? ?前一节总结了如何配置动态路由,本节讨论如何在同一事物中访问不同数据库即分库事物的实现。

? ?由于同一个事物只能绑定一个数据源连接,当切换数据源时需要解除老数据源连接的绑定,将新数据源绑定到当前线程,访问完毕后在将老数据源绑定回线程。

datasource-config.xml如下:

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context"

xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://www.springframework.org/schema/beans ?

http://www.springframework.org/schema/beans/spring-beans-2.5.xsd ?

http://www.springframework.org/schema/context ?

?http://www.springframework.org/schema/context/spring-context-2.5.xsd ?

http://www.springframework.org/schema/aop ??

http://www.springframework.org/schema/aop/spring-aop-2.5.xsd ?

http://www.springframework.org/schema/tx ??

http://www.springframework.org/schema/tx/spring-tx-2.5.xsd">

?

<!-- AOP aspectj配置 可用于异常处理,权限,及参数验证等 -->

<aop:aspectj-autoproxy proxy-target-/>

<bean id="multiDbAspect" ref="dataSource" />

</bean>

?

?

<!-- 数据源配置 -->

<bean id="dataSourceFirst" value="oracle.jdbc.driver.OracleDriver" />

<property name="url" value="jdbc:oracle:thin:@10.20.151.4:1521:ptdev" />

<property name="username" value="pt" />

<property name="password" value="pt" />

<property name="maxActive" value="200" />

<property name="maxIdle" value="5" />

<property name="poolPreparedStatements" value="true" />

<property name="removeAbandoned" value="true" />

<property name="removeAbandonedTimeout" value="300" />

</bean>

?

<bean id="dataSourceSecond" value="oracle.jdbc.driver.OracleDriver" />

<property name="url" value="jdbc:oracle:thin:@10.20.151.12:1521:pt10g" />

<property name="username" value="pt" />

<property name="password" value="pt" />

<property name="maxActive" value="200" />

<property name="maxIdle" value="5" />

<property name="poolPreparedStatements" value="true" />

<property name="removeAbandoned" value="true" />

<property name="removeAbandonedTimeout" value="300" />

</bean>

?

<bean id="dataSource" value-ref="dataSourceFirst" />

<entry key="2" value-ref="dataSourceSecond" />

</map>

</property>

<property name="defaultTargetDataSource">

<ref local="dataSourceFirst" />

</property>

</bean>

?

<bean id="transactionManager"

/>

</property>

</bean>

?

<bean id="lobHandler" />

?

<bean id="sqlMapClient" ref="dataSource" />

<property name="lobHandler" ref="lobHandler" />

<property name="configLocations" value="classpath*:/ibatis/config/sql-map.xml" />

</bean>

?

<bean id="txAttributeSource"

/>

</property>

</bean>

?

<bean id="transactionDefinition"

/>

</property>

<property name="transactionAttributeSource">

<ref bean="txAttributeSource" />

</property>

<!--若不设置该属性,则运行main方法会报:

Exception in thread "main" java.lang.ClassCastException: $Proxy5 cannot be cast to com.service.impl.DbInfoServiceImpl

at com.transaction.TransactionTest.main(TransactionTest.java:16)

错误-->

<property name="proxyTargetClass" value="true">

</property>

</bean>

?

<bean id="baseDao" />

</property>

<property name="dataSource">

<ref bean="dataSource" />

</property>

</bean>

<--配置事务-->

<bean id="dbInfoService" parent="transactionDefinition">

<property name="target">

<bean ref="dbInfoDao" />

</bean>

</property>

</bean>

<bean id="dbInfoDao" ref="baseDao"></property>

</bean>

</beans>

?

com.service.dao.impl.DbInfoDaoImpl类如下:

package com.service.dao.impl;

?

import java.util.List;

import java.util.Map;

?

import com.common.dao.BaseDao;

import com.common.dao.model.User;

import com.service.dao.DbInfoDao;

?

public class DbInfoDaoImpl implements DbInfoDao{

public BaseDao baseDao;

?

public void setBaseDao(BaseDao baseDao) {

this.baseDao = baseDao;

}

?

public List<Map<String, Object>> addUserInfo(User user, Integer db) {

baseDao.add("login.addUser", user);

List<Map<String, Object>> result = baseDao.getList("login.getUserInfo",

user.getName());

return result;

}

}


com.service.impl.DbInfoServiceImpl类如下:package com.service.impl;
import java.util.List;import java.util.Map;
import com.common.dao.model.User;import com.service.DbInfoService;import com.service.dao.DbInfoDao;
public class DbInfoServiceImpl implements DbInfoService{public DbInfoDao dbInfoDao;public void setDbInfoDao(DbInfoDao dbInfoDao) {this.dbInfoDao = dbInfoDao;}
public List<Map<String,Object>> getUserInfo(User user){? ? ? ? ? ? ? ? //数据源1List<Map<String,Object>> result1=dbInfoDao.addUserInfo(user,1);? ? ? ? ? ? ? ? //数据源2List<Map<String,Object>> result2=dbInfoDao.addUserInfo(user,2);
return result1;}
}
切面类com.aop.MultiDbAspect:package com.aop;
import java.sql.Connection;import java.sql.SQLException;import java.util.ArrayList;import java.util.List;
import javax.sql.DataSource;
import org.apache.log4j.Logger;import org.aspectj.lang.ProceedingJoinPoint;import org.aspectj.lang.annotation.Around;import org.aspectj.lang.annotation.Aspect;import org.springframework.jdbc.datasource.ConnectionHolder;import org.springframework.jdbc.datasource.DataSourceUtils;import org.springframework.jdbc.datasource.DelegatingDataSource;import org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy;import org.springframework.transaction.UnexpectedRollbackException;import org.springframework.transaction.support.DefaultTransactionDefinition;import org.springframework.transaction.support.TransactionSynchronization;import org.springframework.transaction.support.TransactionSynchronizationAdapter;import org.springframework.transaction.support.TransactionSynchronizationManager;
import com.common.bean.Shard;import com.common.bean.ThreadInfoHolder;import com.common.bean.RoutingDataSource;@Aspectpublic class MultiDbAspect { public final static long NOT_FIND = -1L; ? ?private final Logger ? ? logger ? = Logger.getLogger(MultiDbAspect.class); ? ?private DataSource ? ? ? dataSource; ? ? ? ?@Around("execution (* com.service.dao.impl.*Impl.*(..))") ? ?public Object getDataFromAllDb(ProceedingJoinPoint jointPoint) throws Throwable { ? ? ? ?List<Object> returnList = new ArrayList<Object>(); ? ? ? ?Object[] args = jointPoint.getArgs(); ? ? ? ?Integer db=1; ? ? ? ?for (Object arg : args) { ? ? ? ? ? ?if (arg != null && Integer.class.isAssignableFrom(arg.getClass())) { ? ? ? ? ? ?db = (Integer) arg; ? ? ? ? ? ? ? ?break; ? ? ? ? ? ?} ? ? ? ?}? ? ? ? ? ? ? ? ? ? //设置数据源 ? ? ? ? ? ?Shard shard = new Shard(); ? ? ? ? ? ?shard.setDbId(db.intValue());
? ? ? ? ? ?//动态数据源同步对象 ? ? ? ? ? ?DynamicTransactionSynchonization syn = new DynamicTransactionSynchonization(); ? ? ? ? ? ?try { ? ? ? ? ? ? ? ?syn.initDynamicRouting(shard); ? ? ? ? ? ? ? ?Object retVal = jointPoint.proceed(); ? ? ? ? ? ? ? ?returnList.add(retVal); ? ? ? ? ? ?} catch (Throwable e) { ? ? ? ? ? ? ? ?this.logger.error(e.getMessage(), e); ? ? ? ? ? ? ? ?throw e; ? ? ? ? ? ?} finally { ? ? ? ? ? ? ? ?syn.endDynamicRouting(); ? ? ? ? ? ?} ? ? ? ?return returnList;
? ?}

? ?private Shard getShard(Long accountId, List<Shard> shards) { ? ? ? ?for (Shard shard : shards) { ? ? ? ? ? ?if (shard.getAccountId().longValue() == accountId.longValue()) { ? ? ? ? ? ? ? ?return shard; ? ? ? ? ? ?} ? ? ? ?}
? ? ? ?return null; ? ?}

? ?/** ? ? * 动态事务源同步类,以实现跟其它数据源事务同步,实现类JTA功能 ? ? *? ? ? * @author xiaoming.niexm ? ? */ ? ?final private class DynamicTransactionSynchonization extends TransactionSynchronizationAdapter { ? ? ? ?private Logger ? ? ? ? ? ? ? ? ? ? ? ? ? logger = Logger ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?.getLogger(DynamicTransactionSynchonization.class);
? ? ? ?private ConnectionHolder ? ? ? ? ? ? ? ? connectionHolder;
? ? ? ?private int ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?order;
? ? ? ?private ConnectionHolder ? ? ? ? ? ? ? ? oldConnectionHolder;
? ? ? ?private List<TransactionSynchronization> oldSynchronizations;
? ? ? ?public int getOrder() { ? ? ? ? ? ?return this.order; ? ? ? ?}
? ? ? ?public DynamicTransactionSynchonization() { ? ? ? ? ? ?this.order = this.getConnectionSynchronizationOrder(MultiDbAspect.this.dataSource); ? ? ? ?}
? ? ? ?/** ? ? ? ? * 暂停线程上绑定的事务资源,如Connection ? ? ? ? * 分库访问时,需要暂时本线程上的已有事务资源,从DataSource上获取新Connection(否则只能拿到原事务connection), ? ? ? ? */ ? ? ? ?@SuppressWarnings("unchecked") ? ? ? ?private void suspendThreadTransactionResource() { ? ? ? ? ? ?//获取线程绑定的事务资源 ? ? ? ? ? ?this.oldConnectionHolder = (ConnectionHolder) TransactionSynchronizationManager ? ? ? ? ? ? ? ? ? ?.getResource(dataSource); ? ? ? ? ? ?this.oldSynchronizations = TransactionSynchronizationManager.getSynchronizations();
? ? ? ? ? ?//清除线程绑定的事务资源,之后操作须从DataSource获取新的Connection ? ? ? ? ? ?if (oldConnectionHolder != null) { ? ? ? ? ? ? ? ?TransactionSynchronizationManager.unbindResource(dataSource); ? ? ? ? ? ?}
? ? ? ?}
? ? ? ?/** ? ? ? ? * 根据shard指定的数据源动态获取Connection ? ? ? ? *? ? ? ? ? * @throws SQLException ? ? ? ? */ ? ? ? ?private void initDynamicRouting1(Shard shard) throws SQLException { ? ? ? ?ThreadInfoHolder.addCurrentThreadShard(shard); ? ? ? ?} ? ? ? ?private void initDynamicRouting(Shard shard) throws SQLException { ? ? ? ? ? ?//在当前线程threadLocalMap里面,存储分片信息,以便PtRoutingDataSource获取 ? ? ? ? ? ?ThreadInfoHolder.addCurrentThreadShard(shard);
? ? ? ? ? ?//业务方法已配置事务 ? ? ? ? ? ?if (TransactionSynchronizationManager.hasResource(dataSource)) { ? ? ? ? ? ? ? ?String currentThreadDb = getCurrentDbId(); ? ? ? ? ? ? ? ?String changeToDb = ((RoutingDataSource) dataSource).getTargetDbId();
? ? ? ? ? ? ? ?//将要切换的DB为当前DB,不做事务资源切换,等同于默认的require事务 ? ? ? ? ? ? ? ?if (currentThreadDb != null && currentThreadDb.equals(changeToDb)) { ? ? ? ? ? ? ? ? ? ?return; ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ?Integer isolationLevel = TransactionSynchronizationManager ? ? ? ? ? ? ? ? ? ? ? ?.getCurrentTransactionIsolationLevel();
? ? ? ? ? ? ? ?//暂停线程上绑定的事务资源,如Connection ? ? ? ? ? ? ? ?this.suspendThreadTransactionResource();
? ? ? ? ? ? ? ?//事务数据源标志 ? ? ? ? ? ? ? ?boolean transactionAware = (dataSource instanceof TransactionAwareDataSourceProxy);
? ? ? ? ? ? ? ?//根据 上一步设置的shard信息,根据shard指定的数据源动态获取Connection(会自动绑定到线程,调用XxxDao.xxx()方法访问数据库时,使用绑定的Connection) ? ? ? ? ? ? ? ?if (transactionAware) { ? ? ? ? ? ? ? ? ? ?dataSource.getConnection(); ? ? ? ? ? ? ? ?} else { ? ? ? ? ? ? ? ? ? ?DataSourceUtils.doGetConnection(dataSource); ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ?//获取上一步绑定到线程的事务资源 ? ? ? ? ? ? ? ?this.connectionHolder = (ConnectionHolder) TransactionSynchronizationManager ? ? ? ? ? ? ? ? ? ? ? ?.getResource(dataSource);
? ? ? ? ? ? ? ?if (logger.isDebugEnabled()) { ? ? ? ? ? ? ? ? ? ?Connection con = this.connectionHolder.getConnection(); ? ? ? ? ? ? ? ? ? ?logger.debug("PT change con:" + con.toString() + " URL:" ? ? ? ? ? ? ? ? ? ? ? ? ? ?+ con.getMetaData().getURL()); ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ?//自动创建的connection为autocommit,须设置Connection属性后重新绑定 ? ? ? ? ? ? ? ?TransactionSynchronizationManager.unbindResource(dataSource);
? ? ? ? ? ? ? ?//设置Connection连接属性 ?? ? ? ? ? ? ? ? ?if (isolationLevel != null) { ? ? ? ? ? ? ? ? ? ?DefaultTransactionDefinition txnDef = new DefaultTransactionDefinition(); ? ? ? ? ? ? ? ? ? ?txnDef.setIsolationLevel(isolationLevel); ? ? ? ? ? ? ? ? ? ?DataSourceUtils.prepareConnectionForTransaction(this.connectionHolder ? ? ? ? ? ? ? ? ? ? ? ? ? ?.getConnection(), txnDef); ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ?this.connectionHolder.getConnection().setAutoCommit(false);
? ? ? ? ? ? ? ?//重新绑定 ? ? ? ? ? ? ? ?TransactionSynchronizationManager.bindResource(dataSource, this.connectionHolder);
? ? ? ? ? ? ? ?//主要删除新connection的Synchronization ? ? ? ? ? ? ? ?if (TransactionSynchronizationManager.isSynchronizationActive()) { ? ? ? ? ? ? ? ? ? ?TransactionSynchronizationManager.clearSynchronization(); ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ?//向业务方法事务注册同步对象,以便业务方法事务提交/回滚前,调用本类动态创建的Connection相关事务 ? ? ? ? ? ? ? ?TransactionSynchronizationManager.initSynchronization(); ? ? ? ? ? ? ? ?TransactionSynchronizationManager.registerSynchronization(this);
? ? ? ? ? ?} ? ? ? ? ? ?//else 业务方法没有配置事务,Ibatis执行时会自动从连接池获取一个新的autocommit Connection
? ? ? ?}
? ? ? ?/** ? ? ? ? * 动态获取Datasource连接操作结束后,恢复 ? ? ? ? */ ? ? ? ?private void endDynamicRouting() { ? ? ? ? ? ?//清空当前线程threadLocalMap里面存储的分片信息,防止影响线程其它方法调用 ? ? ? ? ? ?ThreadInfoHolder.cleanCurrentThreadShard();
? ? ? ? ? ?//恢复initDynamicRouting()暂停的事务资源,如Connection(业务事务资源) ? ? ? ? ? ?if (this.oldConnectionHolder != null) { ? ? ? ? ? ? ? ?//解除initDynamicRouting()方法中绑定的动态数据源(注,提交、回滚操作在外层事务提交,回滚时自动调用beforeCompletion()完成,清除资源自动调用DataSourceUtils.ConnectionSynchronization.afterCompletion() ? ? ? ? ? ? ? ?TransactionSynchronizationManager.unbindResource(dataSource);
? ? ? ? ? ? ? ?TransactionSynchronizationManager.bindResource(dataSource, oldConnectionHolder); ? ? ? ? ? ?}
? ? ? ? ? ?if (TransactionSynchronizationManager.isSynchronizationActive() ? ? ? ? ? ? ? ? ? ?&& this.oldSynchronizations != null && this.oldSynchronizations.size() > 0) { ? ? ? ? ? ? ? ?for (TransactionSynchronization oldSynchronization : oldSynchronizations) { ? ? ? ? ? ? ? ? ? ?TransactionSynchronizationManager.registerSynchronization(oldSynchronization); ? ? ? ? ? ? ? ?} ? ? ? ? ? ?}
? ? ? ?}
? ? ? ?/** ? ? ? ? * 动态事务提交 ? ? ? ? */ ? ? ? ?private void commit() throws UnexpectedRollbackException { ? ? ? ? ? ?try { ? ? ? ? ? ? ? ?this.connectionHolder.getConnection().commit(); ? ? ? ? ? ?} catch (SQLException e) { ? ? ? ? ? ? ? ?String msg = "Commit dynamic transaction error:" + e.getMessage(); ? ? ? ? ? ? ? ?logger.error(msg, e); ? ? ? ? ? ? ? ?throw new UnexpectedRollbackException(msg, e); ? ? ? ? ? ?}
? ? ? ?}
? ? ? ?/** ? ? ? ? * 动态事务回滚 ? ? ? ? */ ? ? ? ?private void rollback() throws UnexpectedRollbackException { ? ? ? ? ? ?try { ? ? ? ? ? ? ? ?this.connectionHolder.getConnection().rollback(); ? ? ? ? ? ?} catch (SQLException e) { ? ? ? ? ? ? ? ?String msg = "Rollback dynamic transaction error:" + e.getMessage(); ? ? ? ? ? ? ? ?logger.error(msg, e); ? ? ? ? ? ? ? ?throw new UnexpectedRollbackException(msg, e); ? ? ? ? ? ?} ? ? ? ?}
? ? ? ?/** ? ? ? ? * 获取外层事务资源 ? ? ? ? *? ? ? ? ? * @return ? ? ? ? */ ? ? ? ?private ConnectionHolder getOutTxnCollectionHolder() { ? ? ? ? ? ?return (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource); ? ? ? ?}
? ? ? ?/** ? ? ? ? * 获取外层事务是否已回滚 ? ? ? ? *? ? ? ? ? * @return ? ? ? ? */ ? ? ? ?private boolean isOutTransactionRollbackOnly() { ? ? ? ? ? ?ConnectionHolder outTxnHolder = this.getOutTxnCollectionHolder(); ? ? ? ? ? ?return (outTxnHolder != null && outTxnHolder.isRollbackOnly()); ? ? ? ?}
? ? ? ?/** ? ? ? ? * 提交、回滚动态创建的事务(外层事务提交、回滚之前调用本方法) ? ? ? ? */ ? ? ? ?@Override ? ? ? ?public void beforeCompletion() { ? ? ? ? ? ?//获取外层事务(PowerTrace 系统XXXService.xxx()方法的事务)提交、回滚状态 ? ? ? ? ? ?boolean isOutTxnRollback = this.isOutTransactionRollbackOnly();
? ? ? ? ? ?//业务逻辑出错需要回滚,动态创建的事务也要跟着回滚 ? ? ? ? ? ?if (isOutTxnRollback) { ? ? ? ? ? ? ? ?this.rollback();
? ? ? ? ? ?} ? ? ? ? ? ?//业务逻辑正常完成提交,动态创建的事务也要跟着提交 ? ? ? ? ? ?else { ? ? ? ? ? ? ? ?this.commit();// ? ? ? ? ? ?} ? ? ? ?}
? ? ? ?@Override ? ? ? ?public void afterCompletion(int status) { ? ? ? ? ? ?if (this.connectionHolder != null) { ? ? ? ? ? ? ? ?Connection con = null; ? ? ? ? ? ? ? ?String url = null; ? ? ? ? ? ? ? ?try { ? ? ? ? ? ? ? ? ? ?con = this.connectionHolder.getConnection();
? ? ? ? ? ? ? ? ? ?if (con != null) { ? ? ? ? ? ? ? ? ? ? ? ?url = con.getMetaData().getURL();
? ? ? ? ? ? ? ? ? ? ? ?if (logger.isDebugEnabled()) { ? ? ? ? ? ? ? ? ? ? ? ? ? ?logger.debug("PT release con:" + con.toString() + " ,URL:" + url); ? ? ? ? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ? ? ? ? ?connectionHolder.released(); ? ? ? ? ? ? ? ? ? ? ? ?DataSourceUtils.releaseConnection(con, dataSource); ? ? ? ? ? ? ? ? ? ?} ? ? ? ? ? ? ? ?} catch (SQLException e) { ? ? ? ? ? ? ? ? ? ?logger.error("Close connection error:" + con.toString() + " ,URL:" + url, e); ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ?this.connectionHolder.reset(); ? ? ? ? ? ? ? ?this.connectionHolder = null; ? ? ? ? ? ?} ? ? ? ?}
? ? ? ?private String getCurrentDbId() throws SQLException { ? ? ? ? ? ?ConnectionHolder currentHolder = (ConnectionHolder) TransactionSynchronizationManager ? ? ? ? ? ? ? ? ? ?.getResource(MultiDbAspect.this.dataSource);
? ? ? ? ? ?if (currentHolder.getConnectionHandle() != null) { ? ? ? ? ? ? ? ?Connection conn = currentHolder.getConnectionHandle().getConnection(); ? ? ? ? ? ? ? ?if (conn != null) { ? ? ? ? ? ? ? ? ? ?//SimpleConnectionHandle: jdbc:oracle:thin:@10.20.151.4:1521:ptdev, UserName=xx, Oracle JDBC driver ? ? ? ? ? ? ? ? ? ?String connectionDesc = conn.getMetaData().getURL();
? ? ? ? ? ? ? ? ? ?int beginIdx = connectionDesc.indexOf("@") + 1; ? ? ? ? ? ? ? ? ? ?int endIdx = connectionDesc.indexOf(":", beginIdx);
? ? ? ? ? ? ? ? ? ?return connectionDesc.substring(beginIdx, endIdx); ? ? ? ? ? ? ? ?} ? ? ? ? ? ?}
? ? ? ? ? ?return null; ? ? ? ?}
? ? ? ?/** ? ? ? ? * 获取连接同步对象调用顺序号 ? ? ? ? *? ? ? ? ? * @param dataSource ? ? ? ? * @return ? ? ? ? */ ? ? ? ?private int getConnectionSynchronizationOrder(DataSource dataSource) { ? ? ? ? ? ?//一定要比DataSourceUtils.ConnectionSynchronization同步对象Order值小,要比该对象先运行(该对象主要用来清理获取的资源,如Collection.close) ? ? ? ? ? ?int order = DataSourceUtils.CONNECTION_SYNCHRONIZATION_ORDER - 1; ? ? ? ? ? ?DataSource currDs = dataSource; ? ? ? ? ? ?while (currDs instanceof DelegatingDataSource) { ? ? ? ? ? ? ? ?order--; ? ? ? ? ? ? ? ?currDs = ((DelegatingDataSource) currDs).getTargetDataSource(); ? ? ? ? ? ?} ? ? ? ? ? ?return order; ? ? ? ?}
? ?}
? ?/** ? ? * @param dataSource the dataSource to set ? ? */ ? ?public void setDataSource(DataSource dataSource) { ? ? ? ?this.dataSource = dataSource; ? ?}}
动态数据源com.common.bean.RoutingDataSource.java:package com.common.bean;
import java.sql.Connection;import java.sql.SQLException;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;

public class RoutingDataSource extends AbstractRoutingDataSource {
? ? protected Object determineCurrentLookupKey() {? ? ? ? //获取当前线程处理的账号对应分片信息? ? ? ? Shard shard = ThreadInfoHolder.getCurrentThreadShard();
? ? ? ? //动态选定DataSource? ? ? ? String dbId = shard == null ? null : String.valueOf(shard.getDbId());
? ? ? ? return dbId;? ? }
? ? @Override? ? public String toString() {? ? ? ? //获取当前线程处理的账号对应分片信息? ? ? ? Shard shard = ThreadInfoHolder.getCurrentThreadShard();
? ? ? ? //动态选定DataSource? ? ? ? String dbId = shard == null ? null : String.valueOf(shard.getDbId());
? ? ? ? return "DB ID " + dbId + ":" + super.toString();? ? }
? ? public String getTargetDbId() throws SQLException {? ? ? ? Connection conn = null;? ? ? ? try {? ? ? ? ? ? //jdbc:oracle:thin:@10.20.151.4:1521:ptdev, UserName=xx, Oracle JDBC driver? ? ? ? ? ? conn = determineTargetDataSource().getConnection();
? ? ? ? ? ? if (conn != null) {? ? ? ? ? ? ? ? String connectionDesc = conn.getMetaData().getURL();? ? ? ? ? ? ? ? int beginIdx = connectionDesc.indexOf("@") + 1;? ? ? ? ? ? ? ? int endIdx = connectionDesc.indexOf(":", beginIdx);
? ? ? ? ? ? ? ? return connectionDesc.substring(beginIdx, endIdx);? ? ? ? ? ? }? ? ? ? } finally {? ? ? ? ? ? if (conn != null) {? ? ? ? ? ? ? ? conn.close();? ? ? ? ? ? }? ? ? ? }
? ? ? ? return null;? ? }
}
main测试类com.transaction.TransactionTest:package com.transaction;
import java.util.List;import java.util.Map;
import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.common.dao.model.User;import com.service.impl.DbInfoServiceImpl;
public class TransactionTest {
public static void main(String[] args) {ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath*:spring/datasource-config.xml");DbInfoServiceImpl db=(DbInfoServiceImpl)ctx.getBean("dbInfoService");User user=new User();user.setName("xj");user.setPassword("123");List<Map<String,Object>> result =db.getUserInfo(user);System.out.println(result);}
}
运行结果在同一事物中分别向2个数据库插入1条记录。
注意:不可在事物方法上加环绕通知,可能是因为事物是通过代理实现的,所以环绕通知会执行2遍。。

热点排行