"""YouTube channel scanner + transcript extraction. Uses YouTube Data API v3 for channel video listing, youtube-transcript-api for transcript extraction. """ from __future__ import annotations import logging import os import re from datetime import datetime import httpx from youtube_transcript_api import YouTubeTranscriptApi from core.db import conn logger = logging.getLogger(__name__) def _api_key() -> str: return os.environ["YOUTUBE_DATA_API_KEY"] def extract_video_id(url: str) -> str: match = re.search(r"(?:v=|youtu\.be/)([^&?/\s]+)", url) if not match: raise ValueError(f"Cannot extract video ID from URL: {url}") return match.group(1) # -- Channel operations ------------------------------------------------------- def add_channel(channel_id: str, channel_name: str, title_filter: str | None = None) -> str: """Register a YouTube channel. Returns DB row id.""" sql = """ INSERT INTO channels (channel_id, channel_name, channel_url, title_filter) VALUES (:cid, :cname, :curl, :tf) RETURNING id INTO :out_id """ with conn() as c: cur = c.cursor() import oracledb out_id = cur.var(oracledb.STRING) cur.execute(sql, { "cid": channel_id, "cname": channel_name, "curl": f"https://www.youtube.com/channel/{channel_id}", "tf": title_filter, "out_id": out_id, }) return out_id.getvalue()[0] def deactivate_channel(channel_id: str) -> bool: """Deactivate a channel by channel_id. Returns True if found.""" sql = "UPDATE channels SET is_active = 0 WHERE channel_id = :cid AND is_active = 1" with conn() as c: cur = c.cursor() cur.execute(sql, {"cid": channel_id}) return cur.rowcount > 0 def deactivate_channel_by_db_id(db_id: str) -> bool: """Deactivate a channel by DB id. Returns True if found.""" sql = "UPDATE channels SET is_active = 0 WHERE id = :did AND is_active = 1" with conn() as c: cur = c.cursor() cur.execute(sql, {"did": db_id}) return cur.rowcount > 0 def get_active_channels() -> list[dict]: sql = """ SELECT c.id, c.channel_id, c.channel_name, c.title_filter, (SELECT COUNT(*) FROM videos v WHERE v.channel_id = c.id) as video_count, (SELECT MAX(v.created_at) FROM videos v WHERE v.channel_id = c.id) as last_scanned_at FROM channels c WHERE c.is_active = 1 """ with conn() as c: cur = c.cursor() cur.execute(sql) return [ { "id": r[0], "channel_id": r[1], "channel_name": r[2], "title_filter": r[3], "video_count": r[4] or 0, "last_scanned_at": r[5].isoformat() if r[5] else None, } for r in cur.fetchall() ] # -- Video listing via YouTube Data API v3 ------------------------------------ def get_latest_video_date(channel_db_id: str) -> str | None: """Get the latest published_at for a channel's videos in ISO 8601 format.""" sql = """ SELECT MAX(published_at) FROM videos WHERE channel_id = :ch_id AND published_at IS NOT NULL """ with conn() as c: cur = c.cursor() cur.execute(sql, {"ch_id": channel_db_id}) row = cur.fetchone() if row and row[0]: return row[0].strftime("%Y-%m-%dT%H:%M:%SZ") return None def _parse_iso8601_duration(dur: str) -> int: """Parse ISO 8601 duration (e.g. PT1M30S, PT5M, PT1H2M) to seconds.""" import re m = re.match(r"PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?", dur or "") if not m: return 0 h, mn, s = (int(x) if x else 0 for x in m.groups()) return h * 3600 + mn * 60 + s def _filter_shorts(videos: list[dict]) -> list[dict]: """Filter out YouTube Shorts (<=60s) by checking video durations via API.""" if not videos: return videos video_ids = [v["video_id"] for v in videos] r = httpx.get( "https://www.googleapis.com/youtube/v3/videos", params={ "key": _api_key(), "id": ",".join(video_ids), "part": "contentDetails", }, timeout=30, ) r.raise_for_status() durations = {} for item in r.json().get("items", []): durations[item["id"]] = _parse_iso8601_duration( item.get("contentDetails", {}).get("duration", "") ) return [v for v in videos if durations.get(v["video_id"], 0) > 60] def fetch_channel_videos_iter( channel_id: str, published_after: str | None = None, exclude_shorts: bool = True, ): """Yield pages of videos from a YouTube channel via Data API v3. Each yield is a list of dicts for one API page (up to 50). If exclude_shorts is True, filters out videos <= 60 seconds. """ params: dict = { "key": _api_key(), "channelId": channel_id, "part": "snippet", "order": "date", "maxResults": 50, "type": "video", } if published_after: params["publishedAfter"] = published_after next_page = None while True: if next_page: params["pageToken"] = next_page r = httpx.get( "https://www.googleapis.com/youtube/v3/search", params=params, timeout=30, ) r.raise_for_status() data = r.json() page_videos = [] for item in data.get("items", []): snippet = item["snippet"] vid = item["id"]["videoId"] page_videos.append({ "video_id": vid, "title": snippet["title"], "published_at": snippet["publishedAt"], "url": f"https://www.youtube.com/watch?v={vid}", }) if page_videos and exclude_shorts: page_videos = _filter_shorts(page_videos) if page_videos: yield page_videos next_page = data.get("nextPageToken") if not next_page: break def fetch_channel_videos( channel_id: str, max_results: int = 0, published_after: str | None = None, ) -> list[dict]: """Fetch video list from a YouTube channel via Data API v3. Args: max_results: 0 means fetch all available videos. """ videos: list[dict] = [] for page in fetch_channel_videos_iter(channel_id, published_after=published_after): videos.extend(page) if max_results > 0 and len(videos) >= max_results: break return videos[:max_results] if max_results > 0 else videos # -- Transcript extraction ---------------------------------------------------- def get_transcript(video_id: str, mode: str = "auto") -> tuple[str | None, str | None]: """Fetch transcript using Playwright (headless browser). Args: mode: "manual" = manual only, "generated" = auto-generated only, "auto" = try API first, fallback to browser transcript panel. Returns: (transcript_text, source) where source describes origin, or (None, None). """ # Try youtube-transcript-api first (fast path) text, source = _get_transcript_api(video_id, mode) if text: return text, source # Fallback: Playwright browser logger.warning("API failed for %s, trying Playwright browser", video_id) print(f"[TRANSCRIPT] API failed for {video_id}, trying Playwright browser", flush=True) return _get_transcript_browser(video_id) def _make_ytt() -> YouTubeTranscriptApi: """Create YouTubeTranscriptApi with cookies if available.""" cookie_file = os.path.join(os.path.dirname(__file__), "..", "cookies.txt") if os.path.exists(cookie_file): import http.cookiejar import requests jar = http.cookiejar.MozillaCookieJar(cookie_file) jar.load(ignore_discard=True, ignore_expires=True) session = requests.Session() session.cookies = jar return YouTubeTranscriptApi(http_client=session) return YouTubeTranscriptApi() def _get_transcript_api(video_id: str, mode: str = "auto") -> tuple[str | None, str | None]: """Try youtube-transcript-api (fast but may be IP-blocked).""" ytt = _make_ytt() prefer = ["ko", "en"] try: transcript_list = ytt.list(video_id) except Exception as e: logger.warning("Cannot list transcripts for %s: %s", video_id, e) return None, None all_transcripts = list(transcript_list) manual = [t for t in all_transcripts if not t.is_generated] generated = [t for t in all_transcripts if t.is_generated] def _pick(candidates): for lang in prefer: for t in candidates: if t.language_code == lang: return t return candidates[0] if candidates else None def _fetch(t): try: return " ".join(seg.text for seg in t.fetch()), t.language_code except Exception: return None, None if mode == "manual": t = _pick(manual) if t: text, lang = _fetch(t) return (text, f"manual ({lang})") if text else (None, None) return None, None elif mode == "generated": t = _pick(generated) if t: text, lang = _fetch(t) return (text, f"generated ({lang})") if text else (None, None) return None, None else: t = _pick(manual) if t: text, lang = _fetch(t) if text: return text, f"manual ({lang})" t = _pick(generated) if t: text, lang = _fetch(t) if text: return text, f"generated ({lang})" return None, None def _get_transcript_browser(video_id: str) -> tuple[str | None, str | None]: """Fetch transcript via Playwright browser (bypasses IP blocks).""" try: from playwright.sync_api import sync_playwright except ImportError: logger.error("playwright not installed") return None, None try: with sync_playwright() as p: browser = p.chromium.launch( headless=False, args=["--disable-blink-features=AutomationControlled"], ) ctx = browser.new_context(locale="ko-KR", viewport={"width": 1280, "height": 900}) # Load YouTube cookies if available cookie_file = os.path.join(os.path.dirname(__file__), "..", "cookies.txt") if os.path.exists(cookie_file): import http.cookiejar jar = http.cookiejar.MozillaCookieJar(cookie_file) jar.load(ignore_discard=True, ignore_expires=True) pw_cookies = [] for c in jar: if "youtube" in c.domain or "google" in c.domain: pw_cookies.append({ "name": c.name, "value": c.value, "domain": c.domain, "path": c.path, "secure": c.secure, "httpOnly": False, }) if pw_cookies: ctx.add_cookies(pw_cookies) print(f"[TRANSCRIPT] Loaded {len(pw_cookies)} cookies", flush=True) page = ctx.new_page() page.add_init_script("Object.defineProperty(navigator, 'webdriver', {get: () => false})") print(f"[TRANSCRIPT] Opening YouTube page for {video_id}", flush=True) page.goto( f"https://www.youtube.com/watch?v={video_id}", wait_until="domcontentloaded", timeout=30000, ) page.wait_for_timeout(5000) # Skip ads if present for ad_wait in range(12): # up to ~60s for ads ad_status = page.evaluate("""() => { const skipBtn = document.querySelector('.ytp-skip-ad-button, .ytp-ad-skip-button, .ytp-ad-skip-button-modern, button.ytp-ad-skip-button-modern'); if (skipBtn) { skipBtn.click(); return 'skipped'; } const adOverlay = document.querySelector('.ytp-ad-player-overlay, .ad-showing'); if (adOverlay) return 'playing'; const adBadge = document.querySelector('.ytp-ad-text'); if (adBadge && adBadge.textContent) return 'badge'; return 'none'; }""") if ad_status == "none": break print(f"[TRANSCRIPT] Ad detected: {ad_status}, waiting...", flush=True) if ad_status == "skipped": page.wait_for_timeout(2000) break page.wait_for_timeout(5000) page.wait_for_timeout(2000) print(f"[TRANSCRIPT] Page loaded, looking for transcript button", flush=True) # Click "더보기" (more actions) button first to reveal transcript option page.evaluate(""" () => { // Try clicking the "...더보기" button in description area const moreBtn = document.querySelector('tp-yt-paper-button#expand'); if (moreBtn) moreBtn.click(); } """) page.wait_for_timeout(2000) # Click "스크립트 표시" button via JS clicked = page.evaluate(""" () => { // Method 1: aria-label for (const label of ['스크립트 표시', 'Show transcript']) { const btns = document.querySelectorAll(`button[aria-label="${label}"]`); for (const b of btns) { b.click(); return 'aria-label: ' + label; } } // Method 2: search all buttons by text content const allBtns = document.querySelectorAll('button'); for (const b of allBtns) { const text = b.textContent.trim(); if (text === '스크립트 표시' || text === 'Show transcript') { b.click(); return 'text: ' + text; } } // Method 3: look for transcript button in engagement panel const engBtns = document.querySelectorAll('ytd-button-renderer button, ytd-button-renderer a'); for (const b of engBtns) { const text = b.textContent.trim().toLowerCase(); if (text.includes('transcript') || text.includes('스크립트')) { b.click(); return 'engagement: ' + text; } } return false; } """) print(f"[TRANSCRIPT] Clicked transcript button: {clicked}", flush=True) if not clicked: # Dump available buttons for debugging btn_labels = page.evaluate(""" () => { const btns = document.querySelectorAll('button[aria-label]'); return Array.from(btns).map(b => b.getAttribute('aria-label')).slice(0, 30); } """) print(f"[TRANSCRIPT] Available buttons: {btn_labels}", flush=True) browser.close() return None, None # Wait for transcript panel segments to appear (max ~40s) page.wait_for_timeout(3000) # initial wait for panel to render for attempt in range(12): page.wait_for_timeout(3000) count = page.evaluate( "() => document.querySelectorAll('ytd-transcript-segment-renderer').length" ) print(f"[TRANSCRIPT] Wait {(attempt+1)*3+3}s: {count} segments", flush=True) if count > 0: break # Select Korean if available (language selector in transcript panel) page.evaluate(""" () => { // Open language dropdown and pick Korean if available const menu = document.querySelector('ytd-transcript-renderer ytd-menu-renderer yt-dropdown-menu'); if (!menu) return; const trigger = menu.querySelector('button, tp-yt-paper-button'); if (trigger) trigger.click(); } """) page.wait_for_timeout(1000) page.evaluate(""" () => { const items = document.querySelectorAll('tp-yt-paper-listbox a, tp-yt-paper-listbox tp-yt-paper-item'); for (const item of items) { const text = item.textContent.trim(); if (text.includes('한국어') || text.includes('Korean')) { item.click(); return; } } } """) page.wait_for_timeout(2000) # Scroll transcript panel to load all segments segments = page.evaluate(""" async () => { const container = document.querySelector( 'ytd-transcript-segment-list-renderer #segments-container, ' + 'ytd-transcript-renderer #body' ); if (!container) { // Fallback: just grab what's there const segs = document.querySelectorAll('ytd-transcript-segment-renderer'); return Array.from(segs).map(s => { const txt = s.querySelector('.segment-text, yt-formatted-string.segment-text'); return txt ? txt.textContent.trim() : ''; }).filter(t => t); } // Scroll to bottom repeatedly to load all virtual segments let prevCount = 0; for (let i = 0; i < 50; i++) { container.scrollTop = container.scrollHeight; await new Promise(r => setTimeout(r, 300)); const segs = document.querySelectorAll('ytd-transcript-segment-renderer'); if (segs.length === prevCount && i > 3) break; prevCount = segs.length; } const segs = document.querySelectorAll('ytd-transcript-segment-renderer'); return Array.from(segs).map(s => { const txt = s.querySelector('.segment-text, yt-formatted-string.segment-text'); return txt ? txt.textContent.trim() : ''; }).filter(t => t); } """) browser.close() print(f"[TRANSCRIPT] Got {len(segments) if segments else 0} segments for {video_id}", flush=True) if segments: text = " ".join(segments) print(f"[TRANSCRIPT] Success: {len(text)} chars from {len(segments)} segments", flush=True) return text, "browser" return None, None except Exception as e: logger.error("Playwright transcript failed for %s: %s", video_id, e) print(f"[TRANSCRIPT] Playwright FAILED for {video_id}: {e}", flush=True) import traceback traceback.print_exc() return None, None # -- DB operations for videos ------------------------------------------------- def save_video(channel_db_id: str, video: dict) -> str | None: """Insert a video row if not exists. Returns row id or None if duplicate.""" sql = """ INSERT INTO videos (channel_id, video_id, title, url, published_at, status) VALUES (:ch_id, :vid, :title, :url, :pub_at, 'pending') RETURNING id INTO :out_id """ with conn() as c: cur = c.cursor() import oracledb out_id = cur.var(oracledb.STRING) try: pub_at = None if video.get("published_at"): pub_at = datetime.fromisoformat( video["published_at"].replace("Z", "+00:00") ) cur.execute(sql, { "ch_id": channel_db_id, "vid": video["video_id"], "title": video["title"], "url": video["url"], "pub_at": pub_at, "out_id": out_id, }) return out_id.getvalue()[0] except Exception as e: if "UQ_VIDEOS_VID" in str(e).upper(): return None # duplicate raise def get_existing_video_ids(channel_db_id: str) -> set[str]: """Get all video_ids already in DB for a channel.""" with conn() as c: cur = c.cursor() cur.execute("SELECT video_id FROM videos WHERE channel_id = :cid", {"cid": channel_db_id}) return {r[0] for r in cur.fetchall()} def save_videos_batch(channel_db_id: str, videos: list[dict]) -> int: """Insert multiple videos in a single DB connection. Returns count of new videos.""" if not videos: return 0 import oracledb sql = """ INSERT INTO videos (channel_id, video_id, title, url, published_at, status) VALUES (:ch_id, :vid, :title, :url, :pub_at, 'pending') """ new_count = 0 with conn() as c: cur = c.cursor() for video in videos: try: pub_at = None if video.get("published_at"): pub_at = datetime.fromisoformat( video["published_at"].replace("Z", "+00:00") ) cur.execute(sql, { "ch_id": channel_db_id, "vid": video["video_id"], "title": video["title"], "url": video["url"], "pub_at": pub_at, }) new_count += 1 except Exception as e: if "UQ_VIDEOS_VID" in str(e).upper(): continue raise return new_count def get_pending_videos(limit: int = 10) -> list[dict]: sql = """ SELECT id, video_id, title, url FROM videos WHERE status = 'pending' ORDER BY created_at FETCH FIRST :n ROWS ONLY """ with conn() as c: cur = c.cursor() cur.execute(sql, {"n": limit}) return [ {"id": r[0], "video_id": r[1], "title": r[2], "url": r[3]} for r in cur.fetchall() ] def update_video_status( video_db_id: str, status: str, transcript: str | None = None, llm_raw: str | None = None, ) -> None: sets = ["status = :st", "processed_at = SYSTIMESTAMP"] params: dict = {"st": status, "vid": video_db_id} if transcript: sets.append("transcript_text = :txt") params["txt"] = transcript if llm_raw: sets.append("llm_raw_response = :llm_resp") params["llm_resp"] = llm_raw sql = f"UPDATE videos SET {', '.join(sets)} WHERE id = :vid" with conn() as c: c.cursor().execute(sql, params) # -- Scan: fetch new videos for all active channels --------------------------- def scan_all_channels() -> int: """Scan all active channels for new videos. Returns count of new videos.""" channels = get_active_channels() total_new = 0 for ch in channels: try: after = get_latest_video_date(ch["id"]) title_filter = ch.get("title_filter") new_count = 0 fetched = 0 for page in fetch_channel_videos_iter(ch["channel_id"], published_after=after): fetched += len(page) for v in page: if title_filter and title_filter not in v["title"]: continue row_id = save_video(ch["id"], v) if row_id: new_count += 1 total_new += new_count logger.info( "Channel %s: fetched %d videos (after=%s), %d new (filter=%s)", ch["channel_name"], fetched, after or "all", new_count, title_filter or "none", ) except Exception as e: logger.error("Failed to scan channel %s: %s", ch["channel_name"], e) return total_new