"""Video API routes.""" from __future__ import annotations import asyncio import json as _json import logging import random import time from concurrent.futures import ThreadPoolExecutor from fastapi import APIRouter, Query from fastapi.responses import StreamingResponse from core.db import conn from core.pipeline import process_pending logger = logging.getLogger(__name__) router = APIRouter() _executor = ThreadPoolExecutor(max_workers=4) @router.get("") def list_videos( status: str | None = None, limit: int = Query(50, le=500), offset: int = Query(0, ge=0), ): conditions = [] params: dict = {"lim": limit, "off": offset} if status: conditions.append("v.status = :st") params["st"] = status where = ("WHERE " + " AND ".join(conditions)) if conditions else "" sql = f""" SELECT v.id, v.video_id, v.title, v.url, v.status, v.published_at, c.channel_name, CASE WHEN v.transcript_text IS NOT NULL AND dbms_lob.getlength(v.transcript_text) > 0 THEN 1 ELSE 0 END as has_transcript, CASE WHEN v.llm_raw_response IS NOT NULL AND dbms_lob.getlength(v.llm_raw_response) > 0 THEN 1 ELSE 0 END as has_llm, (SELECT COUNT(*) FROM video_restaurants vr WHERE vr.video_id = v.id) as restaurant_count, (SELECT COUNT(*) FROM video_restaurants vr JOIN restaurants r ON r.id = vr.restaurant_id WHERE vr.video_id = v.id AND r.google_place_id IS NOT NULL) as matched_count FROM videos v JOIN channels c ON c.id = v.channel_id {where} ORDER BY v.published_at DESC NULLS LAST OFFSET :off ROWS FETCH NEXT :lim ROWS ONLY """ with conn() as c: cur = c.cursor() cur.execute(sql, params) cols = [d[0].lower() for d in cur.description] rows = cur.fetchall() results = [] for row in rows: d = dict(zip(cols, row)) if d.get("published_at"): d["published_at"] = d["published_at"].isoformat() d["has_transcript"] = bool(d.get("has_transcript")) d["has_llm"] = bool(d.get("has_llm")) d["restaurant_count"] = d.get("restaurant_count", 0) d["matched_count"] = d.get("matched_count", 0) results.append(d) return results def _get_unprocessed_videos() -> list[dict]: """Get videos that have transcripts but no LLM extraction.""" sql = """ SELECT v.id, v.video_id, v.title, v.url, v.transcript_text FROM videos v WHERE v.transcript_text IS NOT NULL AND dbms_lob.getlength(v.transcript_text) > 0 AND (v.llm_raw_response IS NULL OR dbms_lob.getlength(v.llm_raw_response) = 0) AND v.status != 'skip' ORDER BY v.published_at DESC """ with conn() as c: cur = c.cursor() cur.execute(sql) rows = cur.fetchall() result = [] for r in rows: transcript = r[4] if hasattr(transcript, "read"): transcript = transcript.read() result.append({ "id": r[0], "video_id": r[1], "title": r[2], "url": r[3], "transcript": transcript, }) return result @router.get("/bulk-extract/pending") def bulk_extract_pending_count(): """Get count of videos pending LLM extraction.""" videos = _get_unprocessed_videos() return {"count": len(videos), "videos": [{"id": v["id"], "title": v["title"]} for v in videos]} @router.post("/bulk-extract") def bulk_extract(): """Process all unextracted videos with random delays. Streams SSE progress.""" from core.pipeline import process_video_extract videos = _get_unprocessed_videos() def generate(): total = len(videos) total_restaurants = 0 yield f"data: {_json.dumps({'type': 'start', 'total': total})}\n\n" for i, v in enumerate(videos): # Random delay (3-8 seconds) between requests to avoid bot detection if i > 0: delay = random.uniform(3.0, 8.0) yield f"data: {_json.dumps({'type': 'wait', 'index': i, 'delay': round(delay, 1)})}\n\n" time.sleep(delay) yield f"data: {_json.dumps({'type': 'processing', 'index': i, 'title': v['title']})}\n\n" try: count = process_video_extract( {"id": v["id"], "video_id": v["video_id"], "title": v["title"], "url": v["url"]}, v["transcript"], ) total_restaurants += count yield f"data: {_json.dumps({'type': 'done', 'index': i, 'title': v['title'], 'restaurants': count})}\n\n" except Exception as e: logger.error("Bulk extract error for %s: %s", v["video_id"], e) yield f"data: {_json.dumps({'type': 'error', 'index': i, 'title': v['title'], 'message': str(e)})}\n\n" yield f"data: {_json.dumps({'type': 'complete', 'total': total, 'total_restaurants': total_restaurants})}\n\n" return StreamingResponse(generate(), media_type="text/event-stream") def _get_no_transcript_videos() -> list[dict]: """Get videos that have no transcript yet.""" sql = """ SELECT v.id, v.video_id, v.title FROM videos v WHERE (v.transcript_text IS NULL OR dbms_lob.getlength(v.transcript_text) = 0) AND v.status != 'skip' ORDER BY v.published_at DESC """ with conn() as c: cur = c.cursor() cur.execute(sql) return [{"id": r[0], "video_id": r[1], "title": r[2]} for r in cur.fetchall()] @router.get("/bulk-transcript/pending") def bulk_transcript_pending_count(): """Get count of videos without transcripts.""" videos = _get_no_transcript_videos() return {"count": len(videos), "videos": [{"id": v["id"], "title": v["title"]} for v in videos]} @router.post("/bulk-transcript") def bulk_transcript(): """Fetch transcripts for all videos missing them. Streams SSE progress.""" from core.youtube import get_transcript videos = _get_no_transcript_videos() def generate(): total = len(videos) success = 0 yield f"data: {_json.dumps({'type': 'start', 'total': total})}\n\n" for i, v in enumerate(videos): # Random delay (5-15 seconds) to avoid bot detection — longer than LLM since this hits YouTube if i > 0: delay = random.uniform(5.0, 15.0) yield f"data: {_json.dumps({'type': 'wait', 'index': i, 'delay': round(delay, 1)})}\n\n" time.sleep(delay) yield f"data: {_json.dumps({'type': 'processing', 'index': i, 'title': v['title']})}\n\n" try: transcript, source = get_transcript(v["video_id"]) if transcript: with conn() as c: cur = c.cursor() cur.execute( "UPDATE videos SET transcript_text = :txt WHERE id = :vid", {"txt": transcript, "vid": v["id"]}, ) success += 1 yield f"data: {_json.dumps({'type': 'done', 'index': i, 'title': v['title'], 'source': source, 'length': len(transcript)})}\n\n" else: yield f"data: {_json.dumps({'type': 'error', 'index': i, 'title': v['title'], 'message': 'No transcript available'})}\n\n" except Exception as e: logger.error("Bulk transcript error for %s: %s", v["video_id"], e) yield f"data: {_json.dumps({'type': 'error', 'index': i, 'title': v['title'], 'message': str(e)})}\n\n" yield f"data: {_json.dumps({'type': 'complete', 'total': total, 'success': success})}\n\n" return StreamingResponse(generate(), media_type="text/event-stream") @router.get("/extract/prompt") def get_extract_prompt(): """Get the current LLM extraction prompt template.""" from core.extractor import _EXTRACT_PROMPT return {"prompt": _EXTRACT_PROMPT} def _do_process(limit: int): return {"restaurants_extracted": process_pending(limit)} @router.post("/process") async def trigger_processing(limit: int = Query(5, le=20)): """Manually trigger processing of pending videos (non-blocking).""" loop = asyncio.get_event_loop() return await loop.run_in_executor(_executor, _do_process, limit) @router.get("/{video_db_id}") def get_video_detail(video_db_id: str): """Get video detail including transcript and extracted restaurants.""" from fastapi import HTTPException import json with conn() as c: cur = c.cursor() cur.execute(""" SELECT v.id, v.video_id, v.title, v.url, v.status, v.published_at, v.transcript_text, c.channel_name FROM videos v JOIN channels c ON c.id = v.channel_id WHERE v.id = :vid """, {"vid": video_db_id}) row = cur.fetchone() if not row: raise HTTPException(404, "Video not found") transcript = row[6] if hasattr(transcript, "read"): transcript = transcript.read() # Get extracted restaurants for this video cur.execute(""" SELECT r.id, r.name, r.address, r.cuisine_type, r.price_range, r.region, vr.foods_mentioned, vr.evaluation, vr.guests, r.google_place_id, r.latitude, r.longitude FROM video_restaurants vr JOIN restaurants r ON r.id = vr.restaurant_id WHERE vr.video_id = :vid """, {"vid": video_db_id}) restaurants = [] for rr in cur.fetchall(): foods_raw = rr[6].read() if hasattr(rr[6], "read") else rr[6] eval_raw = rr[7].read() if hasattr(rr[7], "read") else rr[7] guests_raw = rr[8].read() if hasattr(rr[8], "read") else rr[8] def parse_json(val, default): if val is None: return default if isinstance(val, (list, dict)): return val try: return json.loads(val) except (json.JSONDecodeError, ValueError): return default restaurants.append({ "restaurant_id": rr[0], "name": rr[1], "address": rr[2], "cuisine_type": rr[3], "price_range": rr[4], "region": rr[5], "foods_mentioned": parse_json(foods_raw, []), "evaluation": parse_json(eval_raw, {}), "guests": parse_json(guests_raw, []), "google_place_id": rr[9], "has_location": rr[10] is not None and rr[11] is not None, }) return { "id": row[0], "video_id": row[1], "title": row[2], "url": row[3], "status": row[4], "published_at": row[5].isoformat() if row[5] else None, "transcript": transcript, "channel_name": row[7], "restaurants": restaurants, } def _do_fetch_transcript(video_db_id: str, mode: str): from core.youtube import get_transcript with conn() as c: cur = c.cursor() cur.execute("SELECT video_id FROM videos WHERE id = :vid", {"vid": video_db_id}) row = cur.fetchone() if not row: return {"error": 404, "detail": "Video not found"} video_id = row[0] transcript, source = get_transcript(video_id, mode=mode) if not transcript: return {"error": 422, "detail": "Transcript unavailable for this video"} with conn() as c: cur = c.cursor() cur.execute( "UPDATE videos SET transcript_text = :txt WHERE id = :vid", {"txt": transcript, "vid": video_db_id}, ) return {"ok": True, "length": len(transcript), "source": source} @router.post("/{video_db_id}/fetch-transcript") async def fetch_transcript(video_db_id: str, mode: str = Query("auto")): """Fetch and save transcript for a video (non-blocking).""" from fastapi import HTTPException if mode not in ("auto", "manual", "generated"): raise HTTPException(400, "mode must be auto, manual, or generated") loop = asyncio.get_event_loop() result = await loop.run_in_executor(_executor, _do_fetch_transcript, video_db_id, mode) if "error" in result: raise HTTPException(result["error"], result["detail"]) return result def _do_extract(video_db_id: str, custom_prompt: str | None): from core.pipeline import process_video_extract with conn() as c: cur = c.cursor() cur.execute( "SELECT id, video_id, title, url, transcript_text FROM videos WHERE id = :vid", {"vid": video_db_id}, ) row = cur.fetchone() if not row: return {"error": 404, "detail": "Video not found"} transcript = row[4] if hasattr(transcript, "read"): transcript = transcript.read() if not transcript: return {"error": 422, "detail": "No transcript available for this video"} count = process_video_extract( {"id": row[0], "video_id": row[1], "title": row[2], "url": row[3]}, transcript, custom_prompt=custom_prompt, ) return {"ok": True, "restaurants_extracted": count} @router.post("/{video_db_id}/extract") async def extract_restaurants_from_video(video_db_id: str, body: dict = None): """Run LLM extraction on an existing transcript (non-blocking).""" from fastapi import HTTPException custom_prompt = body.get("prompt") if body else None loop = asyncio.get_event_loop() result = await loop.run_in_executor(_executor, _do_extract, video_db_id, custom_prompt) if "error" in result: raise HTTPException(result["error"], result["detail"]) return result @router.post("/{video_db_id}/skip") def skip_video(video_db_id: str): """Mark a video as skipped.""" from fastapi import HTTPException with conn() as c: cur = c.cursor() cur.execute( "UPDATE videos SET status = 'skip' WHERE id = :vid", {"vid": video_db_id}, ) if cur.rowcount == 0: raise HTTPException(404, "Video not found") return {"ok": True} @router.delete("/{video_db_id}") def delete_video(video_db_id: str): """Delete a video and its related data.""" from core.db import conn as get_conn with get_conn() as c: cur = c.cursor() # Delete vector embeddings for restaurants only linked to this video cur.execute(""" DELETE FROM restaurant_vectors WHERE restaurant_id IN ( SELECT vr.restaurant_id FROM video_restaurants vr WHERE vr.video_id = :vid AND NOT EXISTS ( SELECT 1 FROM video_restaurants vr2 WHERE vr2.restaurant_id = vr.restaurant_id AND vr2.video_id != :vid ) ) """, {"vid": video_db_id}) # Delete reviews for restaurants only linked to this video cur.execute(""" DELETE FROM user_reviews WHERE restaurant_id IN ( SELECT vr.restaurant_id FROM video_restaurants vr WHERE vr.video_id = :vid AND NOT EXISTS ( SELECT 1 FROM video_restaurants vr2 WHERE vr2.restaurant_id = vr.restaurant_id AND vr2.video_id != :vid ) ) """, {"vid": video_db_id}) # Delete restaurants only linked to this video cur.execute(""" DELETE FROM restaurants WHERE id IN ( SELECT vr.restaurant_id FROM video_restaurants vr WHERE vr.video_id = :vid AND NOT EXISTS ( SELECT 1 FROM video_restaurants vr2 WHERE vr2.restaurant_id = vr.restaurant_id AND vr2.video_id != :vid ) ) """, {"vid": video_db_id}) # Delete video-restaurant links cur.execute("DELETE FROM video_restaurants WHERE video_id = :vid", {"vid": video_db_id}) # Delete the video cur.execute("DELETE FROM videos WHERE id = :vid", {"vid": video_db_id}) if cur.rowcount == 0: from fastapi import HTTPException raise HTTPException(404, "Video not found") return {"ok": True} @router.put("/{video_db_id}") def update_video(video_db_id: str, body: dict): """Update video title.""" from fastapi import HTTPException title = body.get("title") if not title: raise HTTPException(400, "title is required") with conn() as c: cur = c.cursor() cur.execute( "UPDATE videos SET title = :title WHERE id = :vid", {"title": title, "vid": video_db_id}, ) if cur.rowcount == 0: raise HTTPException(404, "Video not found") return {"ok": True} @router.delete("/{video_db_id}/restaurants/{restaurant_id}") def delete_video_restaurant(video_db_id: str, restaurant_id: str): """Delete a video-restaurant mapping. Also cleans up orphaned restaurant.""" from fastapi import HTTPException with conn() as c: cur = c.cursor() cur.execute( "DELETE FROM video_restaurants WHERE video_id = :vid AND restaurant_id = :rid", {"vid": video_db_id, "rid": restaurant_id}, ) if cur.rowcount == 0: raise HTTPException(404, "Mapping not found") # Clean up orphaned restaurant (no other video links) cur.execute(""" DELETE FROM restaurant_vectors WHERE restaurant_id = :rid AND NOT EXISTS (SELECT 1 FROM video_restaurants WHERE restaurant_id = :rid) """, {"rid": restaurant_id}) cur.execute(""" DELETE FROM user_reviews WHERE restaurant_id = :rid AND NOT EXISTS (SELECT 1 FROM video_restaurants WHERE restaurant_id = :rid) """, {"rid": restaurant_id}) cur.execute(""" DELETE FROM restaurants WHERE id = :rid AND NOT EXISTS (SELECT 1 FROM video_restaurants WHERE restaurant_id = :rid) """, {"rid": restaurant_id}) return {"ok": True} @router.post("/{video_db_id}/restaurants/manual") def add_manual_restaurant(video_db_id: str, body: dict): """Manually add a restaurant and link it to a video.""" from fastapi import HTTPException from core import restaurant as rest_mod from core.geocoding import geocode_restaurant name = body.get("name", "").strip() if not name: raise HTTPException(400, "Restaurant name is required") address = body.get("address", "").strip() or None region = body.get("region", "").strip() or None cuisine_type = body.get("cuisine_type", "").strip() or None price_range = body.get("price_range", "").strip() or None foods = body.get("foods_mentioned", []) evaluation = body.get("evaluation", "").strip() or None guests = body.get("guests", []) # Geocode to get lat/lng and Google data geo = geocode_restaurant(name, address or region or "") if not geo: raise HTTPException(400, f"'{name}' 위치를 찾을 수 없습니다. 주소를 입력해주세요.") rid = rest_mod.upsert( name=name, address=geo.get("formatted_address") or address, region=region, latitude=geo["latitude"], longitude=geo["longitude"], cuisine_type=cuisine_type, price_range=price_range, google_place_id=geo.get("google_place_id"), phone=geo.get("phone"), website=geo.get("website"), business_status=geo.get("business_status"), rating=geo.get("rating"), rating_count=geo.get("rating_count"), ) link_id = rest_mod.link_video_restaurant( video_db_id=video_db_id, restaurant_id=rid, foods=foods if isinstance(foods, list) else [], evaluation=evaluation, guests=guests if isinstance(guests, list) else [], ) return {"ok": True, "restaurant_id": rid, "link_id": link_id} @router.put("/{video_db_id}/restaurants/{restaurant_id}") def update_video_restaurant(video_db_id: str, restaurant_id: str, body: dict): """Update restaurant info linked to a video. If name changed, re-geocode and remap to a new restaurant record. """ from fastapi import HTTPException import json as _json # Check if name changed — need to remap new_name = body.get("name", "").strip() if "name" in body else None if new_name: with conn() as c: cur = c.cursor() cur.execute("SELECT name FROM restaurants WHERE id = :rid", {"rid": restaurant_id}) row = cur.fetchone() old_name = row[0] if row else "" if old_name != new_name: # Name changed: geocode new restaurant, remap from core import restaurant as rest_mod from core.geocoding import geocode_restaurant address = body.get("address", "").strip() or body.get("region", "").strip() or "" geo = geocode_restaurant(new_name, address) if not geo: raise HTTPException(400, f"'{new_name}' 위치를 찾을 수 없습니다.") new_rid = rest_mod.upsert( name=new_name, address=geo.get("formatted_address") or body.get("address"), region=body.get("region"), latitude=geo["latitude"], longitude=geo["longitude"], cuisine_type=body.get("cuisine_type"), price_range=body.get("price_range"), google_place_id=geo.get("google_place_id"), phone=geo.get("phone"), website=geo.get("website"), business_status=geo.get("business_status"), rating=geo.get("rating"), rating_count=geo.get("rating_count"), ) # Read existing mapping data, delete old, create new with conn() as c: cur = c.cursor() cur.execute( "SELECT foods_mentioned, evaluation, guests FROM video_restaurants WHERE video_id = :vid AND restaurant_id = :rid", {"vid": video_db_id, "rid": restaurant_id}, ) old_vr = cur.fetchone() cur.execute( "DELETE FROM video_restaurants WHERE video_id = :vid AND restaurant_id = :rid", {"vid": video_db_id, "rid": restaurant_id}, ) # Build new mapping values from body or old data def _parse(val, default): if val is None: return default if hasattr(val, "read"): val = val.read() if isinstance(val, (list, dict)): return val try: return _json.loads(val) except Exception: return default old_foods = _parse(old_vr[0], []) if old_vr else [] old_eval = _parse(old_vr[1], {}) if old_vr else {} old_guests = _parse(old_vr[2], []) if old_vr else [] foods = body.get("foods_mentioned", old_foods) evaluation = body.get("evaluation", old_eval) guests = body.get("guests", old_guests) eval_text = evaluation.get("text", "") if isinstance(evaluation, dict) else str(evaluation or "") rest_mod.link_video_restaurant( video_db_id=video_db_id, restaurant_id=new_rid, foods=foods if isinstance(foods, list) else [], evaluation=eval_text or None, guests=guests if isinstance(guests, list) else [], ) return {"ok": True, "remapped": True, "new_restaurant_id": new_rid} # No name change — update in place with conn() as c: cur = c.cursor() r_sets = [] r_params: dict = {"rid": restaurant_id} for field in ("name", "address", "region", "cuisine_type", "price_range"): if field in body: r_sets.append(f"{field} = :{field}") r_params[field] = body[field] if r_sets: r_sets.append("updated_at = SYSTIMESTAMP") sql = f"UPDATE restaurants SET {', '.join(r_sets)} WHERE id = :rid" cur.execute(sql, r_params) vr_params: dict = {"vid": video_db_id, "rid": restaurant_id} vr_sets = [] for field in ("foods_mentioned", "evaluation", "guests"): if field in body: vr_sets.append(f"{field} = :{field}") val = body[field] vr_params[field] = _json.dumps(val, ensure_ascii=False) if isinstance(val, (list, dict)) else val if vr_sets: sql = f"UPDATE video_restaurants SET {', '.join(vr_sets)} WHERE video_id = :vid AND restaurant_id = :rid" cur.execute(sql, vr_params) return {"ok": True}