使用 Netty 构建多用户聊天室 - 代码示例

本文通过 Java 代码示例展示如何使用 Netty 库构建一个简单但功能完备的多用户聊天室应用程序,包括用户登录、聊天室列表、群发消息、私聊消息等功能,并深入分析代码实现细节。

服务器端代码

package org.example;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class MultiChatServer {
    private final int port;
    private final ConcurrentHashMap<String, ChannelHandlerContext> userMap = new ConcurrentHashMap<>();
    private final EventExecutorGroup group = new DefaultEventExecutorGroup(16);

    public MultiChatServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();// 创建主线程组
        EventLoopGroup workerGroup = new NioEventLoopGroup();// 创建工作线程组
        try {
            ServerBootstrap b = new ServerBootstrap();// 配置服务器启动参数
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new StringDecoder(), new StringEncoder(), new MultiChatServerHandler(userMap, group));
                        // 添加处理器到通道的pipeline中
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture f = b.bind(port).sync(); // 绑定端口并启动服务器
            f.channel().closeFuture().sync(); // 等待服务器关闭
        } finally {
            workerGroup.shutdownGracefully(); // 关闭工作线程组
            bossGroup.shutdownGracefully(); // 关闭主线程组
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        // 创建服务器实例并运行
        new MultiChatServer(port).run(); // 创建服务器实例并运行
    }
}

package org.example;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.EventExecutorGroup;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
//MultiChatServerHandler类,实现服务器端的消息处理逻辑,包括用户登录、聊天室列表、消息发送和私聊功能。
public class MultiChatServerHandler extends SimpleChannelInboundHandler<String> {
    // 用户映射表,用于保存用户和上下文的映射关系
    private final ConcurrentHashMap<String, ChannelHandlerContext> userMap;
    private final EventExecutorGroup group;
    // 构造函数,初始化用户映射表和事件执行器组
    public MultiChatServerHandler(ConcurrentHashMap<String, ChannelHandlerContext> userMap, EventExecutorGroup group) {
        this.userMap = userMap;
        this.group = group;
    }
    // 当客户端连接到服务器时调用的方法
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println('Client connected: ' + ctx.channel().remoteAddress()); // 客户端连接时输出连接地址
    }
    // 当客户端断开连接时调用的方法
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println('Client disconnected: ' + ctx.channel().remoteAddress()); // 客户端断开连接时输出连接地址
        userMap.values().remove(ctx); // 从用户列表中移除断开连接的客户端
    }
    // 接收到客户端消息时调用的方法
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
////
        if (msg.contains(ctx.channel().remoteAddress().toString())) { // 只有当消息中包含当前客户端的地址时,才打印消息
            System.out.println(msg);
        }
////
        String[] tokens = msg.split('\|'); // 以'|'分割消息
        String command = tokens[0]; // 获取命令类型

        switch (command) {
            case 'LOGIN':
                handleLogin(ctx, tokens[1], tokens[2]); // 处理登录命令
                break;
            case 'LIST':
                handleList(ctx); // 处理获取聊天室列表命令
                break;
            case 'MSG':
                handleMessage(ctx, tokens[1], tokens[2]); // 处理发送消息命令
                break;
            case 'PRIVATE':
                handlePrivateMessage(ctx, tokens[1], tokens[2]); // 处理发送私聊消息命令
                break;
            case 'RECEIVED':
                handleReceivedMessage(ctx, tokens[1]); // 处理接收到消息的确认命令
                break;

            default:
                ctx.writeAndFlush('Invalid command: ' + command); // 发送无效命令的错误消息
                break;
        }
    }

    private void handleLogin(ChannelHandlerContext ctx, String username, String password) {
        if (userMap.containsKey(username)) { // 如果用户已经登录则发送错误消息
            ctx.writeAndFlush('User already logged in: ' + username);
        } else {
            userMap.put(username, ctx); // 将用户添加到用户列表中
            ctx.writeAndFlush('Login successful: ' + username); // 发送登录成功消息
        }
    }

    private void handleList(ChannelHandlerContext ctx) {
        StringBuilder sb = new StringBuilder();
        sb.append('Available chat rooms:
');
        for (Map.Entry<String, ChannelHandlerContext> entry : userMap.entrySet()) {
            sb.append(entry.getKey()).append('
'); // 将聊天室名称添加到消息中
        }
        ctx.writeAndFlush(sb.toString()); // 发送聊天室列表消息
    }
/*
    private void handleMessage(ChannelHandlerContext ctx, String roomName, String message) {
        StringBuilder sb = new StringBuilder();
        sb.append('[').append(ctx.channel().remoteAddress()).append('] ').append(message).append('
');
        for (Map.Entry<String, ChannelHandlerContext> entry : userMap.entrySet()) {
            if (entry.getKey().equals(roomName) && entry.getValue() == ctx) { // 只有当接收消息的客户端与当前客户端相同时,才发送消息给客户端
                entry.getValue().writeAndFlush(sb.toString());
            }
        }
    }


 */
//群发
    private void handleMessage(ChannelHandlerContext ctx, String roomName, String message) {
        StringBuilder sb = new StringBuilder();
        sb.append('[').append(ctx.channel().remoteAddress()).append('] ').append(message).append('
');
        for (Map.Entry<String, ChannelHandlerContext> entry : userMap.entrySet()) {
            if (entry.getKey().equals(roomName)) { // 找到指定聊天室的客户端并发送消息
                entry.getValue().writeAndFlush(sb.toString());
            }
        }
    }

    //私发

    private void handlePrivateMessage(ChannelHandlerContext ctx, String recipient, String message) {
        StringBuilder sb = new StringBuilder();
        sb.append('[').append(ctx.channel().remoteAddress()).append('] (private) ').append(message).append('
');
        if (userMap.containsKey(recipient)) { // 找到指定收件人的客户端并发送私聊消息
            userMap.get(recipient).writeAndFlush(sb.toString());
        } else {
            ctx.writeAndFlush('User not found: ' + recipient); // 发送收件人未找到的错误消息
        }
    }
    ////
    private void handleReceivedMessage(ChannelHandlerContext ctx, String message) {
        if (message.contains(ctx.channel().remoteAddress().toString())) { // 只有当消息中包含当前客户端的地址时,才打印消息
            System.out.println(message);
        }
    }
    ///
/*
    private void handleReceivedMessage(ChannelHandlerContext ctx, String message) {
        System.out.println('Received message: ' + message); // 处理接收到的消息的确认命令
    }


 */
    // 异常处理方法,打印异常信息并关闭连接
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}


package org.example;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.io.BufferedReader;
import java.io.InputStreamReader;

public class MultiChatClient {
    private final String host;
    private final int port;
    private ChannelHandlerContext ctx;

    public MultiChatClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup(); // 创建一个NioEventLoopGroup对象用于处理I/O操作
        try {
            Bootstrap b = new Bootstrap();// 创建一个Bootstrap对象用于配置客户端
            b.group(group)
                    .channel(NioSocketChannel.class)// 指定使用NioSocketChannel作为客户端通道的实现
                    .option(ChannelOption.SO_KEEPALIVE, true)// 设置TCP连接的KeepAlive选项为true
                    .handler(new ChannelInitializer<SocketChannel>() {// 设置ChannelInitializer用于初始化通道
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new StringDecoder(), new StringEncoder(), new MultiChatClientHandler());
                            // 添加StringDecoder、StringEncoder和MultiChatClientHandler到通道的pipeline中
                        }
                    });

            ChannelFuture f = b.connect(host, port).sync(); // // 连接服务器并获取ChannelFuture对象
            ctx = f.channel().pipeline().context(MultiChatClientHandler.class);// 获取MultiChatClientHandler的ChannelHandlerContext对象
            BufferedReader in = new BufferedReader(new InputStreamReader(System.in));// 创建一个BufferedReader用于读取用户输入的消息
            while (true) {
                String line = in.readLine(); //// 读取用户输入的消息
                if (line == null) {// 如果输入的消息为空,则退出循环
                    break;
                }

                ctx.writeAndFlush(line + '
'); // 发送消息给服务器
            }
        } finally {
            group.shutdownGracefully(); // 关闭线程组
        }
    }

    public static void main(String[] args) throws Exception {
        String host = 'localhost';// 设置服务器主机名
        int port = 8080;
        new MultiChatClient(host, port).run(); // 创建客户端实例并运行
    }
}

package org.example;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class MultiChatClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(msg); // 打印接收到的消息
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace(); // 打印异常堆栈信息
        ctx.close(); // 打印异常堆栈信息
    }
}

上述代码中,自定义协议是什么内容:根据代码分析,自定义协议是基于字符串的协议,使用竖线'|'作为分隔符来分割不同字段。客户端和服务器之间通过字符串消息进行通信,包括登录、获取聊天室列表、发送消息、发送私聊消息等功能。

自定义协议分析

该代码使用了一种简单的基于字符串的自定义协议,使用竖线 '|' 作为分隔符来分割不同的字段。

  • 登录命令: LOGIN|用户名|密码
  • 获取聊天室列表命令: LIST
  • 发送消息命令: MSG|聊天室名称|消息内容
  • 发送私聊消息命令: PRIVATE|接收者用户名|消息内容
  • 消息接收确认命令: RECEIVED|消息内容

代码解读

服务器端代码

  1. 创建 ServerBootstrap 对象并配置参数:

    • group(bossGroup, workerGroup):指定主线程组和工作线程组。
    • channel(NioServerSocketChannel.class):指定使用 NIO 的 ServerSocket 通道。
    • childHandler(new ChannelInitializer<SocketChannel>() {}):设置通道初始化器,在新的客户端连接时会调用 initChannel 方法。
    • option(ChannelOption.SO_BACKLOG, 128):设置 TCP 监听队列大小。
    • childOption(ChannelOption.SO_KEEPALIVE, true):设置 TCP 连接的 KeepAlive 选项。
  2. 创建 ChannelInitializer 对象并添加处理器:

    • ch.pipeline().addLast(new StringDecoder(), new StringEncoder(), new MultiChatServerHandler(userMap, group)):在通道的 pipeline 中添加 StringDecoder、StringEncoder 和 MultiChatServerHandler 处理器。
  3. 绑定端口并启动服务器:

    • b.bind(port).sync():绑定指定的端口并启动服务器。
    • f.channel().closeFuture().sync():等待服务器关闭。
  4. 关闭线程组:

    • workerGroup.shutdownGracefully():关闭工作线程组。
    • bossGroup.shutdownGracefully():关闭主线程组。

MultiChatServerHandler 类

  1. 处理客户端连接和断开连接:

    • channelActive(ChannelHandlerContext ctx):当客户端连接到服务器时被调用,打印连接地址。
    • channelInactive(ChannelHandlerContext ctx):当客户端断开连接时被调用,打印断开连接地址,并将该客户端从 userMap 中移除。
  2. 处理接收到的消息:

    • channelRead0(ChannelHandlerContext ctx, String msg):当接收到客户端消息时被调用,解析消息并根据不同的命令类型调用相应的处理方法。
  3. 处理不同的命令:

    • handleLogin(ChannelHandlerContext ctx, String username, String password):处理登录命令,将用户添加到 userMap 中。
    • handleList(ChannelHandlerContext ctx):处理获取聊天室列表命令,将所有用户列表发送给客户端。
    • handleMessage(ChannelHandlerContext ctx, String roomName, String message):处理发送消息命令,将消息转发给指定聊天室的所有用户。
    • handlePrivateMessage(ChannelHandlerContext ctx, String recipient, String message):处理发送私聊消息命令,将消息发送给指定用户。
    • handleReceivedMessage(ChannelHandlerContext ctx, String message):处理接收到的消息的确认命令,打印消息内容。

客户端代码

  1. 创建 Bootstrap 对象并配置参数:

    • group(group):指定线程组。
    • channel(NioSocketChannel.class):指定使用 NIO 的 Socket 通道。
    • option(ChannelOption.SO_KEEPALIVE, true):设置 TCP 连接的 KeepAlive 选项。
    • handler(new ChannelInitializer<SocketChannel>() {}):设置通道初始化器,在连接到服务器时会调用 initChannel 方法。
  2. 创建 ChannelInitializer 对象并添加处理器:

    • ch.pipeline().addLast(new StringDecoder(), new StringEncoder(), new MultiChatClientHandler()):在通道的 pipeline 中添加 StringDecoder、StringEncoder 和 MultiChatClientHandler 处理器。
  3. 连接服务器:

    • b.connect(host, port).sync():连接到指定的服务器和端口。
  4. 获取 MultiChatClientHandler 的 ChannelHandlerContext 对象:

    • ctx = f.channel().pipeline().context(MultiChatClientHandler.class):获取 MultiChatClientHandler 的 ChannelHandlerContext 对象,用于发送消息给服务器。
  5. 读取用户输入并发送消息:

    • BufferedReader in = new BufferedReader(new InputStreamReader(System.in)):创建一个 BufferedReader 对象用于读取用户输入。
    • ctx.writeAndFlush(line + ' '):将用户输入的消息发送给服务器。
  6. 关闭线程组:

    • group.shutdownGracefully():关闭线程组。

MultiChatClientHandler 类

  1. 处理接收到的消息:

    • channelRead0(ChannelHandlerContext ctx, String msg):当接收到服务器消息时被调用,打印消息内容。
  2. 处理异常:

    • exceptionCaught(ChannelHandlerContext ctx, Throwable cause):当出现异常时被调用,打印异常堆栈信息并关闭连接。

总结

本文通过代码示例展示了如何使用 Netty 库构建一个简单的多用户聊天室应用程序,并介绍了自定义协议的定义和代码解读。该代码示例展示了 Netty 库强大的功能,可以轻松地实现各种网络应用程序。

Netty 实现多用户聊天室 - Java 代码示例

原文地址: https://www.cveoy.top/t/topic/f3Dj 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录