`
Donald_Draper
  • 浏览: 954029 次
社区版块
存档分类
最新评论

Mysql主从复制读写分离连接的获取

    博客分类:
  • JDBC
阅读更多
JDBC驱动初始化-Mysql:http://donald-draper.iteye.com/blog/2342010
JDBC连接的获取:http://donald-draper.iteye.com/blog/2342011
Mysql负载均衡连接的获取:http://donald-draper.iteye.com/blog/2342089
Mysql主从复制读写分离连接的获取:http://donald-draper.iteye.com/blog/2342108
ConnectionImp创建MysqlIO :http://donald-draper.iteye.com/blog/2342959
Mysql预编译SQL:http://donald-draper.iteye.com/blog/2342960
MysqlSQL PreparedStatement的查询:http://donald-draper.iteye.com/blog/2343083
MySQL ServerPreparedStatement查询:http://donald-draper.iteye.com/blog/2343124
前面讲过Driver的初始化,单机Server连接的获取,负载均衡连接的获取,今天讲一下主从复制,读写分析连接的获取:
从下面一段代码来看:
//如果url以jdbc:mysql:replication://则调用集群连接获取方法
if(StringUtils.startsWithIgnoreCase(url, "jdbc:mysql:replication://"))
return connectReplicationConnection(url, info);

//NonRegisteringDriver
private Connection connectReplicationConnection(String url, Properties info)
        throws SQLException
    {
        Properties parsedProps = parseURL(url, info);
        if(parsedProps == null)
            return null;
        Properties masterProps = (Properties)parsedProps.clone();
        Properties slavesProps = (Properties)parsedProps.clone();
        slavesProps.setProperty("com.mysql.jdbc.ReplicationConnection.isSlave", "true");
        String hostValues = parsedProps.getProperty("HOST");
        if(hostValues != null)
        {
            StringTokenizer st = new StringTokenizer(hostValues, ",");
            StringBuffer masterHost = new StringBuffer();
            StringBuffer slaveHosts = new StringBuffer();
	    //Url中第一个host为master,后面为Slave
            if(st.hasMoreTokens())
            {
                String hostPortPair[] = parseHostPortPair(st.nextToken());
                if(hostPortPair[0] != null)
                    masterHost.append(hostPortPair[0]);
                if(hostPortPair[1] != null)
                {
                    masterHost.append(":");
                    masterHost.append(hostPortPair[1]);
                }
            }
            boolean firstSlaveHost = true;
            do
            {
                if(!st.hasMoreTokens())
                    break;
                String hostPortPair[] = parseHostPortPair(st.nextToken());
                if(!firstSlaveHost)
                    slaveHosts.append(",");
                else
                    firstSlaveHost = false;
                if(hostPortPair[0] != null)
                    slaveHosts.append(hostPortPair[0]);
                if(hostPortPair[1] != null)
                {
                    slaveHosts.append(":");
                    slaveHosts.append(hostPortPair[1]);
                }
            } while(true);
            if(slaveHosts.length() == 0)
                throw SQLError.createSQLException("Must specify at least one slave host to connect to for master/slave replication load-balancing functionality", "01S00", null);
            //MaterHost
	    masterProps.setProperty("HOST", masterHost.toString());
	    //SlaveHost
            slavesProps.setProperty("HOST", slaveHosts.toString());
        }
        return new ReplicationConnection(masterProps, slavesProps);
    }

我们来看一下ReplicationConnection
public class ReplicationConnection
    implements Connection, PingTarget
{  
    protected Connection currentConnection;//当前连接
    protected Connection masterConnection;//主连接
    protected Connection slavesConnection;//从连接
    public ReplicationConnection(Properties masterProperties, Properties slaveProperties)
        throws SQLException
    {
        NonRegisteringDriver driver = new NonRegisteringDriver();
        StringBuffer masterUrl = new StringBuffer("jdbc:mysql://");
        StringBuffer slaveUrl = new StringBuffer("jdbc:mysql://");
        String masterHost = masterProperties.getProperty("HOST");
	//初始化Master与Slave的URL
        if(masterHost != null)
            masterUrl.append(masterHost);
        String slaveHost = slaveProperties.getProperty("HOST");
        if(slaveHost != null)
            slaveUrl.append(slaveHost);
        String masterDb = masterProperties.getProperty("DBNAME");
        masterUrl.append("/");
        if(masterDb != null)
            masterUrl.append(masterDb);
        String slaveDb = slaveProperties.getProperty("DBNAME");
        slaveUrl.append("/");
        if(slaveDb != null)
            slaveUrl.append(slaveDb);
        slaveProperties.setProperty("roundRobinLoadBalance", "true");
	//从Driver获取Master连接
        masterConnection = (Connection)driver.connect(masterUrl.toString(), masterProperties);
	//从Driver获取Slave连接
        slavesConnection = (Connection)driver.connect(slaveUrl.toString(), slaveProperties);
        slavesConnection.setReadOnly(true);
	//当前连接为masterConnection
        currentConnection = masterConnection;
    }
    public synchronized void setReadOnly(boolean readOnly)
        throws SQLException
    {
        if(readOnly)
        {
            if(currentConnection != slavesConnection)
                switchToSlavesConnection();
        } else
        if(currentConnection != masterConnection)
            switchToMasterConnection();
    }
    //Master与SLave连接切换
     private synchronized void switchToMasterConnection()
        throws SQLException
    {
        swapConnections(masterConnection, slavesConnection);
    }
    //Slave与Master连接切换
    private synchronized void switchToSlavesConnection()
        throws SQLException
    {
        swapConnections(slavesConnection, masterConnection);
    }
    //切换连接
    private synchronized void swapConnections(Connection switchToConnection, Connection switchFromConnection)
        throws SQLException
    {
        String switchFromCatalog = switchFromConnection.getCatalog();
        String switchToCatalog = switchToConnection.getCatalog();
        if(switchToCatalog != null && !switchToCatalog.equals(switchFromCatalog))
            switchToConnection.setCatalog(switchFromCatalog);
        else
        if(switchFromCatalog != null)
            switchToConnection.setCatalog(switchFromCatalog);
        boolean switchToAutoCommit = switchToConnection.getAutoCommit();
        boolean switchFromConnectionAutoCommit = switchFromConnection.getAutoCommit();
        if(switchFromConnectionAutoCommit != switchToAutoCommit)
            switchToConnection.setAutoCommit(switchFromConnectionAutoCommit);
        int switchToIsolation = switchToConnection.getTransactionIsolation();
        int switchFromIsolation = switchFromConnection.getTransactionIsolation();
        if(switchFromIsolation != switchToIsolation)
            switchToConnection.setTransactionIsolation(switchFromIsolation);
        currentConnection = switchToConnection;
    }
    //回滚
     public synchronized void rollback()
        throws SQLException
    {
        currentConnection.rollback();
    }

    public synchronized void rollback(Savepoint savepoint)
        throws SQLException
    {
        currentConnection.rollback(savepoint);
    }
    //设置是否自动提交
    public synchronized void setAutoCommit(boolean autoCommit)
        throws SQLException
    {
        currentConnection.setAutoCommit(autoCommit);
    }
    public PreparedStatement prepareStatement(String sql)
        throws SQLException
    {
        PreparedStatement pstmt = currentConnection.prepareStatement(sql);
        ((com.mysql.jdbc.Statement)pstmt).setPingTarget(this);
        return pstmt;
    }
    public CallableStatement prepareCall(String sql)
        throws SQLException
    {
        return currentConnection.prepareCall(sql);
    }
}

总结:
从上面可以看出,主从集群的连接获取,首先解析url分离出Mater host和Slave host,第一个为Master,后面为Slave,NonRegisteringDriver创建复制连接时,返回的是一个ReplicationConnection,而ReplicationConnection内有三个连接分别为,currentConnection,masterConnection,slavesConnection,
这三个连接实际上是ConnectionImpl;默认情况下currentConnection为masterConnection,当我们设置readonly为true时,切换到slavesConnection,为false,切换到masterConnection;;ReplicationConnection的相关方法的实现,实际上是调用ConnectionImpl的相应方法。

//NonRegisteringReplicationDriver
public class NonRegisteringReplicationDriver extends NonRegisteringDriver
{

    public NonRegisteringReplicationDriver()
        throws SQLException
    {
    }

    public Connection connect(String url, Properties info)
        throws SQLException
    {
        Properties parsedProps = parseURL(url, info);
        if(parsedProps == null)
            return null;
        Properties masterProps = (Properties)parsedProps.clone();
        Properties slavesProps = (Properties)parsedProps.clone();
        slavesProps.setProperty("com.mysql.jdbc.ReplicationConnection.isSlave", "true");
        String hostValues = parsedProps.getProperty("HOST");
        if(hostValues != null)
        {
            StringTokenizer st = new StringTokenizer(hostValues, ",");
            StringBuffer masterHost = new StringBuffer();
            StringBuffer slaveHosts = new StringBuffer();
            if(st.hasMoreTokens())
            {
                String hostPortPair[] = parseHostPortPair(st.nextToken());
                if(hostPortPair[0] != null)
                    masterHost.append(hostPortPair[0]);
                if(hostPortPair[1] != null)
                {
                    masterHost.append(":");
                    masterHost.append(hostPortPair[1]);
                }
            }
            boolean firstSlaveHost = true;
            do
            {
                if(!st.hasMoreTokens())
                    break;
                String hostPortPair[] = parseHostPortPair(st.nextToken());
                if(!firstSlaveHost)
                    slaveHosts.append(",");
                else
                    firstSlaveHost = false;
                if(hostPortPair[0] != null)
                    slaveHosts.append(hostPortPair[0]);
                if(hostPortPair[1] != null)
                {
                    slaveHosts.append(":");
                    slaveHosts.append(hostPortPair[1]);
                }
            } while(true);
            if(slaveHosts.length() == 0)
                throw SQLError.createSQLException("Must specify at least one slave host to connect to for master/slave replication load-balancing functionality", "01S00", null);
            masterProps.setProperty("HOST", masterHost.toString());
            slavesProps.setProperty("HOST", slaveHosts.toString());
        }
        return new ReplicationConnection(masterProps, slavesProps);
    }
}
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics