Netty 实现多人聊天室:MultiChat 协议详解
使用 Netty 实现多人聊天室:MultiChat 协议详解
本文将详细介绍使用 Netty 库构建多人聊天室的实现,包括 MultiChat 协议的定义、服务器端和客户端的代码示例以及关键功能的实现细节。
1. MultiChat 协议定义
MultiChat 协议定义了用于在服务器和客户端之间进行通信的命令格式,主要包含以下几种命令:
- 登录命令:
LOGIN|username|password,用于用户登录聊天室。 - 聊天室列表命令:
LIST,用于获取当前可用的聊天室列表。 - 发送消息命令:
MSG|roomName|message,用于向指定聊天室发送消息。 - 私聊消息命令:
PRIVATE|recipient|message,用于向指定用户发送私聊消息。 - 接收到消息的确认命令:
RECEIVED|message,用于确认接收到消息。
2. 服务器端实现
服务器端代码主要使用 Netty 的 ServerBootstrap 类创建服务器实例,并通过 ChannelInitializer 对每个新连接的通道进行初始化,添加解码器、编码器和消息处理器。
package org.example;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class MultiChatServer {
private final int port;
private final ConcurrentHashMap<String, ChannelHandlerContext> userMap = new ConcurrentHashMap<>();
private final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
public MultiChatServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();// 创建主线程组
EventLoopGroup workerGroup = new NioEventLoopGroup();// 创建工作线程组
try {
ServerBootstrap b = new ServerBootstrap();// 配置服务器启动参数
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder(), new StringEncoder(), new MultiChatServerHandler(userMap, group));
// 添加处理器到通道的pipeline中
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture f = b.bind(port).sync(); // 绑定端口并启动服务器
f.channel().closeFuture().sync(); // 等待服务器关闭
} finally {
workerGroup.shutdownGracefully(); // 关闭工作线程组
bossGroup.shutdownGracefully(); // 关闭主线程组
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
// 创建服务器实例并运行
new MultiChatServer(port).run(); // 创建服务器实例并运行
}
}
package org.example;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.EventExecutorGroup;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
//MultiChatServerHandler类,实现服务器端的消息处理逻辑,包括用户登录、聊天室列表、消息发送和私聊功能。
public class MultiChatServerHandler extends SimpleChannelInboundHandler<String> {
// 用户映射表,用于保存用户和上下文的映射关系
private final ConcurrentHashMap<String, ChannelHandlerContext> userMap;
private final EventExecutorGroup group;
// 构造函数,初始化用户映射表和事件执行器组
public MultiChatServerHandler(ConcurrentHashMap<String, ChannelHandlerContext> userMap, EventExecutorGroup group) {
this.userMap = userMap;
this.group = group;
}
// 当客户端连接到服务器时调用的方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println('Client connected: ' + ctx.channel().remoteAddress()); // 客户端连接时输出连接地址
}
// 当客户端断开连接时调用的方法
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println('Client disconnected: ' + ctx.channel().remoteAddress()); // 客户端断开连接时输出连接地址
userMap.values().remove(ctx); // 从用户列表中移除断开连接的客户端
}
// 接收到客户端消息时调用的方法
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
////
if (msg.contains(ctx.channel().remoteAddress().toString())) { // 只有当消息中包含当前客户端的地址时,才打印消息
System.out.println(msg);
}
////
String[] tokens = msg.split('\|'); // 以'|'分割消息
String command = tokens[0]; // 获取命令类型
switch (command) {
case 'LOGIN':
handleLogin(ctx, tokens[1], tokens[2]); // 处理登录命令
break;
case 'LIST':
handleList(ctx); // 处理获取聊天室列表命令
break;
case 'MSG':
handleMessage(ctx, tokens[1], tokens[2]); // 处理发送消息命令
break;
case 'PRIVATE':
handlePrivateMessage(ctx, tokens[1], tokens[2]); // 处理发送私聊消息命令
break;
case 'RECEIVED':
handleReceivedMessage(ctx, tokens[1]); // 处理接收到消息的确认命令
break;
default:
ctx.writeAndFlush('Invalid command: ' + command); // 发送无效命令的错误消息
break;
}
}
private void handleLogin(ChannelHandlerContext ctx, String username, String password) {
if (userMap.containsKey(username)) { // 如果用户已经登录则发送错误消息
ctx.writeAndFlush('User already logged in: ' + username);
} else {
userMap.put(username, ctx); // 将用户添加到用户列表中
ctx.writeAndFlush('Login successful: ' + username); // 发送登录成功消息
}
}
private void handleList(ChannelHandlerContext ctx) {
StringBuilder sb = new StringBuilder();
sb.append('Available chat rooms:
');
for (Map.Entry<String, ChannelHandlerContext> entry : userMap.entrySet()) {
sb.append(entry.getKey()).append('
'); // 将聊天室名称添加到消息中
}
ctx.writeAndFlush(sb.toString()); // 发送聊天室列表消息
}
/*
private void handleMessage(ChannelHandlerContext ctx, String roomName, String message) {
StringBuilder sb = new StringBuilder();
sb.append('[').append(ctx.channel().remoteAddress()).append('] ').append(message).append('
');
for (Map.Entry<String, ChannelHandlerContext> entry : userMap.entrySet()) {
if (entry.getKey().equals(roomName) && entry.getValue() == ctx) { // 只有当接收消息的客户端与当前客户端相同时,才发送消息给客户端
entry.getValue().writeAndFlush(sb.toString());
}
}
}
*/
//群发
private void handleMessage(ChannelHandlerContext ctx, String roomName, String message) {
StringBuilder sb = new StringBuilder();
sb.append('[').append(ctx.channel().remoteAddress()).append('] ').append(message).append('
');
for (Map.Entry<String, ChannelHandlerContext> entry : userMap.entrySet()) {
if (entry.getKey().equals(roomName)) { // 找到指定聊天室的客户端并发送消息
entry.getValue().writeAndFlush(sb.toString());
}
}
}
//私发
private void handlePrivateMessage(ChannelHandlerContext ctx, String recipient, String message) {
StringBuilder sb = new StringBuilder();
sb.append('[').append(ctx.channel().remoteAddress()).append('] (private) ').append(message).append('
');
if (userMap.containsKey(recipient)) { // 找到指定收件人的客户端并发送私聊消息
userMap.get(recipient).writeAndFlush(sb.toString());
} else {
ctx.writeAndFlush('User not found: ' + recipient); // 发送收件人未找到的错误消息
}
}
////
private void handleReceivedMessage(ChannelHandlerContext ctx, String message) {
if (message.contains(ctx.channel().remoteAddress().toString())) { // 只有当消息中包含当前客户端的地址时,才打印消息
System.out.println(message);
}
}
///
/*
private void handleReceivedMessage(ChannelHandlerContext ctx, String message) {
System.out.println('Received message: ' + message); // 处理接收到的消息的确认命令
}
*/
// 异常处理方法,打印异常信息并关闭连接
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
package org.example;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.io.BufferedReader;
import java.io.InputStreamReader;
public class MultiChatClient {
private final String host;
private final int port;
private ChannelHandlerContext ctx;
public MultiChatClient(String host, int port) {
this.host = host;
this.port = port;
}
public void run() throws Exception {
EventLoopGroup group = new NioEventLoopGroup(); // 创建一个NioEventLoopGroup对象用于处理I/O操作
try {
Bootstrap b = new Bootstrap();// 创建一个Bootstrap对象用于配置客户端
b.group(group)
.channel(NioSocketChannel.class)// 指定使用NioSocketChannel作为客户端通道的实现
.option(ChannelOption.SO_KEEPALIVE, true)// 设置TCP连接的KeepAlive选项为true
.handler(new ChannelInitializer<SocketChannel>() {// 设置ChannelInitializer用于初始化通道
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder(), new StringEncoder(), new MultiChatClientHandler());
// 添加StringDecoder、StringEncoder和MultiChatClientHandler到通道的pipeline中
}
});
ChannelFuture f = b.connect(host, port).sync(); // // 连接服务器并获取ChannelFuture对象
ctx = f.channel().pipeline().context(MultiChatClientHandler.class);// 获取MultiChatClientHandler的ChannelHandlerContext对象
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));// 创建一个BufferedReader用于读取用户输入的消息
while (true) {
String line = in.readLine(); //// 读取用户输入的消息
if (line == null) {// 如果输入的消息为空,则退出循环
break;
}
ctx.writeAndFlush(line + '
'); // 发送消息给服务器
}
} finally {
group.shutdownGracefully(); // 关闭线程组
}
}
public static void main(String[] args) throws Exception {
String host = 'localhost';// 设置服务器主机名
int port = 8080;
new MultiChatClient(host, port).run(); // 创建客户端实例并运行
}
}
package org.example;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class MultiChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg); // 打印接收到的消息
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace(); // 打印异常堆栈信息
ctx.close(); // 打印异常堆栈信息
}
}
上述代码是怎么定义MultiChat协议的,怎么实现的内容:上述代码实现了一个基于Netty的多人聊天室(MultiChat)的服务器和客户端。下面是MultiChat协议的定义和实现:
1. MultiChat协议的定义:
- 登录命令:LOGIN|username|password,用于用户登录聊天室。
- 聊天室列表命令:LIST,用于获取当前可用的聊天室列表。
- 发送消息命令:MSG|roomName|message,用于向指定聊天室发送消息。
- 私聊消息命令:PRIVATE|recipient|message,用于向指定用户发送私聊消息。
- 接收到消息的确认命令:RECEIVED|message,用于确认接收到消息。
2. MultiChat服务器的实现:
- 使用Netty的ServerBootstrap创建服务器并绑定端口。
- 创建主线程组和工作线程组,分别用于处理服务器的主要逻辑和网络I/O操作。
- 配置服务器启动参数,包括通道类型、处理器和选项。
- 实现MultiChatServerHandler类,继承自SimpleChannelInboundHandler,用于处理服务器端的消息逻辑。
- 在channelActive方法中,当客户端连接到服务器时,输出连接地址。
- 在channelInactive方法中,当客户端断开连接时,输出连接地址,并从用户列表中移除断开连接的客户端。
- 在channelRead0方法中,根据接收到的消息的命令类型,调用相应的处理方法。
- 处理登录命令:判断用户是否已经登录,如果已登录则发送错误消息,否则将用户添加到用户列表中并发送登录成功消息。
- 处理获取聊天室列表命令:遍历用户列表,将聊天室名称添加到消息中并发送。
- 处理发送消息命令:遍历用户列表,找到指定聊天室的客户端并发送消息。
- 处理发送私聊消息命令:根据收件人的用户名找到对应的客户端并发送私聊消息。
- 处理接收到消息的确认命令:打印接收到的消息。
- 实现异常处理方法,打印异常信息并关闭连接。
3. MultiChat客户端的实现:
- 使用Netty的Bootstrap创建客户端。
- 创建一个NioEventLoopGroup对象用于处理I/O操作。
- 配置客户端启动参数,包括通道类型、处理器和选项。
- 实现MultiChatClientHandler类,继承自SimpleChannelInboundHandler,用于处理客户端的消息逻辑。
- 在channelRead0方法中,打印接收到的消息。
- 实现异常处理方法,打印异常信息并关闭连接。
- 在run方法中,连接服务器并获取ChannelFuture对象。
- 获取MultiChatClientHandler的ChannelHandlerContext对象。
- 创建一个BufferedReader用于读取用户输入的消息。
- 循环读取用户输入的消息,并发送给服务器。
通过以上实现,服务器和客户端可以通过MultiChat协议进行通信,实现多人聊天室的功能。
原文地址: https://www.cveoy.top/t/topic/f3ET 著作权归作者所有。请勿转载和采集!