- 浏览: 954042 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
Java动态代理:http://www.cnblogs.com/xiaoluo501395377/p/3383130.html
http://www.360doc.com/content/14/0801/14/1073512_398598312.shtml
JDBC驱动初始化-Mysql:http://donald-draper.iteye.com/blog/2342010
JDBC连接的获取:http://donald-draper.iteye.com/blog/2342011
前面我们讲过单机Server数据库连接的获取,今天来说一下,负载均衡集群下,连接的获取
url为jdbc:mysql:loadbalance://的数据库连接获取方法
//NonRegisteringDriver
//负载均衡连接获取方法
//LoadBalancingConnectionProxy
下面分三步来看LoadBalancingConnectionProxy的构建
//构建负载均衡策略
//Util
再看负载均衡器的初始化,这里我们以BestResponseTimeBalanceStrategy为例:
回到LoadBalancingConnectionProxy的构造方法,从负载均衡器获取连接
p
查看BestResponseTimeBalanceStrategy的pickConnection方法
//BestResponseTimeBalanceStrategy
来看LoadBalancingConnectionProxy创建连接:
在回到connectLoadBalanced函数:
//通过代理新建代理实例Connection
//Proxy
回到LoadBalancingConnectionProxy
这里我们总结一下:
NonRegisteringDriver的负载均衡连接获取方法connectLoadBalanced,首先
构造负载均衡连接代理LoadBalancingConnectionProxy,再通过java动态代理Proxy
产生新建代理实例Connection,当我们调用Connection的prepareStatement等方法时,
实际上通过LoadBalancingConnectionProxy的currentConn(ConnectionImpl)调用其相应的方法。在构建LoadBalancingConnectionProxy的过程中,首先,初始化存活连接liveConnections,Host连接的相应时间responseTimes,构建负载均衡策略BestResponseTimeBalanceStrategy,RandomBalanceStrategy或InvalidLoadBalanceStrategy,然后初始化负载均衡策略,最后从负载均衡器获取连接,BestResponseTimeBalanceStrategy实际上是从liveConnections获取除host黑名单以外,相应时间最小的Connection,如果没有,则创建连接。
http://www.360doc.com/content/14/0801/14/1073512_398598312.shtml
JDBC驱动初始化-Mysql:http://donald-draper.iteye.com/blog/2342010
JDBC连接的获取:http://donald-draper.iteye.com/blog/2342011
前面我们讲过单机Server数据库连接的获取,今天来说一下,负载均衡集群下,连接的获取
url为jdbc:mysql:loadbalance://的数据库连接获取方法
if(StringUtils.startsWithIgnoreCase(url, "jdbc:mysql:loadbalance://")) return connectLoadBalanced(url, info);
//NonRegisteringDriver
//负载均衡连接获取方法
private Connection connectLoadBalanced(String url, Properties info) throws SQLException { //解析url Properties parsedProps = parseURL(url, info); parsedProps.remove("roundRobinLoadBalance"); if(parsedProps == null) return null; String hostValues = parsedProps.getProperty("HOST"); List hostList = null; if(hostValues != null) hostList = StringUtils.split(hostValues, ",", true); if(hostList == null) { hostList = new ArrayList(); hostList.add("localhost:3306"); } //构造负载均衡连接代理 LoadBalancingConnectionProxy proxyBal = new LoadBalancingConnectionProxy(hostList, parsedProps); //通过代理新建代理实例Connection return (Connection)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] { java.sql.Connection.class }, proxyBal); }
//LoadBalancingConnectionProxy
public class LoadBalancingConnectionProxy implements InvocationHandler, PingTarget { private static Method getLocalTimeMethod; public static final String BLACKLIST_TIMEOUT_PROPERTY_KEY = "loadBalanceBlacklistTimeout"; private Connection currentConn; private List hostList; private Map liveConnections; private Map connectionsToHostsMap; private long responseTimes[]; private Map hostsToListIndexMap; private boolean inTransaction; private long transactionStartTime; private Properties localProps; private boolean isClosed; private BalanceStrategy balancer;//负载均衡策略 private int retriesAllDown; private static Map globalBlacklist = new HashMap(); private int globalBlacklistTimeout; static { try { getLocalTimeMethod = (java.lang.System.class).getMethod("nanoTime", new Class[0]); } catch(SecurityException e) { } catch(NoSuchMethodException e) { } } LoadBalancingConnectionProxy(List hosts, Properties props) throws SQLException { inTransaction = false; transactionStartTime = 0L; isClosed = false; globalBlacklistTimeout = 0; hostList = hosts; int numHosts = hostList.size(); //存活连接 liveConnections = new HashMap(numHosts); connectionsToHostsMap = new HashMap(numHosts); //Host连接的相应时间 responseTimes = new long[numHosts]; hostsToListIndexMap = new HashMap(numHosts); for(int i = 0; i < numHosts; i++) hostsToListIndexMap.put(hostList.get(i), new Integer(i)); localProps = (Properties)props.clone(); localProps.remove("HOST"); localProps.remove("PORT"); localProps.setProperty("useLocalSessionState", "true"); String strategy = localProps.getProperty("loadBalanceStrategy", "random"); String retriesAllDownAsString = localProps.getProperty("retriesAllDown", "120"); try { retriesAllDown = Integer.parseInt(retriesAllDownAsString); } catch(NumberFormatException nfe) { throw SQLError.createSQLException(Messages.getString("LoadBalancingConnectionProxy.badValueForRetriesAllDown", new Object[] { retriesAllDownAsString }), "S1009", null); } String blacklistTimeoutAsString = localProps.getProperty("loadBalanceBlacklistTimeout", "0"); try { globalBlacklistTimeout = Integer.parseInt(blacklistTimeoutAsString); } catch(NumberFormatException nfe) { throw SQLError.createSQLException(Messages.getString("LoadBalancingConnectionProxy.badValueForLoadBalanceBlacklistTimeout", new Object[] { retriesAllDownAsString }), "S1009", null); } //构建负载均衡策略 if("random".equals(strategy)) balancer = (BalanceStrategy)Util.loadExtensions(null, props, "com.mysql.jdbc.RandomBalanceStrategy", "InvalidLoadBalanceStrategy", null).get(0); else if("bestResponseTime".equals(strategy)) balancer = (BalanceStrategy)Util.loadExtensions(null, props, "com.mysql.jdbc.BestResponseTimeBalanceStrategy", "InvalidLoadBalanceStrategy", null).get(0); else balancer = (BalanceStrategy)Util.loadExtensions(null, props, strategy, "InvalidLoadBalanceStrategy", null).get(0); //初始化负载均衡器 balancer.init(null, props); //从负载均衡器获取连接 pickNewConnection(); } }
下面分三步来看LoadBalancingConnectionProxy的构建
//构建负载均衡策略
//Util
public static List loadExtensions(Connection conn, Properties props, String extensionClassNames, String errorMessageKey, ExceptionInterceptor exceptionInterceptor) throws SQLException { List extensionList = new LinkedList(); List interceptorsToCreate = StringUtils.split(extensionClassNames, ",", true); Iterator iter = interceptorsToCreate.iterator(); String className = null; try { Extension extensionInstance; for(; iter.hasNext(); extensionList.add(extensionInstance)) { className = iter.next().toString(); //加载className extensionInstance = (Extension)Class.forName(className).newInstance(); //初始化class extensionInstance.init(conn, props); } } catch(Throwable t) { SQLException sqlEx = SQLError.createSQLException(Messages.getString(errorMessageKey, new Object[] { className }), exceptionInterceptor); sqlEx.initCause(t); throw sqlEx; } return extensionList; }
再看负载均衡器的初始化,这里我们以BestResponseTimeBalanceStrategy为例:
public class BestResponseTimeBalanceStrategy implements BalanceStrategy { public void init(Connection connection, Properties properties) throws SQLException { //初始化为空,待扩展 } }
回到LoadBalancingConnectionProxy的构造方法,从负载均衡器获取连接
p
rivate synchronized void pickNewConnection() throws SQLException { if(currentConn == null) { currentConn = balancer.pickConnection(this, Collections.unmodifiableList(hostList), Collections.unmodifiableMap(liveConnections), (long[])responseTimes.clone(), retriesAllDown); return; } else { Connection newConn = balancer.pickConnection(this, Collections.unmodifiableList(hostList), Collections.unmodifiableMap(liveConnections), (long[])responseTimes.clone(), retriesAllDown); newConn.setTransactionIsolation(currentConn.getTransactionIsolation()); newConn.setAutoCommit(currentConn.getAutoCommit()); currentConn = newConn; return; } }
查看BestResponseTimeBalanceStrategy的pickConnection方法
//BestResponseTimeBalanceStrategy
public Connection pickConnection(LoadBalancingConnectionProxy proxy, List configuredHosts, Map liveConnections, long responseTimes[], int numRetries) throws SQLException { SQLException ex; label0: { Map blackList = proxy.getGlobalBlacklist(); ex = null; int attempts = 0; Connection conn; do { if(attempts >= numRetries) break label0; long minResponseTime = 9223372036854775807L; int bestHostIndex = 0; //获取代理host黑名单 if(blackList.size() == configuredHosts.size()) blackList = proxy.getGlobalBlacklist(); //从responseTimes筛选出相应时间最小的host索引index for(int i = 0; i < responseTimes.length; i++) { long candidateResponseTime = responseTimes[i]; if(candidateResponseTime >= minResponseTime || blackList.containsKey(configuredHosts.get(i))) continue; if(candidateResponseTime == 0L) { bestHostIndex = i; break; } bestHostIndex = i; minResponseTime = candidateResponseTime; } //从configuredHosts获取host String bestHost = (String)configuredHosts.get(bestHostIndex); //从liveConnections获取连接 conn = (Connection)liveConnections.get(bestHost); if(conn != null) break; try { //如果liveConnections不存在host对应的连接,则通过代理去创建一个连接 conn = proxy.createConnectionForHost(bestHost); break; } catch(SQLException sqlEx) { ex = sqlEx; if((sqlEx instanceof CommunicationsException) || "08S01".equals(sqlEx.getSQLState())) { //如果创建连接异常,则加入黑名单 proxy.addToGlobalBlacklist(bestHost); blackList.put(bestHost, null); if(blackList.size() == configuredHosts.size()) { attempts++; try { Thread.sleep(250L); } catch(InterruptedException e) { } blackList = proxy.getGlobalBlacklist(); } } else { throw sqlEx; } } } while(true); return conn; } if(ex != null) throw ex; else return null; }
来看LoadBalancingConnectionProxy创建连接:
public synchronized Connection createConnectionForHost(String hostPortSpec) throws SQLException { Properties connProps = (Properties)localProps.clone(); String hostPortPair[] = NonRegisteringDriver.parseHostPortPair(hostPortSpec); if(hostPortPair[1] == null) hostPortPair[1] = "3306"; connProps.setProperty("HOST", hostPortSpec); connProps.setProperty("PORT", hostPortPair[1]); //返回的实际为ConnectionImpl Connection conn = ConnectionImpl.getInstance(hostPortSpec, Integer.parseInt(hostPortPair[1]), connProps, connProps.getProperty("DBNAME"), "jdbc:mysql://" + hostPortPair[0] + ":" + hostPortPair[1] + "/"); liveConnections.put(hostPortSpec, conn); connectionsToHostsMap.put(conn, hostPortSpec); return conn; }
在回到connectLoadBalanced函数:
//通过代理新建代理实例Connection
return (Connection)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] { java.sql.Connection.class }, proxyBal);
//Proxy
public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, InvocationHandler h) throws IllegalArgumentException { if (h == null) { throw new NullPointerException(); } /* * Look up or generate the designated proxy class. */ Class<?> cl = getProxyClass0(loader, interfaces); // stack walk magic: do not refactor /* * Invoke its constructor with the designated invocation handler. */ try { final Constructor<?> cons = cl.getConstructor(constructorParams); final InvocationHandler ih = h; SecurityManager sm = System.getSecurityManager(); if (sm != null && ProxyAccessHelper.needsNewInstanceCheck(cl)) { // create proxy instance with doPrivilege as the proxy class may // implement non-public interfaces that requires a special permission return AccessController.doPrivileged(new PrivilegedAction<Object>() { public Object run() { //创建实例 return newInstance(cons, ih); } }); } else { return newInstance(cons, ih); } } catch (NoSuchMethodException e) { throw new InternalError(e.toString()); } } //创建实例 private static Object newInstance(Constructor<?> cons, InvocationHandler h) { try { return cons.newInstance(new Object[] {h} ); } catch (IllegalAccessException e) { throw new InternalError(e.toString()); } catch (InstantiationException e) { throw new InternalError(e.toString()); } catch (InvocationTargetException e) { Throwable t = e.getCause(); if (t instanceof RuntimeException) { throw (RuntimeException) t; } else { throw new InternalError(t.toString()); } } }
回到LoadBalancingConnectionProxy
public Object invoke(Object proxy, Method method, Object args[]) throws Throwable { String methodName = method.getName(); if("equals".equals(methodName) && args.length == 1) if(args[0] instanceof Proxy) return Boolean.valueOf(((Proxy)args[0]).equals(this)); else return Boolean.valueOf(equals(args[0])); if("close".equals(methodName)) { synchronized(this) { for(Iterator allConnections = liveConnections.values().iterator(); allConnections.hasNext(); ((Connection)allConnections.next()).close()); if(!isClosed) balancer.destroy(); liveConnections.clear(); connectionsToHostsMap.clear(); } return null; } if("isClosed".equals(methodName)) return Boolean.valueOf(isClosed); if(isClosed) throw SQLError.createSQLException("No operations allowed after connection closed.", "08003", null); if(!inTransaction) { inTransaction = true; transactionStartTime = getLocalTimeBestResolution(); } Object result = null; try { //关键在这里,当调用Connection的方法是,实际上调用的currentConn的对应方法 //这个currentConn我们前面有说 result = method.invoke(currentConn, args); if(result != null) { if(result instanceof Statement) ((Statement)result).setPingTarget(this); result = proxyIfInterfaceIsJdbc(result, result.getClass()); } } catch(InvocationTargetException e) { dealWithInvocationException(e); } finally { if("commit".equals(methodName) || "rollback".equals(methodName)) { inTransaction = false; String host = (String)connectionsToHostsMap.get(currentConn); if(host != null) { int hostIndex = ((Integer)hostsToListIndexMap.get(host)).intValue(); synchronized(responseTimes) { responseTimes[hostIndex] = getLocalTimeBestResolution() - transactionStartTime; } } pickNewConnection(); } } return result; }
这里我们总结一下:
NonRegisteringDriver的负载均衡连接获取方法connectLoadBalanced,首先
构造负载均衡连接代理LoadBalancingConnectionProxy,再通过java动态代理Proxy
产生新建代理实例Connection,当我们调用Connection的prepareStatement等方法时,
实际上通过LoadBalancingConnectionProxy的currentConn(ConnectionImpl)调用其相应的方法。在构建LoadBalancingConnectionProxy的过程中,首先,初始化存活连接liveConnections,Host连接的相应时间responseTimes,构建负载均衡策略BestResponseTimeBalanceStrategy,RandomBalanceStrategy或InvalidLoadBalanceStrategy,然后初始化负载均衡策略,最后从负载均衡器获取连接,BestResponseTimeBalanceStrategy实际上是从liveConnections获取除host黑名单以外,相应时间最小的Connection,如果没有,则创建连接。
发表评论
-
Mysql PreparedStatement 批处理
2016-12-06 18:09 1323JDBC驱动初始化-Mysql:http://donald-d ... -
MySQL ServerPreparedStatement查询
2016-12-06 14:42 1248JDBC驱动初始化-Mysql:http://donald-d ... -
MysqlSQL PreparedStatement的查询
2016-12-06 11:40 1975JDBC驱动初始化-Mysql:http://donald-d ... -
Mysql预编译SQL
2016-12-05 19:06 1110JDBC驱动初始化-Mysql:http://donald-d ... -
ConnectionImp创建MysqlIO
2016-12-05 19:01 963JDBC驱动初始化-Mysql:http://donald-d ... -
Mysql主从复制读写分离连接的获取
2016-12-01 08:43 1111JDBC驱动初始化-Mysql:http://donald-d ... -
JDBC连接的获取
2016-11-30 12:44 2874JDBC驱动初始化-Mysql:http://donald-d ... -
JDBC驱动初始化-Mysql
2016-11-30 12:41 3002JDBC驱动初始化-Mysql:http://donald-d ... -
mysql 存储过程
2016-07-17 14:49 862存储过程基础:http://sishuok.com/forum ... -
java.sql.Date,java.util.Date,java.sql.Timestamp的区别
2016-07-17 11:50 841java.sql.Date,jdbc映射数据库中的date类型 ... -
JDBC PreparedStatement 的用法
2016-07-15 17:27 851import java.sql.Connection; im ...
相关推荐
负载均衡是应用中使用非常普遍的一种优化方法,它的机制就是利用某种均衡算法,将固定的负载量分布到不同的服务器上, 以此来降低单台服务器的负载,达到优化的效果。通过MySQL的主从复制,实现读写分离,使增删改...
第26讲 配置mysql+lvs+keeplived实现mysql读操作的负载均衡 课程目标: 熟练掌握mysql+lvs+keeplived的部署方式 keeplived简介 通过配置lvs+keeplived实现mysql读操作的负载均衡 测试mysql+lvs+keeplived负载均衡...
- 使用 Nginx 的 TCP 负载均衡功能,将客户端请求分派到多个服务器上,以提高并发处理能力 - 基于发布-订阅的服务器中间件redis消息队列,解决跨服务器通信难题 - 封装 MySQL 接口,将用户数据储存到磁盘中,实现...
2 负载均衡与反向代理 18 2.1 upstream配置 20 2.2 负载均衡算法 21 2.3 失败重试 23 2.4 健康检查 24 2.4.1 TCP心跳检查 24 2.4.2 HTTP心跳检查 25 2.5 其他配置 25 2.5.1 域名上游服务器 25 2.5.2 备份上游服务器 ...
作为负载均衡服务器:Nginx 既可以在内部直接支持 Rails 和 PHP,也可以支持作为 HTTP代理服务器 对外进行服务。Nginx 用 C 编写, 不论是系统资源开销还是 CPU 使用效率都比 Perlbal 要好的多。 作为邮件代理...
springcloud-gateway:网关接口,暴露给调用方调用,包含负载均衡、重试、熔断等功能。 springcloud-zipkin:链路跟踪工具,监控并就持久化微服务集群中调用链路的通畅情况,采用rabbitmq异步传输、elasticsearch...
负载均衡 自动缩放 受SSL保护 使用Route53路由的DNS 仅限来自允许的IP地址列表的流量 可通过SSH访问 Web应用程序已部署到Web服务器实例,以直观地演示基础架构的成功部署。 该应用程序的构建驻留在Amazon S3存储中...
15.3.1 数据库服务器的连接与断开 241 15.3.2 选择数据库 242 15.3.3 执行SQL语句 242 15.3.4 获得查询结果集中的记录数 243 15.3.5 获得结果集中的某一条记录 244 15.3.6 逐行获取结果集中的每一条 15.3.6 记录 245...
待更新或目标:session 分布式管理 、日志系统、URL事件绑定、Web 网页控制台,修复负载均衡稳定性; 更新日期:2020-06-02 --> 2020-06-27 更新功能: 修复【fix】: 1、修复非80端口访问失败错误,感谢...
│ 04.nginx的反向代理及负载均衡.avi │ 05.FastDFS介绍.avi │ 06.FastDFS安装步骤-文件上传.avi │ 07.配置nginx插件访问图片.avi │ 08.测试图片上传.avi │ 09.FastDFS工具类的使用.avi │ 10.图片上传过程分析...
在Asp.net中如何用SQLDMO来获取SQL Server中的对象信息 使用Relations建立表之间的关系并却使用PagedDataSource类对DataList进行分页 通过作业,定时同步两个数据库 SQLSERVER高级注入技巧 利用反射实现ASP.NET控件和...