mybatis基础支持层-数据源模块
- javax.sql.DataSource接口,Mybatis也提供了实现类PooledDataSource、UnpooledDataSource
|--UnpooledDataSource <- UnpooledDataSourceFactory --|
| |
DataSource ---| | --- DataSourceFactory
| |
|--PooledDataSource <-PooledDataSourceFactory--------|
- 使用工厂类建立对象的方式是工厂方法模式的典型应用
一、工厂方法模式
-
定义用于创建对象的工厂接口,并根据接口的具体实现类决定实例化哪一类具体产品
- 工厂接口 Factory : 是工厂方法模式的核心接口,调用者直接与工厂接口交互用于获取具体实体类
- 具体工厂类 XXFactory : 具体工厂类能够实现工厂接口的方法,并用于实例化产品对象,不同工厂实例化不同的产品
- 产品接口 Product: 用于定义产品的功能,不同的产品各自实现
-
具体产品类 XXProduct: 实现产品接口的实现类,定义了不同产品功能的具体实现
- 工厂方法模式符合”开放-封闭”的原则
- 缺点:XXFactory与XXProduct必须成对出现,并且引入了接口的抽象,增加了复杂度和抽象性
1.DataSourceFactory
public interface DataSourceFactory {
//设置所需的参数
void setProperties(Properties props);
//获取DataSource对象的实现
DataSource getDataSource();
}
2.UnpooledDataSource
- 未采用数据连接池,getConnection获取数据库连接
public class UnpooledDataSource implements DataSource {
//驱动类加载器
private ClassLoader driverClassLoader;
//驱动参数
private Properties driverProperties;
//注册已经加载的驱动
private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<>();
//驱动
private String driver;
//连接地址
private String url;
//用户名
private String username;
//密码
private String password;
//是否自动commit
private Boolean autoCommit;
//默认的隔离级别
private Integer defaultTransactionIsolationLevel;
//默认网络连接超时时间
private Integer defaultNetworkTimeout;
//静态代码块
static {
//获取在DriverManager注册JDBC驱动
Enumeration<Driver> drivers = DriverManager.getDrivers();
while (drivers.hasMoreElements()) {
Driver driver = drivers.nextElement();
//拷贝一份到本地Map
registeredDrivers.put(driver.getClass().getName(), driver);
}
}
...
@Override
public Connection getConnection() throws SQLException {
//传入用户名密码
return doGetConnection(username, password);
}
...
//最终调用流程
private Connection doGetConnection(Properties properties) throws SQLException {
//初始化驱动
initializeDriver();
//获取连接
Connection connection = DriverManager.getConnection(url, properties);
//配置连接参数
configureConnection(connection);
return connection;
}
private synchronized void initializeDriver() throws SQLException {
if (!registeredDrivers.containsKey(driver)) {
Class<?> driverType;
try {
if (driverClassLoader != null) {
driverType = Class.forName(driver, true, driverClassLoader);
} else {
driverType = Resources.classForName(driver);
}
// DriverManager requires the driver to be loaded via the system ClassLoader.
// http://www.kfu.com/~nsayer/Java/dyn-jdbc.html
//加载驱动类
Driver driverInstance = (Driver) driverType.getDeclaredConstructor().newInstance();
//注册驱动类代理
DriverManager.registerDriver(new DriverProxy(driverInstance));
registeredDrivers.put(driver, driverInstance);
} catch (Exception e) {
throw new SQLException("Error setting driver on UnpooledDataSource. Cause: " + e);
}
}
}
private void configureConnection(Connection conn) throws SQLException {
if (defaultNetworkTimeout != null) {
conn.setNetworkTimeout(Executors.newSingleThreadExecutor(), defaultNetworkTimeout);
}
if (autoCommit != null && autoCommit != conn.getAutoCommit()) {
conn.setAutoCommit(autoCommit);
}
if (defaultTransactionIsolationLevel != null) {
conn.setTransactionIsolation(defaultTransactionIsolationLevel);
}
}
...
}
3.PooledDataSource
- PooledDataSource使用数据库连接池的数据库连接,实现了简易的功能
- 好处: 数据库连接的重用、提高响应速度、当值数据库连接过多造成假死、避免数据库连接泄露
- 按照配置,初始化一些数据库连接供使用,需要使用时先从池中获取连接,不再使用时返回池中,并不直接关闭。当连接使用达到上限,如果设置的总连接数达到上限,且都被占用,则进入等待阻塞队列,如果连接池中有空闲连接数时,直接获取。一段时间超过默认alive的线程数时,销毁部分线程数恢复设置默认的active的连接数量
PooledDataSource
|
|
|-------------------|
PoolConnection PoolState
|
Connection
@Override
public Connection getConnection(String username, String password) throws SQLException {
return popConnection(username, password).getProxyConnection();
}
private PooledConnection popConnection(String username, String password) throws SQLException {
boolean countedWait = false;
PooledConnection conn = null;
long t = System.currentTimeMillis();
int localBadConnectionCount = 0;
while (conn == null) {
synchronized (state) {
if (!state.idleConnections.isEmpty()) {
// Pool has available connection
conn = state.idleConnections.remove(0);
if (log.isDebugEnabled()) {
log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
}
} else {
// Pool does not have available connection
if (state.activeConnections.size() < poolMaximumActiveConnections) {
// Can create new connection
conn = new PooledConnection(dataSource.getConnection(), this);
if (log.isDebugEnabled()) {
log.debug("Created connection " + conn.getRealHashCode() + ".");
}
} else {
// Cannot create new connection
PooledConnection oldestActiveConnection = state.activeConnections.get(0);
long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
if (longestCheckoutTime > poolMaximumCheckoutTime) {
// Can claim overdue connection
state.claimedOverdueConnectionCount++;
state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
state.accumulatedCheckoutTime += longestCheckoutTime;
state.activeConnections.remove(oldestActiveConnection);
if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
try {
oldestActiveConnection.getRealConnection().rollback();
} catch (SQLException e) {
/*
Just log a message for debug and continue to execute the following
statement like nothing happened.
Wrap the bad connection with a new PooledConnection, this will help
to not interrupt current executing thread and give current thread a
chance to join the next competition for another valid/good database
connection. At the end of this loop, bad {@link @conn} will be set as null.
*/
log.debug("Bad connection. Could not roll back");
}
}
conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
oldestActiveConnection.invalidate();
if (log.isDebugEnabled()) {
log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
}
} else {
// Must wait
try {
if (!countedWait) {
state.hadToWaitCount++;
countedWait = true;
}
if (log.isDebugEnabled()) {
log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
}
long wt = System.currentTimeMillis();
state.wait(poolTimeToWait);
state.accumulatedWaitTime += System.currentTimeMillis() - wt;
} catch (InterruptedException e) {
break;
}
}
}
}
if (conn != null) {
// ping to server and check the connection is valid or not
if (conn.isValid()) {
if (!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
conn.setCheckoutTimestamp(System.currentTimeMillis());
conn.setLastUsedTimestamp(System.currentTimeMillis());
state.activeConnections.add(conn);
state.requestCount++;
state.accumulatedRequestTime += System.currentTimeMillis() - t;
} else {
if (log.isDebugEnabled()) {
log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
}
state.badConnectionCount++;
localBadConnectionCount++;
conn = null;
if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) {
if (log.isDebugEnabled()) {
log.debug("PooledDataSource: Could not get a good connection to the database.");
}
throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
}
}
}
}
}
if (conn == null) {
if (log.isDebugEnabled()) {
log.debug("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection.");
}
throw new SQLException("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection.");
}
return conn;
}
- PooledConnection 实现InvokeHandler 利用动态代理实现Connection的获取,最终实现在invoke方法中
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
//调用close则重新放回连接池,并不是销毁
if (CLOSE.equals(methodName)) {
dataSource.pushConnection(this);
return null;
}
try {
if (!Object.class.equals(method.getDeclaringClass())) {
// issue #579 toString() should never fail
// throw an SQLException instead of a Runtime
//检测连接是否有效
checkConnection();
}
return method.invoke(realConnection, args);
} catch (Throwable t) {
throw ExceptionUtil.unwrapThrowable(t);
}
}
- PoolState 是用于管理PooledConnection对象状态的组件
- PooledConnection popConnection(String username, String password) 方式是检验连接获取连接的核心方法
private PooledConnection popConnection(String username, String password) throws SQLException {
boolean countedWait = false;
PooledConnection conn = null;
long t = System.currentTimeMillis();
int localBadConnectionCount = 0;
//当连接为空
while (conn == null) {
//同步控制state对象
synchronized (state) {
//存在可用连接
if (!state.idleConnections.isEmpty()) {
// Pool has available connection
//可用连接-1
conn = state.idleConnections.remove(0);
if (log.isDebugEnabled()) {
log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
}
} else {
//不存在可用连接
// Pool does not have available connection
//活跃的链接数小于最大活跃链接数
if (state.activeConnections.size() < poolMaximumActiveConnections) {
// Can create new connection
//新建一个链接
conn = new PooledConnection(dataSource.getConnection(), this);
if (log.isDebugEnabled()) {
log.debug("Created connection " + conn.getRealHashCode() + ".");
}
} else {
// Cannot create new connection
//最早的活跃连接
PooledConnection oldestActiveConnection = state.activeConnections.get(0);
//获取总的超时时间
long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
if (longestCheckoutTime > poolMaximumCheckoutTime) {
//超时时间大于最大超时时间
// Can claim overdue connection
//超时链接数增加
state.claimedOverdueConnectionCount++;
//累计超时连接时间累加
state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
//累计获取连接到放回连接的时间总长累加
state.accumulatedCheckoutTime += longestCheckoutTime;
//活跃链接数-1
state.activeConnections.remove(oldestActiveConnection);
//最早连接的配置不是auto commit
if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
try {
//不是auto commit 就直接将现有操作rollback
oldestActiveConnection.getRealConnection().rollback();
} catch (SQLException e) {
/*
Just log a message for debug and continue to execute the following
statement like nothing happened.
Wrap the bad connection with a new PooledConnection, this will help
to not interrupt current executing thread and give current thread a
chance to join the next competition for another valid/good database
connection. At the end of this loop, bad {@link @conn} will be set as null.
*/
log.debug("Bad connection. Could not roll back");
}
}
//利用最早的链接创建新的connection
conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
//旧的链接使其失效
oldestActiveConnection.invalidate();
if (log.isDebugEnabled()) {
log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
}
} else {
// Must wait
//如果最早的链接还未超时,新的new connection请求只能等待
try {
if (!countedWait) {
//等待数累加
state.hadToWaitCount++;
countedWait = true;
}
if (log.isDebugEnabled()) {
log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
}
long wt = System.currentTimeMillis();
//线程挂起等待,有空闲的会通过pushConnection中的notifyAll进行唤醒
state.wait(poolTimeToWait);
state.accumulatedWaitTime += System.currentTimeMillis() - wt;
} catch (InterruptedException e) {
break;
}
}
}
}
if (conn != null) {
//connection非空,即已经通过上述流程生成新的connection
// ping to server and check the connection is valid or not
if (conn.isValid()) {
if (!conn.getRealConnection().getAutoCommit()) {
//回滚提交的数据
conn.getRealConnection().rollback();
}
//对新的链接设置参数
conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
conn.setCheckoutTimestamp(System.currentTimeMillis());
conn.setLastUsedTimestamp(System.currentTimeMillis());
//放入活跃链接列表
state.activeConnections.add(conn);
state.requestCount++;
state.accumulatedRequestTime += System.currentTimeMillis() - t;
} else {
if (log.isDebugEnabled()) {
log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
}
state.badConnectionCount++;
localBadConnectionCount++;
conn = null;
if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) {
if (log.isDebugEnabled()) {
log.debug("PooledDataSource: Could not get a good connection to the database.");
}
throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
}
}
}
}
}
if (conn == null) {
if (log.isDebugEnabled()) {
log.debug("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection.");
}
throw new SQLException("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection.");
}
return conn;
}
- connection被真正释放是PoolConnection的invoke方法,看一下pushConnectio方法
//释放链接资源
protected void pushConnection(PooledConnection conn) throws SQLException {
//同步state对象
synchronized (state) {
//去除活跃链接记录
state.activeConnections.remove(conn);
if (conn.isValid()) {
//idle链接数小于最大链接数 并且链接类型相同
if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
//获取连接到放回连接的时间总长累加
state.accumulatedCheckoutTime += conn.getCheckoutTime();
if (!conn.getRealConnection().getAutoCommit()) {
//回滚原有操作
conn.getRealConnection().rollback();
}
//创建新连接
PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
state.idleConnections.add(newConn);
newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
//原有链接销毁
conn.invalidate();
if (log.isDebugEnabled()) {
log.debug("Returned connection " + newConn.getRealHashCode() + " to pool.");
}
//唤醒线程
state.notifyAll();
} else {
//如果已经超过最大IDLE链接数,则销毁链接
state.accumulatedCheckoutTime += conn.getCheckoutTime();
if (!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
conn.getRealConnection().close();
if (log.isDebugEnabled()) {
log.debug("Closed connection " + conn.getRealHashCode() + ".");
}
conn.invalidate();
}
} else {
if (log.isDebugEnabled()) {
log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection.");
}
state.badConnectionCount++;
}
}
}
- 检测链接是否有效,isValid的方法主要通过ping connection的方法进行检测
protected boolean pingConnection(PooledConnection conn) {
boolean result = true;
try {
result = !conn.getRealConnection().isClosed();
} catch (SQLException e) {
if (log.isDebugEnabled()) {
log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage());
}
result = false;
}
//长时间未使用的链接需要Ping探测
if (result && poolPingEnabled && poolPingConnectionsNotUsedFor >= 0
&& conn.getTimeElapsedSinceLastUse() > poolPingConnectionsNotUsedFor) {
try {
if (log.isDebugEnabled()) {
log.debug("Testing connection " + conn.getRealHashCode() + " ...");
}
Connection realConn = conn.getRealConnection();
//获取statement
try (Statement statement = realConn.createStatement()) {
//执行Ping query
statement.executeQuery(poolPingQuery).close();
}
if (!realConn.getAutoCommit()) {
realConn.rollback();
}
result = true;
if (log.isDebugEnabled()) {
log.debug("Connection " + conn.getRealHashCode() + " is GOOD!");
}
} catch (Exception e) {
log.warn("Execution of ping query '" + poolPingQuery + "' failed: " + e.getMessage());
try {
conn.getRealConnection().close();
} catch (Exception e2) {
// ignore
}
result = false;
if (log.isDebugEnabled()) {
log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage());
}
}
}
return result;
}
- 当数据库连接的相关参数做出修改,setXXX()后,都会调用forceCloseAll方法将现有的活跃和空闲的连接全部关闭,写法与上述流程中涉及的类似
public void forceCloseAll() {
synchronized (state) {
expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
//活跃连接
for (int i = state.activeConnections.size(); i > 0; i--) {
try {
PooledConnection conn = state.activeConnections.remove(i - 1);
//使之失效
conn.invalidate();
Connection realConn = conn.getRealConnection();
if (!realConn.getAutoCommit()) {
realConn.rollback();
}
realConn.close();
} catch (Exception e) {
// ignore
}
}
//空闲连接
for (int i = state.idleConnections.size(); i > 0; i--) {
try {
PooledConnection conn = state.idleConnections.remove(i - 1);
conn.invalidate();
Connection realConn = conn.getRealConnection();
if (!realConn.getAutoCommit()) {
realConn.rollback();
}
realConn.close();
} catch (Exception e) {
// ignore
}
}
}
if (log.isDebugEnabled()) {
log.debug("PooledDataSource forcefully closed/removed all connections.");
}
}