import time import network import onewire, ds18x20 from machine import Pin
from machine import PWM from machine import ADC from machine import Timer from time import sleep_ms from umqttsimple import MQTTClient

def do_connect(): wlan = network.WLAN(network.STA_IF) wlan.active(True) if not wlan.isconnected(): print('connecting to network...') wlan.connect('许一多的iPhone', '745326830') i = 1 while not wlan.isconnected(): print("正在链接...{}".format(i)) i += 1 time.sleep(1) print('network config:', wlan.ifconfig())

#电压监测 def time0_irq(): adc_vol = 3.3*adc.read()/4095 print("ADC检测电压:%.2fv" %adc_vol) return adc_vol

#温度监测 def read_ds_sensor(): roms = ds_sensor.scan() print('发现设备: ', roms) ds_sensor.convert_temp() for rom in roms: temp = ds_sensor.read_temp(rom) print("DS18B20检测温度:%.1f度" %temp) return temp

def detect(): temperature = read_ds_sensor() voltage = time0_irq() if temperature > 50: i = 0 for t in range(10000): i = not i beep.value(i) time.sleep_us(250) if voltage > 235: i = 0 for t in range(10000): i = not i beep.value(i) time.sleep_us(250) temperature = str(temperature) voltage = str(voltage) c.publish(b"temperature",temperature) c.publish(b"voltage",voltage)

回调函数,收到服务器消息后会调用这个函数

def sub_cb(topic, msg): print(topic, msg) if topic.decode("utf-8") == "ledctl" and msg.decode("utf-8") == "on": led_pin1.value(1) # 控制继电器打开 relay_pin.value(1)

elif topic.decode("utf-8") == "ledctl" and msg.decode("utf-8") == "off":
    led_pin1.value(0)
    # 控制继电器关闭
    relay_pin.value(0)

elif topic.decode("utf-8") == "ledctl" and msg.decode("utf-8") == "overvoltage":
    i = 0
    for t in range(10000):
        i = not i
        beep.value(i)
        time.sleep_us(250)
    # 控制继电器打开
    relay_pin.value(1)

elif topic.decode("utf-8") == "ledctl" and msg.decode("utf-8") == "easy":
    for i in range(3):
        leds.append(Pin(led_pin2[i],Pin.OUT))

    for n in range(3):
        leds[n].value(0)

    for n in range(3):
    
        for n in range(3):
            leds[n].value(1)
            time.sleep(0.05)
        
        for n in range(3):
            leds[n].value(0)
            time.sleep(0.05)
    # 控制继电器关闭
    relay_pin.value(0)

key = Pin(14,Pin.IN,Pin.PULL_UP) led_pin1 = Pin(15, Pin.OUT) beep = Pin(19,Pin.OUT) led_pin2 = [15,16,21,22] leds = [] ad = ADC(Pin(39)) ad .atten(ADC.ATTN_11DB) ds_pin = Pin(26) ds_sensor = ds18x20.DS18X20(onewire.OneWire(ds_pin))

#联网 do_connect()

创建mqt

c = MQTTClient("umqtt_client", "101.42.0.39") # 建立一个MQTT客户端 c.set_callback(sub_cb) # 设置回调函数 c.connect() # 建立连接 c.subscribe(b"ledctl") # 监控ledctl这个通道,接收控制命令 c.subscribe(b"temperature") c.subscribe(b"voltage")

try: #原来的代码放在try语句块中 while True: time.sleep_ms(10) if key.value() == 0: led_pin1.value(1) counter = -1 while True: counter += 1 if counter == 0: detect() time.sleep(1) elif counter > 0: time.sleep_ms(10) c.check_msg() if key.value() == 0: led_pin1.value(0) time.sleep(1) break if counter == 300: counter = 1 detect()

except Exception as e: #捕获所有异常,并打印异常信息 print("Error:", e)

finally: #最终执行的代码,关闭MQTT连接 c.disconnect()

ESP32 基于MQTT的温湿度和电压监测系统

原文地址: https://www.cveoy.top/t/topic/nUxP 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录