SpringBoot + Netty + WebSocket 聊天系统:实现离线消息、未读信息统计、单聊和群聊功能
使用 SpringBoot + Netty + WebSocket 搭建聊天系统:离线消息、未读信息统计、单聊和群聊
本文将介绍如何使用 SpringBoot、Netty 和 WebSocket 开发一个完整的聊天系统,并涵盖以下核心功能:
- 离线消息存储和发送
- 未读信息统计
- 单聊
- 群聊
1. 项目搭建和配置
首先,使用 Spring Boot 搭建项目,并集成 Netty 和 WebSocket。以下是一个简单的示例:
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
@Bean
public NettyServer nettyServer() {
return new NettyServer();
}
}
public class NettyServer {
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
private Channel channel;
public void start() throws Exception {
ServerBootstrap bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(new TextWebSocketFrameHandler());
}
});
ChannelFuture future = bootstrap.bind(8080).sync();
channel = future.channel();
}
public void stop() {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
channel.closeFuture().syncUninterruptibly();
}
}
2. 消息处理器
编写消息处理器,根据消息类型对消息进行处理,例如单聊、群聊等。以下是一个示例:
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private final ObjectMapper objectMapper = new ObjectMapper();
private final Map<String, Channel> channels = new ConcurrentHashMap<>();
private final Map<String, List<Message>> offlineMessages = new ConcurrentHashMap<>();
private final Map<String, Integer> unreadMessages = new ConcurrentHashMap<>();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channels.put(channel.id().asLongText(), channel);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channels.remove(channel.id().asLongText());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
String text = msg.text();
Message message = objectMapper.readValue(text, Message.class);
handleMessage(ctx, message);
}
private void handleMessage(ChannelHandlerContext ctx, Message message) throws Exception {
String type = message.getType();
if ("login".equals(type)) {
handleLogin(ctx, message);
} else if ("logout".equals(type)) {
handleLogout(ctx, message);
} else if ("chat".equals(type)) {
handleChat(ctx, message);
}
}
private void handleLogin(ChannelHandlerContext ctx, Message message) throws Exception {
String userId = message.getUserId();
channels.put(userId, ctx.channel());
List<Message> messages = offlineMessages.remove(userId);
if (messages != null) {
for (Message msg : messages) {
handleChat(ctx, msg);
}
}
}
private void handleLogout(ChannelHandlerContext ctx, Message message) throws Exception {
String userId = message.getUserId();
channels.remove(userId);
}
private void handleChat(ChannelHandlerContext ctx, Message message) throws Exception {
String receiverId = message.getReceiverId();
Channel receiverChannel = channels.get(receiverId);
if (receiverChannel != null) {
receiverChannel.writeAndFlush(new TextWebSocketFrame(objectMapper.writeValueAsString(message)));
} else {
List<Message> messages = offlineMessages.computeIfAbsent(receiverId, k -> new ArrayList<>());
messages.add(message);
}
int count = unreadMessages.compute(receiverId, (k, v) -> v == null ? 1 : v + 1);
}
}
3. 离线消息存储和发送
可以使用缓存或数据库来存储离线消息。以下示例使用 ConcurrentHashMap 来存储离线消息:
private final Map<String, List<Message>> offlineMessages = new ConcurrentHashMap<>();
private void handleLogin(ChannelHandlerContext ctx, Message message) throws Exception {
String userId = message.getUserId();
channels.put(userId, ctx.channel());
List<Message> messages = offlineMessages.remove(userId);
if (messages != null) {
for (Message msg : messages) {
handleChat(ctx, msg);
}
}
}
private void handleChat(ChannelHandlerContext ctx, Message message) throws Exception {
String receiverId = message.getReceiverId();
Channel receiverChannel = channels.get(receiverId);
if (receiverChannel != null) {
receiverChannel.writeAndFlush(new TextWebSocketFrame(objectMapper.writeValueAsString(message)));
} else {
List<Message> messages = offlineMessages.computeIfAbsent(receiverId, k -> new ArrayList<>());
messages.add(message);
}
}
4. 未读信息统计
同样可以使用缓存或数据库来统计未读信息数量。以下示例使用 ConcurrentHashMap 来统计未读信息数量:
private final Map<String, Integer> unreadMessages = new ConcurrentHashMap<>();
private void handleChat(ChannelHandlerContext ctx, Message message) throws Exception {
String receiverId = message.getReceiverId();
Channel receiverChannel = channels.get(receiverId);
if (receiverChannel != null) {
receiverChannel.writeAndFlush(new TextWebSocketFrame(objectMapper.writeValueAsString(message)));
} else {
List<Message> messages = offlineMessages.computeIfAbsent(receiverId, k -> new ArrayList<>());
messages.add(message);
}
int count = unreadMessages.compute(receiverId, (k, v) -> v == null ? 1 : v + 1);
}
5. 单聊和群聊逻辑
在消息处理器中根据消息的 receiverId 判断是单聊还是群聊,并进行相应的处理。以下示例演示了单聊和群聊的逻辑:
private void handleChat(ChannelHandlerContext ctx, Message message) throws Exception {
String receiverId = message.getReceiverId();
if (receiverId == null) {
// 群聊
for (Channel channel : channels.values()) {
if (channel != ctx.channel()) {
channel.writeAndFlush(new TextWebSocketFrame(objectMapper.writeValueAsString(message)));
}
}
} else {
// 单聊
Channel receiverChannel = channels.get(receiverId);
if (receiverChannel != null) {
receiverChannel.writeAndFlush(new TextWebSocketFrame(objectMapper.writeValueAsString(message)));
} else {
List<Message> messages = offlineMessages.computeIfAbsent(receiverId, k -> new ArrayList<>());
messages.add(message);
}
int count = unreadMessages.compute(receiverId, (k, v) -> v == null ? 1 : v + 1);
}
}
总结
以上代码示例仅供参考,实际开发过程中可能需要根据具体需求进行调整。通过以上步骤,可以快速搭建一个完整的聊天系统,实现离线消息、未读信息统计、单聊和群聊等核心功能。
注意:
Message类需要根据实际需求定义,包含消息类型、发送者 ID、接收者 ID、消息内容等属性。- 为了提高性能,可以考虑使用 Redis 或其他缓存数据库来存储离线消息和未读信息统计数据。
- 可以根据实际需求添加其他功能,例如好友管理、群组管理、图片和语音发送等。
- 为了保证安全性,需要对用户输入进行校验和过滤,防止恶意攻击。
- 建议使用单元测试对代码进行验证,确保代码的正确性和稳定性。
希望本文能够帮助您搭建自己的聊天系统。
原文地址: http://www.cveoy.top/t/topic/nI9g 著作权归作者所有。请勿转载和采集!