`
Donald_Draper
  • 浏览: 954042 次
社区版块
存档分类
最新评论

Mysql负载均衡连接的获取

    博客分类:
  • JDBC
阅读更多
Java动态代理:http://www.cnblogs.com/xiaoluo501395377/p/3383130.html
http://www.360doc.com/content/14/0801/14/1073512_398598312.shtml
JDBC驱动初始化-Mysql:http://donald-draper.iteye.com/blog/2342010
JDBC连接的获取:http://donald-draper.iteye.com/blog/2342011
前面我们讲过单机Server数据库连接的获取,今天来说一下,负载均衡集群下,连接的获取
url为jdbc:mysql:loadbalance://的数据库连接获取方法
if(StringUtils.startsWithIgnoreCase(url, "jdbc:mysql:loadbalance://"))
       return connectLoadBalanced(url, info);

//NonRegisteringDriver
//负载均衡连接获取方法
private Connection connectLoadBalanced(String url, Properties info)
        throws SQLException
    {
        //解析url
        Properties parsedProps = parseURL(url, info);
        parsedProps.remove("roundRobinLoadBalance");
        if(parsedProps == null)
            return null;
        String hostValues = parsedProps.getProperty("HOST");
        List hostList = null;
        if(hostValues != null)
            hostList = StringUtils.split(hostValues, ",", true);
        if(hostList == null)
        {
            hostList = new ArrayList();
            hostList.add("localhost:3306");
        }
	//构造负载均衡连接代理
        LoadBalancingConnectionProxy proxyBal = new LoadBalancingConnectionProxy(hostList, parsedProps);
	//通过代理新建代理实例Connection
        return (Connection)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {
            java.sql.Connection.class
        }, proxyBal);
    }

//LoadBalancingConnectionProxy
public class LoadBalancingConnectionProxy
    implements InvocationHandler, PingTarget
{
    private static Method getLocalTimeMethod;
    public static final String BLACKLIST_TIMEOUT_PROPERTY_KEY = "loadBalanceBlacklistTimeout";
    private Connection currentConn;
    private List hostList;
    private Map liveConnections;
    private Map connectionsToHostsMap;
    private long responseTimes[];
    private Map hostsToListIndexMap;
    private boolean inTransaction;
    private long transactionStartTime;
    private Properties localProps;
    private boolean isClosed;
    private BalanceStrategy balancer;//负载均衡策略
    private int retriesAllDown;
    private static Map globalBlacklist = new HashMap();
    private int globalBlacklistTimeout;

    static 
    {
        try
        {
            getLocalTimeMethod = (java.lang.System.class).getMethod("nanoTime", new Class[0]);
        }
        catch(SecurityException e) { }
        catch(NoSuchMethodException e) { }
    }
    LoadBalancingConnectionProxy(List hosts, Properties props)
        throws SQLException
    {
        inTransaction = false;
        transactionStartTime = 0L;
        isClosed = false;
        globalBlacklistTimeout = 0;
        hostList = hosts;
        int numHosts = hostList.size();
	//存活连接
        liveConnections = new HashMap(numHosts);
        connectionsToHostsMap = new HashMap(numHosts);
	//Host连接的相应时间
        responseTimes = new long[numHosts];
        hostsToListIndexMap = new HashMap(numHosts);
        for(int i = 0; i < numHosts; i++)
            hostsToListIndexMap.put(hostList.get(i), new Integer(i));

        localProps = (Properties)props.clone();
        localProps.remove("HOST");
        localProps.remove("PORT");
        localProps.setProperty("useLocalSessionState", "true");
        String strategy = localProps.getProperty("loadBalanceStrategy", "random");
        String retriesAllDownAsString = localProps.getProperty("retriesAllDown", "120");
        try
        {
            retriesAllDown = Integer.parseInt(retriesAllDownAsString);
        }
        catch(NumberFormatException nfe)
        {
            throw SQLError.createSQLException(Messages.getString("LoadBalancingConnectionProxy.badValueForRetriesAllDown", new Object[] {
                retriesAllDownAsString
            }), "S1009", null);
        }
        String blacklistTimeoutAsString = localProps.getProperty("loadBalanceBlacklistTimeout", "0");
        try
        {
            globalBlacklistTimeout = Integer.parseInt(blacklistTimeoutAsString);
        }
        catch(NumberFormatException nfe)
        {
            throw SQLError.createSQLException(Messages.getString("LoadBalancingConnectionProxy.badValueForLoadBalanceBlacklistTimeout", new Object[] {
                retriesAllDownAsString
            }), "S1009", null);
        }
	//构建负载均衡策略
        if("random".equals(strategy))
            balancer = (BalanceStrategy)Util.loadExtensions(null, props, "com.mysql.jdbc.RandomBalanceStrategy", "InvalidLoadBalanceStrategy", null).get(0);
        else
        if("bestResponseTime".equals(strategy))
            balancer = (BalanceStrategy)Util.loadExtensions(null, props, "com.mysql.jdbc.BestResponseTimeBalanceStrategy", "InvalidLoadBalanceStrategy", null).get(0);
        else
            balancer = (BalanceStrategy)Util.loadExtensions(null, props, strategy, "InvalidLoadBalanceStrategy", null).get(0);
        //初始化负载均衡器
	balancer.init(null, props);
	//从负载均衡器获取连接
        pickNewConnection();
    }
}

下面分三步来看LoadBalancingConnectionProxy的构建
//构建负载均衡策略
//Util
public static List loadExtensions(Connection conn, Properties props, String extensionClassNames, String errorMessageKey, ExceptionInterceptor exceptionInterceptor)
        throws SQLException
    {
        List extensionList = new LinkedList();
        List interceptorsToCreate = StringUtils.split(extensionClassNames, ",", true);
        Iterator iter = interceptorsToCreate.iterator();
        String className = null;
        try
        {
            Extension extensionInstance;
            for(; iter.hasNext(); extensionList.add(extensionInstance))
            {
                className = iter.next().toString();
		//加载className
                extensionInstance = (Extension)Class.forName(className).newInstance();
		//初始化class
                extensionInstance.init(conn, props);
            }

        }
        catch(Throwable t)
        {
            SQLException sqlEx = SQLError.createSQLException(Messages.getString(errorMessageKey, new Object[] {
                className
            }), exceptionInterceptor);
            sqlEx.initCause(t);
            throw sqlEx;
        }
        return extensionList;
    }

再看负载均衡器的初始化,这里我们以BestResponseTimeBalanceStrategy为例:
public class BestResponseTimeBalanceStrategy
    implements BalanceStrategy
{
   public void init(Connection connection, Properties properties)
        throws SQLException
    {
      //初始化为空,待扩展
    }
}


回到LoadBalancingConnectionProxy的构造方法,从负载均衡器获取连接
p
rivate synchronized void pickNewConnection()
        throws SQLException
    {
        if(currentConn == null)
        {
            currentConn = balancer.pickConnection(this, Collections.unmodifiableList(hostList), Collections.unmodifiableMap(liveConnections), (long[])responseTimes.clone(), retriesAllDown);
            return;
        } else
        {
            Connection newConn = balancer.pickConnection(this, Collections.unmodifiableList(hostList), Collections.unmodifiableMap(liveConnections), (long[])responseTimes.clone(), retriesAllDown);
            newConn.setTransactionIsolation(currentConn.getTransactionIsolation());
            newConn.setAutoCommit(currentConn.getAutoCommit());
            currentConn = newConn;
            return;
        }
    }

查看BestResponseTimeBalanceStrategy的pickConnection方法
//BestResponseTimeBalanceStrategy
 
public Connection pickConnection(LoadBalancingConnectionProxy proxy, List configuredHosts, Map liveConnections, long responseTimes[], int numRetries)
        throws SQLException
    {
        SQLException ex;
label0:
        {
            Map blackList = proxy.getGlobalBlacklist();
            ex = null;
            int attempts = 0;
            Connection conn;
            do
            {
                if(attempts >= numRetries)
                    break label0;
                long minResponseTime = 9223372036854775807L;
                int bestHostIndex = 0;
		//获取代理host黑名单
                if(blackList.size() == configuredHosts.size())
                    blackList = proxy.getGlobalBlacklist();
		//从responseTimes筛选出相应时间最小的host索引index
                for(int i = 0; i < responseTimes.length; i++)
                {
                    long candidateResponseTime = responseTimes[i];
                    if(candidateResponseTime >= minResponseTime || blackList.containsKey(configuredHosts.get(i)))
                        continue;
                    if(candidateResponseTime == 0L)
                    {
                        bestHostIndex = i;
                        break;
                    }
                    bestHostIndex = i;
                    minResponseTime = candidateResponseTime;
                }
                //从configuredHosts获取host
                String bestHost = (String)configuredHosts.get(bestHostIndex);
		//从liveConnections获取连接
                conn = (Connection)liveConnections.get(bestHost);
                if(conn != null)
                    break;
                try
                {
		    //如果liveConnections不存在host对应的连接,则通过代理去创建一个连接
                    conn = proxy.createConnectionForHost(bestHost);
                    break;
                }
                catch(SQLException sqlEx)
                {
                    ex = sqlEx;
                    if((sqlEx instanceof CommunicationsException) || "08S01".equals(sqlEx.getSQLState()))
                    {
		        //如果创建连接异常,则加入黑名单
                        proxy.addToGlobalBlacklist(bestHost);
                        blackList.put(bestHost, null);
                        if(blackList.size() == configuredHosts.size())
                        {
                            attempts++;
                            try
                            {
                                Thread.sleep(250L);
                            }
                            catch(InterruptedException e) { }
                            blackList = proxy.getGlobalBlacklist();
                        }
                    } else
                    {
                        throw sqlEx;
                    }
                }
            } while(true);
            return conn;
        }
        if(ex != null)
            throw ex;
        else
            return null;
    }

来看LoadBalancingConnectionProxy创建连接:
public synchronized Connection createConnectionForHost(String hostPortSpec)
        throws SQLException
    {
        Properties connProps = (Properties)localProps.clone();
        String hostPortPair[] = NonRegisteringDriver.parseHostPortPair(hostPortSpec);
        if(hostPortPair[1] == null)
            hostPortPair[1] = "3306";
        connProps.setProperty("HOST", hostPortSpec);
        connProps.setProperty("PORT", hostPortPair[1]);
	//返回的实际为ConnectionImpl
        Connection conn = ConnectionImpl.getInstance(hostPortSpec, Integer.parseInt(hostPortPair[1]), connProps, connProps.getProperty("DBNAME"), "jdbc:mysql://" + hostPortPair[0] + ":" + hostPortPair[1] + "/");
        liveConnections.put(hostPortSpec, conn);
        connectionsToHostsMap.put(conn, hostPortSpec);
        return conn;
    }

在回到connectLoadBalanced函数:
//通过代理新建代理实例Connection
return (Connection)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {
           java.sql.Connection.class
        }, proxyBal);

//Proxy
public static Object newProxyInstance(ClassLoader loader,
					  Class<?>[] interfaces,
					  InvocationHandler h)
	throws IllegalArgumentException
    {
	if (h == null) {
	    throw new NullPointerException();
	}

	/*
	 * Look up or generate the designated proxy class.
	 */
        Class<?> cl = getProxyClass0(loader, interfaces); // stack walk magic: do not refactor

	/*
	 * Invoke its constructor with the designated invocation handler.
	 */
	try {
            final Constructor<?> cons = cl.getConstructor(constructorParams);
            final InvocationHandler ih = h;
            SecurityManager sm = System.getSecurityManager();
            if (sm != null && ProxyAccessHelper.needsNewInstanceCheck(cl)) {
                // create proxy instance with doPrivilege as the proxy class may
                // implement non-public interfaces that requires a special permission
                return AccessController.doPrivileged(new PrivilegedAction<Object>() {
                    public Object run() {
		        //创建实例
                        return newInstance(cons, ih);
                    }
                });
            } else {
                return newInstance(cons, ih);
            }
	} catch (NoSuchMethodException e) {
	    throw new InternalError(e.toString());
	} 
    }
//创建实例
private static Object newInstance(Constructor<?> cons, InvocationHandler h) {
        try {
            return cons.newInstance(new Object[] {h} );
        } catch (IllegalAccessException e) {
            throw new InternalError(e.toString());
        } catch (InstantiationException e) {
            throw new InternalError(e.toString());
        } catch (InvocationTargetException e) {
            Throwable t = e.getCause();
            if (t instanceof RuntimeException) {
                throw (RuntimeException) t;
            } else {
                throw new InternalError(t.toString());
            }
        }
    }

回到LoadBalancingConnectionProxy
public Object invoke(Object proxy, Method method, Object args[])
        throws Throwable
    {
        String methodName = method.getName();
        if("equals".equals(methodName) && args.length == 1)
            if(args[0] instanceof Proxy)
                return Boolean.valueOf(((Proxy)args[0]).equals(this));
            else
                return Boolean.valueOf(equals(args[0]));
        if("close".equals(methodName))
        {
            synchronized(this)
            {
                for(Iterator allConnections = liveConnections.values().iterator(); allConnections.hasNext(); ((Connection)allConnections.next()).close());
                if(!isClosed)
                    balancer.destroy();
                liveConnections.clear();
                connectionsToHostsMap.clear();
            }
            return null;
        }
        if("isClosed".equals(methodName))
            return Boolean.valueOf(isClosed);
        if(isClosed)
            throw SQLError.createSQLException("No operations allowed after connection closed.", "08003", null);
        if(!inTransaction)
        {
            inTransaction = true;
            transactionStartTime = getLocalTimeBestResolution();
        }
        Object result = null;
        try
        {
	    //关键在这里,当调用Connection的方法是,实际上调用的currentConn的对应方法
	    //这个currentConn我们前面有说
            result = method.invoke(currentConn, args);
            if(result != null)
            {
                if(result instanceof Statement)
                    ((Statement)result).setPingTarget(this);
                result = proxyIfInterfaceIsJdbc(result, result.getClass());
            }
        }
        catch(InvocationTargetException e)
        {
            dealWithInvocationException(e);
        }
        finally
        {
            if("commit".equals(methodName) || "rollback".equals(methodName))
            {
                inTransaction = false;
                String host = (String)connectionsToHostsMap.get(currentConn);
                if(host != null)
                {
                    int hostIndex = ((Integer)hostsToListIndexMap.get(host)).intValue();
                    synchronized(responseTimes)
                    {
                        responseTimes[hostIndex] = getLocalTimeBestResolution() - transactionStartTime;
                    }
                }
                pickNewConnection();
            }
        }
        return result;
    }

这里我们总结一下:
NonRegisteringDriver的负载均衡连接获取方法connectLoadBalanced,首先
构造负载均衡连接代理LoadBalancingConnectionProxy,再通过java动态代理Proxy
产生新建代理实例Connection,当我们调用Connection的prepareStatement等方法时,
实际上通过LoadBalancingConnectionProxy的currentConn(ConnectionImpl)调用其相应的方法。在构建LoadBalancingConnectionProxy的过程中,首先,初始化存活连接liveConnections,Host连接的相应时间responseTimes,构建负载均衡策略BestResponseTimeBalanceStrategy,RandomBalanceStrategy或InvalidLoadBalanceStrategy,然后初始化负载均衡策略,最后从负载均衡器获取连接,BestResponseTimeBalanceStrategy实际上是从liveConnections获取除host黑名单以外,相应时间最小的Connection,如果没有,则创建连接。
分享到:
评论

相关推荐

    mysql数据优化详细教程

    负载均衡是应用中使用非常普遍的一种优化方法,它的机制就是利用某种均衡算法,将固定的负载量分布到不同的服务器上, 以此来降低单台服务器的负载,达到优化的效果。通过MySQL的主从复制,实现读写分离,使增删改...

    linux集群应用实战

    第26讲 配置mysql+lvs+keeplived实现mysql读操作的负载均衡 课程目标: 熟练掌握mysql+lvs+keeplived的部署方式 keeplived简介 通过配置lvs+keeplived实现mysql读操作的负载均衡 测试mysql+lvs+keeplived负载均衡...

    基于muduo开发的集群聊天服务器c++源码+数据库+使用说明.zip

    - 使用 Nginx 的 TCP 负载均衡功能,将客户端请求分派到多个服务器上,以提高并发处理能力 - 基于发布-订阅的服务器中间件redis消息队列,解决跨服务器通信难题 - 封装 MySQL 接口,将用户数据储存到磁盘中,实现...

    开涛高可用高并发-亿级流量核心技术

    2 负载均衡与反向代理 18 2.1 upstream配置 20 2.2 负载均衡算法 21 2.3 失败重试 23 2.4 健康检查 24 2.4.1 TCP心跳检查 24 2.4.2 HTTP心跳检查 25 2.5 其他配置 25 2.5.1 域名上游服务器 25 2.5.2 备份上游服务器 ...

    LNMP 一键安装包.zip

    作为负载均衡服务器:Nginx 既可以在内部直接支持 Rails 和 PHP,也可以支持作为 HTTP代理服务器 对外进行服务。Nginx 用 C 编写, 不论是系统资源开销还是 CPU 使用效率都比 Perlbal 要好的多。 作为邮件代理...

    springcloud入门

    springcloud-gateway:网关接口,暴露给调用方调用,包含负载均衡、重试、熔断等功能。 springcloud-zipkin:链路跟踪工具,监控并就持久化微服务集群中调用链路的通畅情况,采用rabbitmq异步传输、elasticsearch...

    terraform-aws-vpc-example

    负载均衡 自动缩放 受SSL保护 使用Route53路由的DNS 仅限来自允许的IP地址列表的流量 可通过SSH访问 Web应用程序已部署到Web服务器实例,以直观地演示基础架构的成功部署。 该应用程序的构建驻留在Amazon S3存储中...

    php网络开发完全手册

    15.3.1 数据库服务器的连接与断开 241 15.3.2 选择数据库 242 15.3.3 执行SQL语句 242 15.3.4 获得查询结果集中的记录数 243 15.3.5 获得结果集中的某一条记录 244 15.3.6 逐行获取结果集中的每一条 15.3.6 记录 245...

    【E4W】HTTP Web 服务器开发框架,mysql 插件内多线程稳定-易语言

    待更新或目标:session 分布式管理 、日志系统、URL事件绑定、Web 网页控制台,修复负载均衡稳定性; 更新日期:2020-06-02 --&gt; 2020-06-27 更新功能:  修复【fix】:  1、修复非80端口访问失败错误,感谢...

    Eclipse开发分布式商城系统+完整视频代码及文档

    │ 04.nginx的反向代理及负载均衡.avi │ 05.FastDFS介绍.avi │ 06.FastDFS安装步骤-文件上传.avi │ 07.配置nginx插件访问图片.avi │ 08.测试图片上传.avi │ 09.FastDFS工具类的使用.avi │ 10.图片上传过程分析...

    asp.net知识库

    在Asp.net中如何用SQLDMO来获取SQL Server中的对象信息 使用Relations建立表之间的关系并却使用PagedDataSource类对DataList进行分页 通过作业,定时同步两个数据库 SQLSERVER高级注入技巧 利用反射实现ASP.NET控件和...

Global site tag (gtag.js) - Google Analytics