学习RPC框架,由繁化简,了解其本质原理
文章目录
- 项目简介
- 什么是RPC?
- 项目模块
- 项目代码
- common模块
- client模块
- server模块
- framework模块
- 测试
项目简介
什么是RPC?
RPC(Remote Procedure Call)即远程过程调用,不同于本地调用,RPC是指调用远端机器的函数或方法,且不需要关心底层的调用细节,如网络协议和传输协议等,对于调用者来说,和调用本地方法没有什么区别。
项目模块
- common模块:定义了用户接口和实体类User
- client模块:调用RPC框架的代理类,获取结果
- server模块:
- 实现common的接口,把实现类注册到注册中心中
- 调用RpcServer开启socket
- 根据RpcRequest类的信息,获取到注册中心的实现类
- 执行方法,返回结果,通过socket返回
- Rpc framework
- 注册中心
- RpcRequest,装载类的信息
- RpcServer:创建socket,接受客户端的请求
项目代码
common模块
实体类和定义的接口
package com.rpc.common;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;/*** @Author: Yangmiao* @Date: 2023/2/8 10:37* @Desc: 网络中传输的信息*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class User implements Serializable {private Integer id;private String name;private Integer age;private int sex;
}
package com.rpc.common;/*** @Author: Yangmiao* @Date: 2023/2/8 10:38* @Desc:*/
public interface IUserService {User getById(Integer id);User getUsername(String userName);
}
client模块
package com.rpc.client;import com.rpc.common.IUserService;
import com.rpc.framework.proxy.RpcProxy;/*** @Author: Yangmiao* @Date: 2023/2/8 11:39* @Desc:*/
public class Client {public static void main(String[] args) {RpcProxy rpcProxy = new RpcProxy();IUserService productService = (IUserService) rpcProxy.remoteCall("localhost", 10000, IUserService.class);System.out.println("productService = " + productService.getById(10));}
}
server模块
package com.rpc.server;import com.rpc.common.IUserService;
import com.rpc.framework.Registry;
import com.rpc.framework.RpcServer;/*** @Author: Yangmiao* @Date: 2023/2/8 11:37* @Desc:* https://www.cnblogs.com/fantongxue/p/16004920.html*/
public class Server {/*** 把接口和实现类注册到RPC的注册中心,然后通过RPC的RPCServer开启一个serversocket,监听某一个端口。* @param args*/public static void main(String[] args) {Registry.put(IUserService.class.getName(), UserServiceImpl.class);new RpcServer().provide(10000);}
}
package com.rpc.server;import com.rpc.common.User;
import com.rpc.common.IUserService;/*** @Author: Yangmiao* @Date: 2023/2/8 11:35* @Desc:*/
public class UserServiceImpl implements IUserService {@Overridepublic User getById(Integer id) {return User.builder().id(id).name("yangmiao").age(100).sex(1).build();}@Overridepublic User getUsername(String userName) {return User.builder().name(userName).build();}
}
framework模块
package com.rpc.framework;import java.util.HashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;/*** @Author: Yangmiao* @Date: 2023/2/8 10:39* @Desc: 注册中心*/
public class Registry {private final static HashMap<String, Class> map = new HashMap<>();private final static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();/*** 读缓存* @param key* @return*/public static Class get(String key){// 读锁Lock writeLock = readWriteLock.writeLock();// 写锁Lock readLock = readWriteLock.readLock();Class v = null;readLock.lock();try {v = map.get(key);}finally {readLock.unlock();}if (v != null){return v;}// 缓存中不存在writeLock.lock();try {v = map.get(key);if (v==null){// 1.查询数据库// 2.写入缓存map.put(key,v);}}finally {writeLock.unlock();}return v;}/*** 写缓存* @param key* @param value* @return*/public static Class put(String key, Class value){Lock writeLock = readWriteLock.writeLock();writeLock.lock();try {return map.put(key,value);}finally {writeLock.unlock();}}public static boolean containsKey(String key){return map.containsKey(key);}}
package com.rpc.framework;import lombok.Data;import java.io.Serializable;/*** @Author: Yangmiao* @Date: 2023/2/8 10:41* @Desc:*/
@Data
public class RpcRequest implements Serializable {private String className;private String methodName;private Class[] types;private Object[] params;}
package com.rpc.framework;import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** @Author: Yangmiao* @Date: 2023/2/8 10:42* @Desc:*/
public class RpcServer {/*** 创建线程池*/private ExecutorService executors = Executors.newFixedThreadPool(5);public void provide(int port){try {ServerSocket serverSocket = new ServerSocket(port);while (true){Socket socket = serverSocket.accept();executors.execute(new ProcessHandler(socket));}} catch (IOException e) {e.printStackTrace();}}
}
package com.rpc.framework;import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.Socket;/*** @Author: Yangmiao* @Date: 2023/2/8 10:50* @Desc: 处理服务端逻辑*/
public class ProcessHandler implements Runnable {private Socket socket;public ProcessHandler(Socket socket) {this.socket = socket;}@Overridepublic void run() {ObjectInputStream objectInputStream = null;ObjectOutputStream objectOutputStream = null;try {objectInputStream = new ObjectInputStream(socket.getInputStream());RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();Class clazz = null;// 判断是否存在于注册中心中if (Registry.containsKey(rpcRequest.getClassName())){clazz = Registry.get(rpcRequest.getClassName());}Method method = clazz.getMethod(rpcRequest.getMethodName(), rpcRequest.getTypes());Object result = method.invoke(clazz.newInstance(), rpcRequest.getParams());// 返回结果objectOutputStream = new ObjectOutputStream(socket.getOutputStream());objectOutputStream.writeObject(result);objectOutputStream.flush();} catch (IOException e) {e.printStackTrace();} catch (ClassNotFoundException e) {e.printStackTrace();} catch (NoSuchMethodException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();} catch (IllegalAccessException e) {e.printStackTrace();} catch (InstantiationException e) {e.printStackTrace();}finally {try {if (objectInputStream !=null){objectInputStream.close();}if (objectOutputStream !=null){objectOutputStream.close();}}catch (IOException e){e.printStackTrace();}}}
}
代理
package com.rpc.framework.proxy;import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;/*** @Author: Yangmiao* @Date: 2023/2/8 11:18* @Desc: 创建动态代理*/
public class RpcProxy<T> {public T remoteCall(String host,int port,Class clazz){return (T) Proxy.newProxyInstance(clazz.getClassLoader(),(Class<?>[]) new Class[]{clazz},new RemoteInvocationHandler(host,port,clazz));}
}
package com.rpc.framework.proxy;import com.rpc.framework.RpcRequest;import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;/*** @Author: Yangmiao* @Date: 2023/2/8 11:22* @Desc: 代理类执行的逻辑*/
public class RemoteInvocationHandler implements InvocationHandler {private String host;private int port;private Class clazz;public RemoteInvocationHandler(String host,int port,Class clazz){this.host = host;this.port = port;this.clazz = clazz;}@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {RpcRequest rpcRequest = new RpcRequest();rpcRequest.setClassName(clazz.getName());rpcRequest.setMethodName(method.getName());rpcRequest.setTypes(method.getParameterTypes());rpcRequest.setParams(args);ObjectOutputStream objectOutputStream = null;ObjectInputStream objectInputStream = null;try {Socket socket = new Socket(host,port);// 发送消息objectOutputStream = new ObjectOutputStream(socket.getOutputStream());objectOutputStream.writeObject(rpcRequest);objectOutputStream.flush();// 接受结果objectInputStream = new ObjectInputStream(socket.getInputStream());Object readObject = objectInputStream.readObject();return readObject;}catch (Exception e){e.printStackTrace();}finally {try {if (objectInputStream !=null){objectInputStream.close();}if (objectOutputStream!=null){objectOutputStream.close();}}catch (IOException e){e.printStackTrace();}}return null;}
}
测试