Java 实现简易 MQTT 服务器:代码示例和常见问题解决

本文将提供一个使用 Java 实现简易 MQTT 服务器的代码示例,并解释常见问题,例如接收不到订阅消息的问题。

代码示例

package com.wang.mqtt;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class SimpleMqttBroker {

    public static void main(String[] args) throws IOException {
        int port = 1883;
        ServerSocket serverSocket = new ServerSocket(port);
        System.out.println('MQTT Broker started on port ' + port);

        while (true) {
            Socket clientSocket = serverSocket.accept();
            System.out.println('New client connected: ' + clientSocket.getInetAddress());

            InputStream is = clientSocket.getInputStream();
            OutputStream os = clientSocket.getOutputStream();

            byte[] buffer = new byte[1024];
            int read = is.read(buffer);

            byte[] connectAck = { 0x20, 0x02, 0x00, 0x00 };
            os.write(connectAck);

            while (true) {
                if (is.available() > 0) { // 检查是否有可用数据
                    read = is.read(buffer);
                    if (read == -1) {
                        break;
                    }
                    int messageType = (buffer[0] >> 4) & 0x0f;
                    if (messageType == 0x03) { // PUBLISH message
                        String topic = readMqttString(buffer, 2);
                        byte[] messageData = new byte[read - topic.length() - 4];
                        System.arraycopy(buffer, topic.length() + 4, messageData, 0, messageData.length);
                        int qosLevel = (buffer[0] >> 1) & 0x03;
                        boolean retained = ((buffer[0] >> 0) & 0x01) == 1;
                        System.out.println('Received message on topic ' + topic + ': ' + new String(messageData));
                        forwardMessageToSubscribers(topic, messageData, qosLevel, retained);
                    }
                }
            }
            clientSocket.close();
            System.out.println('Client disconnected');
        }
    }

    private static String readMqttString(byte[] buffer, int offset) {
        int stringLength = ((buffer[offset] & 0xff) << 8) | (buffer[offset + 1] & 0xff);
        byte[] stringData = new byte[stringLength];
        System.arraycopy(buffer, offset + 2, stringData, 0, stringLength);
        return new String(stringData);
    }

    private static void forwardMessageToSubscribers(String topic, byte[] messageData, int qosLevel, boolean retained) {
        String brokerUrl = 'tcp://localhost:1883';
        String clientId = 'mqtt-broker';
        MemoryPersistence persistence = new MemoryPersistence();

        try {
            MqttClient mqttClient = new MqttClient(brokerUrl, clientId, persistence);
            mqttClient.connect();
            MqttMessage message = new MqttMessage(messageData);
            message.setQos(qosLevel);
            message.setRetained(retained);
            mqttClient.publish(topic, message);
            mqttClient.disconnect();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

}

常见问题解决

问题: 启动后只能接收到第一次订阅得到的消息,并打印到控制台上,后面都不可以。

原因: 问题可能出在 while 循环中的 read = is.read(buffer) 这一行代码上。这行代码是用来读取客户端发送的数据的,如果客户端没有发送新的数据,就会一直阻塞在这里,导致后续的代码无法执行。

解决方法: 可以在读取数据之前先检查是否有可用的数据。可以使用 available() 方法来检查输入流中是否有可用的字节。

在代码示例中,我们已经将 is.available() > 0 添加到 while 循环的条件中,确保只有在有可用数据时才读取数据。

其他注意事项

  • 确保客户端发送的订阅消息是符合 MQTT 协议的格式,包括固定报头和可变报头。你可以使用 MQTT 客户端库来发送订阅消息,例如使用 Eclipse Paho 库的 MqttClient 类。
  • 由于本代码示例仅用于演示目的,实际应用中需要考虑更复杂的情况,例如处理连接断开、心跳检测等。

希望这篇文章能够帮助你理解如何使用 Java 实现简易 MQTT 服务器,并解决一些常见问题。

Java 实现简易 MQTT 服务器:代码示例和常见问题解决

原文地址: https://www.cveoy.top/t/topic/YqU 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录