package org.example; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup;

import java.util.Map; import java.util.concurrent.ConcurrentHashMap;

public class MultiChatServer { private final int port; private final ConcurrentHashMap<String, ChannelHandlerContext> userMap = new ConcurrentHashMap<>(); private final EventExecutorGroup group = new DefaultEventExecutorGroup(16);

public MultiChatServer(int port) {
    this.port = port;
}

public void run() throws Exception {
    EventLoopGroup bossGroup = new NioEventLoopGroup();// 创建主线程组
    EventLoopGroup workerGroup = new NioEventLoopGroup();// 创建工作线程组
    try {
        ServerBootstrap b = new ServerBootstrap();// 配置服务器启动参数
        b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringDecoder(), new StringEncoder(), new MultiChatServerHandler(userMap, group));
                    // 添加处理器到通道的pipeline中
                    }
                })
                .option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true);

        ChannelFuture f = b.bind(port).sync(); // 绑定端口并启动服务器
        f.channel().closeFuture().sync(); // 等待服务器关闭
    } finally {
        workerGroup.shutdownGracefully(); // 关闭工作线程组
        bossGroup.shutdownGracefully(); // 关闭主线程组
    }
}

public static void main(String[] args) throws Exception {
    int port = 8080;
    // 创建服务器实例并运行
    new MultiChatServer(port).run(); // 创建服务器实例并运行
}

}

package org.example; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.concurrent.EventExecutorGroup;

import java.util.Map; import java.util.concurrent.ConcurrentHashMap; //MultiChatServerHandler类,实现服务器端的消息处理逻辑,包括用户登录、聊天室列表、消息发送和私聊功能。 public class MultiChatServerHandler extends SimpleChannelInboundHandler { // 用户映射表,用于保存用户和上下文的映射关系 private final ConcurrentHashMap<String, ChannelHandlerContext> userMap; private final EventExecutorGroup group; // 构造函数,初始化用户映射表和事件执行器组 public MultiChatServerHandler(ConcurrentHashMap<String, ChannelHandlerContext> userMap, EventExecutorGroup group) { this.userMap = userMap; this.group = group; } // 当客户端连接到服务器时调用的方法 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println('Client connected: ' + ctx.channel().remoteAddress()); // 客户端连接时输出连接地址 } // 当客户端断开连接时调用的方法 @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println('Client disconnected: ' + ctx.channel().remoteAddress()); // 客户端断开连接时输出连接地址 userMap.values().remove(ctx); // 从用户列表中移除断开连接的客户端 } // 接收到客户端消息时调用的方法 @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { //// if (msg.contains(ctx.channel().remoteAddress().toString())) { // 只有当消息中包含当前客户端的地址时,才打印消息 System.out.println(msg); } //// String[] tokens = msg.split('|'); // 以'|'分割消息 String command = tokens[0]; // 获取命令类型

    switch (command) {
        case 'LOGIN':
            handleLogin(ctx, tokens[1], tokens[2]); // 处理登录命令
            break;
        case 'LIST':
            handleList(ctx); // 处理获取聊天室列表命令
            break;
        case 'MSG':
            handleMessage(ctx, tokens[1], tokens[2]); // 处理发送消息命令
            break;
        case 'PRIVATE':
            handlePrivateMessage(ctx, tokens[1], tokens[2]); // 处理发送私聊消息命令
            break;
        case 'RECEIVED':
            handleReceivedMessage(ctx, tokens[1]); // 处理接收到消息的确认命令
            break;

        default:
            ctx.writeAndFlush('Invalid command: ' + command); // 发送无效命令的错误消息
            break;
    }
}

private void handleLogin(ChannelHandlerContext ctx, String username, String password) {
    if (userMap.containsKey(username)) { // 如果用户已经登录则发送错误消息
        ctx.writeAndFlush('User already logged in: ' + username);
    } else {
        userMap.put(username, ctx); // 将用户添加到用户列表中
        ctx.writeAndFlush('Login successful: ' + username); // 发送登录成功消息
    }
}

private void handleList(ChannelHandlerContext ctx) {
    StringBuilder sb = new StringBuilder();
    sb.append('Available chat rooms:

'); for (Map.Entry<String, ChannelHandlerContext> entry : userMap.entrySet()) { sb.append(entry.getKey()).append(' '); // 将聊天室名称添加到消息中 } ctx.writeAndFlush(sb.toString()); // 发送聊天室列表消息 } /* private void handleMessage(ChannelHandlerContext ctx, String roomName, String message) { StringBuilder sb = new StringBuilder(); sb.append('[').append(ctx.channel().remoteAddress()).append('] ').append(message).append(' '); for (Map.Entry<String, ChannelHandlerContext> entry : userMap.entrySet()) { if (entry.getKey().equals(roomName) && entry.getValue() == ctx) { // 只有当接收消息的客户端与当前客户端相同时,才发送消息给客户端 entry.getValue().writeAndFlush(sb.toString()); } } }

*/ //群发 private void handleMessage(ChannelHandlerContext ctx, String roomName, String message) { StringBuilder sb = new StringBuilder(); sb.append('[').append(ctx.channel().remoteAddress()).append('] ').append(message).append(' '); for (Map.Entry<String, ChannelHandlerContext> entry : userMap.entrySet()) { if (entry.getKey().equals(roomName)) { // 找到指定聊天室的客户端并发送消息 entry.getValue().writeAndFlush(sb.toString()); } } }

//私发

private void handlePrivateMessage(ChannelHandlerContext ctx, String recipient, String message) {
    StringBuilder sb = new StringBuilder();
    sb.append('[').append(ctx.channel().remoteAddress()).append('] (private) ').append(message).append('

'); if (userMap.containsKey(recipient)) { // 找到指定收件人的客户端并发送私聊消息 userMap.get(recipient).writeAndFlush(sb.toString()); } else { ctx.writeAndFlush('User not found: ' + recipient); // 发送收件人未找到的错误消息 } } //// private void handleReceivedMessage(ChannelHandlerContext ctx, String message) { if (message.contains(ctx.channel().remoteAddress().toString())) { // 只有当消息中包含当前客户端的地址时,才打印消息 System.out.println(message); } } /// /* private void handleReceivedMessage(ChannelHandlerContext ctx, String message) { System.out.println('Received message: ' + message); // 处理接收到的消息的确认命令 }

*/ // 异常处理方法,打印异常信息并关闭连接 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }

package org.example; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder;

import java.io.BufferedReader; import java.io.InputStreamReader;

public class MultiChatClient { private final String host; private final int port; private ChannelHandlerContext ctx;

public MultiChatClient(String host, int port) {
    this.host = host;
    this.port = port;
}

public void run() throws Exception {
    EventLoopGroup group = new NioEventLoopGroup(); // 创建一个NioEventLoopGroup对象用于处理I/O操作
    try {
        Bootstrap b = new Bootstrap();// 创建一个Bootstrap对象用于配置客户端
        b.group(group)
                .channel(NioSocketChannel.class)// 指定使用NioSocketChannel作为客户端通道的实现
                .option(ChannelOption.SO_KEEPALIVE, true)// 设置TCP连接的KeepAlive选项为true
                .handler(new ChannelInitializer<SocketChannel>() {// 设置ChannelInitializer用于初始化通道
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringDecoder(), new StringEncoder(), new MultiChatClientHandler());
                        // 添加StringDecoder、StringEncoder和MultiChatClientHandler到通道的pipeline中
                    }
                });

        ChannelFuture f = b.connect(host, port).sync(); // // 连接服务器并获取ChannelFuture对象
        ctx = f.channel().pipeline().context(MultiChatClientHandler.class);// 获取MultiChatClientHandler的ChannelHandlerContext对象
        BufferedReader in = new BufferedReader(new InputStreamReader(System.in));// 创建一个BufferedReader用于读取用户输入的消息
        while (true) {
            String line = in.readLine(); //// 读取用户输入的消息
            if (line == null) {// 如果输入的消息为空,则退出循环
                break;
            }

            ctx.writeAndFlush(line + '

'); // 发送消息给服务器 } } finally { group.shutdownGracefully(); // 关闭线程组 } }

public static void main(String[] args) throws Exception {
    String host = 'localhost';// 设置服务器主机名
    int port = 8080;
    new MultiChatClient(host, port).run(); // 创建客户端实例并运行
}

}

package org.example; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler;

public class MultiChatClientHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(msg); // 打印接收到的消息 }

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    cause.printStackTrace(); // 打印异常堆栈信息
    ctx.close(); // 打印异常堆栈信息
}

}

上述代码中的自定义协议是什么内容:上述代码中的自定义协议是通过字符串进行通信的简单协议。客户端和服务器之间通过字符串进行消息的传递和处理。客户端发送的消息格式为命令+参数,服务器接收到消息后根据命令类型进行相应的处理。例如,LOGIN命令用于用户登录,LIST命令用于获取聊天室列表,MSG命令用于发送消息到指定聊天室,PRIVATE命令用于发送私聊消息等。

Netty 多人聊天室示例代码:服务器端和客户端实现

原文地址: https://www.cveoy.top/t/topic/f3Es 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录