使用reactor修改以下代码:private void handleSocketChannel socketChannel try ByteBuffer buffer = ByteBufferallocate2048; socketChannelreadbuffer; bufferflip; recvString = new Stringbufferarray; String respons
使用reactor模式重构后的代码如下:
private void handle(SocketChannel socketChannel) { try { SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ); key.attach(new SocketAttachment(socketChannel)); } catch (ClosedChannelException e) { System.out.println(ExceptionUtil.getBriefExceptionStackTrace(e)); } }
public void handleRead(SelectionKey key) { SocketAttachment attachment = (SocketAttachment) key.attachment(); SocketChannel socketChannel = attachment.getSocketChannel(); try { ByteBuffer buffer = ByteBuffer.allocate(2048); socketChannel.read(buffer); buffer.flip(); attachment.setRecvString(new String(buffer.array())); String responseJsonBody = ""; if (attachment.getRecvString() != null) { try { responseJsonBody = attachment.getRecvString().substring(attachment.getRecvString().lastIndexOf("\n")); } catch (Exception e) { System.out.println("get responseJsonBody fail."); } } String sendString="HTTP/1.1 200 OK\r\n" + "\r\n" ; ByteBuffer buf = ByteBuffer.allocateDirect(1024); buf.put(sendString.getBytes()); buf.flip(); attachment.setSendBuffer(buf); key.interestOps(SelectionKey.OP_WRITE); System.out.print("simplehttpserver.java 接收到平台推送的第" + msgCnt + "消息 :"); System.out.println(responseJsonBody); msgCnt++; System.out.println(); /** * string转json.调用推送数据处理方法 */ try { Message sendMsg = new Message("DemoTopic", "DemoTag", responseJsonBody.getBytes()); DefaultMQProducer defaultMQProducer = SpringUtils.getBean("producer"); // 默认3秒超时 SendResult sendResult = defaultMQProducer.send(sendMsg); } catch (Exception e) { e.printStackTrace(); } } catch (IOException e) { System.out.println(ExceptionUtil.getBriefExceptionStackTrace(e)); } }
public void handleWrite(SelectionKey key) { SocketAttachment attachment = (SocketAttachment) key.attachment(); SocketChannel socketChannel = attachment.getSocketChannel(); ByteBuffer buffer = attachment.getSendBuffer(); try { socketChannel.write(buffer); if (buffer.remaining() > 0) { // 写入未完成,继续注册写事件 key.interestOps(SelectionKey.OP_WRITE); } else { // 写入完成,注册读事件 key.interestOps(SelectionKey.OP_READ); } } catch (IOException e) { System.out.println(ExceptionUtil.getBriefExceptionStackTrace(e)); } }
public class SocketAttachment { private SocketChannel socketChannel; private String recvString; private ByteBuffer sendBuffer;
public SocketAttachment(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
public SocketChannel getSocketChannel() {
return socketChannel;
}
public String getRecvString() {
return recvString;
}
public void setRecvString(String recvString) {
this.recvString = recvString;
}
public ByteBuffer getSendBuffer() {
return sendBuffer;
}
public void setSendBuffer(ByteBuffer sendBuffer) {
this.sendBuffer = sendBuffer;
}
}
说明:
- 在handle方法中,注册了读事件,并将SocketChannel和一个SocketAttachment对象关联起来,将SocketAttachment作为附件添加到SelectionKey中。
- handleRead方法中,从SelectionKey中获取SocketAttachment对象,处理读事件,将接收到的数据保存到SocketAttachment中,并设置发送的响应消息。
- 在设置完发送的响应消息后,将SelectionKey的操作设置为写操作,等待下一个事件循环调用handleWrite方法。
- handleWrite方法中,从SelectionKey中获取SocketAttachment对象,处理写事件,将响应消息发送给客户端,如果写入未完成则继续注册写事件,否则注册读事件并等待下一个事件循环调用handleRead方法。
- 新增了一个SocketAttachment类,用于保存SocketChannel、接收到的数据和要发送的响应消息
原文地址: https://www.cveoy.top/t/topic/feBJ 著作权归作者所有。请勿转载和采集!