Reactor模型
Reactor是一种设计模式。基于事件驱动,然后通过事件分发器,将事件分发给对应的处理器进行处理。
Reactor:监听网络端口,分发网络连接事件给Acceptor,具体的感兴趣读写事件handler
Acceptor:接受新的连接,连接的读写事件操作交给相应的Handler
Handler:注册为callback对象,并且注册自己感兴趣的读事件或者写事件等等,然后再相应的方法内进行业务操作内容
1.单线程版
参考代码:
package com.ddcx.utils;
/**
* @author: xc
* @ClassName: Test
* @Date: 2021-03-03 12:47
* @Description:
*/
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException { //Reactor初始化
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
//要监听的网络端口号
serverSocket.socket().bind(new InetSocketAddress(port));
//非阻塞
serverSocket.configureBlocking(false);
//分步处理,第一步,接收accept事件
SelectionKey sk =
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
//attach callback object, Acceptor
sk.attach(new Acceptor());
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
//阻塞到至少有一个通道在你注册的事件上就绪了。
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()) {
//Reactor负责dispatch收到的事件
dispatch((SelectionKey) (it.next()));
}
selected.clear();
}
} catch (IOException ex) { /* ... */ }
}
void dispatch(SelectionKey k) {
Runnable r = (Runnable) (k.attachment());
//调用之前注册的callback对象
if (r != null) {
//这里是Acceptor的run方法
r.run();
}
}
// inner class
class Acceptor implements Runnable {
@Override
public void run() {
try {
//阻塞到获取网络连接通道
SocketChannel channel = serverSocket.accept();
if (channel != null) {
//连接已经就绪,将相应的感兴趣的读写事件注册到回调中
new ReadHander(selector, channel);
}
} catch (IOException ex) { /* ... */ }
}
}
public static void main(String[] args) throws IOException {
Reactor reactor = new Reactor(9000);
reactor.run();
}
}
package com.ddcx.utils;
/**
* @author: xc
* @ClassName: ReadHander
* @Date: 2021-03-03 12:48
* @Description:
*/
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
class ReadHander implements Runnable {
final SocketChannel channel;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(90);
ByteBuffer output = ByteBuffer.allocate(400);
static final int READING = 0, SENDING = 1;
int state = READING;
ReadHander(Selector selector, SocketChannel c) throws IOException {
channel = c;
c.configureBlocking(false);
// Optionally try first read now
sk = channel.register(selector, 0);
//将Handler作为callback对象
sk.attach(this);
//第二步,注册Read就绪事件
sk.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
boolean inputIsComplete() {
/* ... */
return false;
}
boolean outputIsComplete() {
/* ... */
return false;
}
void process() {
/* ... */
return;
}
@Override
public void run() {
try {
if (state == READING) {
read();
} else if (state == SENDING) {
send();
}
} catch (IOException ex) { /* ... */ }
}
void read() throws IOException {
channel.read(input);
if (inputIsComplete()) {
process();
state = SENDING;
// Normally also do first write now
//第三步,接收write就绪事件
sk.interestOps(SelectionKey.OP_WRITE);
}
}
void send() throws IOException {
channel.write(output);
//write完就结束了, 关闭select key
if (outputIsComplete()) {
sk.cancel();
}
}
}
①.通过Selector的select()方法可以选择已经准备就绪的通道
②.通过ServerSocketChannel.accept()方法监听新进来的连接。当accept()方法返回的时候,它返回一个包含新进来的连接的SocketChannel。因此, accept()方法会一直阻塞到有新连接到达。通常不会仅仅只监听一个连接
单线程版 Reactor模型,其实就是做了一件事情,就是把要监听的socket端口注册到selector中去,并且轮询线程内可以获取到多个已经准备就绪的socket连接通道,同时进行处理这些事件
2. 多线程Reactor模型
多线程主要体现在handler处理的时候,因为处理的事件可能耗时相对于久一些,这样做可以更快的处理感兴趣的事件
selectionKey.attach(new HandlerThreadPool(socketChannel));
3.主从模式多线程
- mainReactor负责监听socket连接,用来处理新连接的建立和就绪,将建立的socketChannel指定注册给subReactor。网络连接的建立一般很快,所以这里一个主线程就够了
2.subReactor 一般是cpu的核心数,将连接加入到连接队列进行监听,并创建handler进行各种事件处理;当有新事件发生时, subreactor 就会调用对应的handler处理,而对具体的读写事件业务处理的功能交给handler线程池来完成。
还没有评论,来说两句吧...