commit 98e8eca0008d68c9a9813e0daec80fc701e7a648 Author: Aditya Gupta Date: Sun Feb 1 14:14:57 2026 +0530 first commit diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..7464ecf --- /dev/null +++ b/Dockerfile @@ -0,0 +1,21 @@ + +FROM python:3.11-slim + +# Install system deps for OpenCV + ffmpeg +RUN apt-get update && apt-get install -y \ + ffmpeg \ + libgl1 \ + libglib2.0-0 \ + && rm -rf /var/lib/apt/lists/* + +# Install Python deps +RUN pip install --no-cache-dir \ + opencv-python-headless \ + numpy \ + requests + +WORKDIR /app + +COPY desk_presence.py /app/desk_presence.py + +CMD ["python", "/app/desk_presence.py"] diff --git a/desk_presence.py b/desk_presence.py new file mode 100755 index 0000000..0d71671 --- /dev/null +++ b/desk_presence.py @@ -0,0 +1,251 @@ +#!/usr/bin/env python3 +import cv2 +import numpy as np +import subprocess +import os +import json +import time +import requests + +# ---------- Camera ---------- +CAMERA = os.environ.get("CAMERA", "/dev/video0") +TMP_IMG = "/tmp/desk_current.jpg" +PREV_IMG = "/tmp/desk_prev.jpg" + +# ---------- State files ---------- +STATE_FILE = "/tmp/desk_presence_score.json" +HA_STATE_FILE = "/tmp/desk_presence_last_ha.json" +LIGHT_COOLDOWN_FILE = "/tmp/desk_light_cooldown.json" + +# ---------- Haar cascade ---------- +FACE_CASCADE = cv2.CascadeClassifier( + "/usr/share/opencv4/haarcascades/haarcascade_frontalface_default.xml" +) + +# ---------- Home Assistant ---------- +HA_URL = os.environ.get("HA_URL", "http://192.168.0.202:8123") +HA_ENTITY_ID = os.environ.get("HA_ENTITY_ID", "binary_sensor.desk_presence_vision") +HA_TOKEN = os.environ.get("HA_TOKEN") + +# ---------- Presence logic ---------- +MAX_SCORE = 5 +FACE_BOOST = 2 +MOTION_BOOST = 1 +DECAY = 1 + +MOTION_AREA_THRESHOLD = 4000 +LIGHT_COOLDOWN_SECONDS = 15 + +# Adaptive delays (seconds) +SCORE_DELAYS = { + 0: 30, + 1: 15, + 2: 60, + 3: 120, + 4: 240, + 5: 480, +} + +# -------------------------------------------------- + + +def safe_delete(path): + try: + if os.path.exists(path): + os.remove(path) + print(f"[CLEANUP] Deleted {path}") + except Exception as e: + print(f"[WARN] Could not delete {path}: {e}") + + +def capture(): + subprocess.run( + [ + "ffmpeg", + "-loglevel", + "quiet", + "-f", + "v4l2", + "-i", + CAMERA, + "-frames:v", + "1", + TMP_IMG, + ], + check=True, + ) + + +def load_json(path, default): + if not os.path.exists(path): + return default + try: + return json.load(open(path)) + except Exception: + return default + + +def save_json(path, data): + json.dump(data, open(path, "w")) + + +# ---------- Presence score ---------- +def load_score(): + return load_json(STATE_FILE, {}).get("score", 0) + + +def save_score(score): + save_json(STATE_FILE, {"score": score}) + + +# ---------- HA state ---------- +def load_last_ha_state(): + return load_json(HA_STATE_FILE, {}).get("state") + + +def save_last_ha_state(state): + save_json(HA_STATE_FILE, {"state": state}) + + +def record_light_off_event(): + save_json(LIGHT_COOLDOWN_FILE, {"ts": time.time()}) + + +def ignore_motion_due_to_light(): + data = load_json(LIGHT_COOLDOWN_FILE, {}) + ts = data.get("ts", 0) + remaining = LIGHT_COOLDOWN_SECONDS - (time.time() - ts) + if remaining > 0: + print(f"[INFO] Ignoring motion for {remaining:.1f}s (light cooldown)") + return True + return False + + +def send_to_ha(present): + new_state = "on" if present else "off" + last_state = load_last_ha_state() + + if new_state == last_state: + print("[HA] No state change") + return + + headers = { + "Authorization": f"Bearer {HA_TOKEN}", + "Content-Type": "application/json", + } + + payload = { + "state": new_state, + "attributes": { + "friendly_name": "Desk Presence (Vision)", + "source": "snapshot_camera", + }, + } + + try: + r = requests.post( + f"{HA_URL}/api/states/{HA_ENTITY_ID}", + headers=headers, + json=payload, + timeout=5, + ) + + if r.status_code in (200, 201): + print(f"[HA] Updated state → {new_state}") + save_last_ha_state(new_state) + + if new_state == "off": + record_light_off_event() + else: + print(f"[HA] Error {r.status_code}: {r.text}") + + except Exception as e: + print(f"[HA] Connection failed: {e}") + + +# ---------- Detection ---------- +def detect_face(gray): + faces = FACE_CASCADE.detectMultiScale( + gray, scaleFactor=1.2, minNeighbors=4, minSize=(60, 60) + ) + return len(faces) > 0 + + +def detect_motion(gray): + if ignore_motion_due_to_light(): + return False + + if not os.path.exists(PREV_IMG): + cv2.imwrite(PREV_IMG, gray) + return False + + prev = cv2.imread(PREV_IMG, cv2.IMREAD_GRAYSCALE) + cv2.imwrite(PREV_IMG, gray) + + diff = cv2.absdiff(prev, gray) + _, thresh = cv2.threshold(diff, 30, 255, cv2.THRESH_BINARY) + + kernel = np.ones((5, 5), np.uint8) + thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel) + + area = cv2.countNonZero(thresh) + print("[DEBUG] Motion area:", area) + + return area > MOTION_AREA_THRESHOLD + + +def get_delay(score): + return SCORE_DELAYS.get(score, 30) / 10 + + +# ---------- Main loop ---------- +def main_loop(): + print("=== SNAPSHOT DESK PRESENCE (STABLE) ===") + + while True: + try: + capture() + + frame = cv2.imread(TMP_IMG) + if frame is None: + print("[ERROR] Frame read failed") + time.sleep(30) + continue + + gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) + + face = detect_face(gray) + motion = detect_motion(gray) + + score = load_score() + print("\nPrevious score:", score) + + if face: + score += FACE_BOOST + print("Face detected → +", FACE_BOOST) + elif motion: + score += MOTION_BOOST + print("Motion detected → +", MOTION_BOOST) + else: + score -= DECAY + print("No signal → -", DECAY) + + score = max(0, min(MAX_SCORE, score)) + save_score(score) + + present = score > 0 + print("Current score:", score) + print("PRESENCE:", "YES" if present else "NO") + + send_to_ha(present) + + finally: + safe_delete(TMP_IMG) + + delay = get_delay(score) + print(f"[SLEEP] Next check in {delay}s") + time.sleep(delay) + + +if __name__ == "__main__": + main_loop() diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..6306d83 --- /dev/null +++ b/readme.md @@ -0,0 +1,40 @@ +# Desk Presence Detection (Snapshot-Based, Home Assistant) + +This project provides a low-CPU desk presence detection system using a webcam and snapshot-based computer vision. It is designed to work reliably on weak hardware without running continuous video inference. + +Presence is detected using: + +- Face detection (Haar cascade) +- Frame-to-frame motion detection +- A confidence score with decay +- A cooldown to prevent feedback loops caused by lights turning on or off +- The detected presence is published to Home Assistant as a binary_sensor. + +## Installation + +### 1. Clone + +### 2. Build the image + +```sh +docker build -t desk-presence . +``` + +### 3. Home Assistant Setup + +#### A. Create Long Lived Access Token + +#### B. Run the container + +```sh +docker run -d \ + --name desk-presence \ + --restart unless-stopped \ + --device /dev/video0:/dev/video0 \ + -e HA_URL="http://192.168.0.202:8123" \ + -e HA_ENTITY_ID="binary_sensor.desk_presence_vision" \ + -e HA_TOKEN="YOUR_LONG_LIVED_TOKEN" \ + -e CAMERA="/dev/video0" + desk-presence + +```