Java 实现简易 MQTT 服务器:代码示例和常见问题解决
Java 实现简易 MQTT 服务器:代码示例和常见问题解决
本文将提供一个使用 Java 实现简易 MQTT 服务器的代码示例,并解释常见问题,例如接收不到订阅消息的问题。
代码示例
package com.wang.mqtt;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class SimpleMqttBroker {
public static void main(String[] args) throws IOException {
int port = 1883;
ServerSocket serverSocket = new ServerSocket(port);
System.out.println('MQTT Broker started on port ' + port);
while (true) {
Socket clientSocket = serverSocket.accept();
System.out.println('New client connected: ' + clientSocket.getInetAddress());
InputStream is = clientSocket.getInputStream();
OutputStream os = clientSocket.getOutputStream();
byte[] buffer = new byte[1024];
int read = is.read(buffer);
byte[] connectAck = { 0x20, 0x02, 0x00, 0x00 };
os.write(connectAck);
while (true) {
if (is.available() > 0) { // 检查是否有可用数据
read = is.read(buffer);
if (read == -1) {
break;
}
int messageType = (buffer[0] >> 4) & 0x0f;
if (messageType == 0x03) { // PUBLISH message
String topic = readMqttString(buffer, 2);
byte[] messageData = new byte[read - topic.length() - 4];
System.arraycopy(buffer, topic.length() + 4, messageData, 0, messageData.length);
int qosLevel = (buffer[0] >> 1) & 0x03;
boolean retained = ((buffer[0] >> 0) & 0x01) == 1;
System.out.println('Received message on topic ' + topic + ': ' + new String(messageData));
forwardMessageToSubscribers(topic, messageData, qosLevel, retained);
}
}
}
clientSocket.close();
System.out.println('Client disconnected');
}
}
private static String readMqttString(byte[] buffer, int offset) {
int stringLength = ((buffer[offset] & 0xff) << 8) | (buffer[offset + 1] & 0xff);
byte[] stringData = new byte[stringLength];
System.arraycopy(buffer, offset + 2, stringData, 0, stringLength);
return new String(stringData);
}
private static void forwardMessageToSubscribers(String topic, byte[] messageData, int qosLevel, boolean retained) {
String brokerUrl = 'tcp://localhost:1883';
String clientId = 'mqtt-broker';
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient mqttClient = new MqttClient(brokerUrl, clientId, persistence);
mqttClient.connect();
MqttMessage message = new MqttMessage(messageData);
message.setQos(qosLevel);
message.setRetained(retained);
mqttClient.publish(topic, message);
mqttClient.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
常见问题解决
问题: 启动后只能接收到第一次订阅得到的消息,并打印到控制台上,后面都不可以。
原因: 问题可能出在 while 循环中的 read = is.read(buffer) 这一行代码上。这行代码是用来读取客户端发送的数据的,如果客户端没有发送新的数据,就会一直阻塞在这里,导致后续的代码无法执行。
解决方法: 可以在读取数据之前先检查是否有可用的数据。可以使用 available() 方法来检查输入流中是否有可用的字节。
在代码示例中,我们已经将 is.available() > 0 添加到 while 循环的条件中,确保只有在有可用数据时才读取数据。
其他注意事项
- 确保客户端发送的订阅消息是符合 MQTT 协议的格式,包括固定报头和可变报头。你可以使用 MQTT 客户端库来发送订阅消息,例如使用 Eclipse Paho 库的
MqttClient类。 - 由于本代码示例仅用于演示目的,实际应用中需要考虑更复杂的情况,例如处理连接断开、心跳检测等。
希望这篇文章能够帮助你理解如何使用 Java 实现简易 MQTT 服务器,并解决一些常见问题。
原文地址: https://www.cveoy.top/t/topic/YqU 著作权归作者所有。请勿转载和采集!