#!/usr/bin/env python3 import cv2 import numpy as np import subprocess import os import json import time import requests # ---------- Camera ---------- CAMERA = os.environ.get("CAMERA", "/dev/video0") TMP_IMG = "/tmp/desk_current.jpg" PREV_IMG = "/tmp/desk_prev.jpg" # ---------- State files ---------- STATE_FILE = "/tmp/desk_presence_score.json" HA_STATE_FILE = "/tmp/desk_presence_last_ha.json" LIGHT_COOLDOWN_FILE = "/tmp/desk_light_cooldown.json" # ---------- Haar cascade ---------- FACE_CASCADE = cv2.CascadeClassifier( "/usr/share/opencv4/haarcascades/haarcascade_frontalface_default.xml" ) # ---------- Home Assistant ---------- HA_URL = os.environ.get("HA_URL", "http://192.168.0.202:8123") HA_ENTITY_ID = os.environ.get("HA_ENTITY_ID", "binary_sensor.desk_presence_vision") HA_TOKEN = os.environ.get("HA_TOKEN") # ---------- Presence logic ---------- MAX_SCORE = 5 FACE_BOOST = 2 MOTION_BOOST = 1 DECAY = 1 MOTION_AREA_THRESHOLD = 4000 LIGHT_COOLDOWN_SECONDS = 15 # Adaptive delays (seconds) SCORE_DELAYS = { 0: 30, 1: 15, 2: 60, 3: 120, 4: 240, 5: 480, } # -------------------------------------------------- def safe_delete(path): try: if os.path.exists(path): os.remove(path) print(f"[CLEANUP] Deleted {path}") except Exception as e: print(f"[WARN] Could not delete {path}: {e}") def capture(): subprocess.run( [ "ffmpeg", "-loglevel", "quiet", "-f", "v4l2", "-i", CAMERA, "-frames:v", "1", TMP_IMG, ], check=True, ) def load_json(path, default): if not os.path.exists(path): return default try: return json.load(open(path)) except Exception: return default def save_json(path, data): json.dump(data, open(path, "w")) # ---------- Presence score ---------- def load_score(): return load_json(STATE_FILE, {}).get("score", 0) def save_score(score): save_json(STATE_FILE, {"score": score}) # ---------- HA state ---------- def load_last_ha_state(): return load_json(HA_STATE_FILE, {}).get("state") def save_last_ha_state(state): save_json(HA_STATE_FILE, {"state": state}) def record_light_off_event(): save_json(LIGHT_COOLDOWN_FILE, {"ts": time.time()}) def ignore_motion_due_to_light(): data = load_json(LIGHT_COOLDOWN_FILE, {}) ts = data.get("ts", 0) remaining = LIGHT_COOLDOWN_SECONDS - (time.time() - ts) if remaining > 0: print(f"[INFO] Ignoring motion for {remaining:.1f}s (light cooldown)") return True return False def send_to_ha(present): new_state = "on" if present else "off" last_state = load_last_ha_state() if new_state == last_state: print("[HA] No state change") return headers = { "Authorization": f"Bearer {HA_TOKEN}", "Content-Type": "application/json", } payload = { "state": new_state, "attributes": { "friendly_name": "Desk Presence (Vision)", "source": "snapshot_camera", }, } try: r = requests.post( f"{HA_URL}/api/states/{HA_ENTITY_ID}", headers=headers, json=payload, timeout=5, ) if r.status_code in (200, 201): print(f"[HA] Updated state → {new_state}") save_last_ha_state(new_state) if new_state == "off": record_light_off_event() else: print(f"[HA] Error {r.status_code}: {r.text}") except Exception as e: print(f"[HA] Connection failed: {e}") # ---------- Detection ---------- def detect_face(gray): faces = FACE_CASCADE.detectMultiScale( gray, scaleFactor=1.2, minNeighbors=4, minSize=(60, 60) ) return len(faces) > 0 def detect_motion(gray): if ignore_motion_due_to_light(): return False if not os.path.exists(PREV_IMG): cv2.imwrite(PREV_IMG, gray) return False prev = cv2.imread(PREV_IMG, cv2.IMREAD_GRAYSCALE) cv2.imwrite(PREV_IMG, gray) diff = cv2.absdiff(prev, gray) _, thresh = cv2.threshold(diff, 30, 255, cv2.THRESH_BINARY) kernel = np.ones((5, 5), np.uint8) thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel) area = cv2.countNonZero(thresh) print("[DEBUG] Motion area:", area) return area > MOTION_AREA_THRESHOLD def get_delay(score): return SCORE_DELAYS.get(score, 30) / 10 # ---------- Main loop ---------- def main_loop(): print("=== SNAPSHOT DESK PRESENCE (STABLE) ===") while True: try: capture() frame = cv2.imread(TMP_IMG) if frame is None: print("[ERROR] Frame read failed") time.sleep(30) continue gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) face = detect_face(gray) motion = detect_motion(gray) score = load_score() print("\nPrevious score:", score) if face: score += FACE_BOOST print("Face detected → +", FACE_BOOST) elif motion: score += MOTION_BOOST print("Motion detected → +", MOTION_BOOST) else: score -= DECAY print("No signal → -", DECAY) score = max(0, min(MAX_SCORE, score)) save_score(score) present = score > 0 print("Current score:", score) print("PRESENCE:", "YES" if present else "NO") send_to_ha(present) finally: safe_delete(TMP_IMG) delay = get_delay(score) print(f"[SLEEP] Next check in {delay}s") time.sleep(delay) if __name__ == "__main__": main_loop()