ESP32 使用 MQTT 监控温度和电压,并实现 LED 控制
import time
import network
import onewire, ds18x20
from machine import Pin
from machine import PWM
from machine import ADC
from machine import Timer
from time import sleep_ms
from umqttsimple import MQTTClient
def do_connect():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print('connecting to network...')
wlan.connect('许一多的iPhone', '745326830')
i = 1
while not wlan.isconnected():
print("正在链接...{}".format(i))
i += 1
time.sleep(1)
print('network config:', wlan.ifconfig())
#电压监测
def time0_irq():
adc_vol = 3.3*adc.read()/4095
print("ADC检测电压:%.2fv" %adc_vol)
return adc_vol
#温度监测
def read_ds_sensor():
roms = ds_sensor.scan()
print('发现设备: ', roms)
ds_sensor.convert_temp()
for rom in roms:
temp = ds_sensor.read_temp(rom)
print("DS18B20检测温度:%.1f度" %temp)
return temp
def detect():
temperature = read_ds_sensor()
voltage = time0_irq()
if temperature > 50:
i = 0
for t in range(10000):
i = not i
beep.value(i)
time.sleep_us(250)
if voltage > 235:
i = 0
for t in range(10000):
i = not i
beep.value(i)
time.sleep_us(250)
temperature = str(temperature)
voltage = str(voltage)
c.publish(b"temperature",temperature)
c.publish(b"voltage",voltage)
# 回调函数,收到服务器消息后会调用这个函数
def sub_cb(topic, msg):
print(topic, msg)
if topic.decode("utf-8") == "ledctl" and msg.decode("utf-8") == "on":
led_pin1.value(1)
elif topic.decode("utf-8") == "ledctl" and msg.decode("utf-8") == "off":
led_pin1.value(0)
elif topic.decode("utf-8") == "ledctl" and msg.decode("utf-8") == "overvoltage":
i = 0
for t in range(10000):
i = not i
beep.value(i)
time.sleep_us(250)
elif topic.decode("utf-8") == "ledctl" and msg.decode("utf-8") == "easy":
for i in range(3):
leds.append(Pin(led_pin2[i],Pin.OUT))
for n in range(3):
leds[n].value(0)
for n in range(3):
for n in range(3):
leds[n].value(1)
time.sleep(0.05)
for n in range(3):
leds[n].value(0)
time.sleep(0.05)
key = Pin(14,Pin.IN,Pin.PULL_UP)
led_pin1 = Pin(15, Pin.OUT)
beep = Pin(19,Pin.OUT)
led_pin2 = [15,16,21,22]
leds = []
ad = ADC(Pin(39))
ad.atten(ADC.ATTN_11DB)
ds_pin = Pin(26)
ds_sensor = ds18x20.DS18X20(onewire.OneWire(ds_pin))
#联网
do_connect()
# 创建mqt
c = MQTTClient("umqtt_client", "101.42.0.39") # 建立一个MQTT客户端
c.set_callback(sub_cb) # 设置回调函数
c.connect() # 建立连接
c.subscribe(b"ledctl") # 监控ledctl这个通道,接收控制命令
c.subscribe(b"temperature")
c.subscribe(b"voltage")
try:
while True:
time.sleep_ms(10)
if key.value() == 0:
led_pin1.value(1)
counter = -1
while True:
counter += 1
if counter == 0:
detect()
time.sleep(1)
elif counter > 0:
time.sleep_ms(10)
c.check_msg()
if key.value() == 0:
led_pin1.value(0)
time.sleep(1)
break
if counter == 300:
counter = 1
detect()
except Exception as e:
print("程序出现错误:", e) # 打印错误信息
c.disconnect() # 断开MQTT连接
# 关闭所有GPIO口
led_pin1.value(0)
beep.value(0)
for led in leds:
led.value(0)
# 重新启动
machine.reset()
原文地址: https://www.cveoy.top/t/topic/nUxH 著作权归作者所有。请勿转载和采集!