Reactor模型

约定不等于承诺〃 2022-11-04 11:30 196阅读 0赞

Reactor是一种设计模式。基于事件驱动,然后通过事件分发器,将事件分发给对应的处理器进行处理。

  1. Reactor:监听网络端口,分发网络连接事件给Acceptor,具体的感兴趣读写事件handler
  2. Acceptor:接受新的连接,连接的读写事件操作交给相应的Handler
  3. Handler:注册为callback对象,并且注册自己感兴趣的读事件或者写事件等等,然后再相应的方法内进行业务操作内容

1.单线程版

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3hjMTIzX2phdmE_size_16_color_FFFFFF_t_70

参考代码:

  1. package com.ddcx.utils;
  2. /**
  3. * @author: xc
  4. * @ClassName: Test
  5. * @Date: 2021-03-03 12:47
  6. * @Description:
  7. */
  8. import java.io.IOException;
  9. import java.net.InetSocketAddress;
  10. import java.nio.channels.SelectionKey;
  11. import java.nio.channels.Selector;
  12. import java.nio.channels.ServerSocketChannel;
  13. import java.nio.channels.SocketChannel;
  14. import java.util.Iterator;
  15. import java.util.Set;
  16. class Reactor implements Runnable {
  17. final Selector selector;
  18. final ServerSocketChannel serverSocket;
  19. Reactor(int port) throws IOException { //Reactor初始化
  20. selector = Selector.open();
  21. serverSocket = ServerSocketChannel.open();
  22. //要监听的网络端口号
  23. serverSocket.socket().bind(new InetSocketAddress(port));
  24. //非阻塞
  25. serverSocket.configureBlocking(false);
  26. //分步处理,第一步,接收accept事件
  27. SelectionKey sk =
  28. serverSocket.register(selector, SelectionKey.OP_ACCEPT);
  29. //attach callback object, Acceptor
  30. sk.attach(new Acceptor());
  31. }
  32. @Override
  33. public void run() {
  34. try {
  35. while (!Thread.interrupted()) {
  36. //阻塞到至少有一个通道在你注册的事件上就绪了。
  37. selector.select();
  38. Set selected = selector.selectedKeys();
  39. Iterator it = selected.iterator();
  40. while (it.hasNext()) {
  41. //Reactor负责dispatch收到的事件
  42. dispatch((SelectionKey) (it.next()));
  43. }
  44. selected.clear();
  45. }
  46. } catch (IOException ex) { /* ... */ }
  47. }
  48. void dispatch(SelectionKey k) {
  49. Runnable r = (Runnable) (k.attachment());
  50. //调用之前注册的callback对象
  51. if (r != null) {
  52. //这里是Acceptor的run方法
  53. r.run();
  54. }
  55. }
  56. // inner class
  57. class Acceptor implements Runnable {
  58. @Override
  59. public void run() {
  60. try {
  61. //阻塞到获取网络连接通道
  62. SocketChannel channel = serverSocket.accept();
  63. if (channel != null) {
  64. //连接已经就绪,将相应的感兴趣的读写事件注册到回调中
  65. new ReadHander(selector, channel);
  66. }
  67. } catch (IOException ex) { /* ... */ }
  68. }
  69. }
  70. public static void main(String[] args) throws IOException {
  71. Reactor reactor = new Reactor(9000);
  72. reactor.run();
  73. }
  74. }
  75. package com.ddcx.utils;
  76. /**
  77. * @author: xc
  78. * @ClassName: ReadHander
  79. * @Date: 2021-03-03 12:48
  80. * @Description:
  81. */
  82. import java.io.IOException;
  83. import java.nio.ByteBuffer;
  84. import java.nio.channels.SelectionKey;
  85. import java.nio.channels.Selector;
  86. import java.nio.channels.SocketChannel;
  87. class ReadHander implements Runnable {
  88. final SocketChannel channel;
  89. final SelectionKey sk;
  90. ByteBuffer input = ByteBuffer.allocate(90);
  91. ByteBuffer output = ByteBuffer.allocate(400);
  92. static final int READING = 0, SENDING = 1;
  93. int state = READING;
  94. ReadHander(Selector selector, SocketChannel c) throws IOException {
  95. channel = c;
  96. c.configureBlocking(false);
  97. // Optionally try first read now
  98. sk = channel.register(selector, 0);
  99. //将Handler作为callback对象
  100. sk.attach(this);
  101. //第二步,注册Read就绪事件
  102. sk.interestOps(SelectionKey.OP_READ);
  103. selector.wakeup();
  104. }
  105. boolean inputIsComplete() {
  106. /* ... */
  107. return false;
  108. }
  109. boolean outputIsComplete() {
  110. /* ... */
  111. return false;
  112. }
  113. void process() {
  114. /* ... */
  115. return;
  116. }
  117. @Override
  118. public void run() {
  119. try {
  120. if (state == READING) {
  121. read();
  122. } else if (state == SENDING) {
  123. send();
  124. }
  125. } catch (IOException ex) { /* ... */ }
  126. }
  127. void read() throws IOException {
  128. channel.read(input);
  129. if (inputIsComplete()) {
  130. process();
  131. state = SENDING;
  132. // Normally also do first write now
  133. //第三步,接收write就绪事件
  134. sk.interestOps(SelectionKey.OP_WRITE);
  135. }
  136. }
  137. void send() throws IOException {
  138. channel.write(output);
  139. //write完就结束了, 关闭select key
  140. if (outputIsComplete()) {
  141. sk.cancel();
  142. }
  143. }
  144. }

①.通过Selector的select()方法可以选择已经准备就绪的通道

②.通过ServerSocketChannel.accept()方法监听新进来的连接。当accept()方法返回的时候,它返回一个包含新进来的连接的SocketChannel。因此, accept()方法会一直阻塞到有新连接到达。通常不会仅仅只监听一个连接

单线程版 Reactor模型,其实就是做了一件事情,就是把要监听的socket端口注册到selector中去,并且轮询线程内可以获取到多个已经准备就绪的socket连接通道,同时进行处理这些事件

2. 多线程Reactor模型

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3hjMTIzX2phdmE_size_16_color_FFFFFF_t_70 1

多线程主要体现在handler处理的时候,因为处理的事件可能耗时相对于久一些,这样做可以更快的处理感兴趣的事件

  1. selectionKey.attach(new HandlerThreadPool(socketChannel));

3.主从模式多线程

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3hjMTIzX2phdmE_size_16_color_FFFFFF_t_70 2

  1. mainReactor负责监听socket连接,用来处理新连接的建立和就绪,将建立的socketChannel指定注册给subReactor。网络连接的建立一般很快,所以这里一个主线程就够了

2.subReactor 一般是cpu的核心数,将连接加入到连接队列进行监听,并创建handler进行各种事件处理;当有新事件发生时, subreactor 就会调用对应的handler处理,而对具体的读写事件业务处理的功能交给handler线程池来完成。

发表评论

表情:
评论列表 (有 0 条评论,196人围观)

还没有评论,来说两句吧...

相关阅读