使用 SpringBoot + Netty + WebSocket 搭建聊天系统:离线消息、未读信息统计、单聊和群聊

本文将介绍如何使用 SpringBoot、Netty 和 WebSocket 开发一个完整的聊天系统,并涵盖以下核心功能:

  • 离线消息存储和发送
  • 未读信息统计
  • 单聊
  • 群聊

1. 项目搭建和配置

首先,使用 Spring Boot 搭建项目,并集成 Netty 和 WebSocket。以下是一个简单的示例:

@Configuration
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

    @Bean
    public NettyServer nettyServer() {
        return new NettyServer();
    }
}
public class NettyServer {

    private final EventLoopGroup bossGroup = new NioEventLoopGroup();
    private final EventLoopGroup workerGroup = new NioEventLoopGroup();
    private Channel channel;

    public void start() throws Exception {
        ServerBootstrap bootstrap = new ServerBootstrap()
                .group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new HttpServerCodec());
                        pipeline.addLast(new HttpObjectAggregator(65536));
                        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
                        pipeline.addLast(new TextWebSocketFrameHandler());
                    }
                });

        ChannelFuture future = bootstrap.bind(8080).sync();
        channel = future.channel();
    }

    public void stop() {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
        channel.closeFuture().syncUninterruptibly();
    }
}

2. 消息处理器

编写消息处理器,根据消息类型对消息进行处理,例如单聊、群聊等。以下是一个示例:

public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    private final ObjectMapper objectMapper = new ObjectMapper();
    private final Map<String, Channel> channels = new ConcurrentHashMap<>();
    private final Map<String, List<Message>> offlineMessages = new ConcurrentHashMap<>();
    private final Map<String, Integer> unreadMessages = new ConcurrentHashMap<>();

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channels.put(channel.id().asLongText(), channel);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channels.remove(channel.id().asLongText());
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        String text = msg.text();
        Message message = objectMapper.readValue(text, Message.class);
        handleMessage(ctx, message);
    }

    private void handleMessage(ChannelHandlerContext ctx, Message message) throws Exception {
        String type = message.getType();
        if ("login".equals(type)) {
            handleLogin(ctx, message);
        } else if ("logout".equals(type)) {
            handleLogout(ctx, message);
        } else if ("chat".equals(type)) {
            handleChat(ctx, message);
        }
    }

    private void handleLogin(ChannelHandlerContext ctx, Message message) throws Exception {
        String userId = message.getUserId();
        channels.put(userId, ctx.channel());
        List<Message> messages = offlineMessages.remove(userId);
        if (messages != null) {
            for (Message msg : messages) {
                handleChat(ctx, msg);
            }
        }
    }

    private void handleLogout(ChannelHandlerContext ctx, Message message) throws Exception {
        String userId = message.getUserId();
        channels.remove(userId);
    }

    private void handleChat(ChannelHandlerContext ctx, Message message) throws Exception {
        String receiverId = message.getReceiverId();
        Channel receiverChannel = channels.get(receiverId);
        if (receiverChannel != null) {
            receiverChannel.writeAndFlush(new TextWebSocketFrame(objectMapper.writeValueAsString(message)));
        } else {
            List<Message> messages = offlineMessages.computeIfAbsent(receiverId, k -> new ArrayList<>());
            messages.add(message);
        }
        int count = unreadMessages.compute(receiverId, (k, v) -> v == null ? 1 : v + 1);
    }
}

3. 离线消息存储和发送

可以使用缓存或数据库来存储离线消息。以下示例使用 ConcurrentHashMap 来存储离线消息:

private final Map<String, List<Message>> offlineMessages = new ConcurrentHashMap<>();

private void handleLogin(ChannelHandlerContext ctx, Message message) throws Exception {
    String userId = message.getUserId();
    channels.put(userId, ctx.channel());
    List<Message> messages = offlineMessages.remove(userId);
    if (messages != null) {
        for (Message msg : messages) {
            handleChat(ctx, msg);
        }
    }
}

private void handleChat(ChannelHandlerContext ctx, Message message) throws Exception {
    String receiverId = message.getReceiverId();
    Channel receiverChannel = channels.get(receiverId);
    if (receiverChannel != null) {
        receiverChannel.writeAndFlush(new TextWebSocketFrame(objectMapper.writeValueAsString(message)));
    } else {
        List<Message> messages = offlineMessages.computeIfAbsent(receiverId, k -> new ArrayList<>());
        messages.add(message);
    }
}

4. 未读信息统计

同样可以使用缓存或数据库来统计未读信息数量。以下示例使用 ConcurrentHashMap 来统计未读信息数量:

private final Map<String, Integer> unreadMessages = new ConcurrentHashMap<>();

private void handleChat(ChannelHandlerContext ctx, Message message) throws Exception {
    String receiverId = message.getReceiverId();
    Channel receiverChannel = channels.get(receiverId);
    if (receiverChannel != null) {
        receiverChannel.writeAndFlush(new TextWebSocketFrame(objectMapper.writeValueAsString(message)));
    } else {
        List<Message> messages = offlineMessages.computeIfAbsent(receiverId, k -> new ArrayList<>());
        messages.add(message);
    }
    int count = unreadMessages.compute(receiverId, (k, v) -> v == null ? 1 : v + 1);
}

5. 单聊和群聊逻辑

在消息处理器中根据消息的 receiverId 判断是单聊还是群聊,并进行相应的处理。以下示例演示了单聊和群聊的逻辑:

private void handleChat(ChannelHandlerContext ctx, Message message) throws Exception {
    String receiverId = message.getReceiverId();
    if (receiverId == null) {
        // 群聊
        for (Channel channel : channels.values()) {
            if (channel != ctx.channel()) {
                channel.writeAndFlush(new TextWebSocketFrame(objectMapper.writeValueAsString(message)));
            }
        }
    } else {
        // 单聊
        Channel receiverChannel = channels.get(receiverId);
        if (receiverChannel != null) {
            receiverChannel.writeAndFlush(new TextWebSocketFrame(objectMapper.writeValueAsString(message)));
        } else {
            List<Message> messages = offlineMessages.computeIfAbsent(receiverId, k -> new ArrayList<>());
            messages.add(message);
        }
        int count = unreadMessages.compute(receiverId, (k, v) -> v == null ? 1 : v + 1);
    }
}

总结

以上代码示例仅供参考,实际开发过程中可能需要根据具体需求进行调整。通过以上步骤,可以快速搭建一个完整的聊天系统,实现离线消息、未读信息统计、单聊和群聊等核心功能。

注意:

  • Message 类需要根据实际需求定义,包含消息类型、发送者 ID、接收者 ID、消息内容等属性。
  • 为了提高性能,可以考虑使用 Redis 或其他缓存数据库来存储离线消息和未读信息统计数据。
  • 可以根据实际需求添加其他功能,例如好友管理、群组管理、图片和语音发送等。
  • 为了保证安全性,需要对用户输入进行校验和过滤,防止恶意攻击。
  • 建议使用单元测试对代码进行验证,确保代码的正确性和稳定性。

希望本文能够帮助您搭建自己的聊天系统。

SpringBoot + Netty + WebSocket 聊天系统:实现离线消息、未读信息统计、单聊和群聊功能

原文地址: http://www.cveoy.top/t/topic/nI9g 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录