From 6c47d3c57d4960d2648b107cc76788691d085936 Mon Sep 17 00:00:00 2001 From: joungmin Date: Mon, 9 Mar 2026 10:59:22 +0900 Subject: [PATCH] Backend enhancements: auth, channels, restaurants, daemon improvements - Add admin auth dependency and role checks - Expand channel and restaurant API routes - Improve YouTube transcript fetching - Enhance daemon worker with better error handling and scheduling Co-Authored-By: Claude Opus 4.6 --- backend/api/deps.py | 8 +++ backend/api/main.py | 3 +- backend/api/routes/auth.py | 21 ++++++- backend/api/routes/channels.py | 27 ++++++--- backend/api/routes/restaurants.py | 32 +++++++++-- backend/core/auth.py | 6 ++ backend/core/youtube.py | 54 +++++++++++++++++- backend/daemon/worker.py | 95 ++++++++++++++++++++++++------- backend/run_daemon.py | 4 +- 9 files changed, 208 insertions(+), 42 deletions(-) diff --git a/backend/api/deps.py b/backend/api/deps.py index 2e144a3..d46165c 100644 --- a/backend/api/deps.py +++ b/backend/api/deps.py @@ -30,3 +30,11 @@ def get_optional_user(authorization: str = Header(None)) -> dict | None: return verify_jwt(token) except Exception: return None + + +def get_admin_user(authorization: str = Header(None)) -> dict: + """Require authenticated admin user. Raises 401/403.""" + user = get_current_user(authorization) + if not user.get("is_admin"): + raise HTTPException(403, "관리자 권한이 필요합니다") + return user diff --git a/backend/api/main.py b/backend/api/main.py index 4dd1d82..1c589bc 100644 --- a/backend/api/main.py +++ b/backend/api/main.py @@ -5,7 +5,7 @@ from __future__ import annotations from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware -from api.routes import restaurants, channels, videos, search, auth, reviews, admin_users, stats +from api.routes import restaurants, channels, videos, search, auth, reviews, admin_users, stats, daemon app = FastAPI( title="Tasteby API", @@ -29,6 +29,7 @@ app.include_router(auth.router, prefix="/api/auth", tags=["auth"]) app.include_router(reviews.router, prefix="/api", tags=["reviews"]) app.include_router(admin_users.router, prefix="/api/admin/users", tags=["admin-users"]) app.include_router(stats.router, prefix="/api/stats", tags=["stats"]) +app.include_router(daemon.router, prefix="/api/daemon", tags=["daemon"]) @app.get("/api/health") diff --git a/backend/api/routes/auth.py b/backend/api/routes/auth.py index 72337ae..cf1370f 100644 --- a/backend/api/routes/auth.py +++ b/backend/api/routes/auth.py @@ -36,5 +36,22 @@ def login_google(body: GoogleLoginRequest): @router.get("/me") def get_me(current_user: dict = Depends(get_current_user)): - """Return current authenticated user info.""" - return current_user + """Return current authenticated user info including admin status.""" + from core.db import conn + user_id = current_user.get("sub") or current_user.get("id") + with conn() as c: + cur = c.cursor() + cur.execute( + "SELECT id, email, nickname, avatar_url, is_admin FROM tasteby_users WHERE id = :id", + {"id": user_id}, + ) + row = cur.fetchone() + if not row: + raise HTTPException(404, "User not found") + return { + "id": row[0], + "email": row[1], + "nickname": row[2], + "avatar_url": row[3], + "is_admin": bool(row[4]), + } diff --git a/backend/api/routes/channels.py b/backend/api/routes/channels.py index 00961d8..a58d296 100644 --- a/backend/api/routes/channels.py +++ b/backend/api/routes/channels.py @@ -5,10 +5,12 @@ from __future__ import annotations import asyncio from concurrent.futures import ThreadPoolExecutor -from fastapi import APIRouter, HTTPException +from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel -from core import youtube +from api.deps import get_admin_user + +from core import youtube, cache _executor = ThreadPoolExecutor(max_workers=4) @@ -23,13 +25,20 @@ class ChannelCreate(BaseModel): @router.get("") def list_channels(): - return youtube.get_active_channels() + key = cache.make_key("channels") + cached = cache.get(key) + if cached is not None: + return cached + result = youtube.get_active_channels() + cache.set(key, result) + return result @router.post("", status_code=201) -def create_channel(body: ChannelCreate): +def create_channel(body: ChannelCreate, _admin: dict = Depends(get_admin_user)): try: row_id = youtube.add_channel(body.channel_id, body.channel_name, body.title_filter) + cache.flush() return {"id": row_id, "channel_id": body.channel_id} except Exception as e: if "UQ_CHANNELS_CID" in str(e).upper(): @@ -63,12 +72,15 @@ def _do_scan(channel_id: str, full: bool): if not full and new_in_page == 0 and total_fetched > 50: break + filtered = total_fetched - len(candidates) - len([v for v in candidates if v["video_id"] in existing_vids]) new_count = youtube.save_videos_batch(ch["id"], candidates) - return {"total_fetched": total_fetched, "new_videos": new_count} + if new_count > 0: + cache.flush() + return {"total_fetched": total_fetched, "new_videos": new_count, "filtered": filtered if title_filter else 0} @router.post("/{channel_id}/scan") -async def scan_channel(channel_id: str, full: bool = False): +async def scan_channel(channel_id: str, full: bool = False, _admin: dict = Depends(get_admin_user)): """Trigger a scan for new videos from this channel (non-blocking).""" loop = asyncio.get_event_loop() result = await loop.run_in_executor(_executor, _do_scan, channel_id, full) @@ -78,7 +90,7 @@ async def scan_channel(channel_id: str, full: bool = False): @router.delete("/{channel_id:path}") -def delete_channel(channel_id: str): +def delete_channel(channel_id: str, _admin: dict = Depends(get_admin_user)): """Deactivate a channel. Accepts channel_id or DB id.""" deleted = youtube.deactivate_channel(channel_id) if not deleted: @@ -86,4 +98,5 @@ def delete_channel(channel_id: str): deleted = youtube.deactivate_channel_by_db_id(channel_id) if not deleted: raise HTTPException(404, "Channel not found") + cache.flush() return {"ok": True} diff --git a/backend/api/routes/restaurants.py b/backend/api/routes/restaurants.py index dfbbd7d..a7a6615 100644 --- a/backend/api/routes/restaurants.py +++ b/backend/api/routes/restaurants.py @@ -2,9 +2,10 @@ from __future__ import annotations -from fastapi import APIRouter, HTTPException, Query +from fastapi import APIRouter, Depends, HTTPException, Query -from core import restaurant +from api.deps import get_admin_user +from core import restaurant, cache router = APIRouter() @@ -17,19 +18,30 @@ def list_restaurants( region: str | None = None, channel: str | None = None, ): - return restaurant.get_all(limit=limit, offset=offset, cuisine=cuisine, region=region, channel=channel) + key = cache.make_key("restaurants", f"l={limit}", f"o={offset}", f"c={cuisine}", f"r={region}", f"ch={channel}") + cached = cache.get(key) + if cached is not None: + return cached + result = restaurant.get_all(limit=limit, offset=offset, cuisine=cuisine, region=region, channel=channel) + cache.set(key, result) + return result @router.get("/{restaurant_id}") def get_restaurant(restaurant_id: str): + key = cache.make_key("restaurant", restaurant_id) + cached = cache.get(key) + if cached is not None: + return cached r = restaurant.get_by_id(restaurant_id) if not r: raise HTTPException(404, "Restaurant not found") + cache.set(key, r) return r @router.put("/{restaurant_id}") -def update_restaurant(restaurant_id: str, body: dict): +def update_restaurant(restaurant_id: str, body: dict, _admin: dict = Depends(get_admin_user)): from core.db import conn r = restaurant.get_by_id(restaurant_id) if not r: @@ -49,11 +61,12 @@ def update_restaurant(restaurant_id: str, body: dict): sql = f"UPDATE restaurants SET {', '.join(sets)} WHERE id = :rid" with conn() as c: c.cursor().execute(sql, params) + cache.flush() return {"ok": True} @router.delete("/{restaurant_id}") -def delete_restaurant(restaurant_id: str): +def delete_restaurant(restaurant_id: str, _admin: dict = Depends(get_admin_user)): from core.db import conn r = restaurant.get_by_id(restaurant_id) if not r: @@ -64,12 +77,19 @@ def delete_restaurant(restaurant_id: str): cur.execute("DELETE FROM user_reviews WHERE restaurant_id = :rid", {"rid": restaurant_id}) cur.execute("DELETE FROM video_restaurants WHERE restaurant_id = :rid", {"rid": restaurant_id}) cur.execute("DELETE FROM restaurants WHERE id = :rid", {"rid": restaurant_id}) + cache.flush() return {"ok": True} @router.get("/{restaurant_id}/videos") def get_restaurant_videos(restaurant_id: str): + key = cache.make_key("restaurant_videos", restaurant_id) + cached = cache.get(key) + if cached is not None: + return cached r = restaurant.get_by_id(restaurant_id) if not r: raise HTTPException(404, "Restaurant not found") - return restaurant.get_video_links(restaurant_id) + result = restaurant.get_video_links(restaurant_id) + cache.set(key, result) + return result diff --git a/backend/core/auth.py b/backend/core/auth.py index 6ecc0a6..1ac04a8 100644 --- a/backend/core/auth.py +++ b/backend/core/auth.py @@ -67,6 +67,9 @@ def find_or_create_user( "email": email, "nickname": nickname, "avatar_url": avatar_url, "id": row[0], }) + # Fetch is_admin + cur.execute("SELECT is_admin FROM tasteby_users WHERE id = :id", {"id": row[0]}) + is_admin = bool(cur.fetchone()[0]) return { "id": row[0], "provider": row[1], @@ -74,6 +77,7 @@ def find_or_create_user( "email": email or row[3], "nickname": nickname or row[4], "avatar_url": avatar_url or row[5], + "is_admin": is_admin, } # Create new user @@ -99,6 +103,7 @@ def find_or_create_user( "email": email, "nickname": nickname, "avatar_url": avatar_url, + "is_admin": False, } @@ -108,6 +113,7 @@ def create_jwt(user: dict) -> str: "sub": user["id"], "email": user.get("email"), "nickname": user.get("nickname"), + "is_admin": user.get("is_admin", False), "exp": datetime.now(timezone.utc) + timedelta(days=JWT_EXPIRE_DAYS), "iat": datetime.now(timezone.utc), } diff --git a/backend/core/youtube.py b/backend/core/youtube.py index 3fd6ee4..bb17ea3 100644 --- a/backend/core/youtube.py +++ b/backend/core/youtube.py @@ -72,12 +72,22 @@ def deactivate_channel_by_db_id(db_id: str) -> bool: def get_active_channels() -> list[dict]: - sql = "SELECT id, channel_id, channel_name, title_filter FROM channels WHERE is_active = 1" + sql = """ + SELECT c.id, c.channel_id, c.channel_name, c.title_filter, + (SELECT COUNT(*) FROM videos v WHERE v.channel_id = c.id) as video_count, + (SELECT MAX(v.created_at) FROM videos v WHERE v.channel_id = c.id) as last_scanned_at + FROM channels c + WHERE c.is_active = 1 + """ with conn() as c: cur = c.cursor() cur.execute(sql) return [ - {"id": r[0], "channel_id": r[1], "channel_name": r[2], "title_filter": r[3]} + { + "id": r[0], "channel_id": r[1], "channel_name": r[2], "title_filter": r[3], + "video_count": r[4] or 0, + "last_scanned_at": r[5].isoformat() if r[5] else None, + } for r in cur.fetchall() ] @@ -99,13 +109,48 @@ def get_latest_video_date(channel_db_id: str) -> str | None: return None +def _parse_iso8601_duration(dur: str) -> int: + """Parse ISO 8601 duration (e.g. PT1M30S, PT5M, PT1H2M) to seconds.""" + import re + m = re.match(r"PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?", dur or "") + if not m: + return 0 + h, mn, s = (int(x) if x else 0 for x in m.groups()) + return h * 3600 + mn * 60 + s + + +def _filter_shorts(videos: list[dict]) -> list[dict]: + """Filter out YouTube Shorts (<=60s) by checking video durations via API.""" + if not videos: + return videos + video_ids = [v["video_id"] for v in videos] + r = httpx.get( + "https://www.googleapis.com/youtube/v3/videos", + params={ + "key": _api_key(), + "id": ",".join(video_ids), + "part": "contentDetails", + }, + timeout=30, + ) + r.raise_for_status() + durations = {} + for item in r.json().get("items", []): + durations[item["id"]] = _parse_iso8601_duration( + item.get("contentDetails", {}).get("duration", "") + ) + return [v for v in videos if durations.get(v["video_id"], 0) > 60] + + def fetch_channel_videos_iter( channel_id: str, published_after: str | None = None, + exclude_shorts: bool = True, ): """Yield pages of videos from a YouTube channel via Data API v3. Each yield is a list of dicts for one API page (up to 50). + If exclude_shorts is True, filters out videos <= 60 seconds. """ params: dict = { "key": _api_key(), @@ -127,7 +172,7 @@ def fetch_channel_videos_iter( r = httpx.get( "https://www.googleapis.com/youtube/v3/search", params=params, - timeout=15, + timeout=30, ) r.raise_for_status() data = r.json() @@ -143,6 +188,9 @@ def fetch_channel_videos_iter( "url": f"https://www.youtube.com/watch?v={vid}", }) + if page_videos and exclude_shorts: + page_videos = _filter_shorts(page_videos) + if page_videos: yield page_videos diff --git a/backend/daemon/worker.py b/backend/daemon/worker.py index bf61912..d4dcea0 100644 --- a/backend/daemon/worker.py +++ b/backend/daemon/worker.py @@ -1,37 +1,92 @@ -"""Daemon worker: periodic channel scan + video processing.""" +"""Daemon worker: config-driven channel scan + video processing.""" from __future__ import annotations import logging import time +from datetime import datetime, timedelta +from core.db import conn from core.youtube import scan_all_channels from core.pipeline import process_pending logger = logging.getLogger(__name__) +CHECK_INTERVAL = 30 # seconds between config checks -def run_once() -> None: - """Single daemon cycle: scan channels then process pending videos.""" - logger.info("=== Daemon cycle start ===") + +def _get_config() -> dict | None: + """Read daemon config from DB.""" try: - new_count = scan_all_channels() - logger.info("Scan complete: %d new videos", new_count) + with conn() as c: + cur = c.cursor() + cur.execute( + "SELECT scan_enabled, scan_interval_min, process_enabled, " + "process_interval_min, process_limit, last_scan_at, last_process_at " + "FROM daemon_config WHERE id = 1" + ) + row = cur.fetchone() + if not row: + return None + return { + "scan_enabled": bool(row[0]), + "scan_interval_min": row[1], + "process_enabled": bool(row[2]), + "process_interval_min": row[3], + "process_limit": row[4], + "last_scan_at": row[5], + "last_process_at": row[6], + } except Exception as e: - logger.error("Channel scan failed: %s", e) - - try: - rest_count = process_pending(limit=10) - logger.info("Processing complete: %d restaurants extracted", rest_count) - except Exception as e: - logger.error("Video processing failed: %s", e) - - logger.info("=== Daemon cycle end ===") + logger.error("Failed to read daemon config: %s", e) + return None -def run_loop(interval: int = 3600) -> None: - """Run daemon in a loop with configurable interval (default 1 hour).""" - logger.info("Daemon started (interval=%ds)", interval) +def _should_run(last_at: datetime | None, interval_min: int) -> bool: + """Check if enough time has passed since last run.""" + if last_at is None: + return True + now = datetime.utcnow() + # Oracle TIMESTAMP comes as datetime + return now - last_at >= timedelta(minutes=interval_min) + + +def _update_last(field: str) -> None: + """Update last_scan_at or last_process_at.""" + with conn() as c: + c.cursor().execute( + f"UPDATE daemon_config SET {field} = SYSTIMESTAMP WHERE id = 1" + ) + + +def run_once_if_due() -> None: + """Check config and run tasks if their schedule is due.""" + cfg = _get_config() + if not cfg: + return + + if cfg["scan_enabled"] and _should_run(cfg["last_scan_at"], cfg["scan_interval_min"]): + logger.info("=== Scheduled scan start ===") + try: + new_count = scan_all_channels() + logger.info("Scan complete: %d new videos", new_count) + _update_last("last_scan_at") + except Exception as e: + logger.error("Channel scan failed: %s", e) + + if cfg["process_enabled"] and _should_run(cfg["last_process_at"], cfg["process_interval_min"]): + logger.info("=== Scheduled processing start ===") + try: + rest_count = process_pending(limit=cfg["process_limit"]) + logger.info("Processing complete: %d restaurants extracted", rest_count) + _update_last("last_process_at") + except Exception as e: + logger.error("Video processing failed: %s", e) + + +def run_loop() -> None: + """Run daemon loop, checking config every CHECK_INTERVAL seconds.""" + logger.info("Daemon started (config-driven, check every %ds)", CHECK_INTERVAL) while True: - run_once() - time.sleep(interval) + run_once_if_due() + time.sleep(CHECK_INTERVAL) diff --git a/backend/run_daemon.py b/backend/run_daemon.py index 214ca55..9696c97 100644 --- a/backend/run_daemon.py +++ b/backend/run_daemon.py @@ -1,7 +1,6 @@ """Run the daemon worker.""" import logging -import os from dotenv import load_dotenv load_dotenv() @@ -14,5 +13,4 @@ logging.basicConfig( ) if __name__ == "__main__": - interval = int(os.environ.get("DAEMON_INTERVAL", "3600")) - run_loop(interval) + run_loop()