Netty 群聊功能实现:使用 ChannelGroup 实现消息广播和转发
在 Netty 中实现群聊功能,可以通过以下步骤来完成:
- 创建一个 ChannelGroup 对象,用于保存所有连接到服务器的 Channel 对象。
ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
- 在 ChannelHandler 中实现处理消息的方法,包括广播和转发等操作。
public class ChatServerHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
Channel incoming = ctx.channel();
for (Channel channel : channelGroup) {
if (channel != incoming) {
channel.writeAndFlush('[' + incoming.remoteAddress() + '] ' + msg + '\n');
} else {
channel.writeAndFlush('[you] ' + msg + '\n');
}
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
for (Channel channel : channelGroup) {
channel.writeAndFlush('[SERVER] - ' + incoming.remoteAddress() + ' 加入\n');
}
channelGroup.add(incoming);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
for (Channel channel : channelGroup) {
channel.writeAndFlush('[SERVER] - ' + incoming.remoteAddress() + ' 离开\n');
}
channelGroup.remove(incoming);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
System.out.println('Client:' + incoming.remoteAddress() + ' 在线');
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
System.out.println('Client:' + incoming.remoteAddress() + ' 掉线');
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel incoming = ctx.channel();
System.out.println('Client:' + incoming.remoteAddress() + ' 异常');
cause.printStackTrace();
ctx.close();
}
}
- 在 ServerBootstrap 中添加 ChannelHandler,并启动服务器。
public class ChatServer {
private final int port;
public ChatServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ChatServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture f = b.bind(port).sync();
System.out.println('ChatServer 已启动,端口:' + port);
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
new ChatServer(port).run();
}
}
通过以上步骤,我们就可以在 Netty 中实现群聊功能了。当有新的客户端连接到服务器时,服务器会将其加入到 ChannelGroup 中,并向其他客户端发送提示消息。当有客户端发送消息时,服务器会将消息广播给所有客户端,同时在消息前添加发送者的地址信息。当有客户端断开连接时,服务器会将其从 ChannelGroup 中移除,并向其他客户端发送提示消息。
原文地址: http://www.cveoy.top/t/topic/kzmW 著作权归作者所有。请勿转载和采集!