使用Reactor模式重构网络代码示例
使用Reactor模式重构后的代码如下:
private void handle(SocketChannel socketChannel) {
try {
SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
key.attach(new SocketAttachment(socketChannel));
} catch (ClosedChannelException e) {
System.out.println(ExceptionUtil.getBriefExceptionStackTrace(e));
}
}
public void handleRead(SelectionKey key) {
SocketAttachment attachment = (SocketAttachment) key.attachment();
SocketChannel socketChannel = attachment.getSocketChannel();
try {
ByteBuffer buffer = ByteBuffer.allocate(2048);
socketChannel.read(buffer);
buffer.flip();
attachment.setRecvString(new String(buffer.array()));
String responseJsonBody = '';
if (attachment.getRecvString() != null) {
try {
responseJsonBody = attachment.getRecvString().substring(attachment.getRecvString().lastIndexOf('
'));
} catch (Exception e) {
System.out.println('get responseJsonBody fail.');
}
}
String sendString='HTTP/1.1 200 OK
' + '
' ;
ByteBuffer buf = ByteBuffer.allocateDirect(1024);
buf.put(sendString.getBytes());
buf.flip();
attachment.setSendBuffer(buf);
key.interestOps(SelectionKey.OP_WRITE);
System.out.print('simplehttpserver.java 接收到平台推送的第' + msgCnt + '消息 :');
System.out.println(responseJsonBody);
msgCnt++;
System.out.println();
/**
* string转json.调用推送数据处理方法
*/
try {
Message sendMsg = new Message('DemoTopic', 'DemoTag', responseJsonBody.getBytes());
DefaultMQProducer defaultMQProducer = SpringUtils.getBean('producer');
// 默认3秒超时
SendResult sendResult = defaultMQProducer.send(sendMsg);
} catch (Exception e) {
e.printStackTrace();
}
} catch (IOException e) {
System.out.println(ExceptionUtil.getBriefExceptionStackTrace(e));
}
}
public void handleWrite(SelectionKey key) {
SocketAttachment attachment = (SocketAttachment) key.attachment();
SocketChannel socketChannel = attachment.getSocketChannel();
ByteBuffer buffer = attachment.getSendBuffer();
try {
socketChannel.write(buffer);
if (buffer.remaining() > 0) {
// 写入未完成,继续注册写事件
key.interestOps(SelectionKey.OP_WRITE);
} else {
// 写入完成,注册读事件
key.interestOps(SelectionKey.OP_READ);
}
} catch (IOException e) {
System.out.println(ExceptionUtil.getBriefExceptionStackTrace(e));
}
}
public class SocketAttachment {
private SocketChannel socketChannel;
private String recvString;
private ByteBuffer sendBuffer;
public SocketAttachment(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
public SocketChannel getSocketChannel() {
return socketChannel;
}
public String getRecvString() {
return recvString;
}
public void setRecvString(String recvString) {
this.recvString = recvString;
}
public ByteBuffer getSendBuffer() {
return sendBuffer;
}
public void setSendBuffer(ByteBuffer sendBuffer) {
this.sendBuffer = sendBuffer;
}
}
说明:
- 在handle方法中,注册了读事件,并将SocketChannel和一个SocketAttachment对象关联起来,将SocketAttachment作为附件添加到SelectionKey中。
- handleRead方法中,从SelectionKey中获取SocketAttachment对象,处理读事件,将接收到的数据保存到SocketAttachment中,并设置发送的响应消息。
- 在设置完发送的响应消息后,将SelectionKey的操作设置为写操作,等待下一个事件循环调用handleWrite方法。
- handleWrite方法中,从SelectionKey中获取SocketAttachment对象,处理写事件,将响应消息发送给客户端,如果写入未完成则继续注册写事件,否则注册读事件并等待下一个事件循环调用handleRead方法。
- 新增了一个SocketAttachment类,用于保存SocketChannel、接收到的数据和要发送的响应消息。
通过使用Reactor模式,将阻塞式的IO操作改为非阻塞式的事件驱动模型,可以提高程序的性能和可扩展性。
原文地址: https://www.cveoy.top/t/topic/n6Un 著作权归作者所有。请勿转载和采集!