# coding=utf-8 import os import cv2 import re import logging import psycopg2 import threading import datetime from dotenv import load_dotenv class Camera: def __init__(self, id, rtsp, status): self.id = id self.rtsp = rtsp self.status = status def healthbeat(self): try: # 摄像头状态: 1 在线, 2 离线 status = 2 cap = cv2.VideoCapture(self.rtsp) if cap.isOpened(): ret, _ = cap.read() cap.release() if ret == True: status = 1 if status == 1: logging.info("摄像头ID: %s, 状态: 在线" % (self.id)) else: logging.warning("摄像头ID: %s, 状态: 离线" % (self.id)) if (status == 1 and self.status == 2) or (status == 2 and self.status == 1): conn = get_connection() with conn: with conn.cursor() as cursor: cursor.execute("update devices set status = %s, updated_at = %s where id = %s", (status, datetime.datetime.now(), self.id)) conn.close() except Exception as e: logging.error(f"摄像头ID: {self.id}, 检测状态时发生错误: {str(e)}") def init_logger(): logging.basicConfig(filename='healthbeat.log', level=logging.INFO, format='[%(asctime)s] %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S') def get_connection(): dsn = "host=%s port=%s dbname=%s user=%s password=%s" % ( os.environ.get("DB_HOST"), os.environ.get("DB_PORT"), os.environ.get("DB_DATABASE"), os.environ.get("DB_USERNAME"), os.environ.get("DB_PASSWORD") ) return psycopg2.connect(dsn) if __name__ == '__main__': load_dotenv() # 初始化日志格式 init_logger() conn = get_connection() with conn: with conn.cursor() as cursor: cursor.execute("SELECT id, status, extends FROM devices WHERE type = 1 AND status in (1, 2) ORDER BY id DESC") rows = cursor.fetchall() threads = [] for row in rows: extends = row[2] if not isinstance(extends, dict) or extends["rtsp_url"] is None or not re.search(r'^rtsp:', extends["rtsp_url"]): continue c = Camera(row[0], extends["rtsp_url"], row[1]) t = threading.Thread(target=c.healthbeat) threads.append(t) t.start() for t in threads: t.join() conn.close()