lcny-cameras-healthbeat/Healthbeat.py

80 lines
2.4 KiB
Python

# coding=utf-8
import os
import cv2
import logging
import psycopg2
import threading
import datetime
from dotenv import load_dotenv
class Camera:
def __init__(self, id, rtsp, status):
self.id = id
self.rtsp = rtsp
self.status = status
def healthbeat(self):
try:
cap = cv2.VideoCapture(self.rtsp)
ret, _ = cap.read()
cap.release()
# 摄像头状态: 1 在线, 2 离线
status = 1 if ret == True else 2
if status == 1:
logging.info("摄像头ID: %s, 状态: 在线" % (self.id))
else:
logging.warning("摄像头ID: %s, 状态: 离线" % (self.id))
if (status == 1 and self.status == 2) or (status == 2 and self.status == 1):
conn = get_connection()
with conn:
with conn.cursor() as cursor:
cursor.execute("update devices set status = %s, updated_at = %s where id = %s", (status, self.id, datetime.datetime.now()))
conn.close()
except Exception as e:
logging.error(f"摄像头ID: {self.id}, 检测状态时发生错误: {str(e)}")
def init_logger():
logging.basicConfig(filename='healthbeat.log', level=logging.INFO, format='[%(asctime)s] %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
def get_connection():
dsn = "host=%s port=%s dbname=%s user=%s password=%s" % (
os.environ.get("DB_HOST"),
os.environ.get("DB_PORT"),
os.environ.get("DB_DATABASE"),
os.environ.get("DB_USERNAME"),
os.environ.get("DB_PASSWORD")
)
return psycopg2.connect(dsn)
if __name__ == '__main__':
load_dotenv()
# 初始化日志格式
init_logger()
conn = get_connection()
with conn:
with conn.cursor() as cursor:
cursor.execute("SELECT id, status, extends FROM devices WHERE type = 1 AND status in (1, 2) ORDER BY id ASC")
rows = cursor.fetchall()
threads = []
for row in rows:
extends = row[2]
if isinstance(extends, dict) and extends["rtsp_url"] != '':
c = Camera(row[0], extends["rtsp_url"], row[1])
t = threading.Thread(target=c.healthbeat)
threads.append(t)
t.start()
for t in threads:
t.join()
conn.close()