"""Restaurant DB operations — save extracted data, link to videos.""" from __future__ import annotations import json import re import oracledb from core.db import conn # ── Region parser: address → "나라|시|구" ── _CITY_MAP = { "서울특별시": "서울", "서울": "서울", "부산광역시": "부산", "부산": "부산", "대구광역시": "대구", "대구": "대구", "인천광역시": "인천", "인천": "인천", "광주광역시": "광주", "광주": "광주", "대전광역시": "대전", "대전": "대전", "울산광역시": "울산", "울산": "울산", "세종특별자치시": "세종", "경기도": "경기", "경기": "경기", "강원특별자치도": "강원", "강원도": "강원", "충청북도": "충북", "충청남도": "충남", "전라북도": "전북", "전북특별자치도": "전북", "전라남도": "전남", "경상북도": "경북", "경상남도": "경남", "제주특별자치도": "제주", } def parse_region_from_address(address: str | None) -> str | None: """Parse address into 'country|city|district' format.""" if not address: return None addr = address.strip() # Japanese if addr.startswith("일본") or "Japan" in addr: city = None if "Tokyo" in addr: city = "도쿄" elif "Osaka" in addr: city = "오사카" elif "Sapporo" in addr or "Hokkaido" in addr: city = "삿포로" elif "Kyoto" in addr: city = "교토" elif "Fukuoka" in addr: city = "후쿠오카" return f"일본|{city}" if city else "일본" # Singapore if "Singapore" in addr or "싱가포르" in addr: return "싱가포르" # Korean standard: "대한민국 시/도 구/시 ..." if "대한민국" in addr: m = re.match(r"대한민국\s+(\S+)\s+(\S+)", addr) if m: city = _CITY_MAP.get(m.group(1)) if city: gu = m.group(2) if gu.endswith(("구", "군", "시")): return f"한국|{city}|{gu}" # Not a district — just city level return f"한국|{city}" # Reversed: "... 구 시 대한민국" / "... 시 KR" parts = addr.split() for i, p in enumerate(parts): if p in _CITY_MAP: city = _CITY_MAP[p] gu = parts[i - 1] if i > 0 and parts[i - 1].endswith(("구", "군", "시")) else None return f"한국|{city}|{gu}" if gu else f"한국|{city}" return "한국" # Korean without prefix parts = addr.split() if parts: city = _CITY_MAP.get(parts[0]) if city and len(parts) > 1 and parts[1].endswith(("구", "군", "시")): return f"한국|{city}|{parts[1]}" elif city: return f"한국|{city}" return None def _truncate_bytes(val: str | None, max_bytes: int) -> str | None: """Truncate a string to fit within max_bytes when encoded as UTF-8.""" if not val: return val encoded = val.encode("utf-8") if len(encoded) <= max_bytes: return val return encoded[:max_bytes].decode("utf-8", errors="ignore").rstrip() def find_by_place_id(google_place_id: str) -> dict | None: """Find a restaurant by Google Place ID.""" sql = "SELECT id, name, address, region, latitude, longitude FROM restaurants WHERE google_place_id = :gid" with conn() as c: cur = c.cursor() cur.execute(sql, {"gid": google_place_id}) r = cur.fetchone() if r: return { "id": r[0], "name": r[1], "address": r[2], "region": r[3], "latitude": r[4], "longitude": r[5], } return None def find_by_name(name: str) -> dict | None: """Find a restaurant by exact name match.""" sql = "SELECT id, name, address, region, latitude, longitude FROM restaurants WHERE name = :n" with conn() as c: cur = c.cursor() cur.execute(sql, {"n": name}) r = cur.fetchone() if r: return { "id": r[0], "name": r[1], "address": r[2], "region": r[3], "latitude": r[4], "longitude": r[5], } return None def upsert( name: str, address: str | None = None, region: str | None = None, latitude: float | None = None, longitude: float | None = None, cuisine_type: str | None = None, price_range: str | None = None, google_place_id: str | None = None, phone: str | None = None, website: str | None = None, business_status: str | None = None, rating: float | None = None, rating_count: int | None = None, ) -> str: """Insert or update a restaurant. Returns row id.""" # Auto-derive region from address if not provided if not region and address: region = parse_region_from_address(address) # Truncate fields to fit DB column byte limits (VARCHAR2 is byte-based) price_range = _truncate_bytes(price_range, 50) cuisine_type = _truncate_bytes(cuisine_type, 100) region = _truncate_bytes(region, 100) website = _truncate_bytes(website, 500) # 1) google_place_id로 먼저 찾고, 2) 이름으로 찾기 existing = None if google_place_id: existing = find_by_place_id(google_place_id) if not existing: existing = find_by_name(name) if existing: sql = """ UPDATE restaurants SET name = :name, address = COALESCE(:addr, address), region = COALESCE(:reg, region), latitude = COALESCE(:lat, latitude), longitude = COALESCE(:lng, longitude), cuisine_type = COALESCE(:cuisine, cuisine_type), price_range = COALESCE(:price, price_range), google_place_id = COALESCE(:gid, google_place_id), phone = COALESCE(:phone, phone), website = COALESCE(:web, website), business_status = COALESCE(:bstatus, business_status), rating = COALESCE(:rating, rating), rating_count = COALESCE(:rcnt, rating_count), updated_at = SYSTIMESTAMP WHERE id = :id """ with conn() as c: c.cursor().execute(sql, { "name": name, "addr": address, "reg": region, "lat": latitude, "lng": longitude, "cuisine": cuisine_type, "price": price_range, "gid": google_place_id, "phone": phone, "web": website, "bstatus": business_status, "rating": rating, "rcnt": rating_count, "id": existing["id"], }) return existing["id"] sql = """ INSERT INTO restaurants (name, address, region, latitude, longitude, cuisine_type, price_range, google_place_id, phone, website, business_status, rating, rating_count) VALUES (:name, :addr, :reg, :lat, :lng, :cuisine, :price, :gid, :phone, :web, :bstatus, :rating, :rcnt) RETURNING id INTO :out_id """ with conn() as c: cur = c.cursor() out_id = cur.var(oracledb.STRING) cur.execute(sql, { "name": name, "addr": address, "reg": region, "lat": latitude, "lng": longitude, "cuisine": cuisine_type, "price": price_range, "gid": google_place_id, "phone": phone, "web": website, "bstatus": business_status, "rating": rating, "rcnt": rating_count, "out_id": out_id, }) return out_id.getvalue()[0] def link_video_restaurant( video_db_id: str, restaurant_id: str, foods: list[str] | None = None, evaluation: str | None = None, guests: list[str] | None = None, citation: str | None = None, ) -> str | None: """Create video-restaurant mapping. Returns row id or None if duplicate.""" sql = """ INSERT INTO video_restaurants (video_id, restaurant_id, foods_mentioned, evaluation, guests, citation_text) VALUES (:vid, :rid, :foods, :eval, :guests, :cite) RETURNING id INTO :out_id """ with conn() as c: cur = c.cursor() out_id = cur.var(oracledb.STRING) try: cur.execute(sql, { "vid": video_db_id, "rid": restaurant_id, "foods": json.dumps(foods or [], ensure_ascii=False), "eval": json.dumps({"text": evaluation} if evaluation else {}, ensure_ascii=False), "guests": json.dumps(guests or [], ensure_ascii=False), "cite": citation, "out_id": out_id, }) return out_id.getvalue()[0] except Exception as e: if "UQ_VR_VIDEO_REST" in str(e).upper(): return None raise def get_all( limit: int = 100, offset: int = 0, cuisine: str | None = None, region: str | None = None, channel: str | None = None, ) -> list[dict]: """List restaurants with optional filters.""" conditions = [ "r.latitude IS NOT NULL", "EXISTS (SELECT 1 FROM video_restaurants vr0 WHERE vr0.restaurant_id = r.id)", ] params: dict = {"lim": limit, "off": offset} if cuisine: conditions.append("r.cuisine_type = :cuisine") params["cuisine"] = cuisine if region: conditions.append("r.region LIKE :region") params["region"] = f"%{region}%" join_clause = "" if channel: join_clause = """ JOIN video_restaurants vr_f ON vr_f.restaurant_id = r.id JOIN videos v_f ON v_f.id = vr_f.video_id JOIN channels c_f ON c_f.id = v_f.channel_id """ conditions.append("c_f.channel_name = :channel") params["channel"] = channel where = " AND ".join(conditions) sql = f""" SELECT DISTINCT r.id, r.name, r.address, r.region, r.latitude, r.longitude, r.cuisine_type, r.price_range, r.google_place_id, r.business_status, r.rating, r.rating_count, r.updated_at FROM restaurants r {join_clause} WHERE {where} ORDER BY r.updated_at DESC OFFSET :off ROWS FETCH NEXT :lim ROWS ONLY """ with conn() as c: cur = c.cursor() cur.execute(sql, params) cols = [d[0].lower() for d in cur.description] restaurants = [dict(zip(cols, row)) for row in cur.fetchall()] for r in restaurants: r.pop("updated_at", None) if not restaurants: return restaurants # Attach channel names for each restaurant ids = [r["id"] for r in restaurants] placeholders = ", ".join(f":id{i}" for i in range(len(ids))) ch_sql = f""" SELECT DISTINCT vr.restaurant_id, c.channel_name FROM video_restaurants vr JOIN videos v ON v.id = vr.video_id JOIN channels c ON c.id = v.channel_id WHERE vr.restaurant_id IN ({placeholders}) """ ch_params = {f"id{i}": rid for i, rid in enumerate(ids)} ch_map: dict[str, list[str]] = {} with conn() as c: cur = c.cursor() cur.execute(ch_sql, ch_params) for row in cur.fetchall(): ch_map.setdefault(row[0], []).append(row[1]) # Attach aggregated foods_mentioned for each restaurant foods_sql = f""" SELECT vr.restaurant_id, vr.foods_mentioned FROM video_restaurants vr WHERE vr.restaurant_id IN ({placeholders}) """ foods_map: dict[str, list[str]] = {} with conn() as c: cur = c.cursor() cur.execute(foods_sql, ch_params) for row in cur.fetchall(): raw = row[1].read() if hasattr(row[1], "read") else row[1] if raw: try: items = json.loads(raw) if isinstance(raw, str) else raw if isinstance(items, list): for f in items: if isinstance(f, str) and f not in foods_map.get(row[0], []): foods_map.setdefault(row[0], []).append(f) except Exception: pass for r in restaurants: r["channels"] = ch_map.get(r["id"], []) r["foods_mentioned"] = foods_map.get(r["id"], [])[:10] return restaurants def get_by_id(restaurant_id: str) -> dict | None: sql = """ SELECT r.id, r.name, r.address, r.region, r.latitude, r.longitude, r.cuisine_type, r.price_range, r.phone, r.website, r.google_place_id, r.business_status, r.rating, r.rating_count FROM restaurants r WHERE r.id = :id """ with conn() as c: cur = c.cursor() cur.execute(sql, {"id": restaurant_id}) row = cur.fetchone() if not row: return None cols = [d[0].lower() for d in cur.description] return dict(zip(cols, row)) def get_video_links(restaurant_id: str) -> list[dict]: """Get all video appearances for a restaurant.""" sql = """ SELECT v.video_id, v.title, v.url, v.published_at, vr.foods_mentioned, vr.evaluation, vr.guests, c.channel_name, c.channel_id FROM video_restaurants vr JOIN videos v ON v.id = vr.video_id JOIN channels c ON c.id = v.channel_id WHERE vr.restaurant_id = :rid ORDER BY v.published_at DESC """ with conn() as c: cur = c.cursor() cur.execute(sql, {"rid": restaurant_id}) results = [] for r in cur.fetchall(): foods_raw = r[4].read() if hasattr(r[4], "read") else r[4] eval_raw = r[5].read() if hasattr(r[5], "read") else r[5] guests_raw = r[6].read() if hasattr(r[6], "read") else r[6] results.append({ "video_id": r[0], "title": r[1], "url": r[2], "published_at": r[3].isoformat() if r[3] else None, "foods_mentioned": _parse_json_field(foods_raw, []), "evaluation": _parse_json_field(eval_raw, {}), "guests": _parse_json_field(guests_raw, []), "channel_name": r[7], "channel_id": r[8], }) return results def _parse_json_field(val, default): """Parse a JSON field that may be a string, already-parsed object, or None.""" if val is None: return default if isinstance(val, (list, dict)): return val if isinstance(val, str): try: return json.loads(val) except (json.JSONDecodeError, ValueError): return default return default