#!/usr/bin/env python3 import cv2 import numpy as np import subprocess import os import json import time import requests # ---------- Camera ---------- CAMERA = "/dev/video0" TMP_IMG = "/tmp/desk_current.jpg" PREV_IMG = "/tmp/desk_prev.jpg" # ---------- State files ---------- STATE_FILE = "/tmp/desk_presence_score.json" HA_STATE_FILE = "/tmp/desk_presence_last_ha.json" LIGHT_COOLDOWN_FILE = "/tmp/desk_light_cooldown.json" # ---------- Haar cascade ---------- FACE_CASCADE = cv2.CascadeClassifier("cascades/haarcascade_frontalface_default.xml") if FACE_CASCADE.empty(): raise RuntimeError("Failed to load Haar cascade") # ---------- Home Assistant ---------- HA_URL = os.environ.get("HA_URL", "http://192.168.0.202:8123") HA_ENTITY_ID = os.environ.get("HA_ENTITY_ID", "binary_sensor.desk_presence_vision") HA_TOKEN = os.environ.get("HA_TOKEN") if not HA_TOKEN: raise RuntimeError("HA_TOKEN environment variable not set") # ---------- Presence logic ---------- MAX_SCORE = 5 FACE_BOOST = 2 MOTION_BOOST = 0 # motion never increases score DECAY = 1 MOTION_AREA_THRESHOLD = 8000 LIGHT_COOLDOWN_SECONDS = 15 # Adaptive delays (seconds) SCORE_DELAYS = { 0: 30, 1: 15, 2: 60, 3: 120, 4: 240, 5: 480, } # -------------------------------------------------- def safe_delete(path): try: if os.path.exists(path): os.remove(path) except Exception: pass def capture(): subprocess.run( [ "ffmpeg", "-loglevel", "quiet", "-f", "v4l2", "-i", CAMERA, "-frames:v", "1", TMP_IMG, ], check=True, ) def load_json(path, default): if not os.path.exists(path): return default try: return json.load(open(path)) except Exception: return default def save_json(path, data): json.dump(data, open(path, "w")) # ---------- Presence score ---------- def load_score(): return load_json(STATE_FILE, {}).get("score", 0) def save_score(score): save_json(STATE_FILE, {"score": score}) # ---------- HA state ---------- def load_last_ha_state(): return load_json(HA_STATE_FILE, {}).get("state") def save_last_ha_state(state): save_json(HA_STATE_FILE, {"state": state}) def record_light_off_event(): save_json(LIGHT_COOLDOWN_FILE, {"ts": time.time()}) def ignore_motion_due_to_light(): data = load_json(LIGHT_COOLDOWN_FILE, {}) ts = data.get("ts", 0) return (time.time() - ts) < LIGHT_COOLDOWN_SECONDS def send_to_ha(present): new_state = "on" if present else "off" last_state = load_last_ha_state() if new_state == last_state: return headers = { "Authorization": f"Bearer {HA_TOKEN}", "Content-Type": "application/json", } payload = { "state": new_state, "attributes": { "friendly_name": "Desk Presence (Vision)", "source": "snapshot_camera", }, } r = requests.post( f"{HA_URL}/api/states/{HA_ENTITY_ID}", headers=headers, json=payload, timeout=5, ) if r.status_code in (200, 201): save_last_ha_state(new_state) if new_state == "off": record_light_off_event() # ---------- Detection ---------- def detect_face(gray): faces = FACE_CASCADE.detectMultiScale( gray, scaleFactor=1.2, minNeighbors=4, minSize=(60, 60) ) return len(faces) > 0 def detect_motion(gray): if ignore_motion_due_to_light(): return False if not os.path.exists(PREV_IMG): cv2.imwrite(PREV_IMG, gray) return False prev = cv2.imread(PREV_IMG, cv2.IMREAD_GRAYSCALE) cv2.imwrite(PREV_IMG, gray) diff = cv2.absdiff(prev, gray) # Ignore global brightness / exposure shifts mean_diff = np.mean(diff) if mean_diff > 12: return False _, thresh = cv2.threshold(diff, 30, 255, cv2.THRESH_BINARY) kernel = np.ones((5, 5), np.uint8) thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel) contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) max_area = max((cv2.contourArea(c) for c in contours), default=0) return max_area > MOTION_AREA_THRESHOLD def get_delay(score): return SCORE_DELAYS.get(score, 30) / 10 # ---------- Main loop ---------- def main_loop(): print("=== SNAPSHOT DESK PRESENCE ===") while True: try: capture() frame = cv2.imread(TMP_IMG) if frame is None: time.sleep(30) continue gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) face = detect_face(gray) motion = detect_motion(gray) score = load_score() if face: score += FACE_BOOST elif motion: score += MOTION_BOOST else: score -= DECAY score = max(0, min(MAX_SCORE, score)) save_score(score) send_to_ha(score > 0) finally: safe_delete(TMP_IMG) time.sleep(get_delay(score)) if __name__ == "__main__": main_loop()