Все для Arduino в одном месте
Адрес: Павлодар
Телефон: +7 705 336-36-03

whatsapp +7 705 336-36-03

Arduino Uno + DHT22 + Python: Система мониторинга температуры и влажности

Общее описание проекта


Система в реальном времени собирает данные с DHT22 через Arduino Uno, отображает интерактивные графики, позволяет:

Синхронизировать период опроса (1–30 сек) между интерфесов в Python и Arduino
Масштабировать и панорамировать графики
Записывать данные в лог в CSV
Возможнсть создать автономный .exe файл (запуск на ПК без Python)

Аппаратная часть

Компонент Подключение
Arduino Uno USB → ПК
DHT22 "VCC → 5V, GND → GND, Data → Пин 2, резистор 10 кОм (VCC ↔ Data)"

 

Шаг 3: Код файлов проекта   

 

 

Код для dht_graph.py
import serial
import tkinter as tk
from tkinter import ttk, messagebox, filedialog
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg, NavigationToolbar2Tk
import matplotlib.animation as animation
from datetime import datetime
import threading
import time
import csv
import os

class DHTReader:
    def __init__(self, root):
        self.root = root
        self.root.title("DHT22 Синхронизированный Мониторинг")
        self.root.geometry("1000x750")

        self.data = []
        self.ser = None
        self.reading = False
        self.log_enabled = False
        self.log_file = None
        self.csv_writer = None
        self.log_file_path = ""

        self.read_interval = 2  # секунды (синхронизировано с Arduino)

        self.setup_gui()

    def setup_gui(self):
        control_frame = ttk.Frame(self.root)
        control_frame.pack(pady=5, fill=tk.X)

        # COM
        ttk.Label(control_frame, text="COM:").grid(row=0, column=0, padx=5, sticky='w')
        self.port_var = tk.StringVar()
        self.port_combo = ttk.Combobox(control_frame, textvariable=self.port_var, width=10)
        self.port_combo.grid(row=0, column=1, padx=5)
        self.refresh_ports()

        ttk.Button(control_frame, text="Обновить", command=self.refresh_ports).grid(row=0, column=2, padx=5)
        ttk.Button(control_frame, text="Подключиться", command=self.connect).grid(row=0, column=3, padx=5)
        ttk.Button(control_frame, text="Отключиться", command=self.disconnect).grid(row=0, column=4, padx=5)

        # Период (синхронизация)
        ttk.Label(control_frame, text="Период (сек):").grid(row=0, column=5, padx=(20,5), sticky='w')
        self.interval_var = tk.IntVar(value=2)
        interval_combo = ttk.Combobox(control_frame, textvariable=self.interval_var, width=6,
                                     values=[1, 2, 5, 10, 30], state="readonly")
        interval_combo.grid(row=0, column=6, padx=5)
        interval_combo.bind("<>", self.sync_interval)

        # Лог
        self.log_var = tk.BooleanVar()
        log_check = ttk.Checkbutton(control_frame, text="Лог в CSV", variable=self.log_var, command=self.toggle_logging)
        log_check.grid(row=0, column=7, padx=(20,5))

        ttk.Button(control_frame, text="Файл", command=self.choose_log_file).grid(row=0, column=8, padx=5)
        self.log_path_label = ttk.Label(control_frame, text="не выбран", foreground="gray", font=("Arial", 8))
        self.log_path_label.grid(row=0, column=9, padx=5, sticky='w')

        # Графики
        self.fig, (self.ax1, self.ax2) = plt.subplots(2, 1, figsize=(11, 6), sharex=True)
        self.canvas = FigureCanvasTkAgg(self.fig, self.root)
        self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)

        toolbar_frame = ttk.Frame(self.root)
        toolbar_frame.pack(fill=tk.X)
        self.toolbar = NavigationToolbar2Tk(self.canvas, toolbar_frame)
        self.toolbar.update()

        self.ani = animation.FuncAnimation(self.fig, self.update_plot, interval=1000, blit=False)

    def refresh_ports(self):
        import serial.tools.list_ports
        ports = [p.device for p in serial.tools.list_ports.comports()]
        self.port_combo['values'] = ports
        if ports:
            self.port_var.set(ports[0])

    def sync_interval(self, event=None):
        new_sec = self.interval_var.get()
        if self.ser and self.ser.is_open:
            try:
                self.ser.write(f"SET:{new_sec}\n".encode())
                response = self.ser.readline().decode().strip()
                if response.startswith("OK:"):
                    self.read_interval = new_sec
                    messagebox.showinfo("Синхронизация", f"Период установлен: {new_sec} сек")
                else:
                    messagebox.showerror("Ошибка", "Arduino не принял команду")
            except:
                messagebox.showerror("Ошибка", "Не удалось отправить команду")
        else:
            messagebox.showwarning("Не подключено", "Сначала подключитесь к COM-порту")

    def choose_log_file(self):
        filename = filedialog.asksaveasfilename(
            defaultextension=".csv",
            filetypes=[("CSV", "*.csv")],
            title="Лог-файл"
        )
        if filename:
            self.log_file_path = filename
            self.log_path_label.config(text=os.path.basename(filename), foreground="green")
            if self.log_enabled:
                self.start_logging()

    def toggle_logging(self):
        self.log_enabled = self.log_var.get()
        if self.log_enabled and not self.log_file_path:
            messagebox.showwarning("Выберите файл", "Укажите путь к CSV")
            self.log_var.set(False)
            return
        if self.log_enabled:
            self.start_logging()
        else:
            self.stop_logging()

    def start_logging(self):
        try:
            self.log_file = open(self.log_file_path, 'w', newline='', encoding='utf-8')
            self.csv_writer = csv.writer(self.log_file)
            self.csv_writer.writerow(["Время", "Температура (°C)", "Влажность (%)"])
        except Exception as e:
            messagebox.showerror("Ошибка", f"Не удалось создать лог:\n{e}")
            self.log_enabled = False
            self.log_var.set(False)

    def stop_logging(self):
        if self.log_file:
            self.log_file.close()
            self.log_file = None
            self.csv_writer = None

    def connect(self):
        if self.reading:
            return
        try:
            self.ser = serial.Serial(self.port_var.get(), 9600, timeout=2)
            time.sleep(2)
            # Проверка связи
            self.ser.write(b"GET\n")
            resp = self.ser.readline().decode().strip()
            if "INT:" in resp:
                current = int(resp.split(":")[1])
                self.interval_var.set(current)
                self.read_interval = current
            self.reading = True
            self.read_thread = threading.Thread(target=self.read_data, daemon=True)
            self.read_thread.start()
            messagebox.showinfo("Готово", f"Подключено. Текущий период: {self.read_interval} сек")
        except Exception as e:
            messagebox.showerror("Ошибка", f"COM-порт:\n{e}")

    def disconnect(self):
        self.reading = False
        if self.ser and self.ser.is_open:
            self.ser.close()
        self.stop_logging()
        messagebox.showinfo("Отключено", "Соединение закрыто")

    def read_data(self):
        while self.reading:
            if self.ser and self.ser.in_waiting > 0:
                try:
                    line = self.ser.readline().decode('utf-8').strip()
                    if ',' in line and not line.startswith(("OK:", "INT:", "ERR")):
                        temp, hum = map(float, line.split(','))
                        ts = datetime.now()
                        self.data.append((ts, temp, hum))

                        if self.log_enabled and self.csv_writer:
                            self.csv_writer.writerow([ts.strftime("%Y-%m-%d %H:%M:%S"), f"{temp:.2f}", f"{hum:.2f}"])
                            self.log_file.flush()

                        print(f"{ts.strftime('%H:%M:%S')} → {temp}°C, {hum}%")
                except:
                    pass
            time.sleep(0.1)

    def update_plot(self, frame):
        if not self.data:
            return
        times = [d[0] for d in self.data]
        temps = [d[1] for d in self.data]
        hums = [d[2] for d in self.data]

        self.ax1.clear()
        self.ax2.clear()
        self.ax1.plot(times, temps, 'r-', label='Температура')
        self.ax2.plot(times, hums, 'b-', label='Влажность')

        for ax in (self.ax1, self.ax2):
            ax.legend(loc='upper left')
            ax.grid(True, alpha=0.3)
        self.ax1.set_ylabel('°C')
        self.ax2.set_ylabel('%')
        self.ax2.set_xlabel('Время')

        self.fig.autofmt_xdate()
        plt.tight_layout()
        self.canvas.draw()


if __name__ == "__main__":
    root = tk.Tk()
    app = DHTReader(root)
    root.mainloop()

 

 

 

Код для dht_sync.ino
#include 

#define DHTPIN 2
#define DHTTYPE DHT22
DHT dht(DHTPIN, DHTTYPE);

unsigned long interval_ms = 2000;  // По умолчанию 2 сек
unsigned long last_send = 0;

void setup() {
  Serial.begin(9600);
  dht.begin();
  Serial.println("DHT22 READY");
  delay(2000);
}

void loop() {
  unsigned long now = millis();

  // Обработка команд от Python
  if (Serial.available()) {
    String cmd = Serial.readStringUntil('\n');
    cmd.trim();
    if (cmd.startsWith("SET:")) {
      int sec = cmd.substring(4).toInt();
      if (sec >= 1 && sec <= 60) {
        interval_ms = sec * 1000UL;
        Serial.print("OK:");
        Serial.println(sec);
      } else {
        Serial.println("ERR:BAD_INTERVAL");
      }
    } else if (cmd == "GET") {
      Serial.print("INT:");
      Serial.println(interval_ms / 1000);
    }
  }

  // Отправка данных по таймеру
  if (now - last_send >= interval_ms) {
    float t = dht.readTemperature();
    float h = dht.readHumidity();

    if (!isnan(t) && !isnan(h)) {
      Serial.print(t, 2);
      Serial.print(",");
      Serial.println(h, 2);
    } else {
      Serial.println("ERR:SENSOR");
    }
    last_send = now;
  }
}

 

 

 

Шаг 4: Загрузка скетча на Arduino

  1. Установи Arduino IDE: arduino.cc
  2. Подключи Arduino Uno по USB
  3. Создать файл dht_sync.ino
  4. Скопировать код
  5. В меню: Tools → Board → Arduino Uno
  6. Tools → Port → COMx (выбери свой порт)
  7. Нажми Upload
  8. Дождись: Done uploading

Шаг 5: Запуск Python-приложения

  1. В cmd перейди в папку проекта: cd D:\DHT22_Monitor
  2. Запустить: python dht_graph.py

Шаг 6: Сборка в EXE (один файл)

pyinstaller --onefile --windowed dht_graph.py

Файл появится: dist\dht_graph.exe
Запускай двойным кликом — без Python

Как пользоваться


Подключение: Выбери COM → Подключиться
Изменить период: Выбери 5 сек → Arduino примет SET:5
Включить лог: Поставь ☑ → выбери файл
Масштаб: Выдели область мышкой
Сброс: Кнопка "Home""

Формат CSV-лога


Время,Температура (°C),Влажность (%)
2025-11-14 15:30:00,24.30,57.80
2025-11-14 15:30:05,24.40,57.90