
分布式 - RPC
2021年3月18日大约 6 分钟
RPC 基本概念
RPC(Remote Procedure Call):远程过程调用,属于跨进程调用的一种方式。
使用场景:微服务系统中,客户端与服务端通常用 REST API 交互,后端不同系统之间通常使用 RPC 调用(微服务架构下的服务调用量上升,rpc 调用可以提升整体性能)。
RPC 的调用过程

一个完整的 RPC 由五部分组成:
- 客户端(服务消费端):发起远程方法调用方
- 客户端 Stub(桩):本质上是一个客户端代理,当客户端发起调用时,stub 会实现请求信息的序列化与反序列化,并找到服务端地址(服务发现/手动配置)
- 网络传输:调用的方法数据包传输到服务端,并接收服务端执行完后的返回结果。网络传输的实现方式有很多种(TCP、HTTP/1.0、HTTP/2.0 等)
- 服务端 Stub(桩):服务端代理,负责反序列化接收到的包并序列化 Server 执行结果
- 服务端(服务提供端):提供远程方法的 Server
RPC 具体调用链路如下:
- 客户端发起调用 xxxService.method()
- client stub 序列化请求数据,并且找到服务端地址 提交到网络层
- 通过网络协议完成请求发送,请求到达服务端
- 发送给服务端 stub
- 服务端 stub 反序列化请求数据 交给服务端处理
- 服务端目标方法执行完后返回
- 服务端 stub 序列化返回结果,提交网络层
- 结果数据网络传输回客户端
- client stub 反序列化结果
- 调用方获取到结果
场景的序列化方法
序列化:对象->二进制数据
反序列化:二进制数据->对象
- JSON : 直观可读性好,但是不支持二进制数据
- XML
- Hessian
- ⭐️ Protocol Buffers
- ⭐️ Thrift
JSON | XML | Protocol Buffers | Thrift | |
---|---|---|---|---|
格式 | 文本 | 文本 | 二进制数据 | 二进制数据 |
优点 | 可读性强 | 结构化清晰 | 序列化性能高、体积小 | 序列化性能高、体积小 |
缺点 | 性能相对较低 | 性能较低 | 可读性差 | 可读性差 |
场景 | Web API | 配置文件 | 微服务 | 微服务 |
个人理解,IDL 与序列化方法的关系像密文与密码本。IDL 是要传输的数据(密文),具体选择哪种"加密"方式就用对应的密码本。
主流 RPC 框架
所谓的 RPC 框架,我理解就是将上图 RPC 调用过程的 2-9 步骤封装起来,让我们使用的更加便利。
为了让客户端 stub 更好地找到服务方,互联网大厂的 RPC 框架通常会支持服务发现能力,也使得 rpc 发挥更强大的作用。梳理下主流的和工作用到的 RPC 框架:
grpc | thrift | mtthrift | Dubbo | kiteX | |
---|---|---|---|---|---|
公司 | 美团 | 阿里巴巴 | 字节 | ||
序列化 | Protocol Buffers (默认) | Thrift 协议 | Thrift 协议 | Hessian、Protocol Buffers | Protocol Buffers |
服务发现 | ❌ | ❌ | ✅ OCTO | ✅ Nacos | ✅ Consul |
跨语言 | ✅ | ✅ | ✅ 多为 Java | ✅ 多为 Java | ❌ |
网络传输 | HTTP/2 | TSocket, TFramedTransport | TTSocket, TFramedTransport | TCP、HTTP 等 | TCP、HTTP/2 |
rpc 框架通常会生成一个 IDL 文件,由于 IDL bug(不兼容)导致的事故也很多,这部分属于质量预防的范畴,需要 RD 与 QA 一起治理。
实现 Demo -- thrift
1. (服务端)定义.thrift 文件(IDL)
namespace java com.swtywang.service.payService
service PayService {
string pay(1:string orderId)
}
2. (服务端)生成 Thrift 代码
执行下面命令或者其他生成工具,可以生成一个 java 文件,里面封装好了服务端请求的协议、序列化等内容
thrift --gen java payService.thrift
/**
* Autogenerated by Thrift Compiler (0.18.1)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
package com.swtywang.service.payService;
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.18.1)", date = "2021-02-26")
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
public class PayService {
public interface Iface {
public java.lang.String pay(java.lang.String orderId) throws org.apache.thrift.TException;
}
public interface AsyncIface {
public void pay(java.lang.String orderId, org.apache.thrift.async.AsyncMethodCallback<java.lang.String> resultHandler) throws org.apache.thrift.TException;
}
public static class Client extends org.apache.thrift.TServiceClient implements Iface {
public static class Factory implements org.apache.thrift.TServiceClientFactory<Client> {
public Factory() {}
@Override
public Client getClient(org.apache.thrift.protocol.TProtocol prot) {
return new Client(prot);
}
@Override
public Client getClient(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
return new Client(iprot, oprot);
}
}
public Client(org.apache.thrift.protocol.TProtocol prot)
{
super(prot, prot);
}
public Client(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
super(iprot, oprot);
}
@Override
public java.lang.String pay(java.lang.String orderId) throws org.apache.thrift.TException
{
send_pay(orderId);
return recv_pay();
}
public void send_pay(java.lang.String orderId) throws org.apache.thrift.TException
{
pay_args args = new pay_args();
args.setOrderId(orderId);
sendBase("pay", args);
}
public java.lang.String recv_pay() throws org.apache.thrift.TException
{
pay_result result = new pay_result();
receiveBase(result, "pay");
if (result.isSetSuccess()) {
return result.success;
}
throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "pay failed: unknown result");
}
}
public static class AsyncClient extends org.apache.thrift.async.TAsyncClient implements AsyncIface {
public static class Factory implements org.apache.thrift.async.TAsyncClientFactory<AsyncClient> {
private org.apache.thrift.async.TAsyncClientManager clientManager;
private org.apache.thrift.protocol.TProtocolFactory protocolFactory;
public Factory(org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.protocol.TProtocolFactory protocolFactory) {
this.clientManager = clientManager;
this.protocolFactory = protocolFactory;
}
@Override
public AsyncClient getAsyncClient(org.apache.thrift.transport.TNonblockingTransport transport) {
return new AsyncClient(protocolFactory, clientManager, transport);
}
}
public AsyncClient(org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.transport.TNonblockingTransport transport) {
super(protocolFactory, clientManager, transport);
}
@Override
public void pay(java.lang.String orderId, org.apache.thrift.async.AsyncMethodCallback<java.lang.String> resultHandler) throws org.apache.thrift.TException {
checkReady();
pay_call method_call = new pay_call(orderId, resultHandler, this, ___protocolFactory, ___transport);
this.___currentMethod = method_call;
___manager.call(method_call);
}
public static class pay_call extends org.apache.thrift.async.TAsyncMethodCall<java.lang.String> {
private java.lang.String orderId;
public pay_call(java.lang.String orderId, org.apache.thrift.async.AsyncMethodCallback<java.lang.String> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
super(client, protocolFactory, transport, resultHandler, false);
this.orderId = orderId;
}
@Override
public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("pay", org.apache.thrift.protocol.TMessageType.CALL, 0));
pay_args args = new pay_args();
args.setOrderId(orderId);
args.write(prot);
prot.writeMessageEnd();
}
@Override
public java.lang.String getResult() throws org.apache.thrift.TException {
if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
throw new java.lang.IllegalStateException("Method call not finished!");
}
org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
return (new Client(prot)).recv_pay();
}
}
}
public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
private static final org.slf4j.Logger _LOGGER = org.slf4j.LoggerFactory.getLogger(Processor.class.getName());
public Processor(I iface) {
super(iface, getProcessMap(new java.util.HashMap<java.lang.String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
}
protected Processor(I iface, java.util.Map<java.lang.String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
super(iface, getProcessMap(processMap));
}
private static <I extends Iface> java.util.Map<java.lang.String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(java.util.Map<java.lang.String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
processMap.put("pay", new pay());
return processMap;
}
public static class pay<I extends Iface> extends org.apache.thrift.ProcessFunction<I, pay_args> {
public pay() {
super("pay");
}
@Override
public pay_args getEmptyArgsInstance() {
return new pay_args();
}
@Override
protected boolean isOneway() {
return false;
}
@Override
protected boolean rethrowUnhandledExceptions() {
return false;
}
@Override
public pay_result getResult(I iface, pay_args args) throws org.apache.thrift.TException {
pay_result result = new pay_result();
result.success = iface.pay(args.orderId);
return result;
}
}
}
public static class AsyncProcessor<I extends AsyncIface> extends org.apache.thrift.TBaseAsyncProcessor<I> {
private static final org.slf4j.Logger _LOGGER = org.slf4j.LoggerFactory.getLogger(AsyncProcessor.class.getName());
public AsyncProcessor(I iface) {
super(iface, getProcessMap(new java.util.HashMap<java.lang.String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, ?>>()));
}
protected AsyncProcessor(I iface, java.util.Map<java.lang.String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, ?>> processMap) {
super(iface, getProcessMap(processMap));
}
private static <I extends AsyncIface> java.util.Map<java.lang.String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase,?>> getProcessMap(java.util.Map<java.lang.String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, ?>> processMap) {
processMap.put("pay", new pay());
return processMap;
}
public static class pay<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, pay_args, java.lang.String> {
public pay() {
super("pay");
}
@Override
public pay_args getEmptyArgsInstance() {
return new pay_args();
}
}
......省略
3. 实现服务端方法
//payService.Iface是上一步生成的
public class payServiceImpl implements payService.Iface {
@Override
public String pay(String orderId) throws TException {
return "Handle this orderId";
}
}
4. 创建 Thrift 客户端
public class payClient {
public static void main(String[] args) {
try (TTransport transport = new TSocket("localhost", 8081)) {
transport.open();
TProtocol protocol = new TBinaryProtocol(transport);
payService.Client client = new payService.Client(protocol);
String response = client.pay("12345");
} catch (TException e) {
e.printStackTrace();
}
}
}
实现 Demo -- Dubbo
1. (服务端)Application
/**
* Create by swtywang on 11/21/23 10:13 PM
*/
@SpringBootApplication
@EnableDubbo
public class ProviderApplication {
public static void main(String[] args) {
SpringApplication.run(ProviderApplication.class, args);
}
}
2. (服务端)提供的接口与实现类
public interface TimeService {
LocalDateTime getTime();
}
/**
* Create by swtywang on 11/21/23 10:53 PM
*/
@DubboService
public class TimeServiceImpl implements TimeService {
@Override
public LocalDateTime getTime() {
System.out.println("------------------------------------");
return LocalDateTime.now();
}
}
3. (客户端)启动类和调用方法
/**
* Create by swtywang on 11/21/23 10:14 PM
*/
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
@RestController
public static class IndexController{
@DubboReference
private TimeService timeService;
@GetMapping("/")
public String getTime(){
return timeService.getTime().format(DateTimeFormatter.ISO_DATE_TIME);
}
}
}
4. 配置
# application.properties
dubbo.registry.address==zookeeper://localhost:2181
# application-consumer.properties
dubbo.registry.address=zookeeper://localhost:2181
dubbo.application.name=consumer-app
server.port=8081
# application-provider.properties
dubbo.registry.address=zookeeper://localhost:2181
dubbo.application.name=provider-app
server.port=8082