Increased Motion Sens
This commit is contained in:
178
desk_presence.py
178
desk_presence.py
@@ -6,23 +6,51 @@ import os
|
||||
import json
|
||||
import time
|
||||
import requests
|
||||
import logging
|
||||
import sys
|
||||
|
||||
# ---------- Camera ----------
|
||||
# ==================================================
|
||||
# Logging
|
||||
# ==================================================
|
||||
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper()
|
||||
LOG_FILE = os.environ.get("LOG_FILE") # optional
|
||||
|
||||
handlers = [logging.StreamHandler(sys.stdout)]
|
||||
if LOG_FILE:
|
||||
handlers.append(logging.FileHandler(LOG_FILE))
|
||||
|
||||
logging.basicConfig(
|
||||
level=LOG_LEVEL,
|
||||
format="%(asctime)s | %(levelname)-8s | %(message)s",
|
||||
handlers=handlers,
|
||||
)
|
||||
|
||||
log = logging.getLogger("desk_presence")
|
||||
|
||||
# ==================================================
|
||||
# Camera
|
||||
# ==================================================
|
||||
CAMERA = "/dev/video0"
|
||||
TMP_IMG = "/tmp/desk_current.jpg"
|
||||
PREV_IMG = "/tmp/desk_prev.jpg"
|
||||
|
||||
# ---------- State files ----------
|
||||
# ==================================================
|
||||
# State files
|
||||
# ==================================================
|
||||
STATE_FILE = "/tmp/desk_presence_score.json"
|
||||
HA_STATE_FILE = "/tmp/desk_presence_last_ha.json"
|
||||
LIGHT_COOLDOWN_FILE = "/tmp/desk_light_cooldown.json"
|
||||
|
||||
# ---------- Haar cascade ----------
|
||||
# ==================================================
|
||||
# Haar cascade
|
||||
# ==================================================
|
||||
FACE_CASCADE = cv2.CascadeClassifier("cascades/haarcascade_frontalface_default.xml")
|
||||
if FACE_CASCADE.empty():
|
||||
raise RuntimeError("Failed to load Haar cascade")
|
||||
|
||||
# ---------- Home Assistant ----------
|
||||
# ==================================================
|
||||
# Home Assistant
|
||||
# ==================================================
|
||||
HA_URL = os.environ.get("HA_URL", "http://192.168.0.202:8123")
|
||||
HA_ENTITY_ID = os.environ.get("HA_ENTITY_ID", "binary_sensor.desk_presence_vision")
|
||||
HA_TOKEN = os.environ.get("HA_TOKEN")
|
||||
@@ -30,42 +58,46 @@ HA_TOKEN = os.environ.get("HA_TOKEN")
|
||||
if not HA_TOKEN:
|
||||
raise RuntimeError("HA_TOKEN environment variable not set")
|
||||
|
||||
# ---------- Presence logic ----------
|
||||
# ==================================================
|
||||
# Presence logic
|
||||
# ==================================================
|
||||
MAX_SCORE = 5
|
||||
FACE_BOOST = 2
|
||||
MOTION_BOOST = 0 # motion never increases score
|
||||
MOTION_BOOST = 1
|
||||
DECAY = 1
|
||||
|
||||
MOTION_AREA_THRESHOLD = 8000
|
||||
MOTION_AREA_THRESHOLD = 3500
|
||||
LIGHT_COOLDOWN_SECONDS = 15
|
||||
|
||||
# Adaptive delays (seconds)
|
||||
SCORE_DELAYS = {
|
||||
0: 30,
|
||||
1: 15,
|
||||
2: 60,
|
||||
3: 120,
|
||||
4: 240,
|
||||
5: 480,
|
||||
0: 5,
|
||||
1: 10,
|
||||
2: 20,
|
||||
3: 60,
|
||||
4: 120,
|
||||
5: 240,
|
||||
}
|
||||
|
||||
# --------------------------------------------------
|
||||
|
||||
|
||||
# ==================================================
|
||||
# Helpers
|
||||
# ==================================================
|
||||
def safe_delete(path):
|
||||
try:
|
||||
if os.path.exists(path):
|
||||
os.remove(path)
|
||||
log.debug("Deleted %s", path)
|
||||
except Exception:
|
||||
pass
|
||||
log.exception("Failed to delete %s", path)
|
||||
|
||||
|
||||
def capture():
|
||||
log.debug("Capturing frame from %s", CAMERA)
|
||||
subprocess.run(
|
||||
[
|
||||
"ffmpeg",
|
||||
"-loglevel",
|
||||
"quiet",
|
||||
"error",
|
||||
"-f",
|
||||
"v4l2",
|
||||
"-i",
|
||||
@@ -84,14 +116,20 @@ def load_json(path, default):
|
||||
try:
|
||||
return json.load(open(path))
|
||||
except Exception:
|
||||
log.warning("Failed to load JSON from %s", path)
|
||||
return default
|
||||
|
||||
|
||||
def save_json(path, data):
|
||||
json.dump(data, open(path, "w"))
|
||||
try:
|
||||
json.dump(data, open(path, "w"))
|
||||
except Exception:
|
||||
log.exception("Failed to save JSON to %s", path)
|
||||
|
||||
|
||||
# ---------- Presence score ----------
|
||||
# ==================================================
|
||||
# Presence score
|
||||
# ==================================================
|
||||
def load_score():
|
||||
return load_json(STATE_FILE, {}).get("score", 0)
|
||||
|
||||
@@ -100,7 +138,9 @@ def save_score(score):
|
||||
save_json(STATE_FILE, {"score": score})
|
||||
|
||||
|
||||
# ---------- HA state ----------
|
||||
# ==================================================
|
||||
# Home Assistant state
|
||||
# ==================================================
|
||||
def load_last_ha_state():
|
||||
return load_json(HA_STATE_FILE, {}).get("state")
|
||||
|
||||
@@ -110,6 +150,7 @@ def save_last_ha_state(state):
|
||||
|
||||
|
||||
def record_light_off_event():
|
||||
log.debug("Recording light-off cooldown event")
|
||||
save_json(LIGHT_COOLDOWN_FILE, {"ts": time.time()})
|
||||
|
||||
|
||||
@@ -124,44 +165,60 @@ def send_to_ha(present):
|
||||
last_state = load_last_ha_state()
|
||||
|
||||
if new_state == last_state:
|
||||
log.debug("HA state unchanged (%s)", new_state)
|
||||
return
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {HA_TOKEN}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
log.info("Sending HA state: %s", new_state)
|
||||
|
||||
payload = {
|
||||
"state": new_state,
|
||||
"attributes": {
|
||||
"friendly_name": "Desk Presence (Vision)",
|
||||
"source": "snapshot_camera",
|
||||
},
|
||||
}
|
||||
try:
|
||||
r = requests.post(
|
||||
f"{HA_URL}/api/states/{HA_ENTITY_ID}",
|
||||
headers={
|
||||
"Authorization": f"Bearer {HA_TOKEN}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
json={
|
||||
"state": new_state,
|
||||
"attributes": {
|
||||
"friendly_name": "Desk Presence (Vision)",
|
||||
"source": "snapshot_camera",
|
||||
},
|
||||
},
|
||||
timeout=5,
|
||||
)
|
||||
|
||||
r = requests.post(
|
||||
f"{HA_URL}/api/states/{HA_ENTITY_ID}",
|
||||
headers=headers,
|
||||
json=payload,
|
||||
timeout=5,
|
||||
)
|
||||
if r.status_code in (200, 201):
|
||||
log.info("HA state updated successfully")
|
||||
save_last_ha_state(new_state)
|
||||
if new_state == "off":
|
||||
record_light_off_event()
|
||||
else:
|
||||
log.error(
|
||||
"HA update failed (%s): %s",
|
||||
r.status_code,
|
||||
r.text,
|
||||
)
|
||||
|
||||
if r.status_code in (200, 201):
|
||||
save_last_ha_state(new_state)
|
||||
if new_state == "off":
|
||||
record_light_off_event()
|
||||
except Exception:
|
||||
log.exception("Failed to send update to Home Assistant")
|
||||
|
||||
|
||||
# ---------- Detection ----------
|
||||
# ==================================================
|
||||
# Detection
|
||||
# ==================================================
|
||||
def detect_face(gray):
|
||||
faces = FACE_CASCADE.detectMultiScale(
|
||||
gray, scaleFactor=1.2, minNeighbors=4, minSize=(60, 60)
|
||||
gray,
|
||||
scaleFactor=1.2,
|
||||
minNeighbors=4,
|
||||
minSize=(60, 60),
|
||||
)
|
||||
return len(faces) > 0
|
||||
|
||||
|
||||
def detect_motion(gray):
|
||||
if ignore_motion_due_to_light():
|
||||
log.debug("Ignoring motion due to light cooldown")
|
||||
return False
|
||||
|
||||
if not os.path.exists(PREV_IMG):
|
||||
@@ -172,10 +229,10 @@ def detect_motion(gray):
|
||||
cv2.imwrite(PREV_IMG, gray)
|
||||
|
||||
diff = cv2.absdiff(prev, gray)
|
||||
|
||||
# Ignore global brightness / exposure shifts
|
||||
mean_diff = np.mean(diff)
|
||||
if mean_diff > 12:
|
||||
|
||||
if mean_diff > 18:
|
||||
log.debug("Ignoring global brightness change (mean=%0.2f)", mean_diff)
|
||||
return False
|
||||
|
||||
_, thresh = cv2.threshold(diff, 30, 255, cv2.THRESH_BINARY)
|
||||
@@ -185,16 +242,20 @@ def detect_motion(gray):
|
||||
contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
|
||||
max_area = max((cv2.contourArea(c) for c in contours), default=0)
|
||||
log.debug("Motion max contour area: %d", max_area)
|
||||
|
||||
return max_area > MOTION_AREA_THRESHOLD
|
||||
|
||||
|
||||
def get_delay(score):
|
||||
return SCORE_DELAYS.get(score, 30) / 10
|
||||
return SCORE_DELAYS.get(score, 30)
|
||||
|
||||
|
||||
# ---------- Main loop ----------
|
||||
# ==================================================
|
||||
# Main loop
|
||||
# ==================================================
|
||||
def main_loop():
|
||||
print("=== SNAPSHOT DESK PRESENCE ===")
|
||||
log.info("Snapshot desk presence started")
|
||||
|
||||
while True:
|
||||
try:
|
||||
@@ -202,6 +263,7 @@ def main_loop():
|
||||
|
||||
frame = cv2.imread(TMP_IMG)
|
||||
if frame is None:
|
||||
log.warning("Failed to read captured frame")
|
||||
time.sleep(30)
|
||||
continue
|
||||
|
||||
@@ -210,7 +272,14 @@ def main_loop():
|
||||
face = detect_face(gray)
|
||||
motion = detect_motion(gray)
|
||||
|
||||
log.debug(
|
||||
"Detection | face=%s motion=%s",
|
||||
face,
|
||||
motion,
|
||||
)
|
||||
|
||||
score = load_score()
|
||||
old_score = score
|
||||
|
||||
if face:
|
||||
score += FACE_BOOST
|
||||
@@ -220,10 +289,17 @@ def main_loop():
|
||||
score -= DECAY
|
||||
|
||||
score = max(0, min(MAX_SCORE, score))
|
||||
save_score(score)
|
||||
|
||||
if score != old_score:
|
||||
log.info("Score changed %d → %d", old_score, score)
|
||||
|
||||
save_score(score)
|
||||
send_to_ha(score > 0)
|
||||
|
||||
except Exception:
|
||||
log.exception("Unhandled error in main loop")
|
||||
time.sleep(30)
|
||||
|
||||
finally:
|
||||
safe_delete(TMP_IMG)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user