ESP32 温度、电压监测并通过 MQTT 与服务器通信
import time
import network
import onewire, ds18x20
from machine import Pin
from machine import PWM
from machine import ADC
from machine import Timer
from time import sleep_ms
from umqttsimple import MQTTClient
def do_connect():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print('connecting to network...')
wlan.connect('许一多的iPhone', '745326830')
i = 1
while not wlan.isconnected():
print("正在链接...{}".format(i))
i += 1
time.sleep(1)
print('network config:', wlan.ifconfig())
#电压监测
def time0_irq():
try:
adc_vol = 3.3*adc.read()/4095
print("ADC检测电压:%.2fv" %adc_vol)
return adc_vol
except Exception as e:
print("电压监测错误:", e)
return None
#温度监测
def read_ds_sensor():
try:
roms = ds_sensor.scan()
print('发现设备: ', roms)
ds_sensor.convert_temp()
for rom in roms:
temp = ds_sensor.read_temp(rom)
print("DS18B20检测温度:%.1f度" %temp)
return temp
except Exception as e:
print("温度监测错误:", e)
return None
def detect():
try:
temperature = read_ds_sensor()
voltage = time0_irq()
if temperature > 50:
i = 0
for t in range(10000):
i = not i
beep.value(i)
time.sleep_us(250)
if voltage > 235:
i = 0
for t in range(10000):
i = not i
beep.value(i)
time.sleep_us(250)
temperature = str(temperature)
voltage = str(voltage)
c.publish(b"temperature",temperature)
c.publish(b"voltage",voltage)
except Exception as e:
print("监测数据发送错误:", e)
# 回调函数,收到服务器消息后会调用这个函数
def sub_cb(topic, msg):
try:
print(topic, msg)
if topic.decode("utf-8") == "ledctl" and msg.decode("utf-8") == "on":
led_pin1.value(1)
elif topic.decode("utf-8") == "ledctl" and msg.decode("utf-8") == "off":
led_pin1.value(0)
elif topic.decode("utf-8") == "ledctl" and msg.decode("utf-8") == "overvoltage":
i = 0
for t in range(10000):
i = not i
beep.value(i)
time.sleep_us(250)
elif topic.decode("utf-8") == "ledctl" and msg.decode("utf-8") == "easy":
for i in range(3):
leds.append(Pin(led_pin2[i],Pin.OUT))
for n in range(3):
leds[n].value(0)
for n in range(3):
for n in range(3):
leds[n].value(1)
time.sleep(0.05)
for n in range(3):
leds[n].value(0)
time.sleep(0.05)
except Exception as e:
print("接收服务器指令错误:", e)
key = Pin(14,Pin.IN,Pin.PULL_UP)
led_pin1 = Pin(15, Pin.OUT)
beep = Pin(19,Pin.OUT)
led_pin2 = [15,16,21,22]
leds = []
ad = ADC(Pin(39))
ad.atten(ADC.ATTN_11DB)
ds_pin = Pin(26)
ds_sensor = ds18x20.DS18X20(onewire.OneWire(ds_pin))
#联网
do_connect()
# 创建mqt
try:
c = MQTTClient("umqtt_client", "101.42.0.39") # 建立一个MQTT客户端
c.set_callback(sub_cb) # 设置回调函数
c.connect() # 建立连接
c.subscribe(b"ledctl") # 监控ledctl这个通道,接收控制命令
c.subscribe(b"temperature")
c.subscribe(b"voltage")
print("MQTT连接成功")
except Exception as e:
print("MQTT连接失败:", e)
while True:
time.sleep_ms(10)
if key.value() == 0:
led_pin1.value(1)
counter = -1
while True:
counter += 1
if counter == 0:
detect()
time.sleep(1)
elif counter > 0:
time.sleep_ms(10)
try:
c.check_msg()
except Exception as e:
print("检查MQTT消息错误:", e)
if key.value() == 0:
led_pin1.value(0)
time.sleep(1)
break
if counter == 300:
counter = 1
detect()
原文地址: https://www.cveoy.top/t/topic/nUxD 著作权归作者所有。请勿转载和采集!