使用Reactor模式重构后的代码如下:

private void handle(SocketChannel socketChannel) {
    try {
        SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
        key.attach(new SocketAttachment(socketChannel));
    } catch (ClosedChannelException e) {
        System.out.println(ExceptionUtil.getBriefExceptionStackTrace(e));
    }
}

public void handleRead(SelectionKey key) {
    SocketAttachment attachment = (SocketAttachment) key.attachment();
    SocketChannel socketChannel = attachment.getSocketChannel();
    try {
        ByteBuffer buffer = ByteBuffer.allocate(2048);
        socketChannel.read(buffer);
        buffer.flip();
        attachment.setRecvString(new String(buffer.array()));
        String responseJsonBody = '';
        if (attachment.getRecvString() != null) {
            try {
                responseJsonBody = attachment.getRecvString().substring(attachment.getRecvString().lastIndexOf('
'));
            } catch (Exception e) {
                System.out.println('get responseJsonBody fail.');
            }
        }
        String sendString='HTTP/1.1 200 OK
' + '
' ;
        ByteBuffer buf = ByteBuffer.allocateDirect(1024);
        buf.put(sendString.getBytes());
        buf.flip();
        attachment.setSendBuffer(buf);
        key.interestOps(SelectionKey.OP_WRITE);
        System.out.print('simplehttpserver.java 接收到平台推送的第' + msgCnt + '消息 :');
        System.out.println(responseJsonBody);
        msgCnt++;
        System.out.println();
        /** 
         * string转json.调用推送数据处理方法 
         */
        try {
            Message sendMsg = new Message('DemoTopic', 'DemoTag', responseJsonBody.getBytes());
            DefaultMQProducer defaultMQProducer = SpringUtils.getBean('producer');
            // 默认3秒超时
            SendResult sendResult = defaultMQProducer.send(sendMsg);
        } catch (Exception e) {
            e.printStackTrace();
        }
    } catch (IOException e) {
        System.out.println(ExceptionUtil.getBriefExceptionStackTrace(e));
    }
}

public void handleWrite(SelectionKey key) {
    SocketAttachment attachment = (SocketAttachment) key.attachment();
    SocketChannel socketChannel = attachment.getSocketChannel();
    ByteBuffer buffer = attachment.getSendBuffer();
    try {
        socketChannel.write(buffer);
        if (buffer.remaining() > 0) {
            // 写入未完成,继续注册写事件
            key.interestOps(SelectionKey.OP_WRITE);
        } else {
            // 写入完成,注册读事件
            key.interestOps(SelectionKey.OP_READ);
        }
    } catch (IOException e) {
        System.out.println(ExceptionUtil.getBriefExceptionStackTrace(e));
    }
}

public class SocketAttachment {
    private SocketChannel socketChannel;
    private String recvString;
    private ByteBuffer sendBuffer;

    public SocketAttachment(SocketChannel socketChannel) {
        this.socketChannel = socketChannel;
    }

    public SocketChannel getSocketChannel() {
        return socketChannel;
    }

    public String getRecvString() {
        return recvString;
    }

    public void setRecvString(String recvString) {
        this.recvString = recvString;
    }

    public ByteBuffer getSendBuffer() {
        return sendBuffer;
    }

    public void setSendBuffer(ByteBuffer sendBuffer) {
        this.sendBuffer = sendBuffer;
    }
}

说明:

  1. 在handle方法中,注册了读事件,并将SocketChannel和一个SocketAttachment对象关联起来,将SocketAttachment作为附件添加到SelectionKey中。
  2. handleRead方法中,从SelectionKey中获取SocketAttachment对象,处理读事件,将接收到的数据保存到SocketAttachment中,并设置发送的响应消息。
  3. 在设置完发送的响应消息后,将SelectionKey的操作设置为写操作,等待下一个事件循环调用handleWrite方法。
  4. handleWrite方法中,从SelectionKey中获取SocketAttachment对象,处理写事件,将响应消息发送给客户端,如果写入未完成则继续注册写事件,否则注册读事件并等待下一个事件循环调用handleRead方法。
  5. 新增了一个SocketAttachment类,用于保存SocketChannel、接收到的数据和要发送的响应消息。

通过使用Reactor模式,将阻塞式的IO操作改为非阻塞式的事件驱动模型,可以提高程序的性能和可扩展性。

使用Reactor模式重构网络代码示例

原文地址: https://www.cveoy.top/t/topic/n6Un 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录