1. 创建一个ChannelGroup对象,用于保存所有连接的客户端Channel。
public class ChatServerHandler extends SimpleChannelInboundHandler<String> {

    private static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    //...
}
  1. 当有新的客户端连接时,将其Channel添加到ChannelGroup中。
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    Channel incoming = ctx.channel();
    channels.add(incoming);
    //...
}
  1. 当有客户端断开连接时,将其Channel从ChannelGroup中移除。
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    Channel incoming = ctx.channel();
    channels.remove(incoming);
    //...
}
  1. 当有客户端发送消息时,遍历ChannelGroup中的所有Channel,并将消息发送给所有客户端。
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
    Channel incoming = ctx.channel();
    for (Channel channel : channels) {
        if (channel != incoming) {
            channel.writeAndFlush("[" + incoming.remoteAddress() + "] " + msg + "\n");
        }
    }
}

完整代码如下:

public class ChatServerHandler extends SimpleChannelInboundHandler<String> {

    private static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();
        channels.add(incoming);
        for (Channel channel : channels) {
            if (channel != incoming) {
                channel.writeAndFlush("【系统】" + incoming.remoteAddress() + " 加入\n");
            }
        }
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();
        channels.remove(incoming);
        for (Channel channel : channels) {
            if (channel != incoming) {
                channel.writeAndFlush("【系统】" + incoming.remoteAddress() + " 离开\n");
            }
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        Channel incoming = ctx.channel();
        for (Channel channel : channels) {
            if (channel != incoming) {
                channel.writeAndFlush("[" + incoming.remoteAddress() + "] " + msg + "\n");
            }
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        Channel incoming = ctx.channel();
        System.out.println("【系统】" + incoming.remoteAddress() + " 异常");
        cause.printStackTrace();
        ctx.close();
    }
}
``
netty实现群聊功能:在业务逻辑处理器中实现群聊功能包括消息的广播和转发等操作

原文地址: https://www.cveoy.top/t/topic/db5f 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录