ESP32 基于MQTT的温湿度和电压监测系统
import time
import network
import onewire, ds18x20
from machine import Pin
from machine import PWM
from machine import ADC
from machine import Timer
from time import sleep_ms
from umqttsimple import MQTTClient
def do_connect(): wlan = network.WLAN(network.STA_IF) wlan.active(True) if not wlan.isconnected(): print('connecting to network...') wlan.connect('许一多的iPhone', '745326830') i = 1 while not wlan.isconnected(): print("正在链接...{}".format(i)) i += 1 time.sleep(1) print('network config:', wlan.ifconfig())
#电压监测 def time0_irq(): adc_vol = 3.3*adc.read()/4095 print("ADC检测电压:%.2fv" %adc_vol) return adc_vol
#温度监测 def read_ds_sensor(): roms = ds_sensor.scan() print('发现设备: ', roms) ds_sensor.convert_temp() for rom in roms: temp = ds_sensor.read_temp(rom) print("DS18B20检测温度:%.1f度" %temp) return temp
def detect(): temperature = read_ds_sensor() voltage = time0_irq() if temperature > 50: i = 0 for t in range(10000): i = not i beep.value(i) time.sleep_us(250) if voltage > 235: i = 0 for t in range(10000): i = not i beep.value(i) time.sleep_us(250) temperature = str(temperature) voltage = str(voltage) c.publish(b"temperature",temperature) c.publish(b"voltage",voltage)
回调函数,收到服务器消息后会调用这个函数
def sub_cb(topic, msg): print(topic, msg) if topic.decode("utf-8") == "ledctl" and msg.decode("utf-8") == "on": led_pin1.value(1) # 控制继电器打开 relay_pin.value(1)
elif topic.decode("utf-8") == "ledctl" and msg.decode("utf-8") == "off":
led_pin1.value(0)
# 控制继电器关闭
relay_pin.value(0)
elif topic.decode("utf-8") == "ledctl" and msg.decode("utf-8") == "overvoltage":
i = 0
for t in range(10000):
i = not i
beep.value(i)
time.sleep_us(250)
# 控制继电器打开
relay_pin.value(1)
elif topic.decode("utf-8") == "ledctl" and msg.decode("utf-8") == "easy":
for i in range(3):
leds.append(Pin(led_pin2[i],Pin.OUT))
for n in range(3):
leds[n].value(0)
for n in range(3):
for n in range(3):
leds[n].value(1)
time.sleep(0.05)
for n in range(3):
leds[n].value(0)
time.sleep(0.05)
# 控制继电器关闭
relay_pin.value(0)
key = Pin(14,Pin.IN,Pin.PULL_UP) led_pin1 = Pin(15, Pin.OUT) beep = Pin(19,Pin.OUT) led_pin2 = [15,16,21,22] leds = [] ad = ADC(Pin(39)) ad .atten(ADC.ATTN_11DB) ds_pin = Pin(26) ds_sensor = ds18x20.DS18X20(onewire.OneWire(ds_pin))
#联网 do_connect()
创建mqt
c = MQTTClient("umqtt_client", "101.42.0.39") # 建立一个MQTT客户端 c.set_callback(sub_cb) # 设置回调函数 c.connect() # 建立连接 c.subscribe(b"ledctl") # 监控ledctl这个通道,接收控制命令 c.subscribe(b"temperature") c.subscribe(b"voltage")
try: #原来的代码放在try语句块中 while True: time.sleep_ms(10) if key.value() == 0: led_pin1.value(1) counter = -1 while True: counter += 1 if counter == 0: detect() time.sleep(1) elif counter > 0: time.sleep_ms(10) c.check_msg() if key.value() == 0: led_pin1.value(0) time.sleep(1) break if counter == 300: counter = 1 detect()
except Exception as e: #捕获所有异常,并打印异常信息 print("Error:", e)
finally: #最终执行的代码,关闭MQTT连接 c.disconnect()
原文地址: https://www.cveoy.top/t/topic/nUxP 著作权归作者所有。请勿转载和采集!