组长将项目中使用Kafka的地方,进行了优化,最终kafka的作用只是用来收集日志,其他用到kafka的地方,使用了gRPC进行了优化。本文就用来记录下,学习RPC原理的过程。
本文源码meethigher/simple-rpc: Java实现一个简单RPC功能,开启Server,再启动Client
抄袭自Java实现简单的RPC框架 - 苍穹2018 - 博客园
其他参考
- RPC系列:基本概念 - 海米傻傻 - 博客园
- java RMI原理详解_xinghun_4的专栏-CSDN博客_rmi
- Java RMI与RPC的区别 - Silentdoer - 博客园
- RMI和RPC比较_Baron-CSDN博客
一、RPC概念
RPC,全称是Remote Procedure Call,远程过程调用。
RPC是一种技术思想,通过网络从远程计算机程序上调用服务,而不需要去了解底层网络技术。
常见的RPC技术和框架
- 应用级的服务框架:阿里的Dubbo/Dubbox、谷歌的gRPC(基于HTTP2协议)、SpringCloud
- 远程通信协议:RMI、REST、SOAP
- 通信框架:Netty、Miner
RPC能够让本地应用简单、高效地调用服务器中过程,它主要应用在分布式系统。
二、RPC实现
使用原生Java实现RPC框架,使用Socket、动态代理、反射。
RPC架构分为三个部分
- 提供者,运行在服务端,提供Service定义(接口)与ServiceImpl(接口实现类)
- 注册中心:运行在服务端,将本地服务发布成远程服务,管理远程服务,提供给调用者使用。
- 调用者:运行在客户端,通过远程代理对象调用远程服务
2.1 服务端
使用IDEA创建一个maven,项目,命名为simple-rpc-server
HelloService
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public interface HelloService {
String sayHi(String name);
String buyMix4(Integer money,String name); }
|
HelloServiceImpl
1 2 3 4 5 6 7 8 9
| public class HelloServiceImpl implements HelloService { public String sayHi(String name) { return "Hi, " + name; }
public String buyMix4(Integer money,String name) { return "从"+name+"手中花费"+money+"元买到MIX4 12G+256G"; } }
|
ServerCenter
1 2 3 4 5 6 7 8 9 10 11 12 13
| public interface ServerCenter {
void stop();
void start() throws IOException;
void register(Class serviceInterface, Class impl);
boolean isRunning();
int getPort();
}
|
ServerCenterImpl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
| public class ServerCenterImpl implements ServerCenter { private int port;
private static boolean isRunning=false;
private static final HashMap<String,Class> serviceRegistry=new HashMap<String, Class>();
private static ExecutorService executor= Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
public ServerCenterImpl(int port) { this.port=port; }
public void stop() { isRunning=false; executor.shutdown();
}
public void start() throws IOException { ServerSocket serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress(port)); System.out.println("启动Socket服务器"); try { while(true) { Socket client = serverSocket.accept(); System.out.println("收到新的调用"+new Date()); executor.execute(new ServiceTask(client)); } }finally { serverSocket.close(); } }
public void register(Class serviceInterface, Class impl) { serviceRegistry.put(serviceInterface.getName(), impl); }
public boolean isRunning() { return false; }
public int getPort() { return 0; }
class ServiceTask implements Runnable { Socket client=null;
public ServiceTask(Socket client) { this.client = client; }
public void run() { ObjectInputStream inputStream=null; ObjectOutputStream outputStream=null;
try { inputStream=new ObjectInputStream(client.getInputStream());
String serviceName = inputStream.readUTF(); String methodName = inputStream.readUTF(); Class<?>[] parameterTypes = (Class<?>[]) inputStream.readObject(); Object[] arguments = (Object[]) inputStream.readObject();
Class serviceClass = serviceRegistry.get(serviceName); if(serviceClass==null) { throw new ClassNotFoundException(serviceName+" not found"); } Method method = serviceClass.getMethod(methodName, parameterTypes); Object result = method.invoke(serviceClass.newInstance(), arguments); outputStream=new ObjectOutputStream(client.getOutputStream()); outputStream.writeObject(result); }catch (Exception e) { e.printStackTrace(); }finally { if(outputStream!=null) { try { outputStream.close(); } catch (IOException e) { e.printStackTrace(); } } if(inputStream!=null) { try { inputStream.close(); } catch (IOException e) { e.printStackTrace(); } } if(client!=null) { try { client.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
}
|
RPCServerTest
1 2 3 4 5 6 7 8 9 10 11
| public class RPCServerTest { public static void main(String[] args) { try { ServerCenter serverCenter=new ServerCenterImpl(1234); serverCenter.register(HelloService.class, HelloServiceImpl.class); serverCenter.start(); }catch (Exception e) { e.printStackTrace(); } } }
|
将服务端启动起来,监听1234端口
2.2 客户端
使用IDEA创建一个maven,项目,命名为simple-rpc-client
将服务端的Service带过来,不用带ServiceImpl,一般实际使用场景中,会由服务端提供一个Service的jar包。
RPCClient
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| public class RPCClient<T> { public static <T> T getRemoteProxyObj(final Class<?> serviceInterface, final InetSocketAddress addr) {
return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class[]{serviceInterface}, new InvocationHandler() { public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Socket socket=null; ObjectOutputStream outputStream=null; ObjectInputStream inputStream=null; try { socket=new Socket(); socket.connect(addr); outputStream=new ObjectOutputStream(socket.getOutputStream()); outputStream.writeUTF(serviceInterface.getName()); outputStream.writeUTF(method.getName()); outputStream.writeObject(method.getParameterTypes()); outputStream.writeObject(args); inputStream=new ObjectInputStream(socket.getInputStream()); return inputStream.readObject(); }catch (Exception e) { e.printStackTrace(); }finally { if(socket!=null){ socket.close(); } if(outputStream!=null) { outputStream.close(); } if(inputStream!=null) { inputStream.close(); } } return null; } }); } }
|
RPCTest
1 2 3 4 5 6 7 8
| public class RPCTest { public static void main(String[] args) { HelloService service= RPCClient.getRemoteProxyObj(HelloService.class,new InetSocketAddress("localhost",1234)); System.out.println(service.sayHi("雷军")); System.out.println(service.buyMix4(4099,"雷军")); } }
|
启动客户端,调用本地1234端口获取代理对象。
输出结果,当然了,这是本地调用的远程实现类
1 2
| Hi, 雷军 从雷军手中花费4099元买到MIX4 12G+256G
|