- Admin: video management with Google Maps match status, manual restaurant mapping, restaurant remap on name change - Admin: user management tab with favorites/reviews detail - Admin: channel deletion fix for IDs with slashes - Frontend: responsive mobile layout (map top, list bottom, 2-row header) - Frontend: channel-colored map markers with legend - Frontend: my reviews list, favorites toggle, visit counter overlay - Frontend: force light mode for dark theme devices - Backend: visit tracking (site_visits table), user reviews endpoint - Backend: bulk transcript/extract streaming, geocode key fixes - Nginx config for production deployment Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
608 lines
22 KiB
Python
608 lines
22 KiB
Python
"""YouTube channel scanner + transcript extraction.
|
|
|
|
Uses YouTube Data API v3 for channel video listing,
|
|
youtube-transcript-api for transcript extraction.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import re
|
|
from datetime import datetime
|
|
|
|
import httpx
|
|
from youtube_transcript_api import YouTubeTranscriptApi
|
|
|
|
from core.db import conn
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _api_key() -> str:
|
|
return os.environ["YOUTUBE_DATA_API_KEY"]
|
|
|
|
|
|
def extract_video_id(url: str) -> str:
|
|
match = re.search(r"(?:v=|youtu\.be/)([^&?/\s]+)", url)
|
|
if not match:
|
|
raise ValueError(f"Cannot extract video ID from URL: {url}")
|
|
return match.group(1)
|
|
|
|
|
|
# -- Channel operations -------------------------------------------------------
|
|
|
|
def add_channel(channel_id: str, channel_name: str, title_filter: str | None = None) -> str:
|
|
"""Register a YouTube channel. Returns DB row id."""
|
|
sql = """
|
|
INSERT INTO channels (channel_id, channel_name, channel_url, title_filter)
|
|
VALUES (:cid, :cname, :curl, :tf)
|
|
RETURNING id INTO :out_id
|
|
"""
|
|
with conn() as c:
|
|
cur = c.cursor()
|
|
import oracledb
|
|
out_id = cur.var(oracledb.STRING)
|
|
cur.execute(sql, {
|
|
"cid": channel_id,
|
|
"cname": channel_name,
|
|
"curl": f"https://www.youtube.com/channel/{channel_id}",
|
|
"tf": title_filter,
|
|
"out_id": out_id,
|
|
})
|
|
return out_id.getvalue()[0]
|
|
|
|
|
|
def deactivate_channel(channel_id: str) -> bool:
|
|
"""Deactivate a channel by channel_id. Returns True if found."""
|
|
sql = "UPDATE channels SET is_active = 0 WHERE channel_id = :cid AND is_active = 1"
|
|
with conn() as c:
|
|
cur = c.cursor()
|
|
cur.execute(sql, {"cid": channel_id})
|
|
return cur.rowcount > 0
|
|
|
|
|
|
def deactivate_channel_by_db_id(db_id: str) -> bool:
|
|
"""Deactivate a channel by DB id. Returns True if found."""
|
|
sql = "UPDATE channels SET is_active = 0 WHERE id = :did AND is_active = 1"
|
|
with conn() as c:
|
|
cur = c.cursor()
|
|
cur.execute(sql, {"did": db_id})
|
|
return cur.rowcount > 0
|
|
|
|
|
|
def get_active_channels() -> list[dict]:
|
|
sql = "SELECT id, channel_id, channel_name, title_filter FROM channels WHERE is_active = 1"
|
|
with conn() as c:
|
|
cur = c.cursor()
|
|
cur.execute(sql)
|
|
return [
|
|
{"id": r[0], "channel_id": r[1], "channel_name": r[2], "title_filter": r[3]}
|
|
for r in cur.fetchall()
|
|
]
|
|
|
|
|
|
# -- Video listing via YouTube Data API v3 ------------------------------------
|
|
|
|
def get_latest_video_date(channel_db_id: str) -> str | None:
|
|
"""Get the latest published_at for a channel's videos in ISO 8601 format."""
|
|
sql = """
|
|
SELECT MAX(published_at) FROM videos
|
|
WHERE channel_id = :ch_id AND published_at IS NOT NULL
|
|
"""
|
|
with conn() as c:
|
|
cur = c.cursor()
|
|
cur.execute(sql, {"ch_id": channel_db_id})
|
|
row = cur.fetchone()
|
|
if row and row[0]:
|
|
return row[0].strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
return None
|
|
|
|
|
|
def fetch_channel_videos_iter(
|
|
channel_id: str,
|
|
published_after: str | None = None,
|
|
):
|
|
"""Yield pages of videos from a YouTube channel via Data API v3.
|
|
|
|
Each yield is a list of dicts for one API page (up to 50).
|
|
"""
|
|
params: dict = {
|
|
"key": _api_key(),
|
|
"channelId": channel_id,
|
|
"part": "snippet",
|
|
"order": "date",
|
|
"maxResults": 50,
|
|
"type": "video",
|
|
}
|
|
if published_after:
|
|
params["publishedAfter"] = published_after
|
|
|
|
next_page = None
|
|
|
|
while True:
|
|
if next_page:
|
|
params["pageToken"] = next_page
|
|
|
|
r = httpx.get(
|
|
"https://www.googleapis.com/youtube/v3/search",
|
|
params=params,
|
|
timeout=15,
|
|
)
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
|
|
page_videos = []
|
|
for item in data.get("items", []):
|
|
snippet = item["snippet"]
|
|
vid = item["id"]["videoId"]
|
|
page_videos.append({
|
|
"video_id": vid,
|
|
"title": snippet["title"],
|
|
"published_at": snippet["publishedAt"],
|
|
"url": f"https://www.youtube.com/watch?v={vid}",
|
|
})
|
|
|
|
if page_videos:
|
|
yield page_videos
|
|
|
|
next_page = data.get("nextPageToken")
|
|
if not next_page:
|
|
break
|
|
|
|
|
|
def fetch_channel_videos(
|
|
channel_id: str,
|
|
max_results: int = 0,
|
|
published_after: str | None = None,
|
|
) -> list[dict]:
|
|
"""Fetch video list from a YouTube channel via Data API v3.
|
|
|
|
Args:
|
|
max_results: 0 means fetch all available videos.
|
|
"""
|
|
videos: list[dict] = []
|
|
for page in fetch_channel_videos_iter(channel_id, published_after=published_after):
|
|
videos.extend(page)
|
|
if max_results > 0 and len(videos) >= max_results:
|
|
break
|
|
return videos[:max_results] if max_results > 0 else videos
|
|
|
|
|
|
# -- Transcript extraction ----------------------------------------------------
|
|
|
|
def get_transcript(video_id: str, mode: str = "auto") -> tuple[str | None, str | None]:
|
|
"""Fetch transcript using Playwright (headless browser).
|
|
|
|
Args:
|
|
mode: "manual" = manual only, "generated" = auto-generated only,
|
|
"auto" = try API first, fallback to browser transcript panel.
|
|
|
|
Returns:
|
|
(transcript_text, source) where source describes origin, or (None, None).
|
|
"""
|
|
# Try youtube-transcript-api first (fast path)
|
|
text, source = _get_transcript_api(video_id, mode)
|
|
if text:
|
|
return text, source
|
|
|
|
# Fallback: Playwright browser
|
|
logger.warning("API failed for %s, trying Playwright browser", video_id)
|
|
print(f"[TRANSCRIPT] API failed for {video_id}, trying Playwright browser", flush=True)
|
|
return _get_transcript_browser(video_id)
|
|
|
|
|
|
def _make_ytt() -> YouTubeTranscriptApi:
|
|
"""Create YouTubeTranscriptApi with cookies if available."""
|
|
cookie_file = os.path.join(os.path.dirname(__file__), "..", "cookies.txt")
|
|
if os.path.exists(cookie_file):
|
|
import http.cookiejar
|
|
import requests
|
|
jar = http.cookiejar.MozillaCookieJar(cookie_file)
|
|
jar.load(ignore_discard=True, ignore_expires=True)
|
|
session = requests.Session()
|
|
session.cookies = jar
|
|
return YouTubeTranscriptApi(http_client=session)
|
|
return YouTubeTranscriptApi()
|
|
|
|
|
|
def _get_transcript_api(video_id: str, mode: str = "auto") -> tuple[str | None, str | None]:
|
|
"""Try youtube-transcript-api (fast but may be IP-blocked)."""
|
|
ytt = _make_ytt()
|
|
prefer = ["ko", "en"]
|
|
|
|
try:
|
|
transcript_list = ytt.list(video_id)
|
|
except Exception as e:
|
|
logger.warning("Cannot list transcripts for %s: %s", video_id, e)
|
|
return None, None
|
|
|
|
all_transcripts = list(transcript_list)
|
|
manual = [t for t in all_transcripts if not t.is_generated]
|
|
generated = [t for t in all_transcripts if t.is_generated]
|
|
|
|
def _pick(candidates):
|
|
for lang in prefer:
|
|
for t in candidates:
|
|
if t.language_code == lang:
|
|
return t
|
|
return candidates[0] if candidates else None
|
|
|
|
def _fetch(t):
|
|
try:
|
|
return " ".join(seg.text for seg in t.fetch()), t.language_code
|
|
except Exception:
|
|
return None, None
|
|
|
|
if mode == "manual":
|
|
t = _pick(manual)
|
|
if t:
|
|
text, lang = _fetch(t)
|
|
return (text, f"manual ({lang})") if text else (None, None)
|
|
return None, None
|
|
elif mode == "generated":
|
|
t = _pick(generated)
|
|
if t:
|
|
text, lang = _fetch(t)
|
|
return (text, f"generated ({lang})") if text else (None, None)
|
|
return None, None
|
|
else:
|
|
t = _pick(manual)
|
|
if t:
|
|
text, lang = _fetch(t)
|
|
if text:
|
|
return text, f"manual ({lang})"
|
|
t = _pick(generated)
|
|
if t:
|
|
text, lang = _fetch(t)
|
|
if text:
|
|
return text, f"generated ({lang})"
|
|
return None, None
|
|
|
|
|
|
def _get_transcript_browser(video_id: str) -> tuple[str | None, str | None]:
|
|
"""Fetch transcript via Playwright browser (bypasses IP blocks)."""
|
|
try:
|
|
from playwright.sync_api import sync_playwright
|
|
except ImportError:
|
|
logger.error("playwright not installed")
|
|
return None, None
|
|
|
|
try:
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch(
|
|
headless=False,
|
|
args=["--disable-blink-features=AutomationControlled"],
|
|
)
|
|
ctx = browser.new_context(locale="ko-KR", viewport={"width": 1280, "height": 900})
|
|
|
|
# Load YouTube cookies if available
|
|
cookie_file = os.path.join(os.path.dirname(__file__), "..", "cookies.txt")
|
|
if os.path.exists(cookie_file):
|
|
import http.cookiejar
|
|
jar = http.cookiejar.MozillaCookieJar(cookie_file)
|
|
jar.load(ignore_discard=True, ignore_expires=True)
|
|
pw_cookies = []
|
|
for c in jar:
|
|
if "youtube" in c.domain or "google" in c.domain:
|
|
pw_cookies.append({
|
|
"name": c.name, "value": c.value,
|
|
"domain": c.domain, "path": c.path,
|
|
"secure": c.secure, "httpOnly": False,
|
|
})
|
|
if pw_cookies:
|
|
ctx.add_cookies(pw_cookies)
|
|
print(f"[TRANSCRIPT] Loaded {len(pw_cookies)} cookies", flush=True)
|
|
|
|
page = ctx.new_page()
|
|
page.add_init_script("Object.defineProperty(navigator, 'webdriver', {get: () => false})")
|
|
|
|
print(f"[TRANSCRIPT] Opening YouTube page for {video_id}", flush=True)
|
|
page.goto(
|
|
f"https://www.youtube.com/watch?v={video_id}",
|
|
wait_until="domcontentloaded",
|
|
timeout=30000,
|
|
)
|
|
page.wait_for_timeout(5000)
|
|
|
|
# Skip ads if present
|
|
for ad_wait in range(12): # up to ~60s for ads
|
|
ad_status = page.evaluate("""() => {
|
|
const skipBtn = document.querySelector('.ytp-skip-ad-button, .ytp-ad-skip-button, .ytp-ad-skip-button-modern, button.ytp-ad-skip-button-modern');
|
|
if (skipBtn) { skipBtn.click(); return 'skipped'; }
|
|
const adOverlay = document.querySelector('.ytp-ad-player-overlay, .ad-showing');
|
|
if (adOverlay) return 'playing';
|
|
const adBadge = document.querySelector('.ytp-ad-text');
|
|
if (adBadge && adBadge.textContent) return 'badge';
|
|
return 'none';
|
|
}""")
|
|
if ad_status == "none":
|
|
break
|
|
print(f"[TRANSCRIPT] Ad detected: {ad_status}, waiting...", flush=True)
|
|
if ad_status == "skipped":
|
|
page.wait_for_timeout(2000)
|
|
break
|
|
page.wait_for_timeout(5000)
|
|
|
|
page.wait_for_timeout(2000)
|
|
print(f"[TRANSCRIPT] Page loaded, looking for transcript button", flush=True)
|
|
|
|
# Click "더보기" (more actions) button first to reveal transcript option
|
|
page.evaluate("""
|
|
() => {
|
|
// Try clicking the "...더보기" button in description area
|
|
const moreBtn = document.querySelector('tp-yt-paper-button#expand');
|
|
if (moreBtn) moreBtn.click();
|
|
}
|
|
""")
|
|
page.wait_for_timeout(2000)
|
|
|
|
# Click "스크립트 표시" button via JS
|
|
clicked = page.evaluate("""
|
|
() => {
|
|
// Method 1: aria-label
|
|
for (const label of ['스크립트 표시', 'Show transcript']) {
|
|
const btns = document.querySelectorAll(`button[aria-label="${label}"]`);
|
|
for (const b of btns) { b.click(); return 'aria-label: ' + label; }
|
|
}
|
|
// Method 2: search all buttons by text content
|
|
const allBtns = document.querySelectorAll('button');
|
|
for (const b of allBtns) {
|
|
const text = b.textContent.trim();
|
|
if (text === '스크립트 표시' || text === 'Show transcript') {
|
|
b.click();
|
|
return 'text: ' + text;
|
|
}
|
|
}
|
|
// Method 3: look for transcript button in engagement panel
|
|
const engBtns = document.querySelectorAll('ytd-button-renderer button, ytd-button-renderer a');
|
|
for (const b of engBtns) {
|
|
const text = b.textContent.trim().toLowerCase();
|
|
if (text.includes('transcript') || text.includes('스크립트')) {
|
|
b.click();
|
|
return 'engagement: ' + text;
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
""")
|
|
print(f"[TRANSCRIPT] Clicked transcript button: {clicked}", flush=True)
|
|
if not clicked:
|
|
# Dump available buttons for debugging
|
|
btn_labels = page.evaluate("""
|
|
() => {
|
|
const btns = document.querySelectorAll('button[aria-label]');
|
|
return Array.from(btns).map(b => b.getAttribute('aria-label')).slice(0, 30);
|
|
}
|
|
""")
|
|
print(f"[TRANSCRIPT] Available buttons: {btn_labels}", flush=True)
|
|
browser.close()
|
|
return None, None
|
|
|
|
# Wait for transcript panel segments to appear (max ~40s)
|
|
page.wait_for_timeout(3000) # initial wait for panel to render
|
|
for attempt in range(12):
|
|
page.wait_for_timeout(3000)
|
|
count = page.evaluate(
|
|
"() => document.querySelectorAll('ytd-transcript-segment-renderer').length"
|
|
)
|
|
print(f"[TRANSCRIPT] Wait {(attempt+1)*3+3}s: {count} segments", flush=True)
|
|
if count > 0:
|
|
break
|
|
|
|
# Select Korean if available (language selector in transcript panel)
|
|
page.evaluate("""
|
|
() => {
|
|
// Open language dropdown and pick Korean if available
|
|
const menu = document.querySelector('ytd-transcript-renderer ytd-menu-renderer yt-dropdown-menu');
|
|
if (!menu) return;
|
|
const trigger = menu.querySelector('button, tp-yt-paper-button');
|
|
if (trigger) trigger.click();
|
|
}
|
|
""")
|
|
page.wait_for_timeout(1000)
|
|
page.evaluate("""
|
|
() => {
|
|
const items = document.querySelectorAll('tp-yt-paper-listbox a, tp-yt-paper-listbox tp-yt-paper-item');
|
|
for (const item of items) {
|
|
const text = item.textContent.trim();
|
|
if (text.includes('한국어') || text.includes('Korean')) {
|
|
item.click();
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
""")
|
|
page.wait_for_timeout(2000)
|
|
|
|
# Scroll transcript panel to load all segments
|
|
segments = page.evaluate("""
|
|
async () => {
|
|
const container = document.querySelector(
|
|
'ytd-transcript-segment-list-renderer #segments-container, ' +
|
|
'ytd-transcript-renderer #body'
|
|
);
|
|
if (!container) {
|
|
// Fallback: just grab what's there
|
|
const segs = document.querySelectorAll('ytd-transcript-segment-renderer');
|
|
return Array.from(segs).map(s => {
|
|
const txt = s.querySelector('.segment-text, yt-formatted-string.segment-text');
|
|
return txt ? txt.textContent.trim() : '';
|
|
}).filter(t => t);
|
|
}
|
|
|
|
// Scroll to bottom repeatedly to load all virtual segments
|
|
let prevCount = 0;
|
|
for (let i = 0; i < 50; i++) {
|
|
container.scrollTop = container.scrollHeight;
|
|
await new Promise(r => setTimeout(r, 300));
|
|
const segs = document.querySelectorAll('ytd-transcript-segment-renderer');
|
|
if (segs.length === prevCount && i > 3) break;
|
|
prevCount = segs.length;
|
|
}
|
|
|
|
const segs = document.querySelectorAll('ytd-transcript-segment-renderer');
|
|
return Array.from(segs).map(s => {
|
|
const txt = s.querySelector('.segment-text, yt-formatted-string.segment-text');
|
|
return txt ? txt.textContent.trim() : '';
|
|
}).filter(t => t);
|
|
}
|
|
""")
|
|
|
|
browser.close()
|
|
|
|
print(f"[TRANSCRIPT] Got {len(segments) if segments else 0} segments for {video_id}", flush=True)
|
|
if segments:
|
|
text = " ".join(segments)
|
|
print(f"[TRANSCRIPT] Success: {len(text)} chars from {len(segments)} segments", flush=True)
|
|
return text, "browser"
|
|
return None, None
|
|
except Exception as e:
|
|
logger.error("Playwright transcript failed for %s: %s", video_id, e)
|
|
print(f"[TRANSCRIPT] Playwright FAILED for {video_id}: {e}", flush=True)
|
|
import traceback
|
|
traceback.print_exc()
|
|
return None, None
|
|
|
|
|
|
# -- DB operations for videos -------------------------------------------------
|
|
|
|
def save_video(channel_db_id: str, video: dict) -> str | None:
|
|
"""Insert a video row if not exists. Returns row id or None if duplicate."""
|
|
sql = """
|
|
INSERT INTO videos (channel_id, video_id, title, url, published_at, status)
|
|
VALUES (:ch_id, :vid, :title, :url, :pub_at, 'pending')
|
|
RETURNING id INTO :out_id
|
|
"""
|
|
with conn() as c:
|
|
cur = c.cursor()
|
|
import oracledb
|
|
out_id = cur.var(oracledb.STRING)
|
|
try:
|
|
pub_at = None
|
|
if video.get("published_at"):
|
|
pub_at = datetime.fromisoformat(
|
|
video["published_at"].replace("Z", "+00:00")
|
|
)
|
|
cur.execute(sql, {
|
|
"ch_id": channel_db_id,
|
|
"vid": video["video_id"],
|
|
"title": video["title"],
|
|
"url": video["url"],
|
|
"pub_at": pub_at,
|
|
"out_id": out_id,
|
|
})
|
|
return out_id.getvalue()[0]
|
|
except Exception as e:
|
|
if "UQ_VIDEOS_VID" in str(e).upper():
|
|
return None # duplicate
|
|
raise
|
|
|
|
|
|
def get_existing_video_ids(channel_db_id: str) -> set[str]:
|
|
"""Get all video_ids already in DB for a channel."""
|
|
with conn() as c:
|
|
cur = c.cursor()
|
|
cur.execute("SELECT video_id FROM videos WHERE channel_id = :cid", {"cid": channel_db_id})
|
|
return {r[0] for r in cur.fetchall()}
|
|
|
|
|
|
def save_videos_batch(channel_db_id: str, videos: list[dict]) -> int:
|
|
"""Insert multiple videos in a single DB connection. Returns count of new videos."""
|
|
if not videos:
|
|
return 0
|
|
import oracledb
|
|
sql = """
|
|
INSERT INTO videos (channel_id, video_id, title, url, published_at, status)
|
|
VALUES (:ch_id, :vid, :title, :url, :pub_at, 'pending')
|
|
"""
|
|
new_count = 0
|
|
with conn() as c:
|
|
cur = c.cursor()
|
|
for video in videos:
|
|
try:
|
|
pub_at = None
|
|
if video.get("published_at"):
|
|
pub_at = datetime.fromisoformat(
|
|
video["published_at"].replace("Z", "+00:00")
|
|
)
|
|
cur.execute(sql, {
|
|
"ch_id": channel_db_id,
|
|
"vid": video["video_id"],
|
|
"title": video["title"],
|
|
"url": video["url"],
|
|
"pub_at": pub_at,
|
|
})
|
|
new_count += 1
|
|
except Exception as e:
|
|
if "UQ_VIDEOS_VID" in str(e).upper():
|
|
continue
|
|
raise
|
|
return new_count
|
|
|
|
|
|
def get_pending_videos(limit: int = 10) -> list[dict]:
|
|
sql = """
|
|
SELECT id, video_id, title, url
|
|
FROM videos
|
|
WHERE status = 'pending'
|
|
ORDER BY created_at
|
|
FETCH FIRST :n ROWS ONLY
|
|
"""
|
|
with conn() as c:
|
|
cur = c.cursor()
|
|
cur.execute(sql, {"n": limit})
|
|
return [
|
|
{"id": r[0], "video_id": r[1], "title": r[2], "url": r[3]}
|
|
for r in cur.fetchall()
|
|
]
|
|
|
|
|
|
def update_video_status(
|
|
video_db_id: str,
|
|
status: str,
|
|
transcript: str | None = None,
|
|
llm_raw: str | None = None,
|
|
) -> None:
|
|
sets = ["status = :st", "processed_at = SYSTIMESTAMP"]
|
|
params: dict = {"st": status, "vid": video_db_id}
|
|
if transcript:
|
|
sets.append("transcript_text = :txt")
|
|
params["txt"] = transcript
|
|
if llm_raw:
|
|
sets.append("llm_raw_response = :llm_resp")
|
|
params["llm_resp"] = llm_raw
|
|
sql = f"UPDATE videos SET {', '.join(sets)} WHERE id = :vid"
|
|
with conn() as c:
|
|
c.cursor().execute(sql, params)
|
|
|
|
|
|
# -- Scan: fetch new videos for all active channels ---------------------------
|
|
|
|
def scan_all_channels() -> int:
|
|
"""Scan all active channels for new videos. Returns count of new videos."""
|
|
channels = get_active_channels()
|
|
total_new = 0
|
|
for ch in channels:
|
|
try:
|
|
after = get_latest_video_date(ch["id"])
|
|
title_filter = ch.get("title_filter")
|
|
new_count = 0
|
|
fetched = 0
|
|
for page in fetch_channel_videos_iter(ch["channel_id"], published_after=after):
|
|
fetched += len(page)
|
|
for v in page:
|
|
if title_filter and title_filter not in v["title"]:
|
|
continue
|
|
row_id = save_video(ch["id"], v)
|
|
if row_id:
|
|
new_count += 1
|
|
total_new += new_count
|
|
logger.info(
|
|
"Channel %s: fetched %d videos (after=%s), %d new (filter=%s)",
|
|
ch["channel_name"], fetched, after or "all", new_count, title_filter or "none",
|
|
)
|
|
except Exception as e:
|
|
logger.error("Failed to scan channel %s: %s", ch["channel_name"], e)
|
|
return total_new
|