Mybatis基础支持层-数据源模块

工厂方法模式/数据源

Posted by Claire on June 27, 2020

mybatis基础支持层-数据源模块

  • javax.sql.DataSource接口,Mybatis也提供了实现类PooledDataSource、UnpooledDataSource
                |--UnpooledDataSource <- UnpooledDataSourceFactory --|
                |                                                    |   
DataSource   ---|                                                    | --- DataSourceFactory
                |                                                    |
                |--PooledDataSource <-PooledDataSourceFactory--------|

  • 使用工厂类建立对象的方式是工厂方法模式的典型应用

一、工厂方法模式

  • 定义用于创建对象的工厂接口,并根据接口的具体实现类决定实例化哪一类具体产品

  • 工厂接口 Factory : 是工厂方法模式的核心接口,调用者直接与工厂接口交互用于获取具体实体类
  • 具体工厂类 XXFactory : 具体工厂类能够实现工厂接口的方法,并用于实例化产品对象,不同工厂实例化不同的产品
  • 产品接口 Product: 用于定义产品的功能,不同的产品各自实现
  • 具体产品类 XXProduct: 实现产品接口的实现类,定义了不同产品功能的具体实现

  • 工厂方法模式符合”开放-封闭”的原则
  • 缺点:XXFactory与XXProduct必须成对出现,并且引入了接口的抽象,增加了复杂度和抽象性

1.DataSourceFactory

public interface DataSourceFactory {

  //设置所需的参数
  void setProperties(Properties props);

  //获取DataSource对象的实现
  DataSource getDataSource();

}

2.UnpooledDataSource

  • 未采用数据连接池,getConnection获取数据库连接
public class UnpooledDataSource implements DataSource {

  //驱动类加载器
  private ClassLoader driverClassLoader;
  //驱动参数
  private Properties driverProperties;
  //注册已经加载的驱动
  private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<>();

  //驱动
  private String driver;
  //连接地址
  private String url;
  //用户名
  private String username;
  //密码
  private String password;

  //是否自动commit
  private Boolean autoCommit;
  //默认的隔离级别
  private Integer defaultTransactionIsolationLevel;
  //默认网络连接超时时间
  private Integer defaultNetworkTimeout;

  //静态代码块
  static {
     //获取在DriverManager注册JDBC驱动
    Enumeration<Driver> drivers = DriverManager.getDrivers();
    while (drivers.hasMoreElements()) {
      Driver driver = drivers.nextElement();
      //拷贝一份到本地Map
      registeredDrivers.put(driver.getClass().getName(), driver);
    }
  }
  ...
 @Override
  public Connection getConnection() throws SQLException {
    //传入用户名密码
    return doGetConnection(username, password);
  }
  ...
  //最终调用流程
   private Connection doGetConnection(Properties properties) throws SQLException {
    //初始化驱动
    initializeDriver();
    //获取连接
    Connection connection = DriverManager.getConnection(url, properties);
    //配置连接参数
    configureConnection(connection);
    return connection;
  }
   private synchronized void initializeDriver() throws SQLException {
    if (!registeredDrivers.containsKey(driver)) {
      Class<?> driverType;
      try {
        if (driverClassLoader != null) {
          driverType = Class.forName(driver, true, driverClassLoader);
        } else {
          driverType = Resources.classForName(driver);
        }
        // DriverManager requires the driver to be loaded via the system ClassLoader.
        // http://www.kfu.com/~nsayer/Java/dyn-jdbc.html
        //加载驱动类
        Driver driverInstance = (Driver) driverType.getDeclaredConstructor().newInstance();
        //注册驱动类代理
        DriverManager.registerDriver(new DriverProxy(driverInstance));
        registeredDrivers.put(driver, driverInstance);
      } catch (Exception e) {
        throw new SQLException("Error setting driver on UnpooledDataSource. Cause: " + e);
      }
    }
  }

  private void configureConnection(Connection conn) throws SQLException {
    if (defaultNetworkTimeout != null) {
      conn.setNetworkTimeout(Executors.newSingleThreadExecutor(), defaultNetworkTimeout);
    }
    if (autoCommit != null && autoCommit != conn.getAutoCommit()) {
      conn.setAutoCommit(autoCommit);
    }
    if (defaultTransactionIsolationLevel != null) {
      conn.setTransactionIsolation(defaultTransactionIsolationLevel);
    }
  }
...
}

3.PooledDataSource

  • PooledDataSource使用数据库连接池的数据库连接,实现了简易的功能
  • 好处: 数据库连接的重用、提高响应速度、当值数据库连接过多造成假死、避免数据库连接泄露
  • 按照配置,初始化一些数据库连接供使用,需要使用时先从池中获取连接,不再使用时返回池中,并不直接关闭。当连接使用达到上限,如果设置的总连接数达到上限,且都被占用,则进入等待阻塞队列,如果连接池中有空闲连接数时,直接获取。一段时间超过默认alive的线程数时,销毁部分线程数恢复设置默认的active的连接数量
            PooledDataSource
                   |
                   |
          |-------------------|
     PoolConnection         PoolState
          |
     Connection
  @Override
  public Connection getConnection(String username, String password) throws SQLException {
    return popConnection(username, password).getProxyConnection();
  }
  private PooledConnection popConnection(String username, String password) throws SQLException {
    boolean countedWait = false;
    PooledConnection conn = null;
    long t = System.currentTimeMillis();
    int localBadConnectionCount = 0;

    while (conn == null) {
      synchronized (state) {
        if (!state.idleConnections.isEmpty()) {
          // Pool has available connection
          conn = state.idleConnections.remove(0);
          if (log.isDebugEnabled()) {
            log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
          }
        } else {
          // Pool does not have available connection
          if (state.activeConnections.size() < poolMaximumActiveConnections) {
            // Can create new connection
            conn = new PooledConnection(dataSource.getConnection(), this);
            if (log.isDebugEnabled()) {
              log.debug("Created connection " + conn.getRealHashCode() + ".");
            }
          } else {
            // Cannot create new connection
            PooledConnection oldestActiveConnection = state.activeConnections.get(0);
            long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
            if (longestCheckoutTime > poolMaximumCheckoutTime) {
              // Can claim overdue connection
              state.claimedOverdueConnectionCount++;
              state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
              state.accumulatedCheckoutTime += longestCheckoutTime;
              state.activeConnections.remove(oldestActiveConnection);
              if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
                try {
                  oldestActiveConnection.getRealConnection().rollback();
                } catch (SQLException e) {
                  /*
                     Just log a message for debug and continue to execute the following
                     statement like nothing happened.
                     Wrap the bad connection with a new PooledConnection, this will help
                     to not interrupt current executing thread and give current thread a
                     chance to join the next competition for another valid/good database
                     connection. At the end of this loop, bad {@link @conn} will be set as null.
                   */
                  log.debug("Bad connection. Could not roll back");
                }
              }
              conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
              conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
              conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
              oldestActiveConnection.invalidate();
              if (log.isDebugEnabled()) {
                log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
              }
            } else {
              // Must wait
              try {
                if (!countedWait) {
                  state.hadToWaitCount++;
                  countedWait = true;
                }
                if (log.isDebugEnabled()) {
                  log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
                }
                long wt = System.currentTimeMillis();
                state.wait(poolTimeToWait);
                state.accumulatedWaitTime += System.currentTimeMillis() - wt;
              } catch (InterruptedException e) {
                break;
              }
            }
          }
        }
        if (conn != null) {
          // ping to server and check the connection is valid or not
          if (conn.isValid()) {
            if (!conn.getRealConnection().getAutoCommit()) {
              conn.getRealConnection().rollback();
            }
            conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
            conn.setCheckoutTimestamp(System.currentTimeMillis());
            conn.setLastUsedTimestamp(System.currentTimeMillis());
            state.activeConnections.add(conn);
            state.requestCount++;
            state.accumulatedRequestTime += System.currentTimeMillis() - t;
          } else {
            if (log.isDebugEnabled()) {
              log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
            }
            state.badConnectionCount++;
            localBadConnectionCount++;
            conn = null;
            if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) {
              if (log.isDebugEnabled()) {
                log.debug("PooledDataSource: Could not get a good connection to the database.");
              }
              throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
            }
          }
        }
      }

    }

    if (conn == null) {
      if (log.isDebugEnabled()) {
        log.debug("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
      }
      throw new SQLException("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
    }

    return conn;
  }
  • PooledConnection 实现InvokeHandler 利用动态代理实现Connection的获取,最终实现在invoke方法中
  @Override
  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    String methodName = method.getName();
    //调用close则重新放回连接池,并不是销毁
    if (CLOSE.equals(methodName)) {
      dataSource.pushConnection(this);
      return null;
    }
    try {
      if (!Object.class.equals(method.getDeclaringClass())) {
        // issue #579 toString() should never fail
        // throw an SQLException instead of a Runtime
        //检测连接是否有效
        checkConnection();
      }
      return method.invoke(realConnection, args);
    } catch (Throwable t) {
      throw ExceptionUtil.unwrapThrowable(t);
    }

  }
  • PoolState 是用于管理PooledConnection对象状态的组件
  • PooledConnection popConnection(String username, String password) 方式是检验连接获取连接的核心方法
 private PooledConnection popConnection(String username, String password) throws SQLException {
    boolean countedWait = false;
    PooledConnection conn = null;
    long t = System.currentTimeMillis();
    int localBadConnectionCount = 0;

    //当连接为空
    while (conn == null) {
      //同步控制state对象
      synchronized (state) {
        //存在可用连接
        if (!state.idleConnections.isEmpty()) {
          // Pool has available connection
          //可用连接-1
          conn = state.idleConnections.remove(0);
          if (log.isDebugEnabled()) {
            log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
          }
        } else {
          //不存在可用连接
          // Pool does not have available connection
          //活跃的链接数小于最大活跃链接数
          if (state.activeConnections.size() < poolMaximumActiveConnections) {
            // Can create new connection
            //新建一个链接
            conn = new PooledConnection(dataSource.getConnection(), this);
            if (log.isDebugEnabled()) {
              log.debug("Created connection " + conn.getRealHashCode() + ".");
            }
          } else {
            // Cannot create new connection
            //最早的活跃连接
            PooledConnection oldestActiveConnection = state.activeConnections.get(0);
            //获取总的超时时间
            long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
            if (longestCheckoutTime > poolMaximumCheckoutTime) {
              //超时时间大于最大超时时间
              // Can claim overdue connection
              //超时链接数增加
              state.claimedOverdueConnectionCount++;
              //累计超时连接时间累加
              state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
              //累计获取连接到放回连接的时间总长累加
              state.accumulatedCheckoutTime += longestCheckoutTime;
              //活跃链接数-1
              state.activeConnections.remove(oldestActiveConnection);
              //最早连接的配置不是auto commit
              if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
                try {
                  //不是auto commit 就直接将现有操作rollback
                  oldestActiveConnection.getRealConnection().rollback();
                } catch (SQLException e) {
                  /*
                     Just log a message for debug and continue to execute the following
                     statement like nothing happened.
                     Wrap the bad connection with a new PooledConnection, this will help
                     to not interrupt current executing thread and give current thread a
                     chance to join the next competition for another valid/good database
                     connection. At the end of this loop, bad {@link @conn} will be set as null.
                   */
                  log.debug("Bad connection. Could not roll back");
                }
              }
              //利用最早的链接创建新的connection
              conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
              conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
              conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
              //旧的链接使其失效
              oldestActiveConnection.invalidate();
              if (log.isDebugEnabled()) {
                log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
              }
            } else {
              // Must wait
              //如果最早的链接还未超时,新的new connection请求只能等待
              try {
                if (!countedWait) {
                  //等待数累加
                  state.hadToWaitCount++;
                  countedWait = true;
                }
                if (log.isDebugEnabled()) {
                  log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
                }
                long wt = System.currentTimeMillis();
                //线程挂起等待,有空闲的会通过pushConnection中的notifyAll进行唤醒
                state.wait(poolTimeToWait);
                state.accumulatedWaitTime += System.currentTimeMillis() - wt;
              } catch (InterruptedException e) {
                break;
              }
            }
          }
        }
        if (conn != null) {
          //connection非空,即已经通过上述流程生成新的connection
          // ping to server and check the connection is valid or not
          if (conn.isValid()) {
            if (!conn.getRealConnection().getAutoCommit()) {
              //回滚提交的数据
              conn.getRealConnection().rollback();
            }
            //对新的链接设置参数
            conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
            conn.setCheckoutTimestamp(System.currentTimeMillis());
            conn.setLastUsedTimestamp(System.currentTimeMillis());
            //放入活跃链接列表
            state.activeConnections.add(conn);
            state.requestCount++;
            state.accumulatedRequestTime += System.currentTimeMillis() - t;
          } else {
            if (log.isDebugEnabled()) {
              log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
            }
            state.badConnectionCount++;
            localBadConnectionCount++;
            conn = null;
            if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) {
              if (log.isDebugEnabled()) {
                log.debug("PooledDataSource: Could not get a good connection to the database.");
              }
              throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
            }
          }
        }
      }

    }

    if (conn == null) {
      if (log.isDebugEnabled()) {
        log.debug("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
      }
      throw new SQLException("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
    }

    return conn;
  }
  • connection被真正释放是PoolConnection的invoke方法,看一下pushConnectio方法
 //释放链接资源
  protected void pushConnection(PooledConnection conn) throws SQLException {
       //同步state对象
    synchronized (state) {
      //去除活跃链接记录
      state.activeConnections.remove(conn);
      if (conn.isValid()) {
        //idle链接数小于最大链接数 并且链接类型相同
        if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
          //获取连接到放回连接的时间总长累加
          state.accumulatedCheckoutTime += conn.getCheckoutTime();
          if (!conn.getRealConnection().getAutoCommit()) {
            //回滚原有操作
            conn.getRealConnection().rollback();
          }
          //创建新连接
          PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
          state.idleConnections.add(newConn);
          newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
          newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
          //原有链接销毁
          conn.invalidate();
          if (log.isDebugEnabled()) {
            log.debug("Returned connection " + newConn.getRealHashCode() + " to pool.");
          }
          //唤醒线程
          state.notifyAll();
        } else {
          //如果已经超过最大IDLE链接数,则销毁链接
          state.accumulatedCheckoutTime += conn.getCheckoutTime();
          if (!conn.getRealConnection().getAutoCommit()) {
            conn.getRealConnection().rollback();
          }
          conn.getRealConnection().close();
          if (log.isDebugEnabled()) {
            log.debug("Closed connection " + conn.getRealHashCode() + ".");
          }
          conn.invalidate();
        }
      } else {
        if (log.isDebugEnabled()) {
          log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection.");
        }
        state.badConnectionCount++;
      }
    }
  }

  • 检测链接是否有效,isValid的方法主要通过ping connection的方法进行检测
 protected boolean pingConnection(PooledConnection conn) {
    boolean result = true;

    try {
      result = !conn.getRealConnection().isClosed();
    } catch (SQLException e) {
      if (log.isDebugEnabled()) {
        log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage());
      }
      result = false;
    }

    //长时间未使用的链接需要Ping探测
    if (result && poolPingEnabled && poolPingConnectionsNotUsedFor >= 0
        && conn.getTimeElapsedSinceLastUse() > poolPingConnectionsNotUsedFor) {
      try {
        if (log.isDebugEnabled()) {
          log.debug("Testing connection " + conn.getRealHashCode() + " ...");
        }
        Connection realConn = conn.getRealConnection();
        //获取statement
        try (Statement statement = realConn.createStatement()) {
          //执行Ping query
          statement.executeQuery(poolPingQuery).close();
        }
        if (!realConn.getAutoCommit()) {
          realConn.rollback();
        }
        result = true;
        if (log.isDebugEnabled()) {
          log.debug("Connection " + conn.getRealHashCode() + " is GOOD!");
        }
      } catch (Exception e) {
        log.warn("Execution of ping query '" + poolPingQuery + "' failed: " + e.getMessage());
        try {
          conn.getRealConnection().close();
        } catch (Exception e2) {
          // ignore
        }
        result = false;
        if (log.isDebugEnabled()) {
          log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage());
        }
      }
    }
    return result;
  }
  • 当数据库连接的相关参数做出修改,setXXX()后,都会调用forceCloseAll方法将现有的活跃和空闲的连接全部关闭,写法与上述流程中涉及的类似
public void forceCloseAll() {
    synchronized (state) {
      expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
      //活跃连接
      for (int i = state.activeConnections.size(); i > 0; i--) {
        try {
          PooledConnection conn = state.activeConnections.remove(i - 1);
          //使之失效
          conn.invalidate();

          Connection realConn = conn.getRealConnection();
          if (!realConn.getAutoCommit()) {
            realConn.rollback();
          }
          realConn.close();
        } catch (Exception e) {
          // ignore
        }
      }
      //空闲连接
      for (int i = state.idleConnections.size(); i > 0; i--) {
        try {
          PooledConnection conn = state.idleConnections.remove(i - 1);
          conn.invalidate();

          Connection realConn = conn.getRealConnection();
          if (!realConn.getAutoCommit()) {
            realConn.rollback();
          }
          realConn.close();
        } catch (Exception e) {
          // ignore
        }
      }
    }
    if (log.isDebugEnabled()) {
      log.debug("PooledDataSource forcefully closed/removed all connections.");
    }
  }