diff --git a/backend/api/main.py b/backend/api/main.py index 368e567..4dd1d82 100644 --- a/backend/api/main.py +++ b/backend/api/main.py @@ -1,9 +1,11 @@ """FastAPI application entry point.""" +from __future__ import annotations + from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware -from api.routes import restaurants, channels, videos, search, auth, reviews +from api.routes import restaurants, channels, videos, search, auth, reviews, admin_users, stats app = FastAPI( title="Tasteby API", @@ -25,6 +27,8 @@ app.include_router(videos.router, prefix="/api/videos", tags=["videos"]) app.include_router(search.router, prefix="/api/search", tags=["search"]) app.include_router(auth.router, prefix="/api/auth", tags=["auth"]) app.include_router(reviews.router, prefix="/api", tags=["reviews"]) +app.include_router(admin_users.router, prefix="/api/admin/users", tags=["admin-users"]) +app.include_router(stats.router, prefix="/api/stats", tags=["stats"]) @app.get("/api/health") diff --git a/backend/api/routes/admin_users.py b/backend/api/routes/admin_users.py new file mode 100644 index 0000000..39bbe1c --- /dev/null +++ b/backend/api/routes/admin_users.py @@ -0,0 +1,93 @@ +"""Admin user management API routes.""" + +from __future__ import annotations + +from fastapi import APIRouter, Query + +from core.db import conn + +router = APIRouter() + + +@router.get("") +def list_users( + limit: int = Query(50, le=200), + offset: int = Query(0, ge=0), +): + """List all users with favorite/review counts.""" + sql = """ + SELECT u.id, u.email, u.nickname, u.avatar_url, u.provider, u.created_at, + NVL(fav.cnt, 0) AS favorite_count, + NVL(rev.cnt, 0) AS review_count + FROM tasteby_users u + LEFT JOIN ( + SELECT user_id, COUNT(*) AS cnt FROM user_favorites GROUP BY user_id + ) fav ON fav.user_id = u.id + LEFT JOIN ( + SELECT user_id, COUNT(*) AS cnt FROM user_reviews GROUP BY user_id + ) rev ON rev.user_id = u.id + ORDER BY u.created_at DESC + OFFSET :off ROWS FETCH NEXT :lim ROWS ONLY + """ + count_sql = "SELECT COUNT(*) FROM tasteby_users" + with conn() as c: + cur = c.cursor() + cur.execute(count_sql) + total = cur.fetchone()[0] + cur.execute(sql, {"off": offset, "lim": limit}) + cols = [d[0].lower() for d in cur.description] + rows = [dict(zip(cols, row)) for row in cur.fetchall()] + for r in rows: + if r.get("created_at"): + r["created_at"] = r["created_at"].isoformat() + return {"users": rows, "total": total} + + +@router.get("/{user_id}/favorites") +def get_user_favorites(user_id: str): + """Get a user's favorite restaurants.""" + sql = """ + SELECT r.id, r.name, r.address, r.region, r.cuisine_type, + r.rating, r.business_status, f.created_at + FROM user_favorites f + JOIN restaurants r ON r.id = f.restaurant_id + WHERE f.user_id = :u + ORDER BY f.created_at DESC + """ + with conn() as c: + cur = c.cursor() + cur.execute(sql, {"u": user_id}) + cols = [d[0].lower() for d in cur.description] + rows = [dict(zip(cols, row)) for row in cur.fetchall()] + for r in rows: + if r.get("created_at"): + r["created_at"] = r["created_at"].isoformat() + return rows + + +@router.get("/{user_id}/reviews") +def get_user_reviews(user_id: str): + """Get a user's reviews with restaurant names.""" + sql = """ + SELECT r.id, r.restaurant_id, r.rating, r.review_text, + r.visited_at, r.created_at, + rest.name AS restaurant_name + FROM user_reviews r + LEFT JOIN restaurants rest ON rest.id = r.restaurant_id + WHERE r.user_id = :u + ORDER BY r.created_at DESC + """ + with conn() as c: + cur = c.cursor() + cur.execute(sql, {"u": user_id}) + cols = [d[0].lower() for d in cur.description] + rows = [dict(zip(cols, row)) for row in cur.fetchall()] + for r in rows: + # Handle CLOB + if hasattr(r.get("review_text"), "read"): + r["review_text"] = r["review_text"].read() + if r.get("visited_at"): + r["visited_at"] = r["visited_at"].isoformat() + if r.get("created_at"): + r["created_at"] = r["created_at"].isoformat() + return rows diff --git a/backend/api/routes/channels.py b/backend/api/routes/channels.py index 507d595..00961d8 100644 --- a/backend/api/routes/channels.py +++ b/backend/api/routes/channels.py @@ -1,16 +1,24 @@ """Channel API routes.""" +from __future__ import annotations + +import asyncio +from concurrent.futures import ThreadPoolExecutor + from fastapi import APIRouter, HTTPException from pydantic import BaseModel from core import youtube +_executor = ThreadPoolExecutor(max_workers=4) + router = APIRouter() class ChannelCreate(BaseModel): channel_id: str channel_name: str + title_filter: str | None = None @router.get("") @@ -21,7 +29,7 @@ def list_channels(): @router.post("", status_code=201) def create_channel(body: ChannelCreate): try: - row_id = youtube.add_channel(body.channel_id, body.channel_name) + row_id = youtube.add_channel(body.channel_id, body.channel_name, body.title_filter) return {"id": row_id, "channel_id": body.channel_id} except Exception as e: if "UQ_CHANNELS_CID" in str(e).upper(): @@ -29,18 +37,53 @@ def create_channel(body: ChannelCreate): raise -@router.post("/{channel_id}/scan") -def scan_channel(channel_id: str): - """Trigger a scan for new videos from this channel.""" +def _do_scan(channel_id: str, full: bool): + """Sync scan logic, runs in thread pool.""" channels = youtube.get_active_channels() ch = next((c for c in channels if c["channel_id"] == channel_id), None) if not ch: - raise HTTPException(404, "Channel not found") + return None - videos = youtube.fetch_channel_videos(channel_id, max_results=50) - new_count = 0 - for v in videos: - row_id = youtube.save_video(ch["id"], v) - if row_id: - new_count += 1 - return {"total_fetched": len(videos), "new_videos": new_count} + after = None if full else youtube.get_latest_video_date(ch["id"]) + title_filter = ch.get("title_filter") + existing_vids = youtube.get_existing_video_ids(ch["id"]) + + candidates = [] + total_fetched = 0 + for videos_page in youtube.fetch_channel_videos_iter(channel_id, published_after=after): + total_fetched += len(videos_page) + new_in_page = 0 + for v in videos_page: + if title_filter and title_filter not in v["title"]: + continue + if v["video_id"] in existing_vids: + continue + candidates.append(v) + new_in_page += 1 + if not full and new_in_page == 0 and total_fetched > 50: + break + + new_count = youtube.save_videos_batch(ch["id"], candidates) + return {"total_fetched": total_fetched, "new_videos": new_count} + + +@router.post("/{channel_id}/scan") +async def scan_channel(channel_id: str, full: bool = False): + """Trigger a scan for new videos from this channel (non-blocking).""" + loop = asyncio.get_event_loop() + result = await loop.run_in_executor(_executor, _do_scan, channel_id, full) + if result is None: + raise HTTPException(404, "Channel not found") + return result + + +@router.delete("/{channel_id:path}") +def delete_channel(channel_id: str): + """Deactivate a channel. Accepts channel_id or DB id.""" + deleted = youtube.deactivate_channel(channel_id) + if not deleted: + # Try by DB id + deleted = youtube.deactivate_channel_by_db_id(channel_id) + if not deleted: + raise HTTPException(404, "Channel not found") + return {"ok": True} diff --git a/backend/api/routes/restaurants.py b/backend/api/routes/restaurants.py index f65fd1b..dfbbd7d 100644 --- a/backend/api/routes/restaurants.py +++ b/backend/api/routes/restaurants.py @@ -1,5 +1,7 @@ """Restaurant API routes.""" +from __future__ import annotations + from fastapi import APIRouter, HTTPException, Query from core import restaurant @@ -13,8 +15,9 @@ def list_restaurants( offset: int = Query(0, ge=0), cuisine: str | None = None, region: str | None = None, + channel: str | None = None, ): - return restaurant.get_all(limit=limit, offset=offset, cuisine=cuisine, region=region) + return restaurant.get_all(limit=limit, offset=offset, cuisine=cuisine, region=region, channel=channel) @router.get("/{restaurant_id}") @@ -25,6 +28,45 @@ def get_restaurant(restaurant_id: str): return r +@router.put("/{restaurant_id}") +def update_restaurant(restaurant_id: str, body: dict): + from core.db import conn + r = restaurant.get_by_id(restaurant_id) + if not r: + raise HTTPException(404, "Restaurant not found") + + allowed = ("name", "address", "region", "cuisine_type", "price_range", + "phone", "website", "latitude", "longitude") + sets = [] + params: dict = {"rid": restaurant_id} + for field in allowed: + if field in body: + sets.append(f"{field} = :{field}") + params[field] = body[field] if body[field] != "" else None + if not sets: + raise HTTPException(400, "No fields to update") + sets.append("updated_at = SYSTIMESTAMP") + sql = f"UPDATE restaurants SET {', '.join(sets)} WHERE id = :rid" + with conn() as c: + c.cursor().execute(sql, params) + return {"ok": True} + + +@router.delete("/{restaurant_id}") +def delete_restaurant(restaurant_id: str): + from core.db import conn + r = restaurant.get_by_id(restaurant_id) + if not r: + raise HTTPException(404, "Restaurant not found") + with conn() as c: + cur = c.cursor() + cur.execute("DELETE FROM restaurant_vectors WHERE restaurant_id = :rid", {"rid": restaurant_id}) + cur.execute("DELETE FROM user_reviews WHERE restaurant_id = :rid", {"rid": restaurant_id}) + cur.execute("DELETE FROM video_restaurants WHERE restaurant_id = :rid", {"rid": restaurant_id}) + cur.execute("DELETE FROM restaurants WHERE id = :rid", {"rid": restaurant_id}) + return {"ok": True} + + @router.get("/{restaurant_id}/videos") def get_restaurant_videos(restaurant_id: str): r = restaurant.get_by_id(restaurant_id) diff --git a/backend/api/routes/reviews.py b/backend/api/routes/reviews.py index e96d743..f52f79e 100644 --- a/backend/api/routes/reviews.py +++ b/backend/api/routes/reviews.py @@ -95,3 +95,74 @@ def list_my_reviews( ): """List current user's reviews.""" return review.get_user_reviews(current_user["sub"], limit=limit, offset=offset) + + +# --- Favorites --- + +@router.get("/restaurants/{restaurant_id}/favorite") +def get_favorite_status( + restaurant_id: str, + current_user: dict = Depends(get_current_user), +): + """Check if current user has favorited this restaurant.""" + from core.db import conn + with conn() as c: + cur = c.cursor() + cur.execute( + "SELECT id FROM user_favorites WHERE user_id = :u AND restaurant_id = :r", + {"u": current_user["sub"], "r": restaurant_id}, + ) + return {"favorited": cur.fetchone() is not None} + + +@router.post("/restaurants/{restaurant_id}/favorite") +def toggle_favorite( + restaurant_id: str, + current_user: dict = Depends(get_current_user), +): + """Toggle favorite. Returns new state.""" + from core.db import conn + user_id = current_user["sub"] + with conn() as c: + cur = c.cursor() + cur.execute( + "SELECT id FROM user_favorites WHERE user_id = :u AND restaurant_id = :r", + {"u": user_id, "r": restaurant_id}, + ) + row = cur.fetchone() + if row: + cur.execute("DELETE FROM user_favorites WHERE id = :fid", {"fid": row[0]}) + return {"favorited": False} + else: + cur.execute( + "INSERT INTO user_favorites (user_id, restaurant_id) VALUES (:u, :r)", + {"u": user_id, "r": restaurant_id}, + ) + return {"favorited": True} + + +@router.get("/users/me/favorites") +def list_my_favorites( + current_user: dict = Depends(get_current_user), +): + """List current user's favorite restaurants.""" + from core.db import conn + sql = """ + SELECT r.id, r.name, r.address, r.region, r.latitude, r.longitude, + r.cuisine_type, r.price_range, r.google_place_id, + r.business_status, r.rating, r.rating_count, + f.created_at + FROM user_favorites f + JOIN restaurants r ON r.id = f.restaurant_id + WHERE f.user_id = :u + ORDER BY f.created_at DESC + """ + with conn() as c: + cur = c.cursor() + cur.execute(sql, {"u": current_user["sub"]}) + cols = [d[0].lower() for d in cur.description] + rows = [dict(zip(cols, row)) for row in cur.fetchall()] + for r in rows: + if r.get("created_at"): + r["created_at"] = r["created_at"].isoformat() + return rows diff --git a/backend/api/routes/search.py b/backend/api/routes/search.py index bba36d1..7687237 100644 --- a/backend/api/routes/search.py +++ b/backend/api/routes/search.py @@ -1,5 +1,7 @@ """Search API routes — keyword + semantic search.""" +from __future__ import annotations + from fastapi import APIRouter, Query from core import restaurant, vector diff --git a/backend/api/routes/stats.py b/backend/api/routes/stats.py new file mode 100644 index 0000000..41473e9 --- /dev/null +++ b/backend/api/routes/stats.py @@ -0,0 +1,43 @@ +"""Site visit statistics API.""" + +from __future__ import annotations + +from fastapi import APIRouter + +from core.db import conn + +router = APIRouter() + + +@router.post("/visit") +def record_visit(): + """Record a page visit. Increments today's count.""" + sql = """ + MERGE INTO site_visits sv + USING (SELECT TRUNC(SYSDATE) AS d FROM dual) src + ON (sv.visit_date = src.d) + WHEN MATCHED THEN UPDATE SET sv.visit_count = sv.visit_count + 1 + WHEN NOT MATCHED THEN INSERT (visit_date, visit_count) VALUES (src.d, 1) + """ + with conn() as c: + c.cursor().execute(sql) + return {"ok": True} + + +@router.get("/visits") +def get_visits(): + """Return today's visit count and all-time total.""" + sql_today = """ + SELECT NVL(visit_count, 0) FROM site_visits WHERE visit_date = TRUNC(SYSDATE) + """ + sql_total = """ + SELECT NVL(SUM(visit_count), 0) FROM site_visits + """ + with conn() as c: + cur = c.cursor() + cur.execute(sql_today) + row = cur.fetchone() + today = int(row[0]) if row else 0 + cur.execute(sql_total) + total = int(cur.fetchone()[0]) + return {"today": today, "total": total} diff --git a/backend/api/routes/videos.py b/backend/api/routes/videos.py index f4eaba5..8c4bb06 100644 --- a/backend/api/routes/videos.py +++ b/backend/api/routes/videos.py @@ -1,17 +1,29 @@ """Video API routes.""" +from __future__ import annotations + +import asyncio +import json as _json +import logging +import random +import time +from concurrent.futures import ThreadPoolExecutor + from fastapi import APIRouter, Query +from fastapi.responses import StreamingResponse from core.db import conn from core.pipeline import process_pending +logger = logging.getLogger(__name__) router = APIRouter() +_executor = ThreadPoolExecutor(max_workers=4) @router.get("") def list_videos( status: str | None = None, - limit: int = Query(50, le=200), + limit: int = Query(50, le=500), offset: int = Query(0, ge=0), ): conditions = [] @@ -23,7 +35,11 @@ def list_videos( where = ("WHERE " + " AND ".join(conditions)) if conditions else "" sql = f""" SELECT v.id, v.video_id, v.title, v.url, v.status, - v.published_at, c.channel_name + v.published_at, c.channel_name, + CASE WHEN v.transcript_text IS NOT NULL AND dbms_lob.getlength(v.transcript_text) > 0 THEN 1 ELSE 0 END as has_transcript, + CASE WHEN v.llm_raw_response IS NOT NULL AND dbms_lob.getlength(v.llm_raw_response) > 0 THEN 1 ELSE 0 END as has_llm, + (SELECT COUNT(*) FROM video_restaurants vr WHERE vr.video_id = v.id) as restaurant_count, + (SELECT COUNT(*) FROM video_restaurants vr JOIN restaurants r ON r.id = vr.restaurant_id WHERE vr.video_id = v.id AND r.google_place_id IS NOT NULL) as matched_count FROM videos v JOIN channels c ON c.id = v.channel_id {where} @@ -41,12 +57,604 @@ def list_videos( d = dict(zip(cols, row)) if d.get("published_at"): d["published_at"] = d["published_at"].isoformat() + d["has_transcript"] = bool(d.get("has_transcript")) + d["has_llm"] = bool(d.get("has_llm")) + d["restaurant_count"] = d.get("restaurant_count", 0) + d["matched_count"] = d.get("matched_count", 0) results.append(d) return results +def _get_unprocessed_videos() -> list[dict]: + """Get videos that have transcripts but no LLM extraction.""" + sql = """ + SELECT v.id, v.video_id, v.title, v.url, v.transcript_text + FROM videos v + WHERE v.transcript_text IS NOT NULL + AND dbms_lob.getlength(v.transcript_text) > 0 + AND (v.llm_raw_response IS NULL OR dbms_lob.getlength(v.llm_raw_response) = 0) + AND v.status != 'skip' + ORDER BY v.published_at DESC + """ + with conn() as c: + cur = c.cursor() + cur.execute(sql) + rows = cur.fetchall() + result = [] + for r in rows: + transcript = r[4] + if hasattr(transcript, "read"): + transcript = transcript.read() + result.append({ + "id": r[0], "video_id": r[1], "title": r[2], "url": r[3], + "transcript": transcript, + }) + return result + + +@router.get("/bulk-extract/pending") +def bulk_extract_pending_count(): + """Get count of videos pending LLM extraction.""" + videos = _get_unprocessed_videos() + return {"count": len(videos), "videos": [{"id": v["id"], "title": v["title"]} for v in videos]} + + +@router.post("/bulk-extract") +def bulk_extract(): + """Process all unextracted videos with random delays. Streams SSE progress.""" + from core.pipeline import process_video_extract + + videos = _get_unprocessed_videos() + + def generate(): + total = len(videos) + total_restaurants = 0 + yield f"data: {_json.dumps({'type': 'start', 'total': total})}\n\n" + + for i, v in enumerate(videos): + # Random delay (3-8 seconds) between requests to avoid bot detection + if i > 0: + delay = random.uniform(3.0, 8.0) + yield f"data: {_json.dumps({'type': 'wait', 'index': i, 'delay': round(delay, 1)})}\n\n" + time.sleep(delay) + + yield f"data: {_json.dumps({'type': 'processing', 'index': i, 'title': v['title']})}\n\n" + + try: + count = process_video_extract( + {"id": v["id"], "video_id": v["video_id"], "title": v["title"], "url": v["url"]}, + v["transcript"], + ) + total_restaurants += count + yield f"data: {_json.dumps({'type': 'done', 'index': i, 'title': v['title'], 'restaurants': count})}\n\n" + except Exception as e: + logger.error("Bulk extract error for %s: %s", v["video_id"], e) + yield f"data: {_json.dumps({'type': 'error', 'index': i, 'title': v['title'], 'message': str(e)})}\n\n" + + yield f"data: {_json.dumps({'type': 'complete', 'total': total, 'total_restaurants': total_restaurants})}\n\n" + + return StreamingResponse(generate(), media_type="text/event-stream") + + +def _get_no_transcript_videos() -> list[dict]: + """Get videos that have no transcript yet.""" + sql = """ + SELECT v.id, v.video_id, v.title + FROM videos v + WHERE (v.transcript_text IS NULL OR dbms_lob.getlength(v.transcript_text) = 0) + AND v.status != 'skip' + ORDER BY v.published_at DESC + """ + with conn() as c: + cur = c.cursor() + cur.execute(sql) + return [{"id": r[0], "video_id": r[1], "title": r[2]} for r in cur.fetchall()] + + +@router.get("/bulk-transcript/pending") +def bulk_transcript_pending_count(): + """Get count of videos without transcripts.""" + videos = _get_no_transcript_videos() + return {"count": len(videos), "videos": [{"id": v["id"], "title": v["title"]} for v in videos]} + + +@router.post("/bulk-transcript") +def bulk_transcript(): + """Fetch transcripts for all videos missing them. Streams SSE progress.""" + from core.youtube import get_transcript + + videos = _get_no_transcript_videos() + + def generate(): + total = len(videos) + success = 0 + yield f"data: {_json.dumps({'type': 'start', 'total': total})}\n\n" + + for i, v in enumerate(videos): + # Random delay (5-15 seconds) to avoid bot detection — longer than LLM since this hits YouTube + if i > 0: + delay = random.uniform(5.0, 15.0) + yield f"data: {_json.dumps({'type': 'wait', 'index': i, 'delay': round(delay, 1)})}\n\n" + time.sleep(delay) + + yield f"data: {_json.dumps({'type': 'processing', 'index': i, 'title': v['title']})}\n\n" + + try: + transcript, source = get_transcript(v["video_id"]) + if transcript: + with conn() as c: + cur = c.cursor() + cur.execute( + "UPDATE videos SET transcript_text = :txt WHERE id = :vid", + {"txt": transcript, "vid": v["id"]}, + ) + success += 1 + yield f"data: {_json.dumps({'type': 'done', 'index': i, 'title': v['title'], 'source': source, 'length': len(transcript)})}\n\n" + else: + yield f"data: {_json.dumps({'type': 'error', 'index': i, 'title': v['title'], 'message': 'No transcript available'})}\n\n" + except Exception as e: + logger.error("Bulk transcript error for %s: %s", v["video_id"], e) + yield f"data: {_json.dumps({'type': 'error', 'index': i, 'title': v['title'], 'message': str(e)})}\n\n" + + yield f"data: {_json.dumps({'type': 'complete', 'total': total, 'success': success})}\n\n" + + return StreamingResponse(generate(), media_type="text/event-stream") + + +@router.get("/extract/prompt") +def get_extract_prompt(): + """Get the current LLM extraction prompt template.""" + from core.extractor import _EXTRACT_PROMPT + return {"prompt": _EXTRACT_PROMPT} + + +def _do_process(limit: int): + return {"restaurants_extracted": process_pending(limit)} + + @router.post("/process") -def trigger_processing(limit: int = Query(5, le=20)): - """Manually trigger processing of pending videos.""" - count = process_pending(limit) - return {"restaurants_extracted": count} +async def trigger_processing(limit: int = Query(5, le=20)): + """Manually trigger processing of pending videos (non-blocking).""" + loop = asyncio.get_event_loop() + return await loop.run_in_executor(_executor, _do_process, limit) + + +@router.get("/{video_db_id}") +def get_video_detail(video_db_id: str): + """Get video detail including transcript and extracted restaurants.""" + from fastapi import HTTPException + import json + + with conn() as c: + cur = c.cursor() + cur.execute(""" + SELECT v.id, v.video_id, v.title, v.url, v.status, + v.published_at, v.transcript_text, + c.channel_name + FROM videos v + JOIN channels c ON c.id = v.channel_id + WHERE v.id = :vid + """, {"vid": video_db_id}) + row = cur.fetchone() + if not row: + raise HTTPException(404, "Video not found") + + transcript = row[6] + if hasattr(transcript, "read"): + transcript = transcript.read() + + # Get extracted restaurants for this video + cur.execute(""" + SELECT r.id, r.name, r.address, r.cuisine_type, r.price_range, r.region, + vr.foods_mentioned, vr.evaluation, vr.guests, + r.google_place_id, r.latitude, r.longitude + FROM video_restaurants vr + JOIN restaurants r ON r.id = vr.restaurant_id + WHERE vr.video_id = :vid + """, {"vid": video_db_id}) + + restaurants = [] + for rr in cur.fetchall(): + foods_raw = rr[6].read() if hasattr(rr[6], "read") else rr[6] + eval_raw = rr[7].read() if hasattr(rr[7], "read") else rr[7] + guests_raw = rr[8].read() if hasattr(rr[8], "read") else rr[8] + + def parse_json(val, default): + if val is None: + return default + if isinstance(val, (list, dict)): + return val + try: + return json.loads(val) + except (json.JSONDecodeError, ValueError): + return default + + restaurants.append({ + "restaurant_id": rr[0], + "name": rr[1], + "address": rr[2], + "cuisine_type": rr[3], + "price_range": rr[4], + "region": rr[5], + "foods_mentioned": parse_json(foods_raw, []), + "evaluation": parse_json(eval_raw, {}), + "guests": parse_json(guests_raw, []), + "google_place_id": rr[9], + "has_location": rr[10] is not None and rr[11] is not None, + }) + + return { + "id": row[0], + "video_id": row[1], + "title": row[2], + "url": row[3], + "status": row[4], + "published_at": row[5].isoformat() if row[5] else None, + "transcript": transcript, + "channel_name": row[7], + "restaurants": restaurants, + } + + +def _do_fetch_transcript(video_db_id: str, mode: str): + from core.youtube import get_transcript + + with conn() as c: + cur = c.cursor() + cur.execute("SELECT video_id FROM videos WHERE id = :vid", {"vid": video_db_id}) + row = cur.fetchone() + if not row: + return {"error": 404, "detail": "Video not found"} + video_id = row[0] + + transcript, source = get_transcript(video_id, mode=mode) + if not transcript: + return {"error": 422, "detail": "Transcript unavailable for this video"} + + with conn() as c: + cur = c.cursor() + cur.execute( + "UPDATE videos SET transcript_text = :txt WHERE id = :vid", + {"txt": transcript, "vid": video_db_id}, + ) + + return {"ok": True, "length": len(transcript), "source": source} + + +@router.post("/{video_db_id}/fetch-transcript") +async def fetch_transcript(video_db_id: str, mode: str = Query("auto")): + """Fetch and save transcript for a video (non-blocking).""" + from fastapi import HTTPException + + if mode not in ("auto", "manual", "generated"): + raise HTTPException(400, "mode must be auto, manual, or generated") + + loop = asyncio.get_event_loop() + result = await loop.run_in_executor(_executor, _do_fetch_transcript, video_db_id, mode) + if "error" in result: + raise HTTPException(result["error"], result["detail"]) + return result + + +def _do_extract(video_db_id: str, custom_prompt: str | None): + from core.pipeline import process_video_extract + + with conn() as c: + cur = c.cursor() + cur.execute( + "SELECT id, video_id, title, url, transcript_text FROM videos WHERE id = :vid", + {"vid": video_db_id}, + ) + row = cur.fetchone() + if not row: + return {"error": 404, "detail": "Video not found"} + transcript = row[4] + if hasattr(transcript, "read"): + transcript = transcript.read() + if not transcript: + return {"error": 422, "detail": "No transcript available for this video"} + + count = process_video_extract( + {"id": row[0], "video_id": row[1], "title": row[2], "url": row[3]}, + transcript, + custom_prompt=custom_prompt, + ) + return {"ok": True, "restaurants_extracted": count} + + +@router.post("/{video_db_id}/extract") +async def extract_restaurants_from_video(video_db_id: str, body: dict = None): + """Run LLM extraction on an existing transcript (non-blocking).""" + from fastapi import HTTPException + custom_prompt = body.get("prompt") if body else None + loop = asyncio.get_event_loop() + result = await loop.run_in_executor(_executor, _do_extract, video_db_id, custom_prompt) + if "error" in result: + raise HTTPException(result["error"], result["detail"]) + return result + + +@router.post("/{video_db_id}/skip") +def skip_video(video_db_id: str): + """Mark a video as skipped.""" + from fastapi import HTTPException + with conn() as c: + cur = c.cursor() + cur.execute( + "UPDATE videos SET status = 'skip' WHERE id = :vid", + {"vid": video_db_id}, + ) + if cur.rowcount == 0: + raise HTTPException(404, "Video not found") + return {"ok": True} + + +@router.delete("/{video_db_id}") +def delete_video(video_db_id: str): + """Delete a video and its related data.""" + from core.db import conn as get_conn + with get_conn() as c: + cur = c.cursor() + # Delete vector embeddings for restaurants only linked to this video + cur.execute(""" + DELETE FROM restaurant_vectors + WHERE restaurant_id IN ( + SELECT vr.restaurant_id FROM video_restaurants vr + WHERE vr.video_id = :vid + AND NOT EXISTS ( + SELECT 1 FROM video_restaurants vr2 + WHERE vr2.restaurant_id = vr.restaurant_id + AND vr2.video_id != :vid + ) + ) + """, {"vid": video_db_id}) + # Delete reviews for restaurants only linked to this video + cur.execute(""" + DELETE FROM user_reviews + WHERE restaurant_id IN ( + SELECT vr.restaurant_id FROM video_restaurants vr + WHERE vr.video_id = :vid + AND NOT EXISTS ( + SELECT 1 FROM video_restaurants vr2 + WHERE vr2.restaurant_id = vr.restaurant_id + AND vr2.video_id != :vid + ) + ) + """, {"vid": video_db_id}) + # Delete restaurants only linked to this video + cur.execute(""" + DELETE FROM restaurants + WHERE id IN ( + SELECT vr.restaurant_id FROM video_restaurants vr + WHERE vr.video_id = :vid + AND NOT EXISTS ( + SELECT 1 FROM video_restaurants vr2 + WHERE vr2.restaurant_id = vr.restaurant_id + AND vr2.video_id != :vid + ) + ) + """, {"vid": video_db_id}) + # Delete video-restaurant links + cur.execute("DELETE FROM video_restaurants WHERE video_id = :vid", {"vid": video_db_id}) + # Delete the video + cur.execute("DELETE FROM videos WHERE id = :vid", {"vid": video_db_id}) + if cur.rowcount == 0: + from fastapi import HTTPException + raise HTTPException(404, "Video not found") + return {"ok": True} + + +@router.put("/{video_db_id}") +def update_video(video_db_id: str, body: dict): + """Update video title.""" + from fastapi import HTTPException + title = body.get("title") + if not title: + raise HTTPException(400, "title is required") + with conn() as c: + cur = c.cursor() + cur.execute( + "UPDATE videos SET title = :title WHERE id = :vid", + {"title": title, "vid": video_db_id}, + ) + if cur.rowcount == 0: + raise HTTPException(404, "Video not found") + return {"ok": True} + + +@router.delete("/{video_db_id}/restaurants/{restaurant_id}") +def delete_video_restaurant(video_db_id: str, restaurant_id: str): + """Delete a video-restaurant mapping. Also cleans up orphaned restaurant.""" + from fastapi import HTTPException + with conn() as c: + cur = c.cursor() + cur.execute( + "DELETE FROM video_restaurants WHERE video_id = :vid AND restaurant_id = :rid", + {"vid": video_db_id, "rid": restaurant_id}, + ) + if cur.rowcount == 0: + raise HTTPException(404, "Mapping not found") + # Clean up orphaned restaurant (no other video links) + cur.execute(""" + DELETE FROM restaurant_vectors WHERE restaurant_id = :rid + AND NOT EXISTS (SELECT 1 FROM video_restaurants WHERE restaurant_id = :rid) + """, {"rid": restaurant_id}) + cur.execute(""" + DELETE FROM user_reviews WHERE restaurant_id = :rid + AND NOT EXISTS (SELECT 1 FROM video_restaurants WHERE restaurant_id = :rid) + """, {"rid": restaurant_id}) + cur.execute(""" + DELETE FROM restaurants WHERE id = :rid + AND NOT EXISTS (SELECT 1 FROM video_restaurants WHERE restaurant_id = :rid) + """, {"rid": restaurant_id}) + return {"ok": True} + + +@router.post("/{video_db_id}/restaurants/manual") +def add_manual_restaurant(video_db_id: str, body: dict): + """Manually add a restaurant and link it to a video.""" + from fastapi import HTTPException + from core import restaurant as rest_mod + from core.geocoding import geocode_restaurant + + name = body.get("name", "").strip() + if not name: + raise HTTPException(400, "Restaurant name is required") + + address = body.get("address", "").strip() or None + region = body.get("region", "").strip() or None + cuisine_type = body.get("cuisine_type", "").strip() or None + price_range = body.get("price_range", "").strip() or None + foods = body.get("foods_mentioned", []) + evaluation = body.get("evaluation", "").strip() or None + guests = body.get("guests", []) + + # Geocode to get lat/lng and Google data + geo = geocode_restaurant(name, address or region or "") + if not geo: + raise HTTPException(400, f"'{name}' 위치를 찾을 수 없습니다. 주소를 입력해주세요.") + + rid = rest_mod.upsert( + name=name, + address=geo.get("formatted_address") or address, + region=region, + latitude=geo["latitude"], + longitude=geo["longitude"], + cuisine_type=cuisine_type, + price_range=price_range, + google_place_id=geo.get("google_place_id"), + phone=geo.get("phone"), + website=geo.get("website"), + business_status=geo.get("business_status"), + rating=geo.get("rating"), + rating_count=geo.get("rating_count"), + ) + + link_id = rest_mod.link_video_restaurant( + video_db_id=video_db_id, + restaurant_id=rid, + foods=foods if isinstance(foods, list) else [], + evaluation=evaluation, + guests=guests if isinstance(guests, list) else [], + ) + + return {"ok": True, "restaurant_id": rid, "link_id": link_id} + + +@router.put("/{video_db_id}/restaurants/{restaurant_id}") +def update_video_restaurant(video_db_id: str, restaurant_id: str, body: dict): + """Update restaurant info linked to a video. + + If name changed, re-geocode and remap to a new restaurant record. + """ + from fastapi import HTTPException + import json as _json + + # Check if name changed — need to remap + new_name = body.get("name", "").strip() if "name" in body else None + if new_name: + with conn() as c: + cur = c.cursor() + cur.execute("SELECT name FROM restaurants WHERE id = :rid", {"rid": restaurant_id}) + row = cur.fetchone() + old_name = row[0] if row else "" + + if old_name != new_name: + # Name changed: geocode new restaurant, remap + from core import restaurant as rest_mod + from core.geocoding import geocode_restaurant + + address = body.get("address", "").strip() or body.get("region", "").strip() or "" + geo = geocode_restaurant(new_name, address) + if not geo: + raise HTTPException(400, f"'{new_name}' 위치를 찾을 수 없습니다.") + + new_rid = rest_mod.upsert( + name=new_name, + address=geo.get("formatted_address") or body.get("address"), + region=body.get("region"), + latitude=geo["latitude"], + longitude=geo["longitude"], + cuisine_type=body.get("cuisine_type"), + price_range=body.get("price_range"), + google_place_id=geo.get("google_place_id"), + phone=geo.get("phone"), + website=geo.get("website"), + business_status=geo.get("business_status"), + rating=geo.get("rating"), + rating_count=geo.get("rating_count"), + ) + + # Read existing mapping data, delete old, create new + with conn() as c: + cur = c.cursor() + cur.execute( + "SELECT foods_mentioned, evaluation, guests FROM video_restaurants WHERE video_id = :vid AND restaurant_id = :rid", + {"vid": video_db_id, "rid": restaurant_id}, + ) + old_vr = cur.fetchone() + + cur.execute( + "DELETE FROM video_restaurants WHERE video_id = :vid AND restaurant_id = :rid", + {"vid": video_db_id, "rid": restaurant_id}, + ) + + # Build new mapping values from body or old data + def _parse(val, default): + if val is None: + return default + if hasattr(val, "read"): + val = val.read() + if isinstance(val, (list, dict)): + return val + try: + return _json.loads(val) + except Exception: + return default + + old_foods = _parse(old_vr[0], []) if old_vr else [] + old_eval = _parse(old_vr[1], {}) if old_vr else {} + old_guests = _parse(old_vr[2], []) if old_vr else [] + + foods = body.get("foods_mentioned", old_foods) + evaluation = body.get("evaluation", old_eval) + guests = body.get("guests", old_guests) + + eval_text = evaluation.get("text", "") if isinstance(evaluation, dict) else str(evaluation or "") + + rest_mod.link_video_restaurant( + video_db_id=video_db_id, + restaurant_id=new_rid, + foods=foods if isinstance(foods, list) else [], + evaluation=eval_text or None, + guests=guests if isinstance(guests, list) else [], + ) + + return {"ok": True, "remapped": True, "new_restaurant_id": new_rid} + + # No name change — update in place + with conn() as c: + cur = c.cursor() + r_sets = [] + r_params: dict = {"rid": restaurant_id} + for field in ("name", "address", "region", "cuisine_type", "price_range"): + if field in body: + r_sets.append(f"{field} = :{field}") + r_params[field] = body[field] + if r_sets: + r_sets.append("updated_at = SYSTIMESTAMP") + sql = f"UPDATE restaurants SET {', '.join(r_sets)} WHERE id = :rid" + cur.execute(sql, r_params) + + vr_params: dict = {"vid": video_db_id, "rid": restaurant_id} + vr_sets = [] + for field in ("foods_mentioned", "evaluation", "guests"): + if field in body: + vr_sets.append(f"{field} = :{field}") + val = body[field] + vr_params[field] = _json.dumps(val, ensure_ascii=False) if isinstance(val, (list, dict)) else val + if vr_sets: + sql = f"UPDATE video_restaurants SET {', '.join(vr_sets)} WHERE video_id = :vid AND restaurant_id = :rid" + cur.execute(sql, vr_params) + + return {"ok": True} diff --git a/backend/core/extractor.py b/backend/core/extractor.py index 1d1e7dc..a1b3904 100644 --- a/backend/core/extractor.py +++ b/backend/core/extractor.py @@ -56,7 +56,7 @@ def _parse_json(raw: str) -> dict | list: return json.JSONDecoder(strict=False).decode(raw) except json.JSONDecodeError: pass - # recover truncated array + # recover truncated array — extract complete objects one by one if raw.lstrip().startswith("["): decoder = json.JSONDecoder(strict=False) items: list = [] @@ -71,8 +71,19 @@ def _parse_json(raw: str) -> dict | list: items.append(obj) idx = end except json.JSONDecodeError: + # Try to recover truncated last object by closing braces + remainder = raw[idx:] + for fix in ["}", "}]", '"}', '"}' , '"}]', "null}", "null}]"]: + try: + patched = remainder.rstrip().rstrip(",") + fix + obj = json.loads(patched) + if isinstance(obj, dict) and obj.get("name"): + items.append(obj) + except (json.JSONDecodeError, ValueError): + continue break if items: + logger.info("Recovered %d restaurants from truncated JSON", len(items)) return items raise ValueError(f"JSON parse failed: {raw[:80]!r}") @@ -104,7 +115,7 @@ _EXTRACT_PROMPT = """\ JSON 배열:""" -def extract_restaurants(title: str, transcript: str) -> tuple[list[dict], str]: +def extract_restaurants(title: str, transcript: str, custom_prompt: str | None = None) -> tuple[list[dict], str]: """Extract restaurant info from a video transcript using LLM. Returns (list of restaurant dicts, raw LLM response text). @@ -113,10 +124,11 @@ def extract_restaurants(title: str, transcript: str) -> tuple[list[dict], str]: if len(transcript) > 8000: transcript = transcript[:7000] + "\n...(중략)...\n" + transcript[-1000:] - prompt = _EXTRACT_PROMPT.format(title=title, transcript=transcript) + template = custom_prompt if custom_prompt else _EXTRACT_PROMPT + prompt = template.format(title=title, transcript=transcript) try: - raw = _llm(prompt, max_tokens=4096) + raw = _llm(prompt, max_tokens=8192) result = _parse_json(raw) if isinstance(result, list): return result, raw diff --git a/backend/core/geocoding.py b/backend/core/geocoding.py index 0076b01..8cf6cc8 100644 --- a/backend/core/geocoding.py +++ b/backend/core/geocoding.py @@ -57,17 +57,53 @@ def _places_text_search(query: str) -> dict | None: if data.get("status") == "OK" and data.get("results"): place = data["results"][0] loc = place["geometry"]["location"] - return { + result = { "latitude": loc["lat"], "longitude": loc["lng"], "formatted_address": place.get("formatted_address", ""), "google_place_id": place.get("place_id", ""), + "business_status": place.get("business_status"), + "rating": place.get("rating"), + "rating_count": place.get("user_ratings_total"), } + # Fetch phone/website from Place Details + place_id = place.get("place_id") + if place_id: + details = _place_details(place_id) + if details: + result.update(details) + return result except Exception as e: logger.warning("Places text search failed for '%s': %s", query, e) return None +def _place_details(place_id: str) -> dict | None: + """Fetch phone and website from Google Place Details API.""" + try: + r = httpx.get( + "https://maps.googleapis.com/maps/api/place/details/json", + params={ + "place_id": place_id, + "key": _api_key(), + "language": "ko", + "fields": "formatted_phone_number,website", + }, + timeout=10, + ) + r.raise_for_status() + data = r.json() + if data.get("status") == "OK" and data.get("result"): + res = data["result"] + return { + "phone": res.get("formatted_phone_number"), + "website": res.get("website"), + } + except Exception as e: + logger.warning("Place details failed for '%s': %s", place_id, e) + return None + + def _geocode(query: str) -> dict | None: """Geocode an address string.""" try: diff --git a/backend/core/pipeline.py b/backend/core/pipeline.py index 3fde88c..8cf68f2 100644 --- a/backend/core/pipeline.py +++ b/backend/core/pipeline.py @@ -28,7 +28,7 @@ def process_video(video: dict) -> int: try: # 1. Transcript - transcript = youtube.get_transcript(video_id) + transcript, _src = youtube.get_transcript(video_id) if not transcript: logger.warning("No transcript for %s, marking done", video_id) youtube.update_video_status(video_db_id, "done") @@ -72,6 +72,11 @@ def process_video(video: dict) -> int: cuisine_type=rest_data.get("cuisine_type"), price_range=rest_data.get("price_range"), google_place_id=place_id, + phone=geo.get("phone") if geo else None, + website=geo.get("website") if geo else None, + business_status=geo.get("business_status") if geo else None, + rating=geo.get("rating") if geo else None, + rating_count=geo.get("rating_count") if geo else None, ) # Link video <-> restaurant @@ -101,6 +106,76 @@ def process_video(video: dict) -> int: return 0 +def process_video_extract(video: dict, transcript: str, custom_prompt: str | None = None) -> int: + """Run LLM extraction + geocode + save on an existing transcript. + Returns number of restaurants found.""" + video_db_id = video["id"] + title = video["title"] + + logger.info("Extracting restaurants from video: %s", title) + + try: + restaurants, llm_raw = extractor.extract_restaurants(title, transcript, custom_prompt=custom_prompt) + if not restaurants: + youtube.update_video_status(video_db_id, "done", llm_raw=llm_raw) + return 0 + + count = 0 + for rest_data in restaurants: + name = rest_data.get("name") + if not name: + continue + + geo = geocoding.geocode_restaurant( + name, + address=rest_data.get("address"), + region=rest_data.get("region"), + ) + + lat = geo["latitude"] if geo else None + lng = geo["longitude"] if geo else None + addr = geo["formatted_address"] if geo else rest_data.get("address") + place_id = geo["google_place_id"] if geo else None + + rest_id = restaurant.upsert( + name=name, + address=addr, + region=rest_data.get("region"), + latitude=lat, + longitude=lng, + cuisine_type=rest_data.get("cuisine_type"), + price_range=rest_data.get("price_range"), + google_place_id=place_id, + phone=geo.get("phone") if geo else None, + website=geo.get("website") if geo else None, + business_status=geo.get("business_status") if geo else None, + rating=geo.get("rating") if geo else None, + rating_count=geo.get("rating_count") if geo else None, + ) + + restaurant.link_video_restaurant( + video_db_id=video_db_id, + restaurant_id=rest_id, + foods=rest_data.get("foods_mentioned"), + evaluation=rest_data.get("evaluation"), + guests=rest_data.get("guests"), + ) + + chunks = _build_chunks(name, rest_data, title) + if chunks: + vector.save_restaurant_vectors(rest_id, chunks) + + count += 1 + logger.info("Saved restaurant: %s (geocoded=%s)", name, bool(geo)) + + youtube.update_video_status(video_db_id, "done", llm_raw=llm_raw) + return count + + except Exception as e: + logger.error("Extract error for %s: %s", video["video_id"], e, exc_info=True) + return 0 + + def _build_chunks(name: str, data: dict, video_title: str) -> list[str]: """Build text chunks for vector embedding.""" parts = [f"식당: {name}"] diff --git a/backend/core/restaurant.py b/backend/core/restaurant.py index 2a12352..cffa37f 100644 --- a/backend/core/restaurant.py +++ b/backend/core/restaurant.py @@ -9,6 +9,16 @@ import oracledb from core.db import conn +def _truncate_bytes(val: str | None, max_bytes: int) -> str | None: + """Truncate a string to fit within max_bytes when encoded as UTF-8.""" + if not val: + return val + encoded = val.encode("utf-8") + if len(encoded) <= max_bytes: + return val + return encoded[:max_bytes].decode("utf-8", errors="ignore").rstrip() + + def find_by_name(name: str) -> dict | None: """Find a restaurant by exact name match.""" sql = "SELECT id, name, address, region, latitude, longitude FROM restaurants WHERE name = :n" @@ -33,8 +43,19 @@ def upsert( cuisine_type: str | None = None, price_range: str | None = None, google_place_id: str | None = None, + phone: str | None = None, + website: str | None = None, + business_status: str | None = None, + rating: float | None = None, + rating_count: int | None = None, ) -> str: """Insert or update a restaurant. Returns row id.""" + # Truncate fields to fit DB column byte limits (VARCHAR2 is byte-based) + price_range = _truncate_bytes(price_range, 50) + cuisine_type = _truncate_bytes(cuisine_type, 100) + region = _truncate_bytes(region, 100) + website = _truncate_bytes(website, 500) + existing = find_by_name(name) if existing: sql = """ @@ -46,6 +67,11 @@ def upsert( cuisine_type = COALESCE(:cuisine, cuisine_type), price_range = COALESCE(:price, price_range), google_place_id = COALESCE(:gid, google_place_id), + phone = COALESCE(:phone, phone), + website = COALESCE(:web, website), + business_status = COALESCE(:bstatus, business_status), + rating = COALESCE(:rating, rating), + rating_count = COALESCE(:rcnt, rating_count), updated_at = SYSTIMESTAMP WHERE id = :id """ @@ -54,14 +80,18 @@ def upsert( "addr": address, "reg": region, "lat": latitude, "lng": longitude, "cuisine": cuisine_type, "price": price_range, - "gid": google_place_id, "id": existing["id"], + "gid": google_place_id, "phone": phone, "web": website, + "bstatus": business_status, "rating": rating, "rcnt": rating_count, + "id": existing["id"], }) return existing["id"] sql = """ INSERT INTO restaurants (name, address, region, latitude, longitude, - cuisine_type, price_range, google_place_id) - VALUES (:name, :addr, :reg, :lat, :lng, :cuisine, :price, :gid) + cuisine_type, price_range, google_place_id, + phone, website, business_status, rating, rating_count) + VALUES (:name, :addr, :reg, :lat, :lng, :cuisine, :price, :gid, + :phone, :web, :bstatus, :rating, :rcnt) RETURNING id INTO :out_id """ with conn() as c: @@ -71,7 +101,9 @@ def upsert( "name": name, "addr": address, "reg": region, "lat": latitude, "lng": longitude, "cuisine": cuisine_type, "price": price_range, - "gid": google_place_id, "out_id": out_id, + "gid": google_place_id, "phone": phone, "web": website, + "bstatus": business_status, "rating": rating, "rcnt": rating_count, + "out_id": out_id, }) return out_id.getvalue()[0] @@ -116,38 +148,83 @@ def get_all( offset: int = 0, cuisine: str | None = None, region: str | None = None, + channel: str | None = None, ) -> list[dict]: """List restaurants with optional filters.""" - conditions = ["latitude IS NOT NULL"] + conditions = [ + "r.latitude IS NOT NULL", + "EXISTS (SELECT 1 FROM video_restaurants vr0 WHERE vr0.restaurant_id = r.id)", + ] params: dict = {"lim": limit, "off": offset} if cuisine: - conditions.append("cuisine_type = :cuisine") + conditions.append("r.cuisine_type = :cuisine") params["cuisine"] = cuisine if region: - conditions.append("region LIKE :region") + conditions.append("r.region LIKE :region") params["region"] = f"%{region}%" + join_clause = "" + if channel: + join_clause = """ + JOIN video_restaurants vr_f ON vr_f.restaurant_id = r.id + JOIN videos v_f ON v_f.id = vr_f.video_id + JOIN channels c_f ON c_f.id = v_f.channel_id + """ + conditions.append("c_f.channel_name = :channel") + params["channel"] = channel + where = " AND ".join(conditions) sql = f""" - SELECT id, name, address, region, latitude, longitude, - cuisine_type, price_range, google_place_id - FROM restaurants + SELECT DISTINCT r.id, r.name, r.address, r.region, r.latitude, r.longitude, + r.cuisine_type, r.price_range, r.google_place_id, + r.business_status, r.rating, r.rating_count, r.updated_at + FROM restaurants r + {join_clause} WHERE {where} - ORDER BY updated_at DESC + ORDER BY r.updated_at DESC OFFSET :off ROWS FETCH NEXT :lim ROWS ONLY """ with conn() as c: cur = c.cursor() cur.execute(sql, params) cols = [d[0].lower() for d in cur.description] - return [dict(zip(cols, row)) for row in cur.fetchall()] + restaurants = [dict(zip(cols, row)) for row in cur.fetchall()] + for r in restaurants: + r.pop("updated_at", None) + + if not restaurants: + return restaurants + + # Attach channel names for each restaurant + ids = [r["id"] for r in restaurants] + placeholders = ", ".join(f":id{i}" for i in range(len(ids))) + ch_sql = f""" + SELECT DISTINCT vr.restaurant_id, c.channel_name + FROM video_restaurants vr + JOIN videos v ON v.id = vr.video_id + JOIN channels c ON c.id = v.channel_id + WHERE vr.restaurant_id IN ({placeholders}) + """ + ch_params = {f"id{i}": rid for i, rid in enumerate(ids)} + ch_map: dict[str, list[str]] = {} + with conn() as c: + cur = c.cursor() + cur.execute(ch_sql, ch_params) + for row in cur.fetchall(): + ch_map.setdefault(row[0], []).append(row[1]) + + for r in restaurants: + r["channels"] = ch_map.get(r["id"], []) + + return restaurants def get_by_id(restaurant_id: str) -> dict | None: sql = """ SELECT r.id, r.name, r.address, r.region, r.latitude, r.longitude, - r.cuisine_type, r.price_range, r.phone, r.website, r.google_place_id + r.cuisine_type, r.price_range, r.phone, r.website, r.google_place_id, + r.business_status, r.rating, r.rating_count FROM restaurants r WHERE r.id = :id """ @@ -165,9 +242,11 @@ def get_video_links(restaurant_id: str) -> list[dict]: """Get all video appearances for a restaurant.""" sql = """ SELECT v.video_id, v.title, v.url, v.published_at, - vr.foods_mentioned, vr.evaluation, vr.guests + vr.foods_mentioned, vr.evaluation, vr.guests, + c.channel_name, c.channel_id FROM video_restaurants vr JOIN videos v ON v.id = vr.video_id + JOIN channels c ON c.id = v.channel_id WHERE vr.restaurant_id = :rid ORDER BY v.published_at DESC """ @@ -187,6 +266,8 @@ def get_video_links(restaurant_id: str) -> list[dict]: "foods_mentioned": _parse_json_field(foods_raw, []), "evaluation": _parse_json_field(eval_raw, {}), "guests": _parse_json_field(guests_raw, []), + "channel_name": r[7], + "channel_id": r[8], }) return results diff --git a/backend/core/review.py b/backend/core/review.py index 5961d5b..fbb98c2 100644 --- a/backend/core/review.py +++ b/backend/core/review.py @@ -131,13 +131,15 @@ def get_user_reviews( limit: int = 20, offset: int = 0, ) -> list[dict]: - """List reviews by a specific user.""" + """List reviews by a specific user, including restaurant name.""" sql = """ SELECT r.id, r.user_id, r.restaurant_id, r.rating, r.review_text, r.visited_at, r.created_at, r.updated_at, - u.nickname, u.avatar_url + u.nickname, u.avatar_url, + rest.name AS restaurant_name FROM user_reviews r JOIN tasteby_users u ON u.id = r.user_id + LEFT JOIN restaurants rest ON rest.id = r.restaurant_id WHERE r.user_id = :user_id ORDER BY r.created_at DESC OFFSET :off ROWS FETCH NEXT :lim ROWS ONLY @@ -149,7 +151,12 @@ def get_user_reviews( "off": offset, "lim": limit, }) - return [_row_to_dict(row) for row in cur.fetchall()] + rows = [] + for row in cur.fetchall(): + d = _row_to_dict(row) + d["restaurant_name"] = row[10] + rows.append(d) + return rows def get_restaurant_avg_rating(restaurant_id: str) -> dict: diff --git a/backend/core/youtube.py b/backend/core/youtube.py index eb6fc12..3fd6ee4 100644 --- a/backend/core/youtube.py +++ b/backend/core/youtube.py @@ -32,11 +32,11 @@ def extract_video_id(url: str) -> str: # -- Channel operations ------------------------------------------------------- -def add_channel(channel_id: str, channel_name: str) -> str: +def add_channel(channel_id: str, channel_name: str, title_filter: str | None = None) -> str: """Register a YouTube channel. Returns DB row id.""" sql = """ - INSERT INTO channels (channel_id, channel_name, channel_url) - VALUES (:cid, :cname, :curl) + INSERT INTO channels (channel_id, channel_name, channel_url, title_filter) + VALUES (:cid, :cname, :curl, :tf) RETURNING id INTO :out_id """ with conn() as c: @@ -47,45 +47,77 @@ def add_channel(channel_id: str, channel_name: str) -> str: "cid": channel_id, "cname": channel_name, "curl": f"https://www.youtube.com/channel/{channel_id}", + "tf": title_filter, "out_id": out_id, }) return out_id.getvalue()[0] +def deactivate_channel(channel_id: str) -> bool: + """Deactivate a channel by channel_id. Returns True if found.""" + sql = "UPDATE channels SET is_active = 0 WHERE channel_id = :cid AND is_active = 1" + with conn() as c: + cur = c.cursor() + cur.execute(sql, {"cid": channel_id}) + return cur.rowcount > 0 + + +def deactivate_channel_by_db_id(db_id: str) -> bool: + """Deactivate a channel by DB id. Returns True if found.""" + sql = "UPDATE channels SET is_active = 0 WHERE id = :did AND is_active = 1" + with conn() as c: + cur = c.cursor() + cur.execute(sql, {"did": db_id}) + return cur.rowcount > 0 + + def get_active_channels() -> list[dict]: - sql = "SELECT id, channel_id, channel_name FROM channels WHERE is_active = 1" + sql = "SELECT id, channel_id, channel_name, title_filter FROM channels WHERE is_active = 1" with conn() as c: cur = c.cursor() cur.execute(sql) return [ - {"id": r[0], "channel_id": r[1], "channel_name": r[2]} + {"id": r[0], "channel_id": r[1], "channel_name": r[2], "title_filter": r[3]} for r in cur.fetchall() ] # -- Video listing via YouTube Data API v3 ------------------------------------ -def fetch_channel_videos( - channel_id: str, - max_results: int = 50, - published_after: str | None = None, -) -> list[dict]: - """Fetch video list from a YouTube channel via Data API v3. +def get_latest_video_date(channel_db_id: str) -> str | None: + """Get the latest published_at for a channel's videos in ISO 8601 format.""" + sql = """ + SELECT MAX(published_at) FROM videos + WHERE channel_id = :ch_id AND published_at IS NOT NULL + """ + with conn() as c: + cur = c.cursor() + cur.execute(sql, {"ch_id": channel_db_id}) + row = cur.fetchone() + if row and row[0]: + return row[0].strftime("%Y-%m-%dT%H:%M:%SZ") + return None - Returns list of dicts: video_id, title, published_at, url. + +def fetch_channel_videos_iter( + channel_id: str, + published_after: str | None = None, +): + """Yield pages of videos from a YouTube channel via Data API v3. + + Each yield is a list of dicts for one API page (up to 50). """ params: dict = { "key": _api_key(), "channelId": channel_id, "part": "snippet", "order": "date", - "maxResults": min(max_results, 50), + "maxResults": 50, "type": "video", } if published_after: params["publishedAfter"] = published_after - videos: list[dict] = [] next_page = None while True: @@ -100,33 +132,337 @@ def fetch_channel_videos( r.raise_for_status() data = r.json() + page_videos = [] for item in data.get("items", []): snippet = item["snippet"] vid = item["id"]["videoId"] - videos.append({ + page_videos.append({ "video_id": vid, "title": snippet["title"], "published_at": snippet["publishedAt"], "url": f"https://www.youtube.com/watch?v={vid}", }) + if page_videos: + yield page_videos + next_page = data.get("nextPageToken") - if not next_page or len(videos) >= max_results: + if not next_page: break - return videos[:max_results] + +def fetch_channel_videos( + channel_id: str, + max_results: int = 0, + published_after: str | None = None, +) -> list[dict]: + """Fetch video list from a YouTube channel via Data API v3. + + Args: + max_results: 0 means fetch all available videos. + """ + videos: list[dict] = [] + for page in fetch_channel_videos_iter(channel_id, published_after=published_after): + videos.extend(page) + if max_results > 0 and len(videos) >= max_results: + break + return videos[:max_results] if max_results > 0 else videos # -- Transcript extraction ---------------------------------------------------- -def get_transcript(video_id: str) -> str | None: - """Fetch transcript text for a video. Returns None if unavailable.""" +def get_transcript(video_id: str, mode: str = "auto") -> tuple[str | None, str | None]: + """Fetch transcript using Playwright (headless browser). + + Args: + mode: "manual" = manual only, "generated" = auto-generated only, + "auto" = try API first, fallback to browser transcript panel. + + Returns: + (transcript_text, source) where source describes origin, or (None, None). + """ + # Try youtube-transcript-api first (fast path) + text, source = _get_transcript_api(video_id, mode) + if text: + return text, source + + # Fallback: Playwright browser + logger.warning("API failed for %s, trying Playwright browser", video_id) + print(f"[TRANSCRIPT] API failed for {video_id}, trying Playwright browser", flush=True) + return _get_transcript_browser(video_id) + + +def _make_ytt() -> YouTubeTranscriptApi: + """Create YouTubeTranscriptApi with cookies if available.""" + cookie_file = os.path.join(os.path.dirname(__file__), "..", "cookies.txt") + if os.path.exists(cookie_file): + import http.cookiejar + import requests + jar = http.cookiejar.MozillaCookieJar(cookie_file) + jar.load(ignore_discard=True, ignore_expires=True) + session = requests.Session() + session.cookies = jar + return YouTubeTranscriptApi(http_client=session) + return YouTubeTranscriptApi() + + +def _get_transcript_api(video_id: str, mode: str = "auto") -> tuple[str | None, str | None]: + """Try youtube-transcript-api (fast but may be IP-blocked).""" + ytt = _make_ytt() + prefer = ["ko", "en"] + try: - fetched = YouTubeTranscriptApi().fetch(video_id, languages=["ko", "en"]) - return " ".join(seg.text for seg in fetched) + transcript_list = ytt.list(video_id) except Exception as e: - logger.warning("Transcript unavailable for %s: %s", video_id, e) - return None + logger.warning("Cannot list transcripts for %s: %s", video_id, e) + return None, None + + all_transcripts = list(transcript_list) + manual = [t for t in all_transcripts if not t.is_generated] + generated = [t for t in all_transcripts if t.is_generated] + + def _pick(candidates): + for lang in prefer: + for t in candidates: + if t.language_code == lang: + return t + return candidates[0] if candidates else None + + def _fetch(t): + try: + return " ".join(seg.text for seg in t.fetch()), t.language_code + except Exception: + return None, None + + if mode == "manual": + t = _pick(manual) + if t: + text, lang = _fetch(t) + return (text, f"manual ({lang})") if text else (None, None) + return None, None + elif mode == "generated": + t = _pick(generated) + if t: + text, lang = _fetch(t) + return (text, f"generated ({lang})") if text else (None, None) + return None, None + else: + t = _pick(manual) + if t: + text, lang = _fetch(t) + if text: + return text, f"manual ({lang})" + t = _pick(generated) + if t: + text, lang = _fetch(t) + if text: + return text, f"generated ({lang})" + return None, None + + +def _get_transcript_browser(video_id: str) -> tuple[str | None, str | None]: + """Fetch transcript via Playwright browser (bypasses IP blocks).""" + try: + from playwright.sync_api import sync_playwright + except ImportError: + logger.error("playwright not installed") + return None, None + + try: + with sync_playwright() as p: + browser = p.chromium.launch( + headless=False, + args=["--disable-blink-features=AutomationControlled"], + ) + ctx = browser.new_context(locale="ko-KR", viewport={"width": 1280, "height": 900}) + + # Load YouTube cookies if available + cookie_file = os.path.join(os.path.dirname(__file__), "..", "cookies.txt") + if os.path.exists(cookie_file): + import http.cookiejar + jar = http.cookiejar.MozillaCookieJar(cookie_file) + jar.load(ignore_discard=True, ignore_expires=True) + pw_cookies = [] + for c in jar: + if "youtube" in c.domain or "google" in c.domain: + pw_cookies.append({ + "name": c.name, "value": c.value, + "domain": c.domain, "path": c.path, + "secure": c.secure, "httpOnly": False, + }) + if pw_cookies: + ctx.add_cookies(pw_cookies) + print(f"[TRANSCRIPT] Loaded {len(pw_cookies)} cookies", flush=True) + + page = ctx.new_page() + page.add_init_script("Object.defineProperty(navigator, 'webdriver', {get: () => false})") + + print(f"[TRANSCRIPT] Opening YouTube page for {video_id}", flush=True) + page.goto( + f"https://www.youtube.com/watch?v={video_id}", + wait_until="domcontentloaded", + timeout=30000, + ) + page.wait_for_timeout(5000) + + # Skip ads if present + for ad_wait in range(12): # up to ~60s for ads + ad_status = page.evaluate("""() => { + const skipBtn = document.querySelector('.ytp-skip-ad-button, .ytp-ad-skip-button, .ytp-ad-skip-button-modern, button.ytp-ad-skip-button-modern'); + if (skipBtn) { skipBtn.click(); return 'skipped'; } + const adOverlay = document.querySelector('.ytp-ad-player-overlay, .ad-showing'); + if (adOverlay) return 'playing'; + const adBadge = document.querySelector('.ytp-ad-text'); + if (adBadge && adBadge.textContent) return 'badge'; + return 'none'; + }""") + if ad_status == "none": + break + print(f"[TRANSCRIPT] Ad detected: {ad_status}, waiting...", flush=True) + if ad_status == "skipped": + page.wait_for_timeout(2000) + break + page.wait_for_timeout(5000) + + page.wait_for_timeout(2000) + print(f"[TRANSCRIPT] Page loaded, looking for transcript button", flush=True) + + # Click "더보기" (more actions) button first to reveal transcript option + page.evaluate(""" + () => { + // Try clicking the "...더보기" button in description area + const moreBtn = document.querySelector('tp-yt-paper-button#expand'); + if (moreBtn) moreBtn.click(); + } + """) + page.wait_for_timeout(2000) + + # Click "스크립트 표시" button via JS + clicked = page.evaluate(""" + () => { + // Method 1: aria-label + for (const label of ['스크립트 표시', 'Show transcript']) { + const btns = document.querySelectorAll(`button[aria-label="${label}"]`); + for (const b of btns) { b.click(); return 'aria-label: ' + label; } + } + // Method 2: search all buttons by text content + const allBtns = document.querySelectorAll('button'); + for (const b of allBtns) { + const text = b.textContent.trim(); + if (text === '스크립트 표시' || text === 'Show transcript') { + b.click(); + return 'text: ' + text; + } + } + // Method 3: look for transcript button in engagement panel + const engBtns = document.querySelectorAll('ytd-button-renderer button, ytd-button-renderer a'); + for (const b of engBtns) { + const text = b.textContent.trim().toLowerCase(); + if (text.includes('transcript') || text.includes('스크립트')) { + b.click(); + return 'engagement: ' + text; + } + } + return false; + } + """) + print(f"[TRANSCRIPT] Clicked transcript button: {clicked}", flush=True) + if not clicked: + # Dump available buttons for debugging + btn_labels = page.evaluate(""" + () => { + const btns = document.querySelectorAll('button[aria-label]'); + return Array.from(btns).map(b => b.getAttribute('aria-label')).slice(0, 30); + } + """) + print(f"[TRANSCRIPT] Available buttons: {btn_labels}", flush=True) + browser.close() + return None, None + + # Wait for transcript panel segments to appear (max ~40s) + page.wait_for_timeout(3000) # initial wait for panel to render + for attempt in range(12): + page.wait_for_timeout(3000) + count = page.evaluate( + "() => document.querySelectorAll('ytd-transcript-segment-renderer').length" + ) + print(f"[TRANSCRIPT] Wait {(attempt+1)*3+3}s: {count} segments", flush=True) + if count > 0: + break + + # Select Korean if available (language selector in transcript panel) + page.evaluate(""" + () => { + // Open language dropdown and pick Korean if available + const menu = document.querySelector('ytd-transcript-renderer ytd-menu-renderer yt-dropdown-menu'); + if (!menu) return; + const trigger = menu.querySelector('button, tp-yt-paper-button'); + if (trigger) trigger.click(); + } + """) + page.wait_for_timeout(1000) + page.evaluate(""" + () => { + const items = document.querySelectorAll('tp-yt-paper-listbox a, tp-yt-paper-listbox tp-yt-paper-item'); + for (const item of items) { + const text = item.textContent.trim(); + if (text.includes('한국어') || text.includes('Korean')) { + item.click(); + return; + } + } + } + """) + page.wait_for_timeout(2000) + + # Scroll transcript panel to load all segments + segments = page.evaluate(""" + async () => { + const container = document.querySelector( + 'ytd-transcript-segment-list-renderer #segments-container, ' + + 'ytd-transcript-renderer #body' + ); + if (!container) { + // Fallback: just grab what's there + const segs = document.querySelectorAll('ytd-transcript-segment-renderer'); + return Array.from(segs).map(s => { + const txt = s.querySelector('.segment-text, yt-formatted-string.segment-text'); + return txt ? txt.textContent.trim() : ''; + }).filter(t => t); + } + + // Scroll to bottom repeatedly to load all virtual segments + let prevCount = 0; + for (let i = 0; i < 50; i++) { + container.scrollTop = container.scrollHeight; + await new Promise(r => setTimeout(r, 300)); + const segs = document.querySelectorAll('ytd-transcript-segment-renderer'); + if (segs.length === prevCount && i > 3) break; + prevCount = segs.length; + } + + const segs = document.querySelectorAll('ytd-transcript-segment-renderer'); + return Array.from(segs).map(s => { + const txt = s.querySelector('.segment-text, yt-formatted-string.segment-text'); + return txt ? txt.textContent.trim() : ''; + }).filter(t => t); + } + """) + + browser.close() + + print(f"[TRANSCRIPT] Got {len(segments) if segments else 0} segments for {video_id}", flush=True) + if segments: + text = " ".join(segments) + print(f"[TRANSCRIPT] Success: {len(text)} chars from {len(segments)} segments", flush=True) + return text, "browser" + return None, None + except Exception as e: + logger.error("Playwright transcript failed for %s: %s", video_id, e) + print(f"[TRANSCRIPT] Playwright FAILED for {video_id}: {e}", flush=True) + import traceback + traceback.print_exc() + return None, None # -- DB operations for videos ------------------------------------------------- @@ -163,6 +499,48 @@ def save_video(channel_db_id: str, video: dict) -> str | None: raise +def get_existing_video_ids(channel_db_id: str) -> set[str]: + """Get all video_ids already in DB for a channel.""" + with conn() as c: + cur = c.cursor() + cur.execute("SELECT video_id FROM videos WHERE channel_id = :cid", {"cid": channel_db_id}) + return {r[0] for r in cur.fetchall()} + + +def save_videos_batch(channel_db_id: str, videos: list[dict]) -> int: + """Insert multiple videos in a single DB connection. Returns count of new videos.""" + if not videos: + return 0 + import oracledb + sql = """ + INSERT INTO videos (channel_id, video_id, title, url, published_at, status) + VALUES (:ch_id, :vid, :title, :url, :pub_at, 'pending') + """ + new_count = 0 + with conn() as c: + cur = c.cursor() + for video in videos: + try: + pub_at = None + if video.get("published_at"): + pub_at = datetime.fromisoformat( + video["published_at"].replace("Z", "+00:00") + ) + cur.execute(sql, { + "ch_id": channel_db_id, + "vid": video["video_id"], + "title": video["title"], + "url": video["url"], + "pub_at": pub_at, + }) + new_count += 1 + except Exception as e: + if "UQ_VIDEOS_VID" in str(e).upper(): + continue + raise + return new_count + + def get_pending_videos(limit: int = 10) -> list[dict]: sql = """ SELECT id, video_id, title, url @@ -201,20 +579,28 @@ def update_video_status( # -- Scan: fetch new videos for all active channels --------------------------- -def scan_all_channels(max_per_channel: int = 50) -> int: +def scan_all_channels() -> int: """Scan all active channels for new videos. Returns count of new videos.""" channels = get_active_channels() total_new = 0 for ch in channels: try: - videos = fetch_channel_videos(ch["channel_id"], max_per_channel) - for v in videos: - row_id = save_video(ch["id"], v) - if row_id: - total_new += 1 + after = get_latest_video_date(ch["id"]) + title_filter = ch.get("title_filter") + new_count = 0 + fetched = 0 + for page in fetch_channel_videos_iter(ch["channel_id"], published_after=after): + fetched += len(page) + for v in page: + if title_filter and title_filter not in v["title"]: + continue + row_id = save_video(ch["id"], v) + if row_id: + new_count += 1 + total_new += new_count logger.info( - "Channel %s: fetched %d videos, %d new", - ch["channel_name"], len(videos), total_new, + "Channel %s: fetched %d videos (after=%s), %d new (filter=%s)", + ch["channel_name"], fetched, after or "all", new_count, title_filter or "none", ) except Exception as e: logger.error("Failed to scan channel %s: %s", ch["channel_name"], e) diff --git a/backend/run_api.py b/backend/run_api.py index 13d0271..69afd54 100644 --- a/backend/run_api.py +++ b/backend/run_api.py @@ -6,4 +6,11 @@ load_dotenv() import uvicorn if __name__ == "__main__": - uvicorn.run("api.main:app", host="0.0.0.0", port=8000, reload=True) + uvicorn.run( + "api.main:app", + host="0.0.0.0", + port=8000, + reload=True, + workers=1, + limit_concurrency=20, + ) diff --git a/ecosystem.config.js b/ecosystem.config.js index 8e4841d..4741516 100644 --- a/ecosystem.config.js +++ b/ecosystem.config.js @@ -2,25 +2,25 @@ module.exports = { apps: [ { name: "tasteby-api", - cwd: "./backend", + cwd: "/Users/joungmin/workspaces/tasteby/backend", script: "run_api.py", - interpreter: ".venv/bin/python", + interpreter: "/Users/joungmin/workspaces/tasteby/backend/.venv/bin/python", env: { PYTHONPATH: ".", }, }, { name: "tasteby-daemon", - cwd: "./backend", + cwd: "/Users/joungmin/workspaces/tasteby/backend", script: "run_daemon.py", - interpreter: ".venv/bin/python", + interpreter: "/Users/joungmin/workspaces/tasteby/backend/.venv/bin/python", env: { PYTHONPATH: ".", }, }, { name: "tasteby-web", - cwd: "./frontend", + cwd: "/Users/joungmin/workspaces/tasteby/frontend", script: "npm", args: "run dev", }, diff --git a/frontend/src/app/admin/page.tsx b/frontend/src/app/admin/page.tsx new file mode 100644 index 0000000..bd3829e --- /dev/null +++ b/frontend/src/app/admin/page.tsx @@ -0,0 +1,1798 @@ +"use client"; + +import { useCallback, useEffect, useState } from "react"; +import { api } from "@/lib/api"; +import type { Channel, Video, VideoDetail, VideoLink, Restaurant } from "@/lib/api"; + +type Tab = "channels" | "videos" | "restaurants" | "users"; + +export default function AdminPage() { + const [tab, setTab] = useState("channels"); + + return ( +
+
+
+

Tasteby Admin

+ + ← 메인으로 + +
+ +
+ +
+ {tab === "channels" && } + {tab === "videos" && } + {tab === "restaurants" && } + {tab === "users" && } +
+
+ ); +} + +/* ─── 채널 관리 ─── */ +function ChannelsPanel() { + const [channels, setChannels] = useState([]); + const [newId, setNewId] = useState(""); + const [newName, setNewName] = useState(""); + const [newFilter, setNewFilter] = useState(""); + const [loading, setLoading] = useState(false); + const [scanResult, setScanResult] = useState>({}); + + const load = useCallback(() => { + api.getChannels().then(setChannels).catch(console.error); + }, []); + + useEffect(() => { load(); }, [load]); + + const handleAdd = async () => { + if (!newId.trim() || !newName.trim()) return; + setLoading(true); + try { + await api.addChannel(newId.trim(), newName.trim(), newFilter.trim() || undefined); + setNewId(""); + setNewName(""); + setNewFilter(""); + load(); + } catch (e: unknown) { + alert(e instanceof Error ? e.message : "채널 추가 실패"); + } finally { + setLoading(false); + } + }; + + const handleDelete = async (channelId: string, channelName: string) => { + if (!confirm(`"${channelName}" 채널을 삭제하시겠습니까?`)) return; + try { + await api.deleteChannel(channelId); + load(); + } catch { + alert("채널 삭제 실패"); + } + }; + + const handleScan = async (channelId: string, full: boolean = false) => { + setScanResult((prev) => ({ ...prev, [channelId]: full ? "전체 스캔 중..." : "스캔 중..." })); + try { + const res = await api.scanChannel(channelId, full); + setScanResult((prev) => ({ + ...prev, + [channelId]: `${res.total_fetched}개 조회, ${res.new_videos}개 신규${(res as Record).filtered ? `, ${(res as Record).filtered}개 필터링` : ""}`, + })); + } catch { + setScanResult((prev) => ({ ...prev, [channelId]: "스캔 실패" })); + } + }; + + return ( +
+
+

채널 추가

+
+ setNewId(e.target.value)} + className="border rounded px-3 py-2 flex-1 text-sm" + /> + setNewName(e.target.value)} + className="border rounded px-3 py-2 flex-1 text-sm" + /> + setNewFilter(e.target.value)} + className="border rounded px-3 py-2 w-40 text-sm" + /> + +
+
+ +
+ + + + + + + + + + + + {channels.map((ch) => ( + + + + + + + + ))} + {channels.length === 0 && ( + + + + )} + +
채널 이름Channel ID제목 필터액션스캔 결과
{ch.channel_name} + {ch.channel_id} + + {ch.title_filter ? ( + + {ch.title_filter} + + ) : ( + 전체 + )} + + + + + + {scanResult[ch.channel_id] || "-"} +
+ 등록된 채널이 없습니다 +
+
+
+ ); +} + +/* ─── 영상 관리 ─── */ +type VideoSortKey = "status" | "channel_name" | "title" | "published_at"; + +function VideosPanel() { + const [videos, setVideos] = useState([]); + const [channels, setChannels] = useState([]); + const [channelFilter, setChannelFilter] = useState(""); + const [statusFilter, setStatusFilter] = useState(""); + const [titleSearch, setTitleSearch] = useState(""); + const [processing, setProcessing] = useState(false); + const [processResult, setProcessResult] = useState(""); + const [sortKey, setSortKey] = useState("published_at"); + const [sortAsc, setSortAsc] = useState(false); + const [selected, setSelected] = useState>(new Set()); + const [deleting, setDeleting] = useState(false); + const [page, setPage] = useState(0); + const perPage = 15; + const [detail, setDetail] = useState(null); + const [detailLoading, setDetailLoading] = useState(false); + const [fetchingTranscript, setFetchingTranscript] = useState(false); + const [transcriptMode, setTranscriptMode] = useState<"auto" | "manual" | "generated">("auto"); + const [extracting, setExtracting] = useState(false); + const [showPrompt, setShowPrompt] = useState(false); + const [prompt, setPrompt] = useState(""); + const [editingTitle, setEditingTitle] = useState(false); + const [editTitle, setEditTitle] = useState(""); + const [editingRestIdx, setEditingRestIdx] = useState(null); + const [editRest, setEditRest] = useState<{ + name: string; + cuisine_type: string; + foods_mentioned: string; + evaluation: string; + address: string; + region: string; + price_range: string; + guests: string; + } | null>(null); + const [saving, setSaving] = useState(false); + const [showManualAdd, setShowManualAdd] = useState(false); + const [manualForm, setManualForm] = useState({ name: "", address: "", region: "", cuisine_type: "", price_range: "", foods_mentioned: "", evaluation: "", guests: "" }); + const [manualAdding, setManualAdding] = useState(false); + const [bulkExtracting, setBulkExtracting] = useState(false); + const [bulkTranscripting, setBulkTranscripting] = useState(false); + const [bulkProgress, setBulkProgress] = useState<{ + label: string; + total: number; + current: number; + currentTitle: string; + results: { title: string; detail: string; error?: boolean }[]; + waiting?: number; + } | null>(null); + + useEffect(() => { + api.getChannels().then(setChannels).catch(console.error); + }, []); + + const load = useCallback((reset = true) => { + api + .getVideos({ status: statusFilter || undefined, limit: 500 }) + .then((data) => { + setVideos(data); + if (reset) { + setSelected(new Set()); + setPage(0); + } + }) + .catch(console.error); + }, [statusFilter]); + + useEffect(() => { load(); }, [load]); + + const handleSelectVideo = async (v: Video) => { + if (detail?.id === v.id) { + setDetail(null); + return; + } + setDetailLoading(true); + try { + const d = await api.getVideoDetail(v.id); + setDetail(d); + } catch { + alert("영상 상세 조회 실패"); + } finally { + setDetailLoading(false); + } + }; + + const handleDelete = async (id: string, title: string) => { + if (!confirm(`"${title}" 영상을 삭제하시겠습니까?\n연결된 식당 데이터도 함께 삭제됩니다.`)) return; + try { + await api.deleteVideo(id); + load(); + } catch { + alert("영상 삭제 실패"); + } + }; + + const handleSkip = async (id: string) => { + try { + await api.skipVideo(id); + load(); + } catch { + alert("건너뛰기 실패"); + } + }; + + const handleBulkSkip = async () => { + if (selected.size === 0) return; + if (!confirm(`선택한 ${selected.size}개 영상을 건너뛰시겠습니까?`)) return; + for (const id of selected) { + try { await api.skipVideo(id); } catch { /* ignore */ } + } + setSelected(new Set()); + load(); + }; + + const handleBulkDelete = async () => { + if (selected.size === 0) return; + if (!confirm(`선택한 ${selected.size}개 영상을 삭제하시겠습니까?\n연결된 식당 데이터도 함께 삭제됩니다.`)) return; + setDeleting(true); + let failed = 0; + for (const id of selected) { + try { await api.deleteVideo(id); } catch { failed++; } + } + setSelected(new Set()); + load(); + setDeleting(false); + if (failed > 0) alert(`${failed}개 삭제 실패`); + }; + + const toggleSelect = (id: string) => { + setSelected((prev) => { + const next = new Set(prev); + if (next.has(id)) next.delete(id); else next.add(id); + return next; + }); + }; + + const handleProcess = async () => { + setProcessing(true); + setProcessResult(""); + try { + const res = await api.triggerProcessing(10); + setProcessResult(`${res.restaurants_extracted}개 식당 추출 완료`); + load(); + } catch { + setProcessResult("처리 실패"); + } finally { + setProcessing(false); + } + }; + + const startBulkStream = async (mode: "transcript" | "extract") => { + const isTranscript = mode === "transcript"; + const setRunning = isTranscript ? setBulkTranscripting : setBulkExtracting; + + try { + const pending = isTranscript + ? await api.getBulkTranscriptPending() + : await api.getBulkExtractPending(); + if (pending.count === 0) { + alert(isTranscript ? "자막 없는 영상이 없습니다" : "추출 대기 중인 영상이 없습니다"); + return; + } + const msg = isTranscript + ? `자막 없는 영상 ${pending.count}개의 트랜스크립트를 수집하시겠습니까?\n(영상 당 5~15초 랜덤 딜레이)` + : `LLM 추출이 안된 영상 ${pending.count}개를 벌크 처리하시겠습니까?\n(영상 당 3~8초 랜덤 딜레이)`; + if (!confirm(msg)) return; + + setRunning(true); + setBulkProgress({ + label: isTranscript ? "벌크 자막 수집" : "벌크 LLM 추출", + total: pending.count, current: 0, currentTitle: "", results: [], + }); + + const apiBase = process.env.NEXT_PUBLIC_API_URL || ""; + const endpoint = isTranscript ? "/api/videos/bulk-transcript" : "/api/videos/bulk-extract"; + const resp = await fetch(`${apiBase}${endpoint}`, { method: "POST" }); + const reader = resp.body?.getReader(); + const decoder = new TextDecoder(); + if (!reader) { setRunning(false); return; } + + let buf = ""; + while (true) { + const { done, value } = await reader.read(); + if (done) break; + buf += decoder.decode(value, { stream: true }); + const lines = buf.split("\n"); + buf = lines.pop() || ""; + for (const line of lines) { + if (!line.startsWith("data: ")) continue; + try { + const ev = JSON.parse(line.slice(6)); + if (ev.type === "processing") { + setBulkProgress((p) => p ? { ...p, current: ev.index + 1, currentTitle: ev.title, waiting: undefined } : p); + } else if (ev.type === "wait") { + setBulkProgress((p) => p ? { ...p, waiting: ev.delay } : p); + } else if (ev.type === "done") { + const detail = isTranscript + ? `${ev.source} / ${ev.length?.toLocaleString()}자` + : `${ev.restaurants}개 식당`; + setBulkProgress((p) => p ? { ...p, results: [...p.results, { title: ev.title, detail }] } : p); + } else if (ev.type === "error") { + setBulkProgress((p) => p ? { ...p, results: [...p.results, { title: ev.title, detail: ev.message, error: true }] } : p); + } else if (ev.type === "complete") { + setRunning(false); + load(); + } + } catch { /* ignore */ } + } + } + setRunning(false); + load(); + } catch { + setRunning(false); + } + }; + + const handleSort = (key: VideoSortKey) => { + if (sortKey === key) { + setSortAsc(!sortAsc); + } else { + setSortKey(key); + setSortAsc(true); + } + }; + + const filteredVideos = videos.filter((v) => { + if (titleSearch && !v.title.toLowerCase().includes(titleSearch.toLowerCase())) return false; + if (channelFilter && v.channel_name !== channelFilter) return false; + return true; + }); + + const sortedVideos = [...filteredVideos].sort((a, b) => { + const av = a[sortKey] ?? ""; + const bv = b[sortKey] ?? ""; + const cmp = av < bv ? -1 : av > bv ? 1 : 0; + return sortAsc ? cmp : -cmp; + }); + + const totalPages = Math.max(1, Math.ceil(sortedVideos.length / perPage)); + const pagedVideos = sortedVideos.slice(page * perPage, (page + 1) * perPage); + + const toggleSelectAll = () => { + const pageIds = pagedVideos.map((v) => v.id); + const allSelected = pageIds.every((id) => selected.has(id)); + if (allSelected) { + setSelected((prev) => { + const next = new Set(prev); + pageIds.forEach((id) => next.delete(id)); + return next; + }); + } else { + setSelected((prev) => new Set([...prev, ...pageIds])); + } + }; + + const sortIcon = (key: VideoSortKey) => { + if (sortKey !== key) return " ↕"; + return sortAsc ? " ↑" : " ↓"; + }; + + const statusColor: Record = { + pending: "bg-yellow-100 text-yellow-800", + processing: "bg-blue-100 text-blue-800", + done: "bg-green-100 text-green-800", + error: "bg-red-100 text-red-800", + skip: "bg-gray-100 text-gray-600", + }; + + return ( +
+
+ + +
+ { setTitleSearch(e.target.value); setPage(0); }} + onKeyDown={(e) => e.key === "Escape" && setTitleSearch("")} + className="border border-r-0 rounded-l px-3 py-2 text-sm w-48" + /> + {titleSearch ? ( + + ) : ( + + )} +
+ + + + {processResult && ( + {processResult} + )} + {selected.size > 0 && ( + <> + + + + )} + + {(titleSearch || channelFilter) ? `${filteredVideos.length} / ` : ""}총 {videos.length}개 + +
+ +
+ + + + + + + + + + + + + + + {pagedVideos.map((v) => ( + + + + + + + + + + + ))} + {videos.length === 0 && ( + + + + )} + +
+ 0 && pagedVideos.every((v) => selected.has(v.id))} + onChange={toggleSelectAll} + className="rounded" + /> + handleSort("status")} + > + 상태{sortIcon("status")} + handleSort("channel_name")} + > + 채널{sortIcon("channel_name")} + handleSort("title")} + > + 제목{sortIcon("title")} + 처리식당 handleSort("published_at")} + > + 게시일{sortIcon("published_at")} + 액션
+ toggleSelect(v.id)} + className="rounded" + /> + + + {v.status} + + {v.channel_name} + + + + {v.has_transcript ? "T" : "-"} + + + {v.has_llm ? "L" : "-"} + + + {v.restaurant_count > 0 ? ( + 0 + ? "bg-yellow-100 text-yellow-700" + : "bg-red-100 text-red-600" + }`} + title={`매칭 ${v.matched_count}/${v.restaurant_count}`} + > + {v.matched_count}/{v.restaurant_count} + + ) : ( + - + )} + + {v.published_at?.slice(0, 10) || "-"} + + {v.status === "pending" && ( + + )} + +
+ 영상이 없습니다 +
+
+ + {totalPages > 1 && ( +
+ + + + {page + 1} / {totalPages} + + + +
+ )} + + {/* 벌크 진행 패널 */} + {bulkProgress && ( +
+
+

+ {bulkProgress.label} ({bulkProgress.current}/{bulkProgress.total}) +

+ {!bulkExtracting && !bulkTranscripting && ( + + )} +
+
+
0 ? (bulkProgress.current / bulkProgress.total) * 100 : 0}%` }} + /> +
+ {(bulkExtracting || bulkTranscripting) && bulkProgress.currentTitle && ( +

+ {bulkProgress.waiting + ? `⏳ ${bulkProgress.waiting}초 대기 중...` + : `처리 중: ${bulkProgress.currentTitle}`} +

+ )} + {bulkProgress.results.length > 0 && ( +
+ {bulkProgress.results.map((r, i) => ( +
+ {r.error ? "✗" : "✓"} + {r.title} + + {r.detail} + +
+ ))} +
+ )} + {!bulkExtracting && !bulkTranscripting && bulkProgress.results.length > 0 && ( +

+ 완료! 성공 {bulkProgress.results.filter((r) => !r.error).length}/{bulkProgress.total} +

+ )} +
+ )} + + {/* 영상 상세 패널 */} + {detailLoading && ( +
로딩 중...
+ )} + {detail && !detailLoading && ( +
+
+ {editingTitle ? ( +
+ setEditTitle(e.target.value)} + className="flex-1 border rounded px-2 py-1 text-sm font-semibold" + /> + + +
+ ) : ( +

{ setEditTitle(detail.title); setEditingTitle(true); }} + title="클릭하여 제목 수정" + > + {detail.title} +

+ )} + +
+ +
+ {/* 왼쪽: YouTube 임베드 */} +
+
+