"""Data pipeline: process pending videos end-to-end. For each pending video: 1. Fetch transcript 2. Extract restaurant info via LLM 3. Geocode each restaurant 4. Save to DB + generate vector embeddings """ from __future__ import annotations import json import logging from core import youtube, extractor, geocoding, restaurant, vector logger = logging.getLogger(__name__) def process_video(video: dict) -> int: """Process a single pending video. Returns number of restaurants found.""" video_db_id = video["id"] video_id = video["video_id"] title = video["title"] logger.info("Processing video: %s (%s)", title, video_id) youtube.update_video_status(video_db_id, "processing") try: # 1. Transcript transcript, _src = youtube.get_transcript(video_id) if not transcript: logger.warning("No transcript for %s, marking done", video_id) youtube.update_video_status(video_db_id, "done") return 0 youtube.update_video_status(video_db_id, "processing", transcript) # 2. LLM extraction restaurants, llm_raw = extractor.extract_restaurants(title, transcript) if not restaurants: logger.info("No restaurants found in %s", video_id) youtube.update_video_status(video_db_id, "done", llm_raw=llm_raw) return 0 # 3-4. Geocode + save each restaurant count = 0 for rest_data in restaurants: name = rest_data.get("name") if not name: continue # Geocode geo = geocoding.geocode_restaurant( name, address=rest_data.get("address"), region=rest_data.get("region"), ) lat = geo["latitude"] if geo else None lng = geo["longitude"] if geo else None addr = geo["formatted_address"] if geo else rest_data.get("address") place_id = geo["google_place_id"] if geo else None # Upsert restaurant rest_id = restaurant.upsert( name=name, address=addr, region=rest_data.get("region"), latitude=lat, longitude=lng, cuisine_type=rest_data.get("cuisine_type"), price_range=rest_data.get("price_range"), google_place_id=place_id, phone=geo.get("phone") if geo else None, website=geo.get("website") if geo else None, business_status=geo.get("business_status") if geo else None, rating=geo.get("rating") if geo else None, rating_count=geo.get("rating_count") if geo else None, ) # Link video <-> restaurant restaurant.link_video_restaurant( video_db_id=video_db_id, restaurant_id=rest_id, foods=rest_data.get("foods_mentioned"), evaluation=rest_data.get("evaluation"), guests=rest_data.get("guests"), ) # Vector embeddings chunks = _build_chunks(name, rest_data, title) if chunks: vector.save_restaurant_vectors(rest_id, chunks) count += 1 logger.info("Saved restaurant: %s (geocoded=%s)", name, bool(geo)) youtube.update_video_status(video_db_id, "done", llm_raw=llm_raw) logger.info("Video %s done: %d restaurants", video_id, count) return count except Exception as e: logger.error("Pipeline error for %s: %s", video_id, e, exc_info=True) youtube.update_video_status(video_db_id, "error") return 0 def process_video_extract(video: dict, transcript: str, custom_prompt: str | None = None) -> int: """Run LLM extraction + geocode + save on an existing transcript. Returns number of restaurants found.""" video_db_id = video["id"] title = video["title"] logger.info("Extracting restaurants from video: %s", title) try: restaurants, llm_raw = extractor.extract_restaurants(title, transcript, custom_prompt=custom_prompt) if not restaurants: youtube.update_video_status(video_db_id, "done", llm_raw=llm_raw) return 0 count = 0 for rest_data in restaurants: name = rest_data.get("name") if not name: continue geo = geocoding.geocode_restaurant( name, address=rest_data.get("address"), region=rest_data.get("region"), ) lat = geo["latitude"] if geo else None lng = geo["longitude"] if geo else None addr = geo["formatted_address"] if geo else rest_data.get("address") place_id = geo["google_place_id"] if geo else None rest_id = restaurant.upsert( name=name, address=addr, region=rest_data.get("region"), latitude=lat, longitude=lng, cuisine_type=rest_data.get("cuisine_type"), price_range=rest_data.get("price_range"), google_place_id=place_id, phone=geo.get("phone") if geo else None, website=geo.get("website") if geo else None, business_status=geo.get("business_status") if geo else None, rating=geo.get("rating") if geo else None, rating_count=geo.get("rating_count") if geo else None, ) restaurant.link_video_restaurant( video_db_id=video_db_id, restaurant_id=rest_id, foods=rest_data.get("foods_mentioned"), evaluation=rest_data.get("evaluation"), guests=rest_data.get("guests"), ) chunks = _build_chunks(name, rest_data, title) if chunks: vector.save_restaurant_vectors(rest_id, chunks) count += 1 logger.info("Saved restaurant: %s (geocoded=%s)", name, bool(geo)) youtube.update_video_status(video_db_id, "done", llm_raw=llm_raw) return count except Exception as e: logger.error("Extract error for %s: %s", video["video_id"], e, exc_info=True) return 0 def _build_chunks(name: str, data: dict, video_title: str) -> list[str]: """Build text chunks for vector embedding.""" parts = [f"식당: {name}"] if data.get("region"): parts.append(f"지역: {data['region']}") if data.get("cuisine_type"): parts.append(f"음식 종류: {data['cuisine_type']}") if data.get("foods_mentioned"): foods = data["foods_mentioned"] if isinstance(foods, list): parts.append(f"메뉴: {', '.join(foods)}") if data.get("evaluation"): parts.append(f"평가: {data['evaluation']}") if data.get("price_range"): parts.append(f"가격대: {data['price_range']}") parts.append(f"영상: {video_title}") return ["\n".join(parts)] def process_pending(limit: int = 5) -> int: """Process up to `limit` pending videos. Returns total restaurants found.""" videos = youtube.get_pending_videos(limit) if not videos: logger.info("No pending videos") return 0 total = 0 for v in videos: total += process_video(v) return total