- LLM extraction prompt: foods_mentioned max 10, Korean only, prioritized - New /remap-foods API endpoint for bulk LLM re-extraction - Admin UI: "메뉴태그 재생성" button with SSE progress bar - Backend: attach foods_mentioned to restaurant list API response - Restaurant cards: display food tags (orange, max 5 visible) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
410 lines
14 KiB
Python
410 lines
14 KiB
Python
"""Restaurant DB operations — save extracted data, link to videos."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import re
|
|
|
|
import oracledb
|
|
|
|
from core.db import conn
|
|
|
|
|
|
# ── Region parser: address → "나라|시|구" ──
|
|
|
|
_CITY_MAP = {
|
|
"서울특별시": "서울", "서울": "서울",
|
|
"부산광역시": "부산", "부산": "부산",
|
|
"대구광역시": "대구", "대구": "대구",
|
|
"인천광역시": "인천", "인천": "인천",
|
|
"광주광역시": "광주", "광주": "광주",
|
|
"대전광역시": "대전", "대전": "대전",
|
|
"울산광역시": "울산", "울산": "울산",
|
|
"세종특별자치시": "세종",
|
|
"경기도": "경기", "경기": "경기",
|
|
"강원특별자치도": "강원", "강원도": "강원",
|
|
"충청북도": "충북", "충청남도": "충남",
|
|
"전라북도": "전북", "전북특별자치도": "전북",
|
|
"전라남도": "전남",
|
|
"경상북도": "경북", "경상남도": "경남",
|
|
"제주특별자치도": "제주",
|
|
}
|
|
|
|
|
|
def parse_region_from_address(address: str | None) -> str | None:
|
|
"""Parse address into 'country|city|district' format."""
|
|
if not address:
|
|
return None
|
|
addr = address.strip()
|
|
|
|
# Japanese
|
|
if addr.startswith("일본") or "Japan" in addr:
|
|
city = None
|
|
if "Tokyo" in addr: city = "도쿄"
|
|
elif "Osaka" in addr: city = "오사카"
|
|
elif "Sapporo" in addr or "Hokkaido" in addr: city = "삿포로"
|
|
elif "Kyoto" in addr: city = "교토"
|
|
elif "Fukuoka" in addr: city = "후쿠오카"
|
|
return f"일본|{city}" if city else "일본"
|
|
|
|
# Singapore
|
|
if "Singapore" in addr or "싱가포르" in addr:
|
|
return "싱가포르"
|
|
|
|
# Korean standard: "대한민국 시/도 구/시 ..."
|
|
if "대한민국" in addr:
|
|
m = re.match(r"대한민국\s+(\S+)\s+(\S+)", addr)
|
|
if m:
|
|
city = _CITY_MAP.get(m.group(1))
|
|
if city:
|
|
gu = m.group(2)
|
|
if gu.endswith(("구", "군", "시")):
|
|
return f"한국|{city}|{gu}"
|
|
# Not a district — just city level
|
|
return f"한국|{city}"
|
|
# Reversed: "... 구 시 대한민국" / "... 시 KR"
|
|
parts = addr.split()
|
|
for i, p in enumerate(parts):
|
|
if p in _CITY_MAP:
|
|
city = _CITY_MAP[p]
|
|
gu = parts[i - 1] if i > 0 and parts[i - 1].endswith(("구", "군", "시")) else None
|
|
return f"한국|{city}|{gu}" if gu else f"한국|{city}"
|
|
return "한국"
|
|
|
|
# Korean without prefix
|
|
parts = addr.split()
|
|
if parts:
|
|
city = _CITY_MAP.get(parts[0])
|
|
if city and len(parts) > 1 and parts[1].endswith(("구", "군", "시")):
|
|
return f"한국|{city}|{parts[1]}"
|
|
elif city:
|
|
return f"한국|{city}"
|
|
|
|
return None
|
|
|
|
|
|
def _truncate_bytes(val: str | None, max_bytes: int) -> str | None:
|
|
"""Truncate a string to fit within max_bytes when encoded as UTF-8."""
|
|
if not val:
|
|
return val
|
|
encoded = val.encode("utf-8")
|
|
if len(encoded) <= max_bytes:
|
|
return val
|
|
return encoded[:max_bytes].decode("utf-8", errors="ignore").rstrip()
|
|
|
|
|
|
def find_by_place_id(google_place_id: str) -> dict | None:
|
|
"""Find a restaurant by Google Place ID."""
|
|
sql = "SELECT id, name, address, region, latitude, longitude FROM restaurants WHERE google_place_id = :gid"
|
|
with conn() as c:
|
|
cur = c.cursor()
|
|
cur.execute(sql, {"gid": google_place_id})
|
|
r = cur.fetchone()
|
|
if r:
|
|
return {
|
|
"id": r[0], "name": r[1], "address": r[2],
|
|
"region": r[3], "latitude": r[4], "longitude": r[5],
|
|
}
|
|
return None
|
|
|
|
|
|
def find_by_name(name: str) -> dict | None:
|
|
"""Find a restaurant by exact name match."""
|
|
sql = "SELECT id, name, address, region, latitude, longitude FROM restaurants WHERE name = :n"
|
|
with conn() as c:
|
|
cur = c.cursor()
|
|
cur.execute(sql, {"n": name})
|
|
r = cur.fetchone()
|
|
if r:
|
|
return {
|
|
"id": r[0], "name": r[1], "address": r[2],
|
|
"region": r[3], "latitude": r[4], "longitude": r[5],
|
|
}
|
|
return None
|
|
|
|
|
|
def upsert(
|
|
name: str,
|
|
address: str | None = None,
|
|
region: str | None = None,
|
|
latitude: float | None = None,
|
|
longitude: float | None = None,
|
|
cuisine_type: str | None = None,
|
|
price_range: str | None = None,
|
|
google_place_id: str | None = None,
|
|
phone: str | None = None,
|
|
website: str | None = None,
|
|
business_status: str | None = None,
|
|
rating: float | None = None,
|
|
rating_count: int | None = None,
|
|
) -> str:
|
|
"""Insert or update a restaurant. Returns row id."""
|
|
# Auto-derive region from address if not provided
|
|
if not region and address:
|
|
region = parse_region_from_address(address)
|
|
|
|
# Truncate fields to fit DB column byte limits (VARCHAR2 is byte-based)
|
|
price_range = _truncate_bytes(price_range, 50)
|
|
cuisine_type = _truncate_bytes(cuisine_type, 100)
|
|
region = _truncate_bytes(region, 100)
|
|
website = _truncate_bytes(website, 500)
|
|
|
|
# 1) google_place_id로 먼저 찾고, 2) 이름으로 찾기
|
|
existing = None
|
|
if google_place_id:
|
|
existing = find_by_place_id(google_place_id)
|
|
if not existing:
|
|
existing = find_by_name(name)
|
|
if existing:
|
|
sql = """
|
|
UPDATE restaurants
|
|
SET name = :name,
|
|
address = COALESCE(:addr, address),
|
|
region = COALESCE(:reg, region),
|
|
latitude = COALESCE(:lat, latitude),
|
|
longitude = COALESCE(:lng, longitude),
|
|
cuisine_type = COALESCE(:cuisine, cuisine_type),
|
|
price_range = COALESCE(:price, price_range),
|
|
google_place_id = COALESCE(:gid, google_place_id),
|
|
phone = COALESCE(:phone, phone),
|
|
website = COALESCE(:web, website),
|
|
business_status = COALESCE(:bstatus, business_status),
|
|
rating = COALESCE(:rating, rating),
|
|
rating_count = COALESCE(:rcnt, rating_count),
|
|
updated_at = SYSTIMESTAMP
|
|
WHERE id = :id
|
|
"""
|
|
with conn() as c:
|
|
c.cursor().execute(sql, {
|
|
"name": name,
|
|
"addr": address, "reg": region,
|
|
"lat": latitude, "lng": longitude,
|
|
"cuisine": cuisine_type, "price": price_range,
|
|
"gid": google_place_id, "phone": phone, "web": website,
|
|
"bstatus": business_status, "rating": rating, "rcnt": rating_count,
|
|
"id": existing["id"],
|
|
})
|
|
return existing["id"]
|
|
|
|
sql = """
|
|
INSERT INTO restaurants (name, address, region, latitude, longitude,
|
|
cuisine_type, price_range, google_place_id,
|
|
phone, website, business_status, rating, rating_count)
|
|
VALUES (:name, :addr, :reg, :lat, :lng, :cuisine, :price, :gid,
|
|
:phone, :web, :bstatus, :rating, :rcnt)
|
|
RETURNING id INTO :out_id
|
|
"""
|
|
with conn() as c:
|
|
cur = c.cursor()
|
|
out_id = cur.var(oracledb.STRING)
|
|
cur.execute(sql, {
|
|
"name": name, "addr": address, "reg": region,
|
|
"lat": latitude, "lng": longitude,
|
|
"cuisine": cuisine_type, "price": price_range,
|
|
"gid": google_place_id, "phone": phone, "web": website,
|
|
"bstatus": business_status, "rating": rating, "rcnt": rating_count,
|
|
"out_id": out_id,
|
|
})
|
|
return out_id.getvalue()[0]
|
|
|
|
|
|
def link_video_restaurant(
|
|
video_db_id: str,
|
|
restaurant_id: str,
|
|
foods: list[str] | None = None,
|
|
evaluation: str | None = None,
|
|
guests: list[str] | None = None,
|
|
citation: str | None = None,
|
|
) -> str | None:
|
|
"""Create video-restaurant mapping. Returns row id or None if duplicate."""
|
|
sql = """
|
|
INSERT INTO video_restaurants
|
|
(video_id, restaurant_id, foods_mentioned, evaluation, guests, citation_text)
|
|
VALUES (:vid, :rid, :foods, :eval, :guests, :cite)
|
|
RETURNING id INTO :out_id
|
|
"""
|
|
with conn() as c:
|
|
cur = c.cursor()
|
|
out_id = cur.var(oracledb.STRING)
|
|
try:
|
|
cur.execute(sql, {
|
|
"vid": video_db_id,
|
|
"rid": restaurant_id,
|
|
"foods": json.dumps(foods or [], ensure_ascii=False),
|
|
"eval": json.dumps({"text": evaluation} if evaluation else {}, ensure_ascii=False),
|
|
"guests": json.dumps(guests or [], ensure_ascii=False),
|
|
"cite": citation,
|
|
"out_id": out_id,
|
|
})
|
|
return out_id.getvalue()[0]
|
|
except Exception as e:
|
|
if "UQ_VR_VIDEO_REST" in str(e).upper():
|
|
return None
|
|
raise
|
|
|
|
|
|
def get_all(
|
|
limit: int = 100,
|
|
offset: int = 0,
|
|
cuisine: str | None = None,
|
|
region: str | None = None,
|
|
channel: str | None = None,
|
|
) -> list[dict]:
|
|
"""List restaurants with optional filters."""
|
|
conditions = [
|
|
"r.latitude IS NOT NULL",
|
|
"EXISTS (SELECT 1 FROM video_restaurants vr0 WHERE vr0.restaurant_id = r.id)",
|
|
]
|
|
params: dict = {"lim": limit, "off": offset}
|
|
|
|
if cuisine:
|
|
conditions.append("r.cuisine_type = :cuisine")
|
|
params["cuisine"] = cuisine
|
|
if region:
|
|
conditions.append("r.region LIKE :region")
|
|
params["region"] = f"%{region}%"
|
|
|
|
join_clause = ""
|
|
if channel:
|
|
join_clause = """
|
|
JOIN video_restaurants vr_f ON vr_f.restaurant_id = r.id
|
|
JOIN videos v_f ON v_f.id = vr_f.video_id
|
|
JOIN channels c_f ON c_f.id = v_f.channel_id
|
|
"""
|
|
conditions.append("c_f.channel_name = :channel")
|
|
params["channel"] = channel
|
|
|
|
where = " AND ".join(conditions)
|
|
sql = f"""
|
|
SELECT DISTINCT r.id, r.name, r.address, r.region, r.latitude, r.longitude,
|
|
r.cuisine_type, r.price_range, r.google_place_id,
|
|
r.business_status, r.rating, r.rating_count, r.updated_at
|
|
FROM restaurants r
|
|
{join_clause}
|
|
WHERE {where}
|
|
ORDER BY r.updated_at DESC
|
|
OFFSET :off ROWS FETCH NEXT :lim ROWS ONLY
|
|
"""
|
|
with conn() as c:
|
|
cur = c.cursor()
|
|
cur.execute(sql, params)
|
|
cols = [d[0].lower() for d in cur.description]
|
|
restaurants = [dict(zip(cols, row)) for row in cur.fetchall()]
|
|
for r in restaurants:
|
|
r.pop("updated_at", None)
|
|
|
|
if not restaurants:
|
|
return restaurants
|
|
|
|
# Attach channel names for each restaurant
|
|
ids = [r["id"] for r in restaurants]
|
|
placeholders = ", ".join(f":id{i}" for i in range(len(ids)))
|
|
ch_sql = f"""
|
|
SELECT DISTINCT vr.restaurant_id, c.channel_name
|
|
FROM video_restaurants vr
|
|
JOIN videos v ON v.id = vr.video_id
|
|
JOIN channels c ON c.id = v.channel_id
|
|
WHERE vr.restaurant_id IN ({placeholders})
|
|
"""
|
|
ch_params = {f"id{i}": rid for i, rid in enumerate(ids)}
|
|
ch_map: dict[str, list[str]] = {}
|
|
with conn() as c:
|
|
cur = c.cursor()
|
|
cur.execute(ch_sql, ch_params)
|
|
for row in cur.fetchall():
|
|
ch_map.setdefault(row[0], []).append(row[1])
|
|
|
|
# Attach aggregated foods_mentioned for each restaurant
|
|
foods_sql = f"""
|
|
SELECT vr.restaurant_id, vr.foods_mentioned
|
|
FROM video_restaurants vr
|
|
WHERE vr.restaurant_id IN ({placeholders})
|
|
"""
|
|
foods_map: dict[str, list[str]] = {}
|
|
with conn() as c:
|
|
cur = c.cursor()
|
|
cur.execute(foods_sql, ch_params)
|
|
for row in cur.fetchall():
|
|
raw = row[1].read() if hasattr(row[1], "read") else row[1]
|
|
if raw:
|
|
try:
|
|
items = json.loads(raw) if isinstance(raw, str) else raw
|
|
if isinstance(items, list):
|
|
for f in items:
|
|
if isinstance(f, str) and f not in foods_map.get(row[0], []):
|
|
foods_map.setdefault(row[0], []).append(f)
|
|
except Exception:
|
|
pass
|
|
|
|
for r in restaurants:
|
|
r["channels"] = ch_map.get(r["id"], [])
|
|
r["foods_mentioned"] = foods_map.get(r["id"], [])[:10]
|
|
|
|
return restaurants
|
|
|
|
|
|
def get_by_id(restaurant_id: str) -> dict | None:
|
|
sql = """
|
|
SELECT r.id, r.name, r.address, r.region, r.latitude, r.longitude,
|
|
r.cuisine_type, r.price_range, r.phone, r.website, r.google_place_id,
|
|
r.business_status, r.rating, r.rating_count
|
|
FROM restaurants r
|
|
WHERE r.id = :id
|
|
"""
|
|
with conn() as c:
|
|
cur = c.cursor()
|
|
cur.execute(sql, {"id": restaurant_id})
|
|
row = cur.fetchone()
|
|
if not row:
|
|
return None
|
|
cols = [d[0].lower() for d in cur.description]
|
|
return dict(zip(cols, row))
|
|
|
|
|
|
def get_video_links(restaurant_id: str) -> list[dict]:
|
|
"""Get all video appearances for a restaurant."""
|
|
sql = """
|
|
SELECT v.video_id, v.title, v.url, v.published_at,
|
|
vr.foods_mentioned, vr.evaluation, vr.guests,
|
|
c.channel_name, c.channel_id
|
|
FROM video_restaurants vr
|
|
JOIN videos v ON v.id = vr.video_id
|
|
JOIN channels c ON c.id = v.channel_id
|
|
WHERE vr.restaurant_id = :rid
|
|
ORDER BY v.published_at DESC
|
|
"""
|
|
with conn() as c:
|
|
cur = c.cursor()
|
|
cur.execute(sql, {"rid": restaurant_id})
|
|
results = []
|
|
for r in cur.fetchall():
|
|
foods_raw = r[4].read() if hasattr(r[4], "read") else r[4]
|
|
eval_raw = r[5].read() if hasattr(r[5], "read") else r[5]
|
|
guests_raw = r[6].read() if hasattr(r[6], "read") else r[6]
|
|
results.append({
|
|
"video_id": r[0],
|
|
"title": r[1],
|
|
"url": r[2],
|
|
"published_at": r[3].isoformat() if r[3] else None,
|
|
"foods_mentioned": _parse_json_field(foods_raw, []),
|
|
"evaluation": _parse_json_field(eval_raw, {}),
|
|
"guests": _parse_json_field(guests_raw, []),
|
|
"channel_name": r[7],
|
|
"channel_id": r[8],
|
|
})
|
|
return results
|
|
|
|
|
|
def _parse_json_field(val, default):
|
|
"""Parse a JSON field that may be a string, already-parsed object, or None."""
|
|
if val is None:
|
|
return default
|
|
if isinstance(val, (list, dict)):
|
|
return val
|
|
if isinstance(val, str):
|
|
try:
|
|
return json.loads(val)
|
|
except (json.JSONDecodeError, ValueError):
|
|
return default
|
|
return default
|