言成言成啊 | Kit Chen's Blog

RPC简单学习

发布于2021-11-10 16:47:53,更新于2021-12-02 22:40:03,标签:java open rpc  文章会持续修订,转载请注明来源地址:https://meethigher.top/blog

组长将项目中使用Kafka的地方,进行了优化,最终kafka的作用只是用来收集日志,其他用到kafka的地方,使用了gRPC进行了优化。本文就用来记录下,学习RPC原理的过程。

本文源码meethigher/simple-rpc: Java实现一个简单RPC功能,开启Server,再启动Client

抄袭自Java实现简单的RPC框架 - 苍穹2018 - 博客园

其他参考

  1. RPC系列:基本概念 - 海米傻傻 - 博客园
  2. java RMI原理详解_xinghun_4的专栏-CSDN博客_rmi
  3. Java RMI与RPC的区别 - Silentdoer - 博客园
  4. RMI和RPC比较_Baron-CSDN博客

一、RPC概念

RPC,全称是Remote Procedure Call,远程过程调用。

RPC是一种技术思想,通过网络从远程计算机程序上调用服务,而不需要去了解底层网络技术。

常见的RPC技术和框架

  • 应用级的服务框架:阿里的Dubbo/Dubbox、谷歌的gRPC(基于HTTP2协议)、SpringCloud
  • 远程通信协议:RMI、REST、SOAP
  • 通信框架:Netty、Miner

RPC能够让本地应用简单、高效地调用服务器中过程,它主要应用在分布式系统。

二、RPC实现

使用原生Java实现RPC框架,使用Socket、动态代理、反射。

RPC架构分为三个部分

  1. 提供者,运行在服务端,提供Service定义(接口)与ServiceImpl(接口实现类)
  2. 注册中心:运行在服务端,将本地服务发布成远程服务,管理远程服务,提供给调用者使用。
  3. 调用者:运行在客户端,通过远程代理对象调用远程服务

2.1 服务端

使用IDEA创建一个maven,项目,命名为simple-rpc-server

HelloService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface HelloService {
/**
* 打招呼
* @param name
* @return
*/
String sayHi(String name);

/**
* 购买mix4
* @param money
* @return
*/
String buyMix4(Integer money,String name);
}

HelloServiceImpl

1
2
3
4
5
6
7
8
9
public class HelloServiceImpl implements HelloService {
public String sayHi(String name) {
return "Hi, " + name;
}

public String buyMix4(Integer money,String name) {
return "从"+name+"手中花费"+money+"元买到MIX4 12G+256G";
}
}

ServerCenter

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface ServerCenter {

void stop();

void start() throws IOException;

void register(Class serviceInterface, Class impl);

boolean isRunning();

int getPort();

}

ServerCenterImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
public class ServerCenterImpl implements ServerCenter {
private int port;

private static boolean isRunning=false;

private static final HashMap<String,Class> serviceRegistry=new HashMap<String, Class>();

private static ExecutorService executor= Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());


public ServerCenterImpl(int port) {
this.port=port;
}

public void stop() {
isRunning=false;
executor.shutdown();

}

public void start() throws IOException {
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(port));
System.out.println("启动Socket服务器");
try {
while(true) {
Socket client = serverSocket.accept();
System.out.println("收到新的调用"+new Date());
executor.execute(new ServiceTask(client));
}
}finally {
serverSocket.close();
}
}

public void register(Class serviceInterface, Class impl) {
serviceRegistry.put(serviceInterface.getName(), impl);
}

public boolean isRunning() {
return false;
}

public int getPort() {
return 0;
}


class ServiceTask implements Runnable {
Socket client=null;

public ServiceTask(Socket client) {
this.client = client;
}

public void run() {
ObjectInputStream inputStream=null;
ObjectOutputStream outputStream=null;

try {
//将客户端发送的码流反序列化成对象,反射调用服务实现着,获取执行结果
inputStream=new ObjectInputStream(client.getInputStream());

//读取客户端传过来的 类名、方法名称、参数类型、参数值
String serviceName = inputStream.readUTF();
String methodName = inputStream.readUTF();
Class<?>[] parameterTypes = (Class<?>[]) inputStream.readObject();
Object[] arguments = (Object[]) inputStream.readObject();

Class serviceClass = serviceRegistry.get(serviceName);
if(serviceClass==null) {
throw new ClassNotFoundException(serviceName+" not found");
}
Method method = serviceClass.getMethod(methodName, parameterTypes);
Object result = method.invoke(serviceClass.newInstance(), arguments);
//将结果反序列化,通过socket发送给客户端
outputStream=new ObjectOutputStream(client.getOutputStream());
outputStream.writeObject(result);
}catch (Exception e) {
e.printStackTrace();
}finally {
if(outputStream!=null) {
try {
outputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(inputStream!=null) {
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(client!=null) {
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}


}

RPCServerTest

1
2
3
4
5
6
7
8
9
10
11
public class RPCServerTest {
public static void main(String[] args) {
try {
ServerCenter serverCenter=new ServerCenterImpl(1234);
serverCenter.register(HelloService.class, HelloServiceImpl.class);
serverCenter.start();
}catch (Exception e) {
e.printStackTrace();
}
}
}

将服务端启动起来,监听1234端口

2.2 客户端

使用IDEA创建一个maven,项目,命名为simple-rpc-client

将服务端的Service带过来,不用带ServiceImpl,一般实际使用场景中,会由服务端提供一个Service的jar包。

RPCClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class RPCClient<T> {
public static <T> T getRemoteProxyObj(final Class<?> serviceInterface, final InetSocketAddress addr) {
/**
* 将本地接口的调用,转换成JDK动态代理,在动态代理中实现远程调用
* 三个参数
* 1. 类加载器:真实对象.getClass().getClassLoader()
* 2. 接口数组:真实对象.getClass().getInterfaces(),这种写法需要用实现类。如果不想用实现类,就这么搞,new Class[]{serviceInterface}
* 3. 处理器:new InvocationHandler(),这个就是增强对象的核心方法
*/
return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(),
new Class[]{serviceInterface},
new InvocationHandler() {
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Socket socket=null;
ObjectOutputStream outputStream=null;
ObjectInputStream inputStream=null;
try {
//创建socket客户端,根据指定地址连接远程服务提供者
socket=new Socket();
socket.connect(addr);
//将远程服务调用所需的接口类、方法名、参数列表等编码后发送给服务提供者
outputStream=new ObjectOutputStream(socket.getOutputStream());
outputStream.writeUTF(serviceInterface.getName());
outputStream.writeUTF(method.getName());
outputStream.writeObject(method.getParameterTypes());
outputStream.writeObject(args);
//同步阻塞等待服务器返回应答,获取应答后返回
inputStream=new ObjectInputStream(socket.getInputStream());
return inputStream.readObject();
}catch (Exception e) {
e.printStackTrace();
}finally {
if(socket!=null){
socket.close();
}
if(outputStream!=null) {
outputStream.close();
}
if(inputStream!=null) {
inputStream.close();
}
}
return null;
}
});
}
}

RPCTest

1
2
3
4
5
6
7
8
public class RPCTest {
public static void main(String[] args) {
//获取动态代理对象,调用某个方法时,会由动态代理,去调用远程方法,通过socket将远程方法的返回值获取到
HelloService service= RPCClient.getRemoteProxyObj(HelloService.class,new InetSocketAddress("localhost",1234));
System.out.println(service.sayHi("雷军"));
System.out.println(service.buyMix4(4099,"雷军"));
}
}

启动客户端,调用本地1234端口获取代理对象。

输出结果,当然了,这是本地调用的远程实现类

1
2
Hi, 雷军
从雷军手中花费4099元买到MIX4 12G+256G
发布:2021-11-10 16:47:53
修改:2021-12-02 22:40:03
链接:https://meethigher.top/blog/2021/rpc/
标签:java open rpc 
付款码 打赏 分享
Shift+Ctrl+1 可控制工具栏