RPC解释

  • RPC:Remote Procedure Call(远程过程调用)
  • 客户端获取服务端的服务(不同的计算机)

RPC所需要的技术

  • 反射技术:客户端给服务端发送代表接口名的字符串,服务端需要通过字符串解析出该字符串代表的接口的一切信息
  • socket:客户端与服务端交互(传输信息)
  • 动态代理:服务端需要根据客户端的不同请求,返回不同的接口类型,客户端需要接受到不同的接口类型

    代码实现

  • 客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class Client {

//获取代表服务端接口的动态代理对象
//serviceName:请求的接口名
//addr:待请求服务端的ip:端口
@SuppressWarnings("unchecked")
public static <T> T getRemoteProxyObj(Class serviceInterface, InetSocketAddress addr) {
/*newProxyInstance(a,b,c)
* a:类加载器,需要代理哪个类,就需要将哪个类加载器传入第一个参数
* b:需要代理的对象,具备哪些功能(方法) --接口
*/
return (T)Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[] {serviceInterface}, new InvocationHandler() {

//proxy:代理的对象 method:使用的函数 args:函数的参数
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

Object result = null;
ObjectInputStream input = null;
ObjectOutputStream output = null;
try {
//客户端向服务端发送请求:请求某一个具体的接口
Socket socket = new Socket();
//socketaddress: Ip : 端口
socket.connect(addr);
output = new ObjectOutputStream( socket.getOutputStream()); //发送:序列化流(对象流)

//发送 接口名 方法 参数类型 参数

output.writeUTF(serviceInterface.getName());
output.writeUTF(method.getName());

output.writeObject(method.getParameterTypes());
output.writeObject(args);
//等待服务端处理

//接受服务端处理后的返回值
input = new ObjectInputStream(socket.getInputStream());
result = input.readObject();

}catch (Exception e) {
e.printStackTrace();
}finally {
try {
if(null != output) {
output.close();
}
if(null != input) {
input.close();
}
}catch (Exception e) {
e.printStackTrace();
}
}
return result;
}
}) ;
}

}
  • 服务中心代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public class ServerCenter implements Server{

private static HashMap<String, Class> serviceRegiser = new HashMap<>();
private static int port;

public ServerCenter() {
}

public ServerCenter(int port) {
this.port = port;
}

//开启服务
@Override
public void start(){

ServerSocket server = null;
Socket socket = null;
ObjectInputStream input = null;
ObjectOutputStream output = null;

try {
server = new ServerSocket();

server.bind(new InetSocketAddress(port));

socket = server.accept(); //等待客户端连接

//接受到客户端连接及请求,处理该请求
input = new ObjectInputStream(socket.getInputStream());
//因为ObjectInputStream对发送数据的顺序严格要求,因此需要按照发送的顺序逐个接受
String serviceName = input.readUTF();
String methodName = input.readUTF();
Class[] parameterTypes = (Class[])input.readObject();
Object[] arguments = (Object[])input.readObject();

Class serviceClass = serviceRegiser.get(serviceName);
Method method = serviceClass.getMethod(methodName, parameterTypes);
Object result = method.invoke(serviceClass.newInstance(), arguments);

//将服务端执行完毕的返回值返回给客户端
output = new ObjectOutputStream(socket.getOutputStream());
output.writeObject(result);
}catch (Exception e) {
e.printStackTrace();
}finally {
try {
if(null != output) {
output.close();
}
if(null != input) {
input.close();
}
}catch (Exception e) {
e.printStackTrace();
}
}

}

@Override
public void stop() {

}

@Override
public void register(Class service, Class serviceImpl) {
serviceRegiser.put(service.getName(), serviceImpl);
}

}

优化

  • 在服务端建立连接池,使服务能够多线程并发执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
public class ServerCenter implements Server{

private static HashMap<String, Class> serviceRegiser = new HashMap<>();
private static int port;
private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

private static boolean isRunning = false;

public ServerCenter() {
}

public ServerCenter(int port) {
this.port = port;
}

//开启服务
@Override
public void start(){

ServerSocket server = null;
try {
server = new ServerSocket();
server.bind(new InetSocketAddress(port));
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
Socket socket = null;
System.out.println("启动服务...");
isRunning = true;
while(true) {

try {
socket = server.accept();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} //等待客户端连接
executor.execute(new ServiceTask(socket));

}

}

@Override
public void stop() {
System.out.println("关闭服务...");
isRunning = false;
executor.shutdown();
}

@Override
public void register(Class service, Class serviceImpl) {
serviceRegiser.put(service.getName(), serviceImpl);
}

private static class ServiceTask implements Runnable{

Socket socket = new Socket();

public ServiceTask() {}

public ServiceTask(Socket socket) {
this.socket = socket;
}

@Override
public void run() {

ObjectInputStream input = null;
ObjectOutputStream output = null;

try {

//接受到客户端连接及请求,处理该请求
input = new ObjectInputStream(socket.getInputStream());
//因为ObjectInputStream对发送数据的顺序严格要求,因此需要按照发送的顺序逐个接受
String serviceName = input.readUTF();
String methodName = input.readUTF();
Class[] parameterTypes = (Class[])input.readObject();
Object[] arguments = (Object[])input.readObject();

Class serviceClass = serviceRegiser.get(serviceName);
Method method = serviceClass.getMethod(methodName, parameterTypes);
Object result = method.invoke(serviceClass.newInstance(), arguments);

//将服务端执行完毕的返回值返回给客户端
output = new ObjectOutputStream(socket.getOutputStream());
output.writeObject(result);
}catch (Exception e) {
e.printStackTrace();
}finally {
try {
if(null != output) {
output.close();
}
if(null != input) {
input.close();
}
}catch (Exception e) {
e.printStackTrace();
}
}

}

}

}
  • 使用线程启动服务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class RPCServerTest {

public static void main(String[] args) {

new Thread(new Runnable() {

@Override
public void run() {
Server server = new ServerCenter(9999);
server.register(HelloService.class, HelloServiceImpl.class);
server.start();
}
}).start();;

}

}
  • 客户端测试代码
1
2
3
4
5
6
7
8
9
10
11
public class RPCClientTest {

public static void main(String[] args) throws ClassNotFoundException {

HelloService service = Client.getRemoteProxyObj(Class.forName("com.kexing.rpc.service.HelloService"), new InetSocketAddress("127.0.0.1", 9999));
System.out.println(service.sayHi("zs"));


}

}
  • 待优化
    while(true)