Files
desk-presence/desk_presence.py

337 lines
8.5 KiB
Python
Executable File

#!/usr/bin/env python3
import cv2
import numpy as np
import subprocess
import os
import json
import time
import requests
import logging
import sys
import argparse
# ==================================================
# Logging
# ==================================================
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper()
LOG_FILE = os.environ.get("LOG_FILE") # optional
handlers = [logging.StreamHandler(sys.stdout)]
if LOG_FILE:
handlers.append(logging.FileHandler(LOG_FILE))
logging.basicConfig(
level=LOG_LEVEL,
format="%(asctime)s | %(levelname)-8s | %(message)s",
handlers=handlers,
)
log = logging.getLogger("desk_presence")
# ==================================================
# Arguments
# ==================================================
parser = argparse.ArgumentParser(description="Desk presence detection")
parser.add_argument(
"--vflip",
action="store_true",
help="Flip captured image vertically before processing",
)
args = parser.parse_args()
VFLIP = args.vflip
if VFLIP:
log.info("Vertical flip enabled")
# ==================================================
# Camera
# ==================================================
CAMERA = "/dev/video0"
TMP_IMG = "/tmp/desk_current.jpg"
PREV_IMG = "/tmp/desk_prev.jpg"
# ==================================================
# State files
# ==================================================
STATE_FILE = "/tmp/desk_presence_score.json"
HA_STATE_FILE = "/tmp/desk_presence_last_ha.json"
LIGHT_COOLDOWN_FILE = "/tmp/desk_light_cooldown.json"
# ==================================================
# Haar cascade
# ==================================================
FACE_CASCADE = cv2.CascadeClassifier("cascades/haarcascade_frontalface_default.xml")
if FACE_CASCADE.empty():
raise RuntimeError("Failed to load Haar cascade")
# ==================================================
# Home Assistant
# ==================================================
HA_URL = os.environ.get("HA_URL", "http://192.168.0.202:8123")
HA_ENTITY_ID = os.environ.get("HA_ENTITY_ID", "binary_sensor.desk_presence_vision")
HA_TOKEN = os.environ.get("HA_TOKEN")
if not HA_TOKEN:
raise RuntimeError("HA_TOKEN environment variable not set")
# ==================================================
# Presence logic
# ==================================================
MAX_SCORE = 5
FACE_BOOST = 2
MOTION_BOOST = 1
DECAY = 1
MOTION_AREA_THRESHOLD = 3500
LIGHT_COOLDOWN_SECONDS = 15
SCORE_DELAYS = {
0: 5,
1: 10,
2: 20,
3: 20,
4: 20,
5: 20,
}
# ==================================================
# Helpers
# ==================================================
def safe_delete(path):
try:
if os.path.exists(path):
os.remove(path)
log.debug("Deleted %s", path)
except Exception:
log.exception("Failed to delete %s", path)
def capture():
log.debug("Capturing frame from %s", CAMERA)
subprocess.run(
[
"ffmpeg",
"-loglevel",
"error",
"-f",
"v4l2",
"-i",
CAMERA,
"-frames:v",
"1",
TMP_IMG,
],
check=True,
)
def load_json(path, default):
if not os.path.exists(path):
return default
try:
return json.load(open(path))
except Exception:
log.warning("Failed to load JSON from %s", path)
return default
def save_json(path, data):
try:
json.dump(data, open(path, "w"))
except Exception:
log.exception("Failed to save JSON to %s", path)
# ==================================================
# Presence score
# ==================================================
def load_score():
return load_json(STATE_FILE, {}).get("score", 0)
def save_score(score):
save_json(STATE_FILE, {"score": score})
# ==================================================
# Home Assistant state
# ==================================================
def load_last_ha_state():
return load_json(HA_STATE_FILE, {}).get("state")
def save_last_ha_state(state):
save_json(HA_STATE_FILE, {"state": state})
def record_light_off_event():
log.debug("Recording light-off cooldown event")
save_json(LIGHT_COOLDOWN_FILE, {"ts": time.time()})
def ignore_motion_due_to_light():
data = load_json(LIGHT_COOLDOWN_FILE, {})
ts = data.get("ts", 0)
return (time.time() - ts) < LIGHT_COOLDOWN_SECONDS
def send_to_ha(present):
new_state = "on" if present else "off"
last_state = load_last_ha_state()
if new_state == last_state:
log.debug("HA state unchanged (%s)", new_state)
return
log.info("Sending HA state: %s", new_state)
try:
r = requests.post(
f"{HA_URL}/api/states/{HA_ENTITY_ID}",
headers={
"Authorization": f"Bearer {HA_TOKEN}",
"Content-Type": "application/json",
},
json={
"state": new_state,
"attributes": {
"friendly_name": "Desk Presence (Vision)",
"source": "snapshot_camera",
},
},
timeout=5,
)
if r.status_code in (200, 201):
log.info("HA state updated successfully")
save_last_ha_state(new_state)
if new_state == "off":
record_light_off_event()
else:
log.error(
"HA update failed (%s): %s",
r.status_code,
r.text,
)
except Exception:
log.exception("Failed to send update to Home Assistant")
# ==================================================
# Detection
# ==================================================
def detect_face(gray):
faces = FACE_CASCADE.detectMultiScale(
gray,
scaleFactor=1.2,
minNeighbors=4,
minSize=(60, 60),
)
return len(faces) > 0
def detect_motion(gray):
light_cooldown = ignore_motion_due_to_light()
# If no previous frame exists, initialize it
if not os.path.exists(PREV_IMG):
cv2.imwrite(PREV_IMG, gray)
log.debug("Initialized previous frame")
return False
prev = cv2.imread(PREV_IMG, cv2.IMREAD_GRAYSCALE)
# Always update PREV_IMG so state stays aligned
cv2.imwrite(PREV_IMG, gray)
# If we're in light cooldown, ignore motion but accept the frame
if light_cooldown:
log.debug("Ignoring motion due to light cooldown (state synced)")
return False
diff = cv2.absdiff(prev, gray)
mean_diff = np.mean(diff)
if mean_diff > 60:
log.debug("Ignoring global brightness change (mean=%.2f)", mean_diff)
return False
_, thresh = cv2.threshold(diff, 30, 255, cv2.THRESH_BINARY)
kernel = np.ones((5, 5), np.uint8)
thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel)
contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
max_area = max((cv2.contourArea(c) for c in contours), default=0)
log.debug("Motion max contour area: %d", max_area)
return max_area > MOTION_AREA_THRESHOLD
def get_delay(score):
return SCORE_DELAYS.get(score, 30)
# ==================================================
# Main loop
# ==================================================
def main_loop():
log.info("Snapshot desk presence started")
while True:
try:
capture()
frame = cv2.imread(TMP_IMG)
if frame is None:
log.warning("Failed to read captured frame")
time.sleep(30)
continue
if VFLIP:
frame = cv2.flip(frame, 0)
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
face = detect_face(gray)
motion = detect_motion(gray)
log.debug(
"Detection | face=%s motion=%s",
face,
motion,
)
score = load_score()
old_score = score
if face:
score += FACE_BOOST
elif motion:
score += MOTION_BOOST
else:
score -= DECAY
score = max(0, min(MAX_SCORE, score))
if score != old_score:
log.info("Score changed %d%d", old_score, score)
save_score(score)
send_to_ha(score > 0)
except Exception:
log.exception("Unhandled error in main loop")
time.sleep(30)
finally:
safe_delete(TMP_IMG)
time.sleep(get_delay(score))
if __name__ == "__main__":
main_loop()