dubbo网络层功能
dubbo网络通讯层主要实现了以下功能:
- 多种网络通讯框架的抽象封装(netty,mina,grizzly)
- 每个客户端主机和服务端保存单个长链接通信
- 异步调用转同步
- tcp链接的心跳和自动重连
- 基于header头通讯协议,请求的编解码器
dubbo网络通讯框架抽象
dubbo的网络通信基于NIO框架,一般基于事件的NIO网络框架都涉及到 channel , channelHandle核心概念,网络数据buffer, 网络数据编解码器,dubbo为了能够适配多种NIO框架,将以上概念全部又抽象了一层接口。如果有netty开发经验或者了解netty helloworld demo程序对于理解这个章节非常有帮助。
/**以下是dubbo对于channel , channelHandle封装的抽象接口*/public interface Endpoint { /** * get url. * * @return url */ URL getUrl(); /** * get channel handler. * * @return channel handler */ ChannelHandler getChannelHandler(); /** * get local address. * * @return local address. */ InetSocketAddress getLocalAddress(); /** * send message. * * @param message * @throws RemotingException */ void send(Object message) throws RemotingException; /** * send message. * * @param message * @param sent 是否已发送完成 */ void send(Object message, boolean sent) throws RemotingException; /** * close the channel. */ void close(); /** * Graceful close the channel. */ void close(int timeout); /** * is closed. * * @return closed */ boolean isClosed();}public interface Channel extends Endpoint { /** * get remote address. * * @return remote address. */ InetSocketAddress getRemoteAddress(); /** * is connected. * * @return connected */ boolean isConnected(); /** * has attribute. * * @param key key. * @return has or has not. */ boolean hasAttribute(String key); /** * get attribute. * * @param key key. * @return value. */ Object getAttribute(String key); /** * set attribute. * * @param key key. * @param value value. */ void setAttribute(String key,Object value); /** * remove attribute. * * @param key key. */ void removeAttribute(String key);}public interface Client extends Endpoint, Channel, Resetable { /** * reconnect. */ void reconnect() throws RemotingException; @Deprecated void reset(com.alibaba.dubbo.common.Parameters parameters); }@SPIpublic interface ChannelHandler { /** * on channel connected. * * @param channel channel. */ void connected(Channel channel) throws RemotingException; /** * on channel disconnected. * * @param channel channel. */ void disconnected(Channel channel) throws RemotingException; /** * on message sent. * * @param channel channel. * @param message message. */ void sent(Channel channel, Object message) throws RemotingException; /** * on message received.接收到对方消息和回调 * * @param channel channel. * @param message message. */ void received(Channel channel, Object message) throws RemotingException; /** * on exception caught. * * @param channel channel. * @param exception exception. */ void caught(Channel channel, Throwable exception) throws RemotingException;}/**信息交换client,对Client对象接口包装,实现发送请求返回结果Future 以及长链接心跳定时发送*/public class HeaderExchangeClient implements ExchangeClient { private static final Logger logger = LoggerFactory.getLogger( HeaderExchangeClient.class ); private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true)); // 心跳定时器 private ScheduledFuture heatbeatTimer; // 心跳超时,毫秒。缺省0,不会执行心跳。 private int heartbeat; private int heartbeatTimeout; private final Client client; private final ExchangeChannel channel; public HeaderExchangeClient(Client client){ if (client == null) { throw new IllegalArgumentException("client == null"); } this.client = client; this.channel = new HeaderExchangeChannel(client); String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY); this.heartbeat = client.getUrl().getParameter( Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0 ); this.heartbeatTimeout = client.getUrl().getParameter( Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3 ); if ( heartbeatTimeout < heartbeat * 2 ) { throw new IllegalStateException( "heartbeatTimeout < heartbeatInterval * 2" ); } startHeatbeatTimer(); } //最核心方法,实现发送请求后返回ResponseFuture对象,通过该Future可以获取RPC请求结果 public ResponseFuture request(Object request) throws RemotingException { return channel.request(request); } public URL getUrl() { return channel.getUrl(); } public InetSocketAddress getRemoteAddress() { return channel.getRemoteAddress(); } public ResponseFuture request(Object request, int timeout) throws RemotingException { return channel.request(request, timeout); } public ChannelHandler getChannelHandler() { return channel.getChannelHandler(); } public boolean isConnected() { return channel.isConnected(); } public InetSocketAddress getLocalAddress() { return channel.getLocalAddress(); } public ExchangeHandler getExchangeHandler() { return channel.getExchangeHandler(); } public void send(Object message) throws RemotingException { channel.send(message); } public void send(Object message, boolean sent) throws RemotingException { channel.send(message, sent); } public boolean isClosed() { return channel.isClosed(); } public void close() { doClose(); channel.close(); } public void close(int timeout) { doClose(); channel.close(timeout); } public void reset(URL url) { client.reset(url); } @Deprecated public void reset(com.alibaba.dubbo.common.Parameters parameters){ reset(getUrl().addParameters(parameters.getParameters())); } public void reconnect() throws RemotingException { client.reconnect(); } public Object getAttribute(String key) { return channel.getAttribute(key); } public void setAttribute(String key, Object value) { channel.setAttribute(key, value); } public void removeAttribute(String key) { channel.removeAttribute(key); } public boolean hasAttribute(String key) { return channel.hasAttribute(key); } private void startHeatbeatTimer() { stopHeartbeatTimer(); if ( heartbeat > 0 ) { heatbeatTimer = scheduled.scheduleWithFixedDelay( new HeartBeatTask( new HeartBeatTask.ChannelProvider() { public CollectiongetChannels() { return Collections. singletonList( HeaderExchangeClient.this ); } }, heartbeat, heartbeatTimeout), heartbeat, heartbeat, TimeUnit.MILLISECONDS ); } } private void stopHeartbeatTimer() { if (heatbeatTimer != null && ! heatbeatTimer.isCancelled()) { try { heatbeatTimer.cancel(true); scheduled.purge(); } catch ( Throwable e ) { if (logger.isWarnEnabled()) { logger.warn(e.getMessage(), e); } } } heatbeatTimer =null; } private void doClose() { stopHeartbeatTimer(); } @Override public String toString() { return "HeaderExchangeClient [channel=" + channel + "]"; }}/**基于Header Request信息交换Channel,对于真实底层Chanel实现比如Netty *Channel进行了一层包装,实现将RPC请求参数Invocation封装为Request对象,*Request对象描述了RPC请求全部信息。协议头数据以及协议body数据。*/final class HeaderExchangeChannel implements ExchangeChannel { private static final Logger logger = LoggerFactory.getLogger(HeaderExchangeChannel.class); private static final String CHANNEL_KEY = HeaderExchangeChannel.class.getName() + ".CHANNEL"; private final Channel channel; private volatile boolean closed = false; HeaderExchangeChannel(Channel channel){ if (channel == null) { throw new IllegalArgumentException("channel == null"); } this.channel = channel; } static HeaderExchangeChannel getOrAddChannel(Channel ch) { if (ch == null) { return null; } HeaderExchangeChannel ret = (HeaderExchangeChannel) ch.getAttribute(CHANNEL_KEY); if (ret == null) { ret = new HeaderExchangeChannel(ch); if (ch.isConnected()) { ch.setAttribute(CHANNEL_KEY, ret); } } return ret; } static void removeChannelIfDisconnected(Channel ch) { if (ch != null && ! ch.isConnected()) { ch.removeAttribute(CHANNEL_KEY); } } public void send(Object message) throws RemotingException { send(message, getUrl().getParameter(Constants.SENT_KEY, false)); } public void send(Object message, boolean sent) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The channel " + this + " is closed!"); } if (message instanceof Request || message instanceof Response || message instanceof String) { channel.send(message, sent); } else { Request request = new Request(); request.setVersion("2.0.0"); request.setTwoWay(false); request.setData(message); channel.send(request, sent); } } public ResponseFuture request(Object request) throws RemotingException { return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT)); } //request 入参类型是Invocation invocation public ResponseFuture request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } // create request.将Invocation封装为Request对象,request对象创建的时候会生成递增的请求id,private static final AtomicLong INVOKE_ID = new AtomicLong(0); Request req = new Request(); req.setVersion("2.0.0"); req.setTwoWay(true); req.setData(request); //发送请求数据是异步的,返回future对象,调用方用于同步等待响应,类似于提交到线程池返回Future对象用于等待返回结果 DefaultFuture future = new DefaultFuture(channel, req, timeout); try{ channel.send(req); }catch (RemotingException e) { future.cancel(); throw e; } return future; } public class DefaultFuture implements ResponseFuture { //存储所有requestId 和 通道映射,用于判断当前通道是否有未完成的请求 private static final Map CHANNELS = new ConcurrentHashMap (); //存储请求id和future对象映射,用于收到响应唤醒等待线程 private static final Map FUTURES = new ConcurrentHashMap (); // invoke id. private final long id; private final Channel channel; private final Request request; private final int timeout; private final Lock lock = new ReentrantLock(); private final Condition done = lock.newCondition(); private final long start = System.currentTimeMillis(); private volatile long sent; private volatile Response response; private volatile ResponseCallback callback; public DefaultFuture(Channel channel, Request request, int timeout){ this.channel = channel; this.request = request; this.id = request.getId(); this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // put into waiting map. FUTURES.put(id, this); CHANNELS.put(id, channel); } public Object get() throws RemotingException { return get(timeout); } public Object get(int timeout) throws RemotingException { if (timeout <= 0) { timeout = Constants.DEFAULT_TIMEOUT; } if (! isDone()) { long start = System.currentTimeMillis(); lock.lock(); try { while (! isDone()) { done.await(timeout, TimeUnit.MILLISECONDS); if (isDone() || System.currentTimeMillis() - start > timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } if (! isDone()) { throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); } } return returnFromResponse(); } public boolean isDone() { return response != null; } private Object returnFromResponse() throws RemotingException { Response res = response; if (res == null) { throw new IllegalStateException("response cannot be null"); } if (res.getStatus() == Response.OK) { return res.getResult(); } if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()); } throw new RemotingException(channel, res.getErrorMessage()); } //channelHandle收到服务端响应后调用该方法唤醒调用方等待线程 public static void received(Channel channel, Response response) { try { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress())); } } finally { CHANNELS.remove(response.getId()); } } private void doReceived(Response res) { lock.lock(); try { response = res; if (done != null) { //发生收到相应信号,通知等待线程 done.signal(); } } finally { lock.unlock(); } if (callback != null) { invokeCallback(callback); } }}public interface ChannelHandlerDelegate extends ChannelHandler { public ChannelHandler getHandler();}/**HeaderExchangeHandler实现了ChannelHandler接口。实现了NIO消息回调处理,*本类最关键的方法在于received方法,同时在各个回调方法设置Chanel的读写数据时间用于心跳定时器任务发送心跳判断条件*/public class HeaderExchangeHandler implements ChannelHandlerDelegate { protected static final Logger logger = LoggerFactory.getLogger(HeaderExchangeHandler.class); public static String KEY_READ_TIMESTAMP = HeartbeatHandler.KEY_READ_TIMESTAMP; public static String KEY_WRITE_TIMESTAMP = HeartbeatHandler.KEY_WRITE_TIMESTAMP; private final ExchangeHandler handler; public HeaderExchangeHandler(ExchangeHandler handler){ if (handler == null) { throw new IllegalArgumentException("handler == null"); } this.handler = handler; } void handlerEvent(Channel channel, Request req) throws RemotingException { if (req.getData() != null && req.getData().equals(Request.READONLY_EVENT)) { channel.setAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY, Boolean.TRUE); } } Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion()); if (req.isBroken()) { Object data = req.getData(); String msg; if (data == null) msg = null; else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data); else msg = data.toString(); res.setErrorMessage("Fail to decode request due to: " + msg); res.setStatus(Response.BAD_REQUEST); return res; } // find handler by message class. Object msg = req.getData(); try { // handle data. Object result = handler.reply(channel, msg); res.setStatus(Response.OK); res.setResult(result); } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); } return res; } static void handleResponse(Channel channel, Response response) throws RemotingException { if (response != null && !response.isHeartbeat()) { //非常关键的调用,用于将client收到的响应唤醒等待响应结果的用户线程 DefaultFuture.received(channel, response); } } public void connected(Channel channel) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { handler.connected(exchangeChannel); } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } public void disconnected(Channel channel) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { handler.disconnected(exchangeChannel); } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } public void sent(Channel channel, Object message) throws RemotingException { Throwable exception = null; try { channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { handler.sent(exchangeChannel, message); } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } catch (Throwable t) { exception = t; } if (message instanceof Request) { Request request = (Request) message; DefaultFuture.sent(channel, request); } if (exception != null) { if (exception instanceof RuntimeException) { throw (RuntimeException) exception; } else if (exception instanceof RemotingException) { throw (RemotingException) exception; } else { throw new RemotingException(channel.getLocalAddress(), channel.getRemoteAddress(), exception.getMessage(), exception); } } } private static boolean isClientSide(Channel channel) { InetSocketAddress address = channel.getRemoteAddress(); URL url = channel.getUrl(); return url.getPort() == address.getPort() && NetUtils.filterLocalHost(url.getIp()) .equals(NetUtils.filterLocalHost(address.getAddress().getHostAddress())); } //关键方法,实现client端Response响应对应处理以及server端Request请求处理 public void received(Channel channel, Object message) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { if (message instanceof Request) { // handle request. Request request = (Request) message; if (request.isEvent()) { handlerEvent(channel, request); } else { if (request.isTwoWay()) { Response response = handleRequest(exchangeChannel, request); channel.send(response); } else { handler.received(exchangeChannel, request.getData()); } } } else if (message instanceof Response) { handleResponse(channel, (Response) message); } else if (message instanceof String) { if (isClientSide(channel)) { Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl()); logger.error(e.getMessage(), e); } else { String echo = handler.telnet(channel, (String) message); if (echo != null && echo.length() > 0) { channel.send(echo); } } } else { handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } public void caught(Channel channel, Throwable exception) throws RemotingException { if (exception instanceof ExecutionException) { ExecutionException e = (ExecutionException) exception; Object msg = e.getRequest(); if (msg instanceof Request) { Request req = (Request) msg; if (req.isTwoWay() && ! req.isHeartbeat()) { Response res = new Response(req.getId(), req.getVersion()); res.setStatus(Response.SERVER_ERROR); res.setErrorMessage(StringUtils.toString(e)); channel.send(res); return; } } } ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { handler.caught(exchangeChannel, exception); } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } public ChannelHandler getHandler() { if (handler instanceof ChannelHandlerDelegate) { return ((ChannelHandlerDelegate) handler).getHandler(); } else { return handler; } }}/**千呼万唤始出来,是时候展示dubbo协议的入口函数了*/public class DubboProtocol extends AbstractProtocol { //根据服务提供者URL和服务接口类型创建Invoker对象 public Invoker refer(Class serviceType, URL url) throws RpcException { // modified by lishen optimizeSerialization(url); // create rpc invoker. DubboInvoker invoker = new DubboInvoker (serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; } private ExchangeClient[] getClients(URL url){ //默认使用共享链接 boolean service_share_connect = false; int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); //如果connections不配置,则共享连接,否则每服务每连接 if (connections == 0){ service_share_connect = true; connections = 1; } ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { if (service_share_connect){ clients[i] = getSharedClient(url); } else { clients[i] = initClient(url); } } return clients; } /** *获取共享连接 */ private ExchangeClient getSharedClient(URL url){ String key = url.getAddress(); ReferenceCountExchangeClient client = referenceClientMap.get(key); if ( client != null ){ if ( !client.isClosed()){ client.incrementAndGetCount(); return client; } else {// logger.warn(new IllegalStateException("client is closed,but stay in clientmap .client :"+ client)); referenceClientMap.remove(key); } } ExchangeClient exchagneclient = initClient(url); client = new ReferenceCountExchangeClient(exchagneclient, ghostClientMap); referenceClientMap.put(key, client); ghostClientMap.remove(key); return client; } /** * 创建新连接. */ private ExchangeClient initClient(URL url) { // client type setting. String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT)); String version = url.getParameter(Constants.DUBBO_VERSION_KEY); boolean compatible = (version != null && version.startsWith("1.0.")); url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() && compatible ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); //默认开启heartbeat url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // BIO存在严重性能问题,暂时不允许使用 if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { throw new RpcException("Unsupported client type: " + str + "," + " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " ")); } ExchangeClient client ; try { //设置连接应该是lazy的 if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)){ client = new LazyConnectExchangeClient(url ,requestHandler); } else { //该工厂方法会创建之前介绍的HeaderExchangeClient实例 client = Exchangers.connect(url ,requestHandler); } } catch (RemotingException e) { throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e); } return client; }}/**通过DubboProtocol创建好的ExchangeClient对象来创建DubboInvoker*DubboInvoker可以支持异步调用,同步调用,仅发送请求3种模式*/public class DubboInvoker extends AbstractInvoker { private final ExchangeClient[] clients; private final AtomicPositiveInteger index = new AtomicPositiveInteger(); private final String version; private final ReentrantLock destroyLock = new ReentrantLock(); private final Set > invokers; public DubboInvoker(Class serviceType, URL url, ExchangeClient[] clients){ this(serviceType, url, clients, null); } public DubboInvoker(Class serviceType, URL url, ExchangeClient[] clients, Set > invokers){ super(serviceType, url, new String[] {Constants.INTERFACE_KEY, Constants.GROUP_KEY, Constants.TOKEN_KEY, Constants.TIMEOUT_KEY}); this.clients = clients; // get version. this.version = url.getParameter(Constants.VERSION_KEY, "0.0.0"); this.invokers = invokers; } @Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout) ; RpcContext.getContext().setFuture(new FutureAdapter