- Admin: video management with Google Maps match status, manual restaurant mapping, restaurant remap on name change - Admin: user management tab with favorites/reviews detail - Admin: channel deletion fix for IDs with slashes - Frontend: responsive mobile layout (map top, list bottom, 2-row header) - Frontend: channel-colored map markers with legend - Frontend: my reviews list, favorites toggle, visit counter overlay - Frontend: force light mode for dark theme devices - Backend: visit tracking (site_visits table), user reviews endpoint - Backend: bulk transcript/extract streaming, geocode key fixes - Nginx config for production deployment Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
210 lines
7.3 KiB
Python
210 lines
7.3 KiB
Python
"""Data pipeline: process pending videos end-to-end.
|
|
|
|
For each pending video:
|
|
1. Fetch transcript
|
|
2. Extract restaurant info via LLM
|
|
3. Geocode each restaurant
|
|
4. Save to DB + generate vector embeddings
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
|
|
from core import youtube, extractor, geocoding, restaurant, vector
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def process_video(video: dict) -> int:
|
|
"""Process a single pending video. Returns number of restaurants found."""
|
|
video_db_id = video["id"]
|
|
video_id = video["video_id"]
|
|
title = video["title"]
|
|
|
|
logger.info("Processing video: %s (%s)", title, video_id)
|
|
youtube.update_video_status(video_db_id, "processing")
|
|
|
|
try:
|
|
# 1. Transcript
|
|
transcript, _src = youtube.get_transcript(video_id)
|
|
if not transcript:
|
|
logger.warning("No transcript for %s, marking done", video_id)
|
|
youtube.update_video_status(video_db_id, "done")
|
|
return 0
|
|
|
|
youtube.update_video_status(video_db_id, "processing", transcript)
|
|
|
|
# 2. LLM extraction
|
|
restaurants, llm_raw = extractor.extract_restaurants(title, transcript)
|
|
if not restaurants:
|
|
logger.info("No restaurants found in %s", video_id)
|
|
youtube.update_video_status(video_db_id, "done", llm_raw=llm_raw)
|
|
return 0
|
|
|
|
# 3-4. Geocode + save each restaurant
|
|
count = 0
|
|
for rest_data in restaurants:
|
|
name = rest_data.get("name")
|
|
if not name:
|
|
continue
|
|
|
|
# Geocode
|
|
geo = geocoding.geocode_restaurant(
|
|
name,
|
|
address=rest_data.get("address"),
|
|
region=rest_data.get("region"),
|
|
)
|
|
|
|
lat = geo["latitude"] if geo else None
|
|
lng = geo["longitude"] if geo else None
|
|
addr = geo["formatted_address"] if geo else rest_data.get("address")
|
|
place_id = geo["google_place_id"] if geo else None
|
|
|
|
# Upsert restaurant
|
|
rest_id = restaurant.upsert(
|
|
name=name,
|
|
address=addr,
|
|
region=rest_data.get("region"),
|
|
latitude=lat,
|
|
longitude=lng,
|
|
cuisine_type=rest_data.get("cuisine_type"),
|
|
price_range=rest_data.get("price_range"),
|
|
google_place_id=place_id,
|
|
phone=geo.get("phone") if geo else None,
|
|
website=geo.get("website") if geo else None,
|
|
business_status=geo.get("business_status") if geo else None,
|
|
rating=geo.get("rating") if geo else None,
|
|
rating_count=geo.get("rating_count") if geo else None,
|
|
)
|
|
|
|
# Link video <-> restaurant
|
|
restaurant.link_video_restaurant(
|
|
video_db_id=video_db_id,
|
|
restaurant_id=rest_id,
|
|
foods=rest_data.get("foods_mentioned"),
|
|
evaluation=rest_data.get("evaluation"),
|
|
guests=rest_data.get("guests"),
|
|
)
|
|
|
|
# Vector embeddings
|
|
chunks = _build_chunks(name, rest_data, title)
|
|
if chunks:
|
|
vector.save_restaurant_vectors(rest_id, chunks)
|
|
|
|
count += 1
|
|
logger.info("Saved restaurant: %s (geocoded=%s)", name, bool(geo))
|
|
|
|
youtube.update_video_status(video_db_id, "done", llm_raw=llm_raw)
|
|
logger.info("Video %s done: %d restaurants", video_id, count)
|
|
return count
|
|
|
|
except Exception as e:
|
|
logger.error("Pipeline error for %s: %s", video_id, e, exc_info=True)
|
|
youtube.update_video_status(video_db_id, "error")
|
|
return 0
|
|
|
|
|
|
def process_video_extract(video: dict, transcript: str, custom_prompt: str | None = None) -> int:
|
|
"""Run LLM extraction + geocode + save on an existing transcript.
|
|
Returns number of restaurants found."""
|
|
video_db_id = video["id"]
|
|
title = video["title"]
|
|
|
|
logger.info("Extracting restaurants from video: %s", title)
|
|
|
|
try:
|
|
restaurants, llm_raw = extractor.extract_restaurants(title, transcript, custom_prompt=custom_prompt)
|
|
if not restaurants:
|
|
youtube.update_video_status(video_db_id, "done", llm_raw=llm_raw)
|
|
return 0
|
|
|
|
count = 0
|
|
for rest_data in restaurants:
|
|
name = rest_data.get("name")
|
|
if not name:
|
|
continue
|
|
|
|
geo = geocoding.geocode_restaurant(
|
|
name,
|
|
address=rest_data.get("address"),
|
|
region=rest_data.get("region"),
|
|
)
|
|
|
|
lat = geo["latitude"] if geo else None
|
|
lng = geo["longitude"] if geo else None
|
|
addr = geo["formatted_address"] if geo else rest_data.get("address")
|
|
place_id = geo["google_place_id"] if geo else None
|
|
|
|
rest_id = restaurant.upsert(
|
|
name=name,
|
|
address=addr,
|
|
region=rest_data.get("region"),
|
|
latitude=lat,
|
|
longitude=lng,
|
|
cuisine_type=rest_data.get("cuisine_type"),
|
|
price_range=rest_data.get("price_range"),
|
|
google_place_id=place_id,
|
|
phone=geo.get("phone") if geo else None,
|
|
website=geo.get("website") if geo else None,
|
|
business_status=geo.get("business_status") if geo else None,
|
|
rating=geo.get("rating") if geo else None,
|
|
rating_count=geo.get("rating_count") if geo else None,
|
|
)
|
|
|
|
restaurant.link_video_restaurant(
|
|
video_db_id=video_db_id,
|
|
restaurant_id=rest_id,
|
|
foods=rest_data.get("foods_mentioned"),
|
|
evaluation=rest_data.get("evaluation"),
|
|
guests=rest_data.get("guests"),
|
|
)
|
|
|
|
chunks = _build_chunks(name, rest_data, title)
|
|
if chunks:
|
|
vector.save_restaurant_vectors(rest_id, chunks)
|
|
|
|
count += 1
|
|
logger.info("Saved restaurant: %s (geocoded=%s)", name, bool(geo))
|
|
|
|
youtube.update_video_status(video_db_id, "done", llm_raw=llm_raw)
|
|
return count
|
|
|
|
except Exception as e:
|
|
logger.error("Extract error for %s: %s", video["video_id"], e, exc_info=True)
|
|
return 0
|
|
|
|
|
|
def _build_chunks(name: str, data: dict, video_title: str) -> list[str]:
|
|
"""Build text chunks for vector embedding."""
|
|
parts = [f"식당: {name}"]
|
|
if data.get("region"):
|
|
parts.append(f"지역: {data['region']}")
|
|
if data.get("cuisine_type"):
|
|
parts.append(f"음식 종류: {data['cuisine_type']}")
|
|
if data.get("foods_mentioned"):
|
|
foods = data["foods_mentioned"]
|
|
if isinstance(foods, list):
|
|
parts.append(f"메뉴: {', '.join(foods)}")
|
|
if data.get("evaluation"):
|
|
parts.append(f"평가: {data['evaluation']}")
|
|
if data.get("price_range"):
|
|
parts.append(f"가격대: {data['price_range']}")
|
|
parts.append(f"영상: {video_title}")
|
|
|
|
return ["\n".join(parts)]
|
|
|
|
|
|
def process_pending(limit: int = 5) -> int:
|
|
"""Process up to `limit` pending videos. Returns total restaurants found."""
|
|
videos = youtube.get_pending_videos(limit)
|
|
if not videos:
|
|
logger.info("No pending videos")
|
|
return 0
|
|
|
|
total = 0
|
|
for v in videos:
|
|
total += process_video(v)
|
|
return total
|