- Add admin auth dependency and role checks - Expand channel and restaurant API routes - Improve YouTube transcript fetching - Enhance daemon worker with better error handling and scheduling Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
93 lines
3.0 KiB
Python
93 lines
3.0 KiB
Python
"""Daemon worker: config-driven channel scan + video processing."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
|
|
from core.db import conn
|
|
from core.youtube import scan_all_channels
|
|
from core.pipeline import process_pending
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
CHECK_INTERVAL = 30 # seconds between config checks
|
|
|
|
|
|
def _get_config() -> dict | None:
|
|
"""Read daemon config from DB."""
|
|
try:
|
|
with conn() as c:
|
|
cur = c.cursor()
|
|
cur.execute(
|
|
"SELECT scan_enabled, scan_interval_min, process_enabled, "
|
|
"process_interval_min, process_limit, last_scan_at, last_process_at "
|
|
"FROM daemon_config WHERE id = 1"
|
|
)
|
|
row = cur.fetchone()
|
|
if not row:
|
|
return None
|
|
return {
|
|
"scan_enabled": bool(row[0]),
|
|
"scan_interval_min": row[1],
|
|
"process_enabled": bool(row[2]),
|
|
"process_interval_min": row[3],
|
|
"process_limit": row[4],
|
|
"last_scan_at": row[5],
|
|
"last_process_at": row[6],
|
|
}
|
|
except Exception as e:
|
|
logger.error("Failed to read daemon config: %s", e)
|
|
return None
|
|
|
|
|
|
def _should_run(last_at: datetime | None, interval_min: int) -> bool:
|
|
"""Check if enough time has passed since last run."""
|
|
if last_at is None:
|
|
return True
|
|
now = datetime.utcnow()
|
|
# Oracle TIMESTAMP comes as datetime
|
|
return now - last_at >= timedelta(minutes=interval_min)
|
|
|
|
|
|
def _update_last(field: str) -> None:
|
|
"""Update last_scan_at or last_process_at."""
|
|
with conn() as c:
|
|
c.cursor().execute(
|
|
f"UPDATE daemon_config SET {field} = SYSTIMESTAMP WHERE id = 1"
|
|
)
|
|
|
|
|
|
def run_once_if_due() -> None:
|
|
"""Check config and run tasks if their schedule is due."""
|
|
cfg = _get_config()
|
|
if not cfg:
|
|
return
|
|
|
|
if cfg["scan_enabled"] and _should_run(cfg["last_scan_at"], cfg["scan_interval_min"]):
|
|
logger.info("=== Scheduled scan start ===")
|
|
try:
|
|
new_count = scan_all_channels()
|
|
logger.info("Scan complete: %d new videos", new_count)
|
|
_update_last("last_scan_at")
|
|
except Exception as e:
|
|
logger.error("Channel scan failed: %s", e)
|
|
|
|
if cfg["process_enabled"] and _should_run(cfg["last_process_at"], cfg["process_interval_min"]):
|
|
logger.info("=== Scheduled processing start ===")
|
|
try:
|
|
rest_count = process_pending(limit=cfg["process_limit"])
|
|
logger.info("Processing complete: %d restaurants extracted", rest_count)
|
|
_update_last("last_process_at")
|
|
except Exception as e:
|
|
logger.error("Video processing failed: %s", e)
|
|
|
|
|
|
def run_loop() -> None:
|
|
"""Run daemon loop, checking config every CHECK_INTERVAL seconds."""
|
|
logger.info("Daemon started (config-driven, check every %ds)", CHECK_INTERVAL)
|
|
while True:
|
|
run_once_if_due()
|
|
time.sleep(CHECK_INTERVAL)
|