Arduino Uno + DHT22 + Python: Система мониторинга температуры и влажности
Общее описание проекта
Система в реальном времени собирает данные с DHT22 через Arduino Uno, отображает интерактивные графики, позволяет:
Синхронизировать период опроса (1–30 сек) между интерфесов в Python и Arduino
Масштабировать и панорамировать графики
Записывать данные в лог в CSV
Возможнсть создать автономный .exe файл (запуск на ПК без Python)
Аппаратная часть
| Компонент | Подключение |
| Arduino Uno | USB → ПК |
| DHT22 | "VCC → 5V, GND → GND, Data → Пин 2, резистор 10 кОм (VCC ↔ Data)" |
Шаг 3: Код файлов проекта
Код для dht_graph.py
import serial
import tkinter as tk
from tkinter import ttk, messagebox, filedialog
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg, NavigationToolbar2Tk
import matplotlib.animation as animation
from datetime import datetime
import threading
import time
import csv
import os
class DHTReader:
def __init__(self, root):
self.root = root
self.root.title("DHT22 Синхронизированный Мониторинг")
self.root.geometry("1000x750")
self.data = []
self.ser = None
self.reading = False
self.log_enabled = False
self.log_file = None
self.csv_writer = None
self.log_file_path = ""
self.read_interval = 2 # секунды (синхронизировано с Arduino)
self.setup_gui()
def setup_gui(self):
control_frame = ttk.Frame(self.root)
control_frame.pack(pady=5, fill=tk.X)
# COM
ttk.Label(control_frame, text="COM:").grid(row=0, column=0, padx=5, sticky='w')
self.port_var = tk.StringVar()
self.port_combo = ttk.Combobox(control_frame, textvariable=self.port_var, width=10)
self.port_combo.grid(row=0, column=1, padx=5)
self.refresh_ports()
ttk.Button(control_frame, text="Обновить", command=self.refresh_ports).grid(row=0, column=2, padx=5)
ttk.Button(control_frame, text="Подключиться", command=self.connect).grid(row=0, column=3, padx=5)
ttk.Button(control_frame, text="Отключиться", command=self.disconnect).grid(row=0, column=4, padx=5)
# Период (синхронизация)
ttk.Label(control_frame, text="Период (сек):").grid(row=0, column=5, padx=(20,5), sticky='w')
self.interval_var = tk.IntVar(value=2)
interval_combo = ttk.Combobox(control_frame, textvariable=self.interval_var, width=6,
values=[1, 2, 5, 10, 30], state="readonly")
interval_combo.grid(row=0, column=6, padx=5)
interval_combo.bind("<>", self.sync_interval)
# Лог
self.log_var = tk.BooleanVar()
log_check = ttk.Checkbutton(control_frame, text="Лог в CSV", variable=self.log_var, command=self.toggle_logging)
log_check.grid(row=0, column=7, padx=(20,5))
ttk.Button(control_frame, text="Файл", command=self.choose_log_file).grid(row=0, column=8, padx=5)
self.log_path_label = ttk.Label(control_frame, text="не выбран", foreground="gray", font=("Arial", 8))
self.log_path_label.grid(row=0, column=9, padx=5, sticky='w')
# Графики
self.fig, (self.ax1, self.ax2) = plt.subplots(2, 1, figsize=(11, 6), sharex=True)
self.canvas = FigureCanvasTkAgg(self.fig, self.root)
self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
toolbar_frame = ttk.Frame(self.root)
toolbar_frame.pack(fill=tk.X)
self.toolbar = NavigationToolbar2Tk(self.canvas, toolbar_frame)
self.toolbar.update()
self.ani = animation.FuncAnimation(self.fig, self.update_plot, interval=1000, blit=False)
def refresh_ports(self):
import serial.tools.list_ports
ports = [p.device for p in serial.tools.list_ports.comports()]
self.port_combo['values'] = ports
if ports:
self.port_var.set(ports[0])
def sync_interval(self, event=None):
new_sec = self.interval_var.get()
if self.ser and self.ser.is_open:
try:
self.ser.write(f"SET:{new_sec}\n".encode())
response = self.ser.readline().decode().strip()
if response.startswith("OK:"):
self.read_interval = new_sec
messagebox.showinfo("Синхронизация", f"Период установлен: {new_sec} сек")
else:
messagebox.showerror("Ошибка", "Arduino не принял команду")
except:
messagebox.showerror("Ошибка", "Не удалось отправить команду")
else:
messagebox.showwarning("Не подключено", "Сначала подключитесь к COM-порту")
def choose_log_file(self):
filename = filedialog.asksaveasfilename(
defaultextension=".csv",
filetypes=[("CSV", "*.csv")],
title="Лог-файл"
)
if filename:
self.log_file_path = filename
self.log_path_label.config(text=os.path.basename(filename), foreground="green")
if self.log_enabled:
self.start_logging()
def toggle_logging(self):
self.log_enabled = self.log_var.get()
if self.log_enabled and not self.log_file_path:
messagebox.showwarning("Выберите файл", "Укажите путь к CSV")
self.log_var.set(False)
return
if self.log_enabled:
self.start_logging()
else:
self.stop_logging()
def start_logging(self):
try:
self.log_file = open(self.log_file_path, 'w', newline='', encoding='utf-8')
self.csv_writer = csv.writer(self.log_file)
self.csv_writer.writerow(["Время", "Температура (°C)", "Влажность (%)"])
except Exception as e:
messagebox.showerror("Ошибка", f"Не удалось создать лог:\n{e}")
self.log_enabled = False
self.log_var.set(False)
def stop_logging(self):
if self.log_file:
self.log_file.close()
self.log_file = None
self.csv_writer = None
def connect(self):
if self.reading:
return
try:
self.ser = serial.Serial(self.port_var.get(), 9600, timeout=2)
time.sleep(2)
# Проверка связи
self.ser.write(b"GET\n")
resp = self.ser.readline().decode().strip()
if "INT:" in resp:
current = int(resp.split(":")[1])
self.interval_var.set(current)
self.read_interval = current
self.reading = True
self.read_thread = threading.Thread(target=self.read_data, daemon=True)
self.read_thread.start()
messagebox.showinfo("Готово", f"Подключено. Текущий период: {self.read_interval} сек")
except Exception as e:
messagebox.showerror("Ошибка", f"COM-порт:\n{e}")
def disconnect(self):
self.reading = False
if self.ser and self.ser.is_open:
self.ser.close()
self.stop_logging()
messagebox.showinfo("Отключено", "Соединение закрыто")
def read_data(self):
while self.reading:
if self.ser and self.ser.in_waiting > 0:
try:
line = self.ser.readline().decode('utf-8').strip()
if ',' in line and not line.startswith(("OK:", "INT:", "ERR")):
temp, hum = map(float, line.split(','))
ts = datetime.now()
self.data.append((ts, temp, hum))
if self.log_enabled and self.csv_writer:
self.csv_writer.writerow([ts.strftime("%Y-%m-%d %H:%M:%S"), f"{temp:.2f}", f"{hum:.2f}"])
self.log_file.flush()
print(f"{ts.strftime('%H:%M:%S')} → {temp}°C, {hum}%")
except:
pass
time.sleep(0.1)
def update_plot(self, frame):
if not self.data:
return
times = [d[0] for d in self.data]
temps = [d[1] for d in self.data]
hums = [d[2] for d in self.data]
self.ax1.clear()
self.ax2.clear()
self.ax1.plot(times, temps, 'r-', label='Температура')
self.ax2.plot(times, hums, 'b-', label='Влажность')
for ax in (self.ax1, self.ax2):
ax.legend(loc='upper left')
ax.grid(True, alpha=0.3)
self.ax1.set_ylabel('°C')
self.ax2.set_ylabel('%')
self.ax2.set_xlabel('Время')
self.fig.autofmt_xdate()
plt.tight_layout()
self.canvas.draw()
if __name__ == "__main__":
root = tk.Tk()
app = DHTReader(root)
root.mainloop()
Код для dht_sync.ino
#include
#define DHTPIN 2
#define DHTTYPE DHT22
DHT dht(DHTPIN, DHTTYPE);
unsigned long interval_ms = 2000; // По умолчанию 2 сек
unsigned long last_send = 0;
void setup() {
Serial.begin(9600);
dht.begin();
Serial.println("DHT22 READY");
delay(2000);
}
void loop() {
unsigned long now = millis();
// Обработка команд от Python
if (Serial.available()) {
String cmd = Serial.readStringUntil('\n');
cmd.trim();
if (cmd.startsWith("SET:")) {
int sec = cmd.substring(4).toInt();
if (sec >= 1 && sec <= 60) {
interval_ms = sec * 1000UL;
Serial.print("OK:");
Serial.println(sec);
} else {
Serial.println("ERR:BAD_INTERVAL");
}
} else if (cmd == "GET") {
Serial.print("INT:");
Serial.println(interval_ms / 1000);
}
}
// Отправка данных по таймеру
if (now - last_send >= interval_ms) {
float t = dht.readTemperature();
float h = dht.readHumidity();
if (!isnan(t) && !isnan(h)) {
Serial.print(t, 2);
Serial.print(",");
Serial.println(h, 2);
} else {
Serial.println("ERR:SENSOR");
}
last_send = now;
}
}
Шаг 4: Загрузка скетча на Arduino
- Установи Arduino IDE: arduino.cc
- Подключи Arduino Uno по USB
- Создать файл dht_sync.ino
- Скопировать код
- В меню: Tools → Board → Arduino Uno
- Tools → Port → COMx (выбери свой порт)
- Нажми Upload
- Дождись: Done uploading
Шаг 5: Запуск Python-приложения
- В cmd перейди в папку проекта: cd D:\DHT22_Monitor
- Запустить: python dht_graph.py
Шаг 6: Сборка в EXE (один файл)
pyinstaller --onefile --windowed dht_graph.py
Файл появится: dist\dht_graph.exe
Запускай двойным кликом — без Python
Как пользоваться
Подключение: Выбери COM → Подключиться
Изменить период: Выбери 5 сек → Arduino примет SET:5
Включить лог: Поставь ☑ → выбери файл
Масштаб: Выдели область мышкой
Сброс: Кнопка "Home""
Формат CSV-лога
Время,Температура (°C),Влажность (%)
2025-11-14 15:30:00,24.30,57.80
2025-11-14 15:30:05,24.40,57.90


