博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
dubbo网络通讯(四)
阅读量:6880 次
发布时间:2019-06-27

本文共 25935 字,大约阅读时间需要 86 分钟。

dubbo网络层功能

dubbo网络通讯层主要实现了以下功能:

  1. 多种网络通讯框架的抽象封装(netty,mina,grizzly)
  2. 每个客户端主机和服务端保存单个长链接通信
  3. 异步调用转同步
  4. tcp链接的心跳和自动重连
  5. 基于header头通讯协议,请求的编解码器

dubbo网络通讯框架抽象

dubbo的网络通信基于NIO框架,一般基于事件的NIO网络框架都涉及到 channel , channelHandle核心概念,网络数据buffer, 网络数据编解码器,dubbo为了能够适配多种NIO框架,将以上概念全部又抽象了一层接口。如果有netty开发经验或者了解netty helloworld demo程序对于理解这个章节非常有帮助。

/**以下是dubbo对于channel , channelHandle封装的抽象接口*/public interface Endpoint {    /**     * get url.     *      * @return url     */    URL getUrl();    /**     * get channel handler.     *      * @return channel handler     */    ChannelHandler getChannelHandler();    /**     * get local address.     *      * @return local address.     */    InetSocketAddress getLocalAddress();        /**     * send message.     *      * @param message     * @throws RemotingException     */    void send(Object message) throws RemotingException;    /**     * send message.     *      * @param message     * @param sent 是否已发送完成     */    void send(Object message, boolean sent) throws RemotingException;    /**     * close the channel.     */    void close();        /**     * Graceful close the channel.     */    void close(int timeout);        /**     * is closed.     *      * @return closed     */    boolean isClosed();}public interface Channel extends Endpoint {    /**     * get remote address.     *      * @return remote address.     */    InetSocketAddress getRemoteAddress();    /**     * is connected.     *      * @return connected     */    boolean isConnected();    /**     * has attribute.     *      * @param key key.     * @return has or has not.     */    boolean hasAttribute(String key);    /**     * get attribute.     *      * @param key key.     * @return value.     */    Object getAttribute(String key);    /**     * set attribute.     *      * @param key key.     * @param value value.     */    void setAttribute(String key,Object value);        /**     * remove attribute.     *      * @param key key.     */    void removeAttribute(String key);}public interface Client extends Endpoint, Channel, Resetable {    /**     * reconnect.     */    void reconnect() throws RemotingException;        @Deprecated    void reset(com.alibaba.dubbo.common.Parameters parameters);    }@SPIpublic interface ChannelHandler {    /**     * on channel connected.     *      * @param channel channel.     */    void connected(Channel channel) throws RemotingException;    /**     * on channel disconnected.     *      * @param channel channel.     */    void disconnected(Channel channel) throws RemotingException;    /**     * on message sent.     *      * @param channel channel.     * @param message message.     */    void sent(Channel channel, Object message) throws RemotingException;    /**     * on message received.接收到对方消息和回调     *      * @param channel channel.     * @param message message.     */    void received(Channel channel, Object message) throws RemotingException;    /**     * on exception caught.     *      * @param channel channel.     * @param exception exception.     */    void caught(Channel channel, Throwable exception) throws RemotingException;}/**信息交换client,对Client对象接口包装,实现发送请求返回结果Future 以及长链接心跳定时发送*/public class HeaderExchangeClient implements ExchangeClient {    private static final Logger logger = LoggerFactory.getLogger( HeaderExchangeClient.class );    private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true));    // 心跳定时器    private ScheduledFuture
heatbeatTimer; // 心跳超时,毫秒。缺省0,不会执行心跳。 private int heartbeat; private int heartbeatTimeout; private final Client client; private final ExchangeChannel channel; public HeaderExchangeClient(Client client){ if (client == null) { throw new IllegalArgumentException("client == null"); } this.client = client; this.channel = new HeaderExchangeChannel(client); String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY); this.heartbeat = client.getUrl().getParameter( Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0 ); this.heartbeatTimeout = client.getUrl().getParameter( Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3 ); if ( heartbeatTimeout < heartbeat * 2 ) { throw new IllegalStateException( "heartbeatTimeout < heartbeatInterval * 2" ); } startHeatbeatTimer(); } //最核心方法,实现发送请求后返回ResponseFuture对象,通过该Future可以获取RPC请求结果 public ResponseFuture request(Object request) throws RemotingException { return channel.request(request); } public URL getUrl() { return channel.getUrl(); } public InetSocketAddress getRemoteAddress() { return channel.getRemoteAddress(); } public ResponseFuture request(Object request, int timeout) throws RemotingException { return channel.request(request, timeout); } public ChannelHandler getChannelHandler() { return channel.getChannelHandler(); } public boolean isConnected() { return channel.isConnected(); } public InetSocketAddress getLocalAddress() { return channel.getLocalAddress(); } public ExchangeHandler getExchangeHandler() { return channel.getExchangeHandler(); } public void send(Object message) throws RemotingException { channel.send(message); } public void send(Object message, boolean sent) throws RemotingException { channel.send(message, sent); } public boolean isClosed() { return channel.isClosed(); } public void close() { doClose(); channel.close(); } public void close(int timeout) { doClose(); channel.close(timeout); } public void reset(URL url) { client.reset(url); } @Deprecated public void reset(com.alibaba.dubbo.common.Parameters parameters){ reset(getUrl().addParameters(parameters.getParameters())); } public void reconnect() throws RemotingException { client.reconnect(); } public Object getAttribute(String key) { return channel.getAttribute(key); } public void setAttribute(String key, Object value) { channel.setAttribute(key, value); } public void removeAttribute(String key) { channel.removeAttribute(key); } public boolean hasAttribute(String key) { return channel.hasAttribute(key); } private void startHeatbeatTimer() { stopHeartbeatTimer(); if ( heartbeat > 0 ) { heatbeatTimer = scheduled.scheduleWithFixedDelay( new HeartBeatTask( new HeartBeatTask.ChannelProvider() { public Collection
getChannels() { return Collections.
singletonList( HeaderExchangeClient.this ); } }, heartbeat, heartbeatTimeout), heartbeat, heartbeat, TimeUnit.MILLISECONDS ); } } private void stopHeartbeatTimer() { if (heatbeatTimer != null && ! heatbeatTimer.isCancelled()) { try { heatbeatTimer.cancel(true); scheduled.purge(); } catch ( Throwable e ) { if (logger.isWarnEnabled()) { logger.warn(e.getMessage(), e); } } } heatbeatTimer =null; } private void doClose() { stopHeartbeatTimer(); } @Override public String toString() { return "HeaderExchangeClient [channel=" + channel + "]"; }}/**基于Header Request信息交换Channel,对于真实底层Chanel实现比如Netty *Channel进行了一层包装,实现将RPC请求参数Invocation封装为Request对象,*Request对象描述了RPC请求全部信息。协议头数据以及协议body数据。*/final class HeaderExchangeChannel implements ExchangeChannel { private static final Logger logger = LoggerFactory.getLogger(HeaderExchangeChannel.class); private static final String CHANNEL_KEY = HeaderExchangeChannel.class.getName() + ".CHANNEL"; private final Channel channel; private volatile boolean closed = false; HeaderExchangeChannel(Channel channel){ if (channel == null) { throw new IllegalArgumentException("channel == null"); } this.channel = channel; } static HeaderExchangeChannel getOrAddChannel(Channel ch) { if (ch == null) { return null; } HeaderExchangeChannel ret = (HeaderExchangeChannel) ch.getAttribute(CHANNEL_KEY); if (ret == null) { ret = new HeaderExchangeChannel(ch); if (ch.isConnected()) { ch.setAttribute(CHANNEL_KEY, ret); } } return ret; } static void removeChannelIfDisconnected(Channel ch) { if (ch != null && ! ch.isConnected()) { ch.removeAttribute(CHANNEL_KEY); } } public void send(Object message) throws RemotingException { send(message, getUrl().getParameter(Constants.SENT_KEY, false)); } public void send(Object message, boolean sent) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The channel " + this + " is closed!"); } if (message instanceof Request || message instanceof Response || message instanceof String) { channel.send(message, sent); } else { Request request = new Request(); request.setVersion("2.0.0"); request.setTwoWay(false); request.setData(message); channel.send(request, sent); } } public ResponseFuture request(Object request) throws RemotingException { return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT)); } //request 入参类型是Invocation invocation public ResponseFuture request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } // create request.将Invocation封装为Request对象,request对象创建的时候会生成递增的请求id,private static final AtomicLong INVOKE_ID = new AtomicLong(0); Request req = new Request(); req.setVersion("2.0.0"); req.setTwoWay(true); req.setData(request); //发送请求数据是异步的,返回future对象,调用方用于同步等待响应,类似于提交到线程池返回Future对象用于等待返回结果 DefaultFuture future = new DefaultFuture(channel, req, timeout); try{ channel.send(req); }catch (RemotingException e) { future.cancel(); throw e; } return future; } public class DefaultFuture implements ResponseFuture { //存储所有requestId 和 通道映射,用于判断当前通道是否有未完成的请求 private static final Map
CHANNELS = new ConcurrentHashMap
(); //存储请求id和future对象映射,用于收到响应唤醒等待线程 private static final Map
FUTURES = new ConcurrentHashMap
(); // invoke id. private final long id; private final Channel channel; private final Request request; private final int timeout; private final Lock lock = new ReentrantLock(); private final Condition done = lock.newCondition(); private final long start = System.currentTimeMillis(); private volatile long sent; private volatile Response response; private volatile ResponseCallback callback; public DefaultFuture(Channel channel, Request request, int timeout){ this.channel = channel; this.request = request; this.id = request.getId(); this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // put into waiting map. FUTURES.put(id, this); CHANNELS.put(id, channel); } public Object get() throws RemotingException { return get(timeout); } public Object get(int timeout) throws RemotingException { if (timeout <= 0) { timeout = Constants.DEFAULT_TIMEOUT; } if (! isDone()) { long start = System.currentTimeMillis(); lock.lock(); try { while (! isDone()) { done.await(timeout, TimeUnit.MILLISECONDS); if (isDone() || System.currentTimeMillis() - start > timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } if (! isDone()) { throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); } } return returnFromResponse(); } public boolean isDone() { return response != null; } private Object returnFromResponse() throws RemotingException { Response res = response; if (res == null) { throw new IllegalStateException("response cannot be null"); } if (res.getStatus() == Response.OK) { return res.getResult(); } if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()); } throw new RemotingException(channel, res.getErrorMessage()); } //channelHandle收到服务端响应后调用该方法唤醒调用方等待线程 public static void received(Channel channel, Response response) { try { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress())); } } finally { CHANNELS.remove(response.getId()); } } private void doReceived(Response res) { lock.lock(); try { response = res; if (done != null) { //发生收到相应信号,通知等待线程 done.signal(); } } finally { lock.unlock(); } if (callback != null) { invokeCallback(callback); } }}public interface ChannelHandlerDelegate extends ChannelHandler { public ChannelHandler getHandler();}/**HeaderExchangeHandler实现了ChannelHandler接口。实现了NIO消息回调处理,*本类最关键的方法在于received方法,同时在各个回调方法设置Chanel的读写数据时间用于心跳定时器任务发送心跳判断条件*/public class HeaderExchangeHandler implements ChannelHandlerDelegate { protected static final Logger logger = LoggerFactory.getLogger(HeaderExchangeHandler.class); public static String KEY_READ_TIMESTAMP = HeartbeatHandler.KEY_READ_TIMESTAMP; public static String KEY_WRITE_TIMESTAMP = HeartbeatHandler.KEY_WRITE_TIMESTAMP; private final ExchangeHandler handler; public HeaderExchangeHandler(ExchangeHandler handler){ if (handler == null) { throw new IllegalArgumentException("handler == null"); } this.handler = handler; } void handlerEvent(Channel channel, Request req) throws RemotingException { if (req.getData() != null && req.getData().equals(Request.READONLY_EVENT)) { channel.setAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY, Boolean.TRUE); } } Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion()); if (req.isBroken()) { Object data = req.getData(); String msg; if (data == null) msg = null; else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data); else msg = data.toString(); res.setErrorMessage("Fail to decode request due to: " + msg); res.setStatus(Response.BAD_REQUEST); return res; } // find handler by message class. Object msg = req.getData(); try { // handle data. Object result = handler.reply(channel, msg); res.setStatus(Response.OK); res.setResult(result); } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); } return res; } static void handleResponse(Channel channel, Response response) throws RemotingException { if (response != null && !response.isHeartbeat()) { //非常关键的调用,用于将client收到的响应唤醒等待响应结果的用户线程 DefaultFuture.received(channel, response); } } public void connected(Channel channel) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { handler.connected(exchangeChannel); } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } public void disconnected(Channel channel) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { handler.disconnected(exchangeChannel); } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } public void sent(Channel channel, Object message) throws RemotingException { Throwable exception = null; try { channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { handler.sent(exchangeChannel, message); } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } catch (Throwable t) { exception = t; } if (message instanceof Request) { Request request = (Request) message; DefaultFuture.sent(channel, request); } if (exception != null) { if (exception instanceof RuntimeException) { throw (RuntimeException) exception; } else if (exception instanceof RemotingException) { throw (RemotingException) exception; } else { throw new RemotingException(channel.getLocalAddress(), channel.getRemoteAddress(), exception.getMessage(), exception); } } } private static boolean isClientSide(Channel channel) { InetSocketAddress address = channel.getRemoteAddress(); URL url = channel.getUrl(); return url.getPort() == address.getPort() && NetUtils.filterLocalHost(url.getIp()) .equals(NetUtils.filterLocalHost(address.getAddress().getHostAddress())); } //关键方法,实现client端Response响应对应处理以及server端Request请求处理 public void received(Channel channel, Object message) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { if (message instanceof Request) { // handle request. Request request = (Request) message; if (request.isEvent()) { handlerEvent(channel, request); } else { if (request.isTwoWay()) { Response response = handleRequest(exchangeChannel, request); channel.send(response); } else { handler.received(exchangeChannel, request.getData()); } } } else if (message instanceof Response) { handleResponse(channel, (Response) message); } else if (message instanceof String) { if (isClientSide(channel)) { Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl()); logger.error(e.getMessage(), e); } else { String echo = handler.telnet(channel, (String) message); if (echo != null && echo.length() > 0) { channel.send(echo); } } } else { handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } public void caught(Channel channel, Throwable exception) throws RemotingException { if (exception instanceof ExecutionException) { ExecutionException e = (ExecutionException) exception; Object msg = e.getRequest(); if (msg instanceof Request) { Request req = (Request) msg; if (req.isTwoWay() && ! req.isHeartbeat()) { Response res = new Response(req.getId(), req.getVersion()); res.setStatus(Response.SERVER_ERROR); res.setErrorMessage(StringUtils.toString(e)); channel.send(res); return; } } } ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { handler.caught(exchangeChannel, exception); } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } public ChannelHandler getHandler() { if (handler instanceof ChannelHandlerDelegate) { return ((ChannelHandlerDelegate) handler).getHandler(); } else { return handler; } }}/**千呼万唤始出来,是时候展示dubbo协议的入口函数了*/public class DubboProtocol extends AbstractProtocol { //根据服务提供者URL和服务接口类型创建Invoker对象 public
Invoker
refer(Class
serviceType, URL url) throws RpcException { // modified by lishen optimizeSerialization(url); // create rpc invoker. DubboInvoker
invoker = new DubboInvoker
(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; } private ExchangeClient[] getClients(URL url){ //默认使用共享链接 boolean service_share_connect = false; int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); //如果connections不配置,则共享连接,否则每服务每连接 if (connections == 0){ service_share_connect = true; connections = 1; } ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { if (service_share_connect){ clients[i] = getSharedClient(url); } else { clients[i] = initClient(url); } } return clients; } /** *获取共享连接 */ private ExchangeClient getSharedClient(URL url){ String key = url.getAddress(); ReferenceCountExchangeClient client = referenceClientMap.get(key); if ( client != null ){ if ( !client.isClosed()){ client.incrementAndGetCount(); return client; } else {// logger.warn(new IllegalStateException("client is closed,but stay in clientmap .client :"+ client)); referenceClientMap.remove(key); } } ExchangeClient exchagneclient = initClient(url); client = new ReferenceCountExchangeClient(exchagneclient, ghostClientMap); referenceClientMap.put(key, client); ghostClientMap.remove(key); return client; } /** * 创建新连接. */ private ExchangeClient initClient(URL url) { // client type setting. String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT)); String version = url.getParameter(Constants.DUBBO_VERSION_KEY); boolean compatible = (version != null && version.startsWith("1.0.")); url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() && compatible ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); //默认开启heartbeat url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // BIO存在严重性能问题,暂时不允许使用 if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { throw new RpcException("Unsupported client type: " + str + "," + " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " ")); } ExchangeClient client ; try { //设置连接应该是lazy的 if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)){ client = new LazyConnectExchangeClient(url ,requestHandler); } else { //该工厂方法会创建之前介绍的HeaderExchangeClient实例 client = Exchangers.connect(url ,requestHandler); } } catch (RemotingException e) { throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e); } return client; }}/**通过DubboProtocol创建好的ExchangeClient对象来创建DubboInvoker*DubboInvoker可以支持异步调用,同步调用,仅发送请求3种模式*/public class DubboInvoker
extends AbstractInvoker
{ private final ExchangeClient[] clients; private final AtomicPositiveInteger index = new AtomicPositiveInteger(); private final String version; private final ReentrantLock destroyLock = new ReentrantLock(); private final Set
> invokers; public DubboInvoker(Class
serviceType, URL url, ExchangeClient[] clients){ this(serviceType, url, clients, null); } public DubboInvoker(Class
serviceType, URL url, ExchangeClient[] clients, Set
> invokers){ super(serviceType, url, new String[] {Constants.INTERFACE_KEY, Constants.GROUP_KEY, Constants.TOKEN_KEY, Constants.TIMEOUT_KEY}); this.clients = clients; // get version. this.version = url.getParameter(Constants.VERSION_KEY, "0.0.0"); this.invokers = invokers; } @Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout) ; RpcContext.getContext().setFuture(new FutureAdapter
(future)); return new RpcResult(); } else { RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } } @Override public boolean isAvailable() { if (!super.isAvailable()) return false; for (ExchangeClient client : clients){ if (client.isConnected() && !client.hasAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY)){ //cannot write == not Available ? return true ; } } return false; } public void destroy() { //防止client被关闭多次.在connect per jvm的情况下,client.close方法会调用计数器-1,当计数器小于等于0的情况下,才真正关闭 if (super.isDestroyed()){ return ; } else { //dubbo check ,避免多次关闭 destroyLock.lock(); try{ if (super.isDestroyed()){ return ; } super.destroy(); if (invokers != null){ invokers.remove(this); } for (ExchangeClient client : clients) { try { client.close(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } }finally { destroyLock.unlock(); } } }}复制代码

转载于:https://juejin.im/post/5bf29697f265da6172651692

你可能感兴趣的文章
mongos-sharding连接池配置
查看>>
html2canvas的踩坑之路
查看>>
wumii 爆款总结经验
查看>>
芯咖汇沙龙首战告捷,与诸位AI大咖一起探讨人工智能
查看>>
BLUEGUARD-E智能锁,门锁和手机自动组成cp
查看>>
在家也能逛家居商场!宜家推出VR购物应用
查看>>
java学习笔记--常用类(System,Runtime,date类, Math 数学类,Random 随机数类 )
查看>>
面对前车之鉴的AR,现在的VR要做些什么?
查看>>
Fitbit表高兴太早,廉价手环和智能手表的威胁就在眼前
查看>>
为什么SAP默默开始区块链研究测试
查看>>
Redis Sentinel安装、配置和部署
查看>>
英伟达联合达索系统与水晶石,打造VR行业应用推广中心
查看>>
PostgreSQL增强版命令行客户端(pgcli)
查看>>
关于SDN的未来,Linux基金会专访阿里云网络大神
查看>>
凯文·凯利:虚拟现实将取代微信!
查看>>
北航联合哈佛,研制出吸盘式仿生机器人
查看>>
廖方宇:数据与计算是科技创新倍增器
查看>>
redis 集群搭建
查看>>
Creating a pop up TitleWindow using the PopUpButton control in Flex
查看>>
联嘉祥牵手8Manage 打造信息化采购管理平台
查看>>