Netty 实现多用户聊天室:服务器、客户端和消息处理
Netty 多用户聊天室
本示例使用 Netty 库构建了一个多用户聊天室,涵盖了服务器、客户端和消息处理等部分。
服务器端 (MultiChatServer.java)javapackage org.example;
import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import io.netty.util.concurrent.DefaultEventExecutorGroup;import io.netty.util.concurrent.EventExecutorGroup;
import java.util.Map;import java.util.concurrent.ConcurrentHashMap;
public class MultiChatServer { private final int port; private final ConcurrentHashMap<String, ChannelHandlerContext> userMap = new ConcurrentHashMap<>(); private final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
public MultiChatServer(int port) { this.port = port; }
public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup();// 创建主线程组 EventLoopGroup workerGroup = new NioEventLoopGroup();// 创建工作线程组 try { ServerBootstrap b = new ServerBootstrap();// 配置服务器启动参数 b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new StringDecoder(), new StringEncoder(), new MultiChatServerHandler(userMap, group)); // 添加处理器到通道的pipeline中 } }) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture f = b.bind(port).sync(); // 绑定端口并启动服务器 f.channel().closeFuture().sync(); // 等待服务器关闭 } finally { workerGroup.shutdownGracefully(); // 关闭工作线程组 bossGroup.shutdownGracefully(); // 关闭主线程组 } }
public static void main(String[] args) throws Exception { int port = 8080; // 创建服务器实例并运行 new MultiChatServer(port).run(); // 创建服务器实例并运行 }}
服务器端消息处理 (MultiChatServerHandler.java)javapackage org.example;
import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.util.concurrent.EventExecutorGroup;
import java.util.Map;import java.util.concurrent.ConcurrentHashMap;
//MultiChatServerHandler类,实现服务器端的消息处理逻辑,包括用户登录、聊天室列表、消息发送和私聊功能。public class MultiChatServerHandler extends SimpleChannelInboundHandler
switch (command) { case 'LOGIN': handleLogin(ctx, tokens[1], tokens[2]); // 处理登录命令 break; case 'LIST': handleList(ctx); // 处理获取聊天室列表命令 break; case 'MSG': handleMessage(ctx, tokens[1], tokens[2]); // 处理发送消息命令 break; case 'PRIVATE': handlePrivateMessage(ctx, tokens[1], tokens[2]); // 处理发送私聊消息命令 break; case 'RECEIVED': handleReceivedMessage(ctx, tokens[1]); // 处理接收到消息的确认命令 break;
default: ctx.writeAndFlush('Invalid command: ' + command); // 发送无效命令的错误消息 break; } }
private void handleLogin(ChannelHandlerContext ctx, String username, String password) { if (userMap.containsKey(username)) { // 如果用户已经登录则发送错误消息 ctx.writeAndFlush('User already logged in: ' + username); } else { userMap.put(username, ctx); // 将用户添加到用户列表中 ctx.writeAndFlush('Login successful: ' + username); // 发送登录成功消息 } }
private void handleList(ChannelHandlerContext ctx) { StringBuilder sb = new StringBuilder(); sb.append('Available chat rooms:
'); for (Map.Entry<String, ChannelHandlerContext> entry : userMap.entrySet()) { sb.append(entry.getKey()).append(' '); // 将聊天室名称添加到消息中 } ctx.writeAndFlush(sb.toString()); // 发送聊天室列表消息 }
//群发 private void handleMessage(ChannelHandlerContext ctx, String roomName, String message) { StringBuilder sb = new StringBuilder(); sb.append('[').append(ctx.channel().remoteAddress()).append('] ').append(message).append('
'); for (Map.Entry<String, ChannelHandlerContext> entry : userMap.entrySet()) { if (entry.getKey().equals(roomName)) { // 找到指定聊天室的客户端并发送消息 entry.getValue().writeAndFlush(sb.toString()); } } }
//私发 private void handlePrivateMessage(ChannelHandlerContext ctx, String recipient, String message) { StringBuilder sb = new StringBuilder(); sb.append('[').append(ctx.channel().remoteAddress()).append('] (private) ').append(message).append('
'); if (userMap.containsKey(recipient)) { // 找到指定收件人的客户端并发送私聊消息 userMap.get(recipient).writeAndFlush(sb.toString()); } else { ctx.writeAndFlush('User not found: ' + recipient); // 发送收件人未找到的错误消息 } }
private void handleReceivedMessage(ChannelHandlerContext ctx, String message) { if (message.contains(ctx.channel().remoteAddress().toString())) { // 只有当消息中包含当前客户端的地址时,才打印消息 System.out.println(message); } }
// 异常处理方法,打印异常信息并关闭连接 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}
客户端 (MultiChatClient.java)javapackage org.example;
import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;
import java.io.BufferedReader;import java.io.InputStreamReader;
public class MultiChatClient { private final String host; private final int port; private ChannelHandlerContext ctx;
public MultiChatClient(String host, int port) { this.host = host; this.port = port; }
public void run() throws Exception { EventLoopGroup group = new NioEventLoopGroup(); // 创建一个NioEventLoopGroup对象用于处理I/O操作 try { Bootstrap b = new Bootstrap();// 创建一个Bootstrap对象用于配置客户端 b.group(group) .channel(NioSocketChannel.class)// 指定使用NioSocketChannel作为客户端通道的实现 .option(ChannelOption.SO_KEEPALIVE, true)// 设置TCP连接的KeepAlive选项为true .handler(new ChannelInitializer<SocketChannel>() {// 设置ChannelInitializer用于初始化通道 @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new StringDecoder(), new StringEncoder(), new MultiChatClientHandler()); // 添加StringDecoder、StringEncoder和MultiChatClientHandler到通道的pipeline中 } });
ChannelFuture f = b.connect(host, port).sync(); // // 连接服务器并获取ChannelFuture对象 ctx = f.channel().pipeline().context(MultiChatClientHandler.class);// 获取MultiChatClientHandler的ChannelHandlerContext对象 BufferedReader in = new BufferedReader(new InputStreamReader(System.in));// 创建一个BufferedReader用于读取用户输入的消息 while (true) { String line = in.readLine(); //// 读取用户输入的消息 if (line == null) {// 如果输入的消息为空,则退出循环 break; }
ctx.writeAndFlush(line + '
'); // 发送消息给服务器 } } finally { group.shutdownGracefully(); // 关闭线程组 } }
public static void main(String[] args) throws Exception { String host = 'localhost';// 设置服务器主机名 int port = 8080; new MultiChatClient(host, port).run(); // 创建客户端实例并运行 }}
客户端消息处理 (MultiChatClientHandler.java)javapackage org.example;
import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;
public class MultiChatClientHandler extends SimpleChannelInboundHandler
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); // 打印异常堆栈信息 ctx.close(); // 打印异常堆栈信息
原文地址: https://www.cveoy.top/t/topic/f3Fw 著作权归作者所有。请勿转载和采集!