#!/usr/bin/env python3 import cv2 import numpy as np import subprocess import os import json import time import requests import logging import sys import argparse # ================================================== # Logging # ================================================== LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper() LOG_FILE = os.environ.get("LOG_FILE") # optional handlers = [logging.StreamHandler(sys.stdout)] if LOG_FILE: handlers.append(logging.FileHandler(LOG_FILE)) logging.basicConfig( level=LOG_LEVEL, format="%(asctime)s | %(levelname)-8s | %(message)s", handlers=handlers, ) log = logging.getLogger("desk_presence") # ================================================== # Arguments # ================================================== parser = argparse.ArgumentParser(description="Desk presence detection") parser.add_argument( "--vflip", action="store_true", help="Flip captured image vertically before processing", ) args = parser.parse_args() VFLIP = args.vflip if VFLIP: log.info("Vertical flip enabled") # ================================================== # Camera # ================================================== CAMERA = "/dev/video0" TMP_IMG = "/tmp/desk_current.jpg" PREV_IMG = "/tmp/desk_prev.jpg" # ================================================== # State files # ================================================== STATE_FILE = "/tmp/desk_presence_score.json" HA_STATE_FILE = "/tmp/desk_presence_last_ha.json" LIGHT_COOLDOWN_FILE = "/tmp/desk_light_cooldown.json" # ================================================== # Haar cascade # ================================================== FACE_CASCADE = cv2.CascadeClassifier("cascades/haarcascade_frontalface_default.xml") if FACE_CASCADE.empty(): raise RuntimeError("Failed to load Haar cascade") # ================================================== # Home Assistant # ================================================== HA_URL = os.environ.get("HA_URL", "http://192.168.0.202:8123") HA_ENTITY_ID = os.environ.get("HA_ENTITY_ID", "binary_sensor.desk_presence_vision") HA_TOKEN = os.environ.get("HA_TOKEN") if not HA_TOKEN: raise RuntimeError("HA_TOKEN environment variable not set") # ================================================== # Presence logic # ================================================== MAX_SCORE = 5 FACE_BOOST = 2 MOTION_BOOST = 1 DECAY = 1 MOTION_AREA_THRESHOLD = 3500 LIGHT_COOLDOWN_SECONDS = 15 SCORE_DELAYS = { 0: 5, 1: 10, 2: 20, 3: 20, 4: 20, 5: 20, } # ================================================== # Helpers # ================================================== def safe_delete(path): try: if os.path.exists(path): os.remove(path) log.debug("Deleted %s", path) except Exception: log.exception("Failed to delete %s", path) def capture(): log.debug("Capturing frame from %s", CAMERA) subprocess.run( [ "ffmpeg", "-loglevel", "error", "-f", "v4l2", "-i", CAMERA, "-frames:v", "1", TMP_IMG, ], check=True, ) def load_json(path, default): if not os.path.exists(path): return default try: return json.load(open(path)) except Exception: log.warning("Failed to load JSON from %s", path) return default def save_json(path, data): try: json.dump(data, open(path, "w")) except Exception: log.exception("Failed to save JSON to %s", path) # ================================================== # Presence score # ================================================== def load_score(): return load_json(STATE_FILE, {}).get("score", 0) def save_score(score): save_json(STATE_FILE, {"score": score}) # ================================================== # Home Assistant state # ================================================== def load_last_ha_state(): return load_json(HA_STATE_FILE, {}).get("state") def save_last_ha_state(state): save_json(HA_STATE_FILE, {"state": state}) def record_light_off_event(): log.debug("Recording light-off cooldown event") save_json(LIGHT_COOLDOWN_FILE, {"ts": time.time()}) def ignore_motion_due_to_light(): data = load_json(LIGHT_COOLDOWN_FILE, {}) ts = data.get("ts", 0) return (time.time() - ts) < LIGHT_COOLDOWN_SECONDS def send_to_ha(present): new_state = "on" if present else "off" last_state = load_last_ha_state() if new_state == last_state: log.debug("HA state unchanged (%s)", new_state) return log.info("Sending HA state: %s", new_state) try: r = requests.post( f"{HA_URL}/api/states/{HA_ENTITY_ID}", headers={ "Authorization": f"Bearer {HA_TOKEN}", "Content-Type": "application/json", }, json={ "state": new_state, "attributes": { "friendly_name": "Desk Presence (Vision)", "source": "snapshot_camera", }, }, timeout=5, ) if r.status_code in (200, 201): log.info("HA state updated successfully") save_last_ha_state(new_state) if new_state == "off": record_light_off_event() else: log.error( "HA update failed (%s): %s", r.status_code, r.text, ) except Exception: log.exception("Failed to send update to Home Assistant") # ================================================== # Detection # ================================================== def detect_face(gray): faces = FACE_CASCADE.detectMultiScale( gray, scaleFactor=1.2, minNeighbors=4, minSize=(60, 60), ) return len(faces) > 0 def detect_motion(gray): light_cooldown = ignore_motion_due_to_light() # If no previous frame exists, initialize it if not os.path.exists(PREV_IMG): cv2.imwrite(PREV_IMG, gray) log.debug("Initialized previous frame") return False prev = cv2.imread(PREV_IMG, cv2.IMREAD_GRAYSCALE) # Always update PREV_IMG so state stays aligned cv2.imwrite(PREV_IMG, gray) # If we're in light cooldown, ignore motion but accept the frame if light_cooldown: log.debug("Ignoring motion due to light cooldown (state synced)") return False diff = cv2.absdiff(prev, gray) mean_diff = np.mean(diff) if mean_diff > 60: log.debug("Ignoring global brightness change (mean=%.2f)", mean_diff) return False _, thresh = cv2.threshold(diff, 30, 255, cv2.THRESH_BINARY) kernel = np.ones((5, 5), np.uint8) thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel) contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) max_area = max((cv2.contourArea(c) for c in contours), default=0) log.debug("Motion max contour area: %d", max_area) return max_area > MOTION_AREA_THRESHOLD def get_delay(score): return SCORE_DELAYS.get(score, 30) # ================================================== # Main loop # ================================================== def main_loop(): log.info("Snapshot desk presence started") while True: try: capture() frame = cv2.imread(TMP_IMG) if frame is None: log.warning("Failed to read captured frame") time.sleep(30) continue if VFLIP: frame = cv2.flip(frame, 0) gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) face = detect_face(gray) motion = detect_motion(gray) log.debug( "Detection | face=%s motion=%s", face, motion, ) score = load_score() old_score = score if face: score += FACE_BOOST elif motion: score += MOTION_BOOST else: score -= DECAY score = max(0, min(MAX_SCORE, score)) if score != old_score: log.info("Score changed %d → %d", old_score, score) save_score(score) send_to_ha(score > 0) except Exception: log.exception("Unhandled error in main loop") time.sleep(30) finally: safe_delete(TMP_IMG) time.sleep(get_delay(score)) if __name__ == "__main__": main_loop()