import time
import network
import onewire, ds18x20
from machine import Pin  
from machine import PWM
from machine import ADC
from machine import Timer
from time import sleep_ms
from umqttsimple import MQTTClient


def do_connect():
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)
    if not wlan.isconnected():
        print('connecting to network...')
        wlan.connect('许一多的iPhone', '745326830')
        i = 1
        while not wlan.isconnected():
            print("正在链接...{}".format(i))
            i += 1
            time.sleep(1)
    print('network config:', wlan.ifconfig())


#电压监测
def time0_irq():
    try:
        adc_vol = 3.3*adc.read()/4095
        print("ADC检测电压:%.2fv" %adc_vol)
        return adc_vol
    except Exception as e:
        print("电压监测错误:", e)
        return None


#温度监测
def read_ds_sensor():
    try:
        roms = ds_sensor.scan()
        print('发现设备: ', roms)
        ds_sensor.convert_temp()
        for rom in roms:
            temp = ds_sensor.read_temp(rom)
            print("DS18B20检测温度:%.1f度" %temp)
        return temp
    except Exception as e:
        print("温度监测错误:", e)
        return None


def detect():
    try:
        temperature = read_ds_sensor()
        voltage = time0_irq()
        if temperature > 50:
            i = 0
            for t in range(10000):
                i = not i
                beep.value(i)
                time.sleep_us(250)
        if voltage > 235:
            i = 0
            for t in range(10000):
                i = not i
                beep.value(i)
                time.sleep_us(250)
        temperature = str(temperature)
        voltage = str(voltage)
        c.publish(b"temperature",temperature)
        c.publish(b"voltage",voltage)
    except Exception as e:
        print("监测数据发送错误:", e)


# 回调函数,收到服务器消息后会调用这个函数
def sub_cb(topic, msg): 
    try:
        print(topic, msg)
        if topic.decode("utf-8") == "ledctl" and msg.decode("utf-8") == "on":
            led_pin1.value(1)

        elif topic.decode("utf-8") == "ledctl" and msg.decode("utf-8") == "off":
            led_pin1.value(0)

        elif topic.decode("utf-8") == "ledctl" and msg.decode("utf-8") == "overvoltage":
            i = 0
            for t in range(10000):
                i = not i
                beep.value(i)
                time.sleep_us(250)
        
        elif topic.decode("utf-8") == "ledctl" and msg.decode("utf-8") == "easy":
            
            for i in range(3):
                leds.append(Pin(led_pin2[i],Pin.OUT))

            for n in range(3):
                leds[n].value(0)
            
            for n in range(3):
            
                for n in range(3):
                    leds[n].value(1)
                    time.sleep(0.05)
                
                for n in range(3):
                    leds[n].value(0)
                    time.sleep(0.05)
    except Exception as e:
        print("接收服务器指令错误:", e)


key = Pin(14,Pin.IN,Pin.PULL_UP)
led_pin1 = Pin(15, Pin.OUT)
beep = Pin(19,Pin.OUT)
led_pin2 = [15,16,21,22]
leds = []
ad = ADC(Pin(39))
ad.atten(ADC.ATTN_11DB)
ds_pin = Pin(26)
ds_sensor = ds18x20.DS18X20(onewire.OneWire(ds_pin))


#联网
do_connect()

# 创建mqt
try:
    c = MQTTClient("umqtt_client", "101.42.0.39")  # 建立一个MQTT客户端
    c.set_callback(sub_cb)  # 设置回调函数
    c.connect()  # 建立连接
    c.subscribe(b"ledctl")  # 监控ledctl这个通道,接收控制命令
    c.subscribe(b"temperature")
    c.subscribe(b"voltage")
    print("MQTT连接成功")
except Exception as e:
    print("MQTT连接失败:", e)


while True:
    time.sleep_ms(10)
    if key.value() == 0:
        led_pin1.value(1)
        counter = -1
        while True:
            counter += 1
            if counter == 0:
                detect()
                time.sleep(1)
            elif counter > 0:
                time.sleep_ms(10)
                try:
                    c.check_msg()
                except Exception as e:
                    print("检查MQTT消息错误:", e)
                if key.value() == 0:
                    led_pin1.value(0)
                    time.sleep(1)
                    break
                if counter == 300:
                    counter = 1
                    detect()
ESP32 温度、电压监测并通过 MQTT 与服务器通信

原文地址: https://www.cveoy.top/t/topic/nUxD 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录