# coding=utf-8 import os import cv2 import logging import pymysql import threading import json from dotenv import load_dotenv class Camera: def __init__(self, id, rtsp, status): self.id = id self.rtsp = rtsp self.status = status def healthbeat(self): cap = cv2.VideoCapture(self.rtsp) ret, _ = cap.read() cap.release() # 摄像头状态: 1 在线, 2 离线 status = 1 if ret == True else 2 if status == 1: logging.info("摄像头ID: %s, 状态: 在线" % (self.id)) else: logging.warning("摄像头ID: %s, 状态: 离线" % (self.id)) if (status == 1 and self.status == 2) or (status == 2 and self.status == 1): conn = get_connection() with conn: with conn.cursor() as cursor: cursor.execute("update devices set state = %s where id = %s", (status, self.id)) def init_logger(): logging.basicConfig(filename='healthbeat.log', level=logging.INFO, format='[%(asctime)s] %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S') def get_connection(): return pymysql.connect( host=os.environ.get("DB_HOST"), port=int(os.environ.get("DB_PORT")), user=os.environ.get("DB_USERNAME"), password=os.environ.get("DB_PASSWORD"), database=os.environ.get("DB_DATABASE"), charset='utf8mb4' ) if __name__ == '__main__': load_dotenv() # 初始化日志格式 init_logger() conn = get_connection() with conn: with conn.cursor() as cursor: cursor.execute("SELECT id, state, extends FROM devices WHERE type = 1 AND state in (1, 2) ORDER BY id ASC") rows = cursor.fetchall() for row in rows: extends = json.loads(row[2]) c = Camera(row[0], extends["rtsp_url"], row[1]) t = threading.Thread(target=c.healthbeat) t.start()