为什么要学习RPC
如下是Http请求案例:
请求过程会有3次握手4次挥手:
1:浏览器请求服务器(订单服务),请求建立链接 1次握手
2:服务器(订单服务)响应浏览器,可以建立链接,并询问浏览器是否可以建立链接 2次握手
3:浏览器响应服务器(订单服务),可以建立链接 3次握手
------开始传输数据------
1:浏览器向服务端(订单服务)发起请求,要求断开链接 1次挥手
2:服务器(订单服务)回应浏览器,数据还在传输中 2次挥手
3:服务器(订单服务)接收完数据后,向浏览器发消息要求断开链接 3次挥手
4:浏览器收到服务器消息后,回复服务器(订单服务)同意断开链接 4次挥手
1.1 PRC概述
RPC 的主要功能目标是让构建分布式计算(应用)更容易,在提供强大的远程调用能力时不损失本地调用的语义简洁性。为实现该目标,RPC 框架需提供一种透明调用机制,让使用者不必显式的区分本地调用和远程调用。
RPC的优点:
-
分布式设计
-
部署灵活
-
解耦服务
-
扩展性强
RPC框架优势:
- RPC框架一般使用长链接,不必每次通信都要3次握手,减少网络开销。
- RPC框架一般都有注册中心,有丰富的监控管理、发布、下线接口、动态扩展等,对调用方来说是无感知、统一化的操作、协议私密,安全性较高
- RPC 协议更简单内容更小,效率更高,服务化架构、服务化治理,RPC框架是一个强力的支撑。
- RPC基于TCP实现,也可以基于Http2实现
1.2 RPC框架
主流RPC框架:
- Dubbo:国内最早开源的 RPC 框架,由阿里巴巴公司开发并于 2011 年末对外开源,仅支持 Java 语言。
- Motan:新浪微博内部使用的 RPC 框架,于 2016 年对外开源,仅支持 Java 语言。
- Tars:腾讯内部使用的 RPC 框架,于 2017 年对外开源,仅支持 C++ 语言。
- Spring Cloud:国外 Pivotal 公司 2014 年对外开源的 RPC 框架,提供了丰富的生态组件。
- gRPC:Google 于 2015 年对外开源的跨语言 RPC 框架,支持多种语言。
- Thrift:最初是由 Facebook 开发的内部系统跨语言的 RPC 框架,2007 年贡献给了 Apache 基金,成为 Apache 开源项目之一,支持多种语言。
1.3 应用场景
应用例举:
- 分布式操作系统的进程间通讯
进程间通讯是操作系统必须提供的基本设施之一,分布式操作系统必须提供分布于异构的结点机上进程间的通讯机制,RPC是实现消息传送模式的分布式进程间通讯方式之一。 - 构造分布式设计的软件环境
由于分布式软件设计,服务与环境的分布性, 它的各个组成成份之间存在大量的交互和通讯, RPC是其基本的实现方法之一。Dubbo分布式服务框架基于RPC实现,Hadoop也采用了RPC方式实现客户端与服务端的交互。 - 远程数据库服务
在分布式数据库系统中,数据库一般驻存在服务器上,客户机通过远程数据库服务功能访问数据库服务器,现有的远程数据库服务是使用RPC模式的。例如,Sybase和Oracle都提供了存储过程机制,系统与用户定义的存储过程存储在数据库服务器上,用户在客户端使用RPC模式调用存储过程。 - 分布式应用程序设计
RPC机制与RPC工具为分布式应用程序设计提供了手段和方便, 用户可以无需知道网络结构和协议细节而直接使用RPC工具设计分布式应用程序。 - 分布式程序的调试
RPC可用于分布式程序的调试。使用反向RPC使服务器成为客户并向它的客户进程发出RPC,可以调试分布式程序。例如,在服务器上运行一个远端调试程序,它不断接收客户端的RPC,当遇到一个调试程序断点时,它向客户机发回一个RPC,通知断点已经到达,这也是RPC用于进程通讯的例子。
2. 深入RPC原理
2.1 设计与调用流程
具体调用过程:
- 服务消费者(client客户端)通过本地调用的方式调用服务。
- 客户端存根(client stub)接收到请求后负责将方法、入参等信息序列化(组装)成能够进行网络传输的消息体。
- 客户端存根(client stub)找到远程的服务地址,并且将消息通过网络发送给服务端。
- 服务端存根(server stub)收到消息后进行解码(反序列化操作)。
- 服务端存根(server stub)根据解码结果调用本地的服务进行相关处理。
- 本地服务执行具体业务逻辑并将处理结果返回给服务端存根(server stub)。
- 服务端存根(server stub)将返回结果重新打包成消息(序列化)并通过网络发送至消费方。
- 客户端存根(client stub)接收到消息,并进行解码(反序列化)。
- 服务消费方得到最终结果。
所涉及的技术:
-
动态代理
生成Client Stub(客户端存根)和Server Stub(服务端存根)的时候需要用到java动态代理技术。
-
序列化
在网络中,所有的数据都将会被转化为字节进行传送,需要对这些参数进行序列化和反序列化操作。目前主流高效的开源序列化框架有Kryo、fastjson、Hessian、Protobuf等。
-
NIO通信
Java 提供了 NIO 的解决方案,Java 7 也提供了更优秀的 NIO.2 支持。可以采用Netty或者mina框架来解决NIO数据传输的问题。开源的RPC框架Dubbo就是采用NIO通信,集成支持netty、mina、grizzly。
-
服务注册中心
通过注册中心,让客户端连接调用服务端所发布的服务。主流的注册中心组件:Redis、Nacos、Zookeeper、Consul 、Etcd。Dubbo采用的是ZooKeeper提供服务注册与发现功能。
-
负载均衡
在高并发的场景下,需要多个节点或集群来提升整体吞吐能力。
-
健康检查
健康检查包括,客户端心跳和服务端主动探测两种方式。
2.2 RPC深入解析
2.2.1 序列化技术
-
序列化的作用
在网络传输中,数据必须采用二进制形式, 所以在RPC调用过程中, 需要采用序列化技术,对入参对象和返回值对象进行序列化与反序列化。
-
序列化原理
自定义的二进制协议来实现序列化:
一个对象是如何进行序列化? 下面以User对象例举讲解:User对象:
package com.itcast; public class User { /** * 用户编号 */ private String userNo = "0001"; /** * 用户名称 */ private String name = "zhangsan"; }
包体的数据组成:
业务指令为0x00000001占1个字节,类的包名com.itcast占10个字节, 类名User占4个字节;
属性UserNo名称占6个字节,属性类型string占2个字节表示,属性值为0001占4个字节;
属性name名称占4个字节,属性类型string占2个字节表示,属性值为zhangsan占8个字节;
包体共计占有1+10+4+6+2+4+4+2+8 = 41字节。
包头的数据组成:
版本号v1.0占4个字节,消息包体实际长度为41占4个字节表示,序列号0001占4个字节,校验码32位表示占4个字节。
包头共计占有4+4+4+4 = 16字节。
包尾的数据组成:
通过回车符标记结束\r\n,占用1个字节。
整个包的序列化二进制字节流共41+16+1 = 58字节。这里讲解的是整个序列化的处理思路, 在实际的序列化处理中还要考虑更多细节,比如说方法和属性的区分,方法权限的标记,嵌套类型的处理等等。
-
序列化的处理要素
- 解析效率:序列化协议应该首要考虑的因素,像xml/json解析起来比较耗时,需要解析dom树,二进制自定义协议解析起来效率要快很多。
- 压缩率:同样一个对象,xml/json传输起来有大量的标签冗余信息,信息有效性低,二进制自定义协议占用的空间相对来说会小很多。
- 扩展性与兼容性:是否能够利于信息的扩展,并且增加字段后旧版客户端是否需要强制升级,这都是需要考虑的问题,在自定义二进制协议时候,要做好充分考虑设计。
- 可读性与可调试性:xml/json的可读性会比二进制协议好很多,并且通过网络抓包是可以直接读取,二进制则需要反序列化才能查看其内容。
- 跨语言:有些序列化协议是与开发语言紧密相关的,例如dubbo的Hessian序列化协议就只能支持Java的RPC调用。
- 通用性:xml/json非常通用,都有很好的第三方解析库,各个语言解析起来都十分方便,二进制数据的处理方面也有Protobuf和Hessian等插件,在做设计的时候尽量做到较好的通用性。
-
常用的序列化技术
-
JDK原生序列化
代码:
... public static void main(String[] args) throws IOException, ClassNotFoundException { String basePath = "D:/TestCode"; FileOutputStream fos = new FileOutputStream(basePath + "tradeUser.clazz"); TradeUser tradeUser = new TradeUser(); tradeUser.setName("Mirson"); ObjectOutputStream oos = new ObjectOutputStream(fos); oos.writeObject(tradeUser); oos.flush(); oos.close(); FileInputStream fis = new FileInputStream(basePath + "tradeUser.clazz"); ObjectInputStream ois = new ObjectInputStream(fis); TradeUser deStudent = (TradeUser) ois.readObject(); ois.close(); System.out.println(deStudent); } ...
(1) 在Java中,序列化必须要实现java.io.Serializable接口。
(2) 通过ObjectOutputStream和ObjectInputStream对象进行序列化及反序列化操作。
(3) 虚拟机是否允许反序列化,不仅取决于类路径和功能代码是否一致,一个非常重要的一点是两个类的序列化 ID 是否一致
(也就是在代码中定义的序列ID private static final long serialVersionUID)(4) 序列化并不会保存静态变量。
(5) 要想将父类对象也序列化,就需要让父类也实现Serializable 接口。
(6) Transient 关键字的作用是控制变量的序列化,在变量声明前加上该关键字,可以阻止该变量被序列化到文件中,在被反序列化后,transient 变量的值被设为初始值,如基本类型 int为 0,封装对象型Integer则为null。
(7) 服务器端给客户端发送序列化对象数据并非加密的,如果对象中有一些敏感数据比如密码等,那么在对密码字段序列化之前,最好做加密处理, 这样可以一定程度保证序列化对象的数据安全。
-
JSON序列化
一般在HTTP协议的RPC框架通信中,会选择JSON方式。
优势:JSON具有较好的扩展性、可读性和通用性。
缺陷:JSON序列化占用空间开销较大,没有JAVA的强类型区分,需要通过反射解决,解析效率和压缩率都较差。
如果对并发和性能要求较高,或者是传输数据量较大的场景,不建议采用JSON序列化方式。
-
Hessian2序列化
Hessian 是一个动态类型,二进制序列化,并且支持跨语言特性的序列化框架。
Hessian 性能上要比 JDK、JSON 序列化高效很多,并且生成的字节数也更小。有非常好的兼容性和稳定性,所以 Hessian 更加适合作为 RPC 框架远程通信的序列化协议。
代码示例:
... TradeUser tradeUser = new TradeUser(); tradeUser.setName("Mirson"); //tradeUser对象序列化处理 ByteArrayOutputStream bos = new ByteArrayOutputStream(); Hessian2Output output = new Hessian2Output(bos); output.writeObject(tradeUser); output.flushBuffer(); byte[] data = bos.toByteArray(); bos.close(); //tradeUser对象反序列化处理 ByteArrayInputStream bis = new ByteArrayInputStream(data); Hessian2Input input = new Hessian2Input(bis); TradeUser deTradeUser = (TradeUser) input.readObject(); input.close(); System.out.println(deTradeUser); ...
Dubbo Hessian Lite序列化流程:
Dubbo Hessian Lite反序列化流程:
Hessian自身也存在一些缺陷,大家在使用过程中要注意:
-
对Linked系列对象不支持,比如LinkedHashMap、LinkedHashSet 等,但可以通过CollectionSerializer类修复。
-
Locale 类不支持,可以通过扩展 ContextSerializerFactory 类修复。
-
Byte/Short 在反序列化的时候会转成 Integer。
Dubbo2.7.3通讯序列化源码实现分析:
- 序列化实现流程:
-
ExchangeCodec的encode方法:
```java
@Override
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
if (msg instanceof Request) {
encodeRequest(channel, buffer, (Request) msg);
} else if (msg instanceof Response) {
encodeResponse(channel, buffer, (Response) msg);
} else {
super.encode(channel, buffer, msg);
}
}
```
- 反序列化实现流程:
源码:
ExchangeCodec的decode方法:
```java
@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
int readable = buffer.readableBytes();
byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
buffer.readBytes(header);
return decode(channel, buffer, readable, header);
}
```
ExchangeCodec的decodeBody方法:
```java
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
...
} else {
// decode request.
Request req = new Request(id);
req.setVersion(Version.getProtocolVersion());
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
if ((flag & FLAG_EVENT) != 0) {
req.setEvent(true);
}
try {
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
}
...
}
```
-
Protobuf序列化
Protobuf 是 Google 推出的开源序列库,它是一种轻便、高效的结构化数据存储格式,可以用于结构化数据序列化,支持 Java、Python、C++、Go 等多种语言。
Protobuf 使用的时候需要定义 IDL(Interface description language),然后使用不同语言的 IDL 编译器,生成序列化工具类,它具备以下优点:
- 压缩比高,体积小,序列化后体积相比 JSON、Hessian 小很多;
- IDL 能清晰地描述语义,可以帮助并保证应用程序之间的类型不会丢失,无需类似 XML 解析器;
- 序列化反序列化速度很快,不需要通过反射获取类型;
- 消息格式的扩展、升级和兼容性都不错,可以做到向后兼容。
代码示例:
Protobuf脚本定义:
// 定义Proto版本 syntax = "proto3"; // 是否允许生成多个JAVA文件 option java_multiple_files = false; // 生成的包路径 option java_package = "com.itcast.bulls.stock.struct.netty.trade"; // 生成的JAVA类名 option java_outer_classname = "TradeUserProto"; // 预警通知消息体 message TradeUser { /** * 用户ID */ int64 userId = 1 ; /** * 用户名称 */ string userName = 2 ; }
代码操作:
// 创建TradeUser的Protobuf对象 TradeUserProto.TradeUser.Builder builder = TradeUserProto.TradeUser.newBuilder(); builder.setUserId(101); builder.setUserName("Mirson"); //将TradeUser做序列化处理 TradeUserProto.TradeUser msg = builder.build(); byte[] data = msg.toByteArray(); //反序列化处理, 将刚才序列化的byte数组转化为TradeUser对象 TradeUserProto.TradeUser deTradeUser = TradeUserProto.TradeUser.parseFrom(data); System.out.println(deTradeUser);
2.2.2 动态代理
-
内部接口如何调用实现?
RPC的调用对用户来讲是透明的,那内部是如何实现呢?内部核心技术采用的就是动态代理,RPC 会自动给接口生成一个代理类,当我们在项目中注入接口的时候,运行过程中实际绑定的是这个接口生成的代理类。在接口方法被调用的时候,它实际上是被生成代理类拦截到了,这样就可以在生成的代理类里面,加入其他调用处理逻辑,比如连接负载管理,日志记录等等。
JDK动态代理:
被代理对象必须实现1个接口
-
JDK动态代理的如何实现?
实例代码:
public class JdkProxyTest { /** * 定义用户的接口 */ public interface User { String job(); } /** * 实际的调用对象 */ public static class Teacher { public String invoke(){ return "i'm Teacher"; } } /** * 创建JDK动态代理类 */ public static class JDKProxy implements InvocationHandler { private Object target; JDKProxy(Object target) { this.target = target; } @Override public Object invoke(Object proxy, Method method, Object[] paramValues) { return ((Teacher)target).invoke(); } } public static void main(String[] args){ // 构建代理器 JDKProxy proxy = new JDKProxy(new Teacher()); ClassLoader classLoader = ClassLoaderUtils.getClassLoader(); // 生成代理类 User user = (User) Proxy.newProxyInstance(classLoader, new Class[]{User.class}, proxy); // 接口调用 System.out.println(user.job()); } }
JDK动态代理的实现原理:
JDK内部如何处理?
反编译生成的代理类可以知道,代理类 $Proxy里面会定义相同签名的接口(也就是上面代码User的job接口),然后内部会定义一个变量绑定JDKProxy代理对象,当调用User.job接口方法,实质上调用的是JDKProxy.invoke()方法,从而实现了接口的动态代理。
-
为什么要加入动态代理?
第一, 如果没有动态代理, 服务端大量的接口将不便于管理,需要大量的if判断,如果扩展了新的接口,需要更改调用逻辑, 不利于扩展维护。
第二, 是可以拦截,添加其他额外功能, 比如连接负载管理,日志记录等等。
-
动态代理开源技术
(1) Cglib 动态代理
Cglib是一个强大的、高性能的代码生成包,它广泛被许多AOP框架使用,支持方法级别的拦截。它是高级的字节码生成库,位于ASM之上,ASM是低级的字节码生成工具,ASM的使用对开发人员要求较高,相比较来讲, ASM性能更好。
(2) Javassist 动态代理
一个开源的分析、编辑和创建Java字节码的类库。javassist是jboss的一个子项目,它直接使用java编码的形式,不需要了解虚拟机指令,可以动态改变类的结构,或者动态生成类。Javassist 的定位是能够操纵底层字节码,所以使用起来并不简单,Dubbo 框架的设计者为了追求性能花费了不少精力去适配javassist。
(3) Byte Buddy 字节码增强库
Byte Buddy是致力于解决字节码操作和 简化操作复杂性的开源框架。Byte Buddy 目标是将显式的字节码操作隐藏在一个类型安全的领域特定语言背后。它属于后起之秀,在很多优秀的项目中,像 Spring、Jackson 都用到了 Byte Buddy 来完成底层代理。相比 Javassist,Byte Buddy 提供了更容易操作的 API,编写的代码可读性更高。
几种动态代理性能比较:
单位是纳秒。大括号内代表的是样本标准差,综合结果:
Byte Buddy > CGLIB > Javassist> JDK。
源码剖析:
核心源码:
2.2.3 服务注册发现
-
服务注册发现的作用
在高可用的生产环境中,一般都以集群方式提供服务,集群里面的IP可能随时变化,也可能会随着维护扩充或减少节点,客户端需要能够及时感知服务端的变化,获取集群最新服务节点的连接信息。
-
服务注册发现功能
服务注册:在服务提供方启动的时候,将对外暴露的接口注册到注册中心内,注册中心将这个服务节点的 IP 和接口等连接信息保存下来。为了检测服务的服务端的有效状态,一般会建立双向心跳机制。
服务订阅:在服务调用方启动的时候,客户端去注册中心查找并订阅服务提供方的 IP,然后缓存到本地,并用于后续的远程调用。如果注册中心信息发生变化, 一般会采用推送的方式做更新。
-
服务注册发现的具体流程
主流服务注册工具有Nacos、Consul、Zookeeper等,
基于 ZooKeeper 的服务发现:
ZooKeeper 集群作为注册中心集群,服务注册的时候只需要服务节点向 ZooKeeper 节点写入注册信息即可,利用 ZooKeeper 的 Watcher 机制完成服务订阅与服务下发功能。
A. 先在 ZooKeeper 中创建一个服务根路径,可以根据接口名命名(例如:/dubbo/com.itcast.xxService),在这个路径再创建服务提供方与调用方目录(providers、consumers),分别用来存储服务提供方和调用方的节点信息。
B. 服务端发起注册时,会在服务提供方目录中创建一个临时节点,节点中存储注册信 息,比如IP,端口,服务名称等等。
C. 客户端发起订阅时,会在服务调用方目录中创建一个临时节点,节点中存储调用方的信息,同时watch 服务提供方的目录(/dubbo/com.itcast.xxService/providers)中所有的服务节点数据。当服务端产生变化时,比如下线或宕机等,ZooKeeper 就会通知给订阅的客户端。
ZooKeeper方案的特点:
ZooKeeper 的一大特点就是强一致性,ZooKeeper 集群的每个节点的数据每次发生更新操作,都会通知其它 ZooKeeper 节点同时执行更新。它要求保证每个节点的数据能够实时的完全一致,这样也就会导致ZooKeeper 集群性能上的下降,ZK是采用CP模式(保证强一致性),如果要注重性能, 可以考虑采用AP模式(保证最终一致)的注册中心组件, 比如Nacos等。
-
源码剖析
Dubbo Spring Cloud 订阅的源码(客户端):
核心源码:
RegistryProtocol的doRefer方法:
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); // all attributes of REFER_KEY Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters); if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) { directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url)); registry.register(directory.getRegisteredConsumerUrl()); } directory.buildRouterChain(subscribeUrl); directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY)); Invoker invoker = cluster.join(directory); ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); return invoker; }
Dubbo Spring Cloud 注册发现的源码(服务端):
核心源码:
RegistryProtocol的export方法:
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { // 获取注册信息 URL registryUrl = getRegistryUrl(originInvoker); // 获取服务提供方信息 URL providerUrl = getProviderUrl(originInvoker); // Subscribe the override data // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call // the same service. Because the subscribed is cached key with the name of the service, it causes the // subscription information to cover. final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener); //export invoker final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl); // 获取订阅注册器 final Registry registry = getRegistry(originInvoker); final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl); ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); //to judge if we need to delay publish boolean register = registeredProviderUrl.getParameter("register", true); if (register) { // 进入服务端信息注册处理 register(registryUrl, registeredProviderUrl); providerInvokerWrapper.setReg(true); } // Deprecated! Subscribe to override rules in 2.6.x or before. // 服务端信息订阅处理 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); exporter.setRegisterUrl(registeredProviderUrl); exporter.setSubscribeUrl(overrideSubscribeUrl); //Ensure that a new exporter instance is returned every time export return new DestroyableExporter<>(exporter); }
2.2.4 网络IO模型
-
有哪些网络IO模型
分为五种:
- 同步阻塞 IO(BIO)
- 同步非阻塞 IO(NIO)
- IO 多路复用
- 信号驱动IO
- 异步非阻塞 IO(AIO)
常用的是同步阻塞 IO 和 IO 多路复用模型。
-
什么是阻塞IO模型
通常由一个独立的 Acceptor 线程负责监听客户端的连接。一般通过在while(true)
循环中服务端会调用 accept()
方法等待接收客户端的连接的方式监听请求,请求一旦接收到一个连接请求,就可以建立通信套接字,在这个通信套接字上进行读写操作,此时不能再接收其他客户端连接请求,直到客户端的操作执行完成。
系统内核处理 IO 操作分为两个阶段——等待数据和拷贝数据。而在这两个阶段中,应用进程中 IO 操作的线程会一直都处于阻塞状态,如果是基于 Java 多线程开发,那么每一个 IO 操作都要占用线程,直至 IO 操作结束。
-
IO多路复用
概念: 服务端采用单线程过select/epoll机制,获取fd列表, 遍历fd中的所有事件, 可以关注多个文件描述符,使其能够支持更多的并发连接。
IO多路复用的实现主要有select,poll和epoll模式。
文件描述符:
在Linux系统中一切皆可以看成是文件,文件又可分为:普通文件、目录文件、链接文件和设备文件。
文件描述符(file descriptor)是内核为了高效管理已被打开的文件所创建的索引,用来指向被打开的文件。文件描述符的值是一个非负整数。
下图说明(左边是进程、中间是内核、右边是文件系统):
1) A的文件描述符1和30都指向了同一个打开的文件句柄, 代表进程多次执行打开操作。
2) A的文件描述符2和B的文件描述符2都指向文件句柄(#73),代表A和程B可能是父子进程或者A和进程B打开了同一个文件(低概率)。
3) (时间紧张可不讲)A的描述符0和B的描述符3分别指向不同的打开文件句柄,但这些句柄均指向i-node表的相同条目(#1936),这种情况是因为每个进程各自对同一个文件发起了打开请求。
程序刚刚启动的时候,0是标准输入,1是标准输出,2是标准错误。如果此时去打开一个新的文件,它的文件描述符会是3。
三者的区别:
select | poll | epoll | |
---|---|---|---|
操作方式 | 遍历 | 遍历 | 回调 |
底层实现 | bitmap | 数组 | 红黑树 |
IO效率 | 每次调用都进行线性遍历,时间复杂度为O(n) | 每次调用都进行线性遍历,时间复杂度为O(n) | 事件通知方式,每当fd就绪,系统注册的回调函数就会被调用,将就绪fd放到readyList里面,时间复杂度O(1) |
最大连接数 | 1024(x86)或2048(x64) | 无上限 | 无上限 |
fd拷贝 | 每次调用select,都需要把fd集合从用户态拷贝到内核态 | 每次调用poll,都需要把fd集合从用户态拷贝到内核态 | 调用epoll_ctl时拷贝进内核并保存,之后每次epoll_wait不拷贝 |
select/poll处理流程:
此处是动图
epoll的处理流程:
此处是动图
当连接有I/O流事件产生的时候,epoll就会去告诉进程哪个连接有I/O流事件产生,然后进程就去处理这个进程。这样性能相比要高效很多!
epoll 可以说是I/O 多路复用最新的一个实现,epoll 修复了poll 和select绝大部分问题, 比如
epoll 是线程安全的。
epoll 不仅告诉你sock组里面的数据,还会告诉你具体哪个sock连接有数据,不用进程独自轮询查找。
-
select 模型
使用示例:
while (1) { // 阻塞获取 // 每次需要把fd从用户态拷贝到内核态 nfds = select(max + 1, &read_fd, &write_fd, NULL, &timeout); // 每次需要遍历所有fd,判断有无读写事件发生 for (int i = 0; i <= max && nfds; ++i) { if (i == listenfd) { --nfds; // 这里处理accept事件 FD_SET(i, &read_fd);//将客户端socket加入到集合中 } if (FD_ISSET(i, &read_fd)) { --nfds; // 这里处理read事件 } if (FD_ISSET(i, &write_fd)) { --nfds; // 这里处理write事件 } } }
缺点:
- 单个进程所打开的FD最大数限制为1024。
- 每次调用select,都需要把fd集合从用户态拷贝到内核态,fd数据较大时影响性能。
- 对socket扫描时是线性扫描,效率较低(高并发场景)
-
POLL模型
int max = 0; // 队列的实际长度 while (1) { // 阻塞获取 // 每次需要把fd从用户态拷贝到内核态 nfds = poll(fds, max+1, timeout); if (fds[0].revents & POLLRDNORM) { // 这里处理accept事件 connfd = accept(listenfd); //将新的描述符添加到读描述符集合中 } // 每次需要遍历所有fd,判断有无读写事件发生 for (int i = 1; i < max; ++i) { if (fds[i].revents & POLLRDNORM) { sockfd = fds[i].fd if ((n = read(sockfd, buf, MAXLINE)) <= 0) { // 这里处理read事件 if (n == 0) { close(sockfd); fds[i].fd = -1; } } else { // 这里处理write事件 } if (--nfds <= 0) { break; } } } }
缺点:
- poll与select相比,只是没有fd的限制,都存在相同的缺陷。
-
EPOLL模型
使用示例:
// 需要监听的socket放到ep中 epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev); while(1) { // 阻塞获取 nfds = epoll_wait(epfd,events,20,0); for(i=0;i<nfds;++i) { if(events[i].data.fd==listenfd) { // 这里处理accept事件 connfd = accept(listenfd); // 接收新连接写到内核对象中 epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev); } else if (events[i].events&EPOLLIN) { // 这里处理read事件 read(sockfd, BUF, MAXLINE); //读完后准备写 epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); } else if(events[i].events&EPOLLOUT) { // 这里处理write事件 write(sockfd, BUF, n); //写完后准备读 epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); } } }
缺点:
- 目前只能工作在linux环境下
- 数据量很小的时候没有性能优势
epoll下的两种模式(拓展了解):
EPOLLLT和EPOLLET两种触发模式,LT是默认的模式,ET是“高速”模式。
LT(水平触发)模式下,只要这个fd还有数据可读,每次 epoll_wait都会返回它的事件,提醒用户程序去操作
ET(边缘触发)模式下,它只会提示一次,直到下次再有数据流入之前都不会再提示了,无论fd中是否还有数据可读。所以在ET模式下,read一个fd的时候一定要把它的buffer读完,或者遇到EAGAIN错误
-
三种模式对比
对比项 select poll epoll 数据结构 bitmap 数组 红黑树 最大连接数 1024 无上限 无上限 fd拷贝 每次调用select拷贝 每次调用poll拷贝 fd首次调用epoll_ctl拷贝,每次调用epoll_wait不拷贝 工作效率 轮询:O(n) 轮询:O(n) 回调:O(1) -
为什么阻塞 IO 和 IO 多路复用最为常用?
在实际的网络 IO 的应用中,需要的是系统内核的支持以及编程语言的支持。现在大多数系统内核都会支持阻塞 IO、非阻塞 IO 和 IO 多路复用,但像信号驱动 IO、异步 IO,只有高版本的 Linux 系统内核才会支持。
同步阻塞IO、同步非阻塞IO、同步IO多路复用与异步IO区别:
- 同步阻塞IO(实质上, 每个请求不管成功或失败, 都会阻塞)
- 同步非阻塞IO(相比第一种的完全阻塞,如果数据没准备好,会返回EWOULDBLOCK, 这样就不会造成阻塞)
-
同步IO多路复用
(kernel会根据select/epoll等机制, 监听所有select接入的socket,当任何一个socket中的数据准备好了,select就会返回, 使得一个进程能同时等待多个文件描述符。)
- 异步IO
-
RPC 框架采用哪种网络 IO 模型?
- IO 多路复用应用特点:
IO 多路复用更适合高并发的场景,可以用较少的进程(线程)处理较多的 socket 的 IO 请求,但使用难度比较高。
- 阻塞 IO应用特点:
与 IO 多路复用相比,阻塞 IO 每处理一个 socket 的 IO 请求都会阻塞进程(线程),但使用难度较低。在并发量较低、业务逻辑只需要同步进行 IO 操作的场景下,阻塞 IO 已经满足了需求,并且不需要发起 大量的select 调用,开销上要比 IO 多路复用低。
- RPC框架应用:
RPC 调用在大多数的情况下,是一个高并发调用的场景, 在 RPC 框架的实现中,一般会选择 IO 多路复用的方式。在开发语言的网络通信框架的选型上,我们最优的选择是基于 Reactor 模式实现的框架,如 Java 语言,首选的框架便是 Netty 框架(目前 Netty 是应用最为广泛的框架),并且在 Linux 环境下,也要开启 epoll 来提升系统性能(Windows 环境下是无法开启 epoll 的,因为系统内核不支持)。
2.2.5 时间轮
-
为什么需要时间轮?
在Dubbo中,为增强系统的容错能力,会有相应的监听判断处理机制。比如RPC调用的超时机制的实现,消费者判断RPC调用是否超时,如果超时会将超时结果返回给应用层。在Dubbo最开始的实现中,是将所有的返回结果(DefaultFuture)都放入一个集合中,并且通过一个定时任务,每隔一定时间间隔就扫描所有的future,逐个判断是否超时。
这样的实现方式虽然比较简单,但是存在一个问题就是会有很多无意义的遍历操作开销。比如一个RPC调用的超时时间是10秒,而设置的超时判定的定时任务是2秒执行一次,那么可能会有4次左右无意义的循环检测判断操作。
为了解决上述场景中的类似问题,Dubbo借鉴Netty,引入了时间轮算法,减少无意义的轮询判断操作。
-
时间轮原理
对于以上问题, 目的是要减少额外的扫描操作就可以了。比如说一个定时任务是在5 秒之后执行,那么在 4.9 秒之后才扫描这个定时任务,这样就可以极大减少 CPU开销。这时我们就可以利用时钟轮的机制了。
时钟轮的实质上是参考了生活中的时钟跳动的原理,那么具体是如何实现呢?
在时钟轮机制中,有时间槽和时钟轮的概念,时间槽就相当于时钟的刻度;而时钟轮就相当于指针跳动的一个周期,我们可以将每个任务放到对应的时间槽位上。
如果时钟轮有 10 个槽位,而时钟轮一轮的周期是 10 秒,那么我们每个槽位的单位时间就是 1 秒,而下一层时间轮的周期就是 100 秒,每个槽位的单位时间也就是 10 秒,这就好比秒针与分针, 在秒针周期下, 刻度单位为秒, 在分针周期下, 刻度为分。
假设现在我们有 3 个任务,分别是任务 A(0.9秒之后执行)、任务 B(2.1秒后执行)与任务 C(12.1秒之后执行),我们将这 3 个任务添加到时钟轮中,任务 A 被放到第 0 槽位,任务 B 被放到第 2槽位,任务 C 被放到下一层时间轮的第2个槽位,如下图所示:
通过这个场景我们可以了解到,时钟轮的扫描周期仍是最小单位1秒,但是放置其中的任务并没有反复扫描,每个任务会按要求只扫描执行一次, 这样就能够很好的解决CPU 浪费的问题。
叠加时钟轮, 无限增长, 效率会不断下降,该如何解决?设定三个时钟轮, 小时轮, 分钟轮, 秒级轮
Dubbo中的时间轮原理是如何实现?
主要是通过Timer,Timeout,TimerTask几个接口定义了一个定时器的模型,再通过HashedWheelTimer这个类实现了一个时间轮定时器(默认的时间槽的数量是512,可以自定义这个值)。它对外提供了简单易用的接口,只需要调用newTimeout接口,就可以实现对只需执行一次任务的调度。通过该定时器,Dubbo在响应的场景中实现了高效的任务调度。
-
Dubbo源码剖析
时间轮核心类HashedWheelTimer结构:
-
时间轮在RPC的应用
-
调用超时与重试处理: 上面所讲的客户端调用超时的处理,就可以应用到时钟轮,我们每发一次请求,都创建一个处理请求超时的定时任务放到时钟轮里,在高并发、高访问量的情况下,时钟轮每次只轮询一个时间槽位中的任务,这样会节省大量的 CPU。
源码:
FailbackRegistry, 代码片段:
// 构造方法 public FailbackRegistry(URL url) { super(url); this.retryPeriod = url.getParameter(REGISTRY_RETRY_PERIOD_KEY, DEFAULT_REGISTRY_RETRY_PERIOD); // since the retry task will not be very much. 128 ticks is enough. // 重试器的时间槽数量, 设定为128 retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128); } // 失败时间任务注册器 private void addFailedRegistered(URL url) { FailedRegisteredTask oldOne = failedRegistered.get(url); if (oldOne != null) { return; } FailedRegisteredTask newTask = new FailedRegisteredTask(url, this); oldOne = failedRegistered.putIfAbsent(url, newTask); if (oldOne == null) { // never has a retry task. then start a new task for retry. // 旧任务不存在, 则放置时间轮,开启新一个任务 retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS); } }
-
定时心跳检测: RPC 框架调用端定时向服务端发送的心跳检测,来维护连接状态,我们可以将心跳的逻辑封装为一个心跳任务,放到时钟轮里。心跳是要定时重复执行的,而时钟轮中的任务执行一遍就被移除了,对于这种需要重复执行的定时任务我们该如何处理呢?我们在定时任务逻辑结束的最后,再加上一段逻辑, 重设这个任务的执行时间,把它重新丢回到时钟轮里。这样就可以实现循环执行。
源码:
HeaderExchangeServer代码片段:
... // 建立心跳时间轮, 槽位数默认为128 private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer(new NamedThreadFactory("dubbo-server-idleCheck", true), 1, TimeUnit.SECONDS, TICKS_PER_WHEEL); ... // 启动心跳任务检测 private void startIdleCheckTask(URL url) { if (!server.canHandleIdle()) { AbstractTimerTask.ChannelProvider cp = () -> unmodifiableCollection(HeaderExchangeServer.this.getChannels()); int idleTimeout = getIdleTimeout(url); long idleTimeoutTick = calculateLeastDuration(idleTimeout); CloseTimerTask closeTimerTask = new CloseTimerTask(cp, idleTimeoutTick, idleTimeout); this.closeTimerTask = closeTimerTask; // init task and start timer. // 开启心跳检测任务 IDLE_CHECK_TIMER.newTimeout(closeTimerTask, idleTimeoutTick, TimeUnit.MILLISECONDS); } } ...
连接检测, 会不断执行, 加入时间轮中。
AbstractTimerTask源码:
@Override public void run(Timeout timeout) throws Exception { Collection<Channel> c = channelProvider.getChannels(); for (Channel channel : c) { if (channel.isClosed()) { continue; } // 调用心跳检测任务 doTask(channel); } // 重新放入时间轮中 reput(timeout, tick); }
还可以参考HeartbeatTimerTask、ReconnectTimerTask源码实现。
-
3. RPC的高级机制
3.1 异步处理机制
-
为什么要采用异步?
如果采用同步调用, CPU 大部分的时间都在等待而没有去计算,从而导致 CPU 的利用率不够。
RPC 请求比较耗时的原因主要是在哪里?
在大多数情况下,RPC 本身处理请求的效率是在毫秒级的。RPC 请求的耗时大部分都是业务耗时,比如业务逻辑中有访问数据库执行慢 SQL 的操作,核心是在I/O瓶颈。所以说,在大多数情况下,影响到 RPC 调用的吞吐量的原因也就是业务逻辑处理慢了,CPU 大部分时间都在等待资源。
-
调用端如何实现异步?
常用的方式就是Future 方式,它是返回 Future 对象,通过GET方式获取结果;或者采用入参为 Callback 对象的回调方式,处理结果。
从DUBBO框架, 来看具体是如何实现异步调用?
-
服务端如何实现异步?
为了提升性能,连接请求与业务处理不会放在一个线程处理, 这个就是服务端的异步化。服务端业务处理逻辑加入异步处理机制。
在RPC 框架提供一种回调方式,让业务逻辑可以异步处理,处理完之后调用 RPC 框架的回调接口。
RPC 框架的异步策略主要是调用端异步与服务端异步。调用端的异步就是通过 Future 方式。
服务端异步则需要一种回调方式,让业务逻辑可以异步处理。这样就实现了RPC调用的全异步化。
-
RPC框架的异步实现
RPC 框架的异步策略主要是调用端异步与服务端异步。调用端的异步就是通过 Future 方式实现异步,调用端发起一次异步请求并且从请求上下文中拿到一个 Future,之后通过 Future 的 get 方法获取结果,如果业务逻辑中同时调用多个其它的服务,则可以通过 Future 的方式减少业务逻辑的耗时,提升吞吐量。
服务端异步则需要一种回调方式,让业务逻辑可以异步处理,之后调用 RPC 框架提供的回调接口,将最终结果异步通知给调用端。这样就实现了RPC调用的全异步。
Dubbo源码:
异步调用: AsyncToSyncInvoker.invoke方法
获取结果:ChannelWrappedInvoker.doInvoke方法
3.2 路由与负载均衡(了解)
我们后面会讲解灰度发布机制,基于Nginx+Lua、扩展SpringCloud Gateway源码灰度发布和负载均衡,只要项目集群、分布式应用就会涉及到路由与负载均衡。
-
为什么要采用路由?
真实的环境中一般是以集群的方式提供服务,对于服务调用方来说,一个接口会有多个服务提供方同时提供服务,所以 RPC 在每次发起请求的时候,都需要从多个服务节点里面选取一个用于处理请求的服务节点。
这就需要在RPC应用中增加路由功能。
-
如何实现路由?
服务注册发现方式:
通过服务发现的方式从逻辑上看是可行,但注册中心是用来保证数据的一致性。通过服务发现方式来实现请求隔离并不理想。
RPC路由策略:
从服务提供方节点集合里面选择一个合适的节点(负载均衡),把符合我们要求的节点筛选出来。这个就是路由策略:
接收请求–>请求校验–>路由策略–>负载均衡–>
使用了 IP 路由策略后,整个集群的调用拓扑如下图所示:
有些场景下,可能还需要更细粒度的路由方式,比如说根据SESSIONID要落到相同的服务节点上以保持会话的有效性;
可以考虑采用参数化路由:
-
RPC框架中的负载均衡
RPC 的负载均衡是由 RPC 框架自身提供实现,自主选择一个最佳的服务节点,发起 RPC 调用请求。
RPC 负载均衡策略一般包括轮询、随机、权重、最少连接等。Dubbo默认就是使用随机负载均衡策略。
-
自适应的负载均衡策略
RPC 的负载均衡完全由 RPC 框架自身实现,服务调用方发起请求时,会通过所配置的负载均衡组件,自主地选择合适服务节点。调用方如果能知道每个服务节点处理请求的能力,再根据服务节点处理请求的能力来判断分配相应的流量,集群资源就能够得到充分的利用, 当一个服务节点负载过高或响应过慢时,就少给它发送请求,反之则多给它发送请求。这个就是自适应的负载均衡策略。
具体如何实现?
这就需要判定服务节点的处理能力。
主要步骤:
(1)添加计分器和指标采集器。
(2)指标采集器收集服务节点 CPU 核数、CPU 负载以及内存占用率等指标。
(3)可以配置开启哪些指标采集器,并设置这些参考指标的具体权重。
(4)通过对服务节点的综合打分,最终计算出服务节点的实际权重,选择合适的服务节点。
3.3 熔断限流(了解)
我们后面课程会详细讲解熔断限流组件Sentinel高级用法、源码剖析、策略机制,但是RPC需要考虑熔断限流机制,我们一起来了解一下。
-
为什么要进行限流?
在实际生产环境中,每个服务节点都可能由于访问量过大而引起一系列问题,就需要业务提供方能够进行自我保护,从而保证在高访问量、高并发的场景下,系统依然能够稳定,高效运行。
-
服务端的自我保护实现
在Dubbo框架中, 可以通过Sentinel来实现更为完善的熔断限流功能,服务端是具体如何实现限流逻辑的?
方法有很多种, 最简单的是计数器,还有平滑限流的滑动窗口、漏斗算法以及令牌桶算法等等。Sentinel采用的是滑动窗口来实现的限流。
windowStart: 时间窗口的开始时间,单位是毫秒
windowLength: 时间窗口的长度,单位是毫秒
value: 时间窗口的内容
初始的时候arrays数组中只有一个窗口,每个时间窗口的长度是500ms,这就意味着只要当前时间与时间窗口的差值在500ms之内,时间窗口就不会向前滑动。
时间继续往前走,当超过500ms时,时间窗口就会向前滑动到下一个,这时就会更新当前窗口的开始时间:
在当前时间点中进入的请求,会被统计到当前时间所对应的时间窗口中。
-
调用方的自我保护
一个服务 A 调用服务 B 时,服务 B 的业务逻辑又调用了服务 C,这时服务 C 响应超时,服务 B 就可能会因为堆积大量请求而导致服务宕机,由此产生服务雪崩的问题。
熔断处理流程:
熔断机制:
熔断器的工作机制主要是关闭、打开和半打开这三个状态之间的切换。
Sentinel 熔断降级组件它可以支持以下降级策略:
- 平均响应时间 (
DEGRADE_GRADE_RT
):当 1s 内持续进入 N 个请求,对应时刻的平均响应时间(秒级)均超过阈值(count
,以 ms 为单位),那么在接下的时间窗口(DegradeRule
中的timeWindow
,以 s 为单位)之内,对这个方法的调用都会自动地熔断(抛出DegradeException
)。注意 Sentinel 默认统计的 RT 上限是 4900 ms,超出此阈值的都会算作 4900 ms,若需要变更此上限可以通过启动配置项-Dcsp.sentinel.statistic.max.rt=xxx
来配置。 - 异常比例 (
DEGRADE_GRADE_EXCEPTION_RATIO
):当资源的每秒请求量 >= N(可配置),并且每秒异常总数占通过量的比值超过阈值(DegradeRule
中的count
)之后,资源进入降级状态,即在接下的时间窗口(DegradeRule
中的timeWindow
,以 s 为单位)之内,对这个方法的调用都会自动地返回。异常比率的阈值范围是[0.0, 1.0]
,代表 0% – 100%。 - 异常数 (
DEGRADE_GRADE_EXCEPTION_COUNT
):当资源近 1 分钟的异常数目超过阈值之后会进行熔断。注意由于统计时间窗口是分钟级别的,若timeWindow
小于 60s,则结束熔断状态后仍可能再进入熔断状态。
更多资料,参考Sentinel官方文档。
本文由传智教育博学谷 – 狂野架构师教研团队发布
如果本文对您有帮助,欢迎关注和点赞;如果您有任何建议也可留言评论或私信,您的支持是我坚持创作的动力
转载请注明出处!