UX improvements: mobile bottom sheet, cuisine taxonomy, search enhancements

- Add BottomSheet component for Google Maps-style restaurant detail on mobile
  (3-snap drag: 40%/55%/92%, velocity-based close, backdrop overlay)
- Mobile map mode now full-screen with bottom sheet overlay for details
- Collapsible filter panel on mobile with active filter badge count
- Standardized cuisine taxonomy (46 categories: 한식|국밥, 일식|스시 etc.)
  with LLM remap endpoint and admin UI button
- Enhanced search: keyword search now includes foods_mentioned + video title
- Search results include channels array for frontend filtering
- Channel filter moved to frontend filteredRestaurants (not API-level)
- LLM extraction prompt updated for pipe-delimited region + cuisine taxonomy
- Vector rebuild endpoint with rich JSON chunks per restaurant
- Geolocation-based auto region selection on page load
- Desktop filters split into two clean rows

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
joungmin
2026-03-09 10:54:28 +09:00
parent 3694730501
commit 2bddb0f764
16 changed files with 2277 additions and 308 deletions

View File

@@ -0,0 +1,98 @@
"""Daemon config & manual trigger API routes."""
from __future__ import annotations
from fastapi import APIRouter, Depends
from pydantic import BaseModel
from api.deps import get_admin_user
from core.db import conn
from core import cache
router = APIRouter()
class DaemonConfigUpdate(BaseModel):
scan_enabled: bool | None = None
scan_interval_min: int | None = None
process_enabled: bool | None = None
process_interval_min: int | None = None
process_limit: int | None = None
@router.get("/config")
def get_config():
"""Get daemon config (read-only for all authenticated users)."""
with conn() as c:
cur = c.cursor()
cur.execute(
"SELECT scan_enabled, scan_interval_min, process_enabled, process_interval_min, "
"process_limit, last_scan_at, last_process_at, updated_at "
"FROM daemon_config WHERE id = 1"
)
row = cur.fetchone()
if not row:
return {}
return {
"scan_enabled": bool(row[0]),
"scan_interval_min": row[1],
"process_enabled": bool(row[2]),
"process_interval_min": row[3],
"process_limit": row[4],
"last_scan_at": str(row[5]) if row[5] else None,
"last_process_at": str(row[6]) if row[6] else None,
"updated_at": str(row[7]) if row[7] else None,
}
@router.put("/config")
def update_config(body: DaemonConfigUpdate, _admin: dict = Depends(get_admin_user)):
"""Update daemon schedule config (admin only)."""
sets = []
params: dict = {}
if body.scan_enabled is not None:
sets.append("scan_enabled = :se")
params["se"] = 1 if body.scan_enabled else 0
if body.scan_interval_min is not None:
sets.append("scan_interval_min = :si")
params["si"] = body.scan_interval_min
if body.process_enabled is not None:
sets.append("process_enabled = :pe")
params["pe"] = 1 if body.process_enabled else 0
if body.process_interval_min is not None:
sets.append("process_interval_min = :pi")
params["pi"] = body.process_interval_min
if body.process_limit is not None:
sets.append("process_limit = :pl")
params["pl"] = body.process_limit
if not sets:
return {"ok": True}
sets.append("updated_at = SYSTIMESTAMP")
sql = f"UPDATE daemon_config SET {', '.join(sets)} WHERE id = 1"
with conn() as c:
c.cursor().execute(sql, params)
return {"ok": True}
@router.post("/run/scan")
def run_scan(_admin: dict = Depends(get_admin_user)):
"""Manually trigger channel scan (admin only)."""
from core.youtube import scan_all_channels
new_count = scan_all_channels()
with conn() as c:
c.cursor().execute("UPDATE daemon_config SET last_scan_at = SYSTIMESTAMP WHERE id = 1")
if new_count > 0:
cache.flush()
return {"ok": True, "new_videos": new_count}
@router.post("/run/process")
def run_process(limit: int = 10, _admin: dict = Depends(get_admin_user)):
"""Manually trigger video processing (admin only)."""
from core.pipeline import process_pending
rest_count = process_pending(limit=limit)
with conn() as c:
c.cursor().execute("UPDATE daemon_config SET last_process_at = SYSTIMESTAMP WHERE id = 1")
if rest_count > 0:
cache.flush()
return {"ok": True, "restaurants_extracted": rest_count}

View File

@@ -4,7 +4,7 @@ from __future__ import annotations
from fastapi import APIRouter, Query
from core import restaurant, vector
from core import restaurant, vector, cache
from core.db import conn
router = APIRouter()
@@ -17,8 +17,15 @@ def search_restaurants(
limit: int = Query(20, le=100),
):
"""Search restaurants by keyword, semantic similarity, or hybrid."""
key = cache.make_key("search", f"q={q}", f"m={mode}", f"l={limit}")
cached = cache.get(key)
if cached is not None:
return cached
if mode == "semantic":
return _semantic_search(q, limit)
result = _semantic_search(q, limit)
cache.set(key, result)
return result
elif mode == "hybrid":
kw = _keyword_search(q, limit)
sem = _semantic_search(q, limit)
@@ -29,21 +36,31 @@ def search_restaurants(
if r["id"] not in seen:
merged.append(r)
seen.add(r["id"])
return merged[:limit]
result = merged[:limit]
cache.set(key, result)
return result
else:
return _keyword_search(q, limit)
result = _keyword_search(q, limit)
cache.set(key, result)
return result
def _keyword_search(q: str, limit: int) -> list[dict]:
# JOIN video_restaurants to also search foods_mentioned and video title
sql = """
SELECT id, name, address, region, latitude, longitude,
cuisine_type, price_range
FROM restaurants
WHERE latitude IS NOT NULL
AND (UPPER(name) LIKE UPPER(:q)
OR UPPER(address) LIKE UPPER(:q)
OR UPPER(region) LIKE UPPER(:q)
OR UPPER(cuisine_type) LIKE UPPER(:q))
SELECT DISTINCT r.id, r.name, r.address, r.region, r.latitude, r.longitude,
r.cuisine_type, r.price_range, r.google_place_id,
r.business_status, r.rating, r.rating_count
FROM restaurants r
JOIN video_restaurants vr ON vr.restaurant_id = r.id
JOIN videos v ON v.id = vr.video_id
WHERE r.latitude IS NOT NULL
AND (UPPER(r.name) LIKE UPPER(:q)
OR UPPER(r.address) LIKE UPPER(:q)
OR UPPER(r.region) LIKE UPPER(:q)
OR UPPER(r.cuisine_type) LIKE UPPER(:q)
OR UPPER(vr.foods_mentioned) LIKE UPPER(:q)
OR UPPER(v.title) LIKE UPPER(:q))
FETCH FIRST :lim ROWS ONLY
"""
pattern = f"%{q}%"
@@ -51,18 +68,56 @@ def _keyword_search(q: str, limit: int) -> list[dict]:
cur = c.cursor()
cur.execute(sql, {"q": pattern, "lim": limit})
cols = [d[0].lower() for d in cur.description]
return [dict(zip(cols, row)) for row in cur.fetchall()]
rows = [dict(zip(cols, row)) for row in cur.fetchall()]
# Attach channel names
if rows:
_attach_channels(rows)
return rows
def _semantic_search(q: str, limit: int) -> list[dict]:
similar = vector.search_similar(q, top_k=limit)
similar = vector.search_similar(q, top_k=max(30, limit * 3))
if not similar:
return []
rest_ids = list({s["restaurant_id"] for s in similar})
# Deduplicate by restaurant_id, preserving distance order (best first)
seen: set[str] = set()
ordered_ids: list[str] = []
for s in similar:
rid = s["restaurant_id"]
if rid not in seen:
seen.add(rid)
ordered_ids.append(rid)
results = []
for rid in rest_ids[:limit]:
for rid in ordered_ids[:limit]:
r = restaurant.get_by_id(rid)
if r and r.get("latitude"):
results.append(r)
if results:
_attach_channels(results)
return results
def _attach_channels(rows: list[dict]):
"""Attach channel names to each restaurant dict."""
ids = [r["id"] for r in rows]
placeholders = ", ".join(f":id{i}" for i in range(len(ids)))
sql = f"""
SELECT DISTINCT vr.restaurant_id, c.channel_name
FROM video_restaurants vr
JOIN videos v ON v.id = vr.video_id
JOIN channels c ON c.id = v.channel_id
WHERE vr.restaurant_id IN ({placeholders})
"""
params = {f"id{i}": rid for i, rid in enumerate(ids)}
ch_map: dict[str, list[str]] = {}
with conn() as c:
cur = c.cursor()
cur.execute(sql, params)
for row in cur.fetchall():
ch_map.setdefault(row[0], []).append(row[1])
for r in rows:
r["channels"] = ch_map.get(r["id"], [])

View File

@@ -9,11 +9,14 @@ import random
import time
from concurrent.futures import ThreadPoolExecutor
from fastapi import APIRouter, Query
from fastapi import APIRouter, Depends, Query
from fastapi.responses import StreamingResponse
from api.deps import get_admin_user
from core.db import conn
from core.pipeline import process_pending
from core import cache
logger = logging.getLogger(__name__)
router = APIRouter()
@@ -23,11 +26,9 @@ _executor = ThreadPoolExecutor(max_workers=4)
@router.get("")
def list_videos(
status: str | None = None,
limit: int = Query(50, le=500),
offset: int = Query(0, ge=0),
):
conditions = []
params: dict = {"lim": limit, "off": offset}
params: dict = {}
if status:
conditions.append("v.status = :st")
params["st"] = status
@@ -44,7 +45,6 @@ def list_videos(
JOIN channels c ON c.id = v.channel_id
{where}
ORDER BY v.published_at DESC NULLS LAST
OFFSET :off ROWS FETCH NEXT :lim ROWS ONLY
"""
with conn() as c:
cur = c.cursor()
@@ -100,7 +100,7 @@ def bulk_extract_pending_count():
@router.post("/bulk-extract")
def bulk_extract():
def bulk_extract(_admin: dict = Depends(get_admin_user)):
"""Process all unextracted videos with random delays. Streams SSE progress."""
from core.pipeline import process_video_extract
@@ -131,6 +131,8 @@ def bulk_extract():
logger.error("Bulk extract error for %s: %s", v["video_id"], e)
yield f"data: {_json.dumps({'type': 'error', 'index': i, 'title': v['title'], 'message': str(e)})}\n\n"
if total_restaurants > 0:
cache.flush()
yield f"data: {_json.dumps({'type': 'complete', 'total': total, 'total_restaurants': total_restaurants})}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
@@ -159,7 +161,7 @@ def bulk_transcript_pending_count():
@router.post("/bulk-transcript")
def bulk_transcript():
def bulk_transcript(_admin: dict = Depends(get_admin_user)):
"""Fetch transcripts for all videos missing them. Streams SSE progress."""
from core.youtube import get_transcript
@@ -196,11 +198,133 @@ def bulk_transcript():
logger.error("Bulk transcript error for %s: %s", v["video_id"], e)
yield f"data: {_json.dumps({'type': 'error', 'index': i, 'title': v['title'], 'message': str(e)})}\n\n"
if success > 0:
cache.flush()
yield f"data: {_json.dumps({'type': 'complete', 'total': total, 'success': success})}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
@router.post("/remap-cuisine")
def remap_cuisine(_admin: dict = Depends(get_admin_user)):
"""Remap all restaurant cuisine_type using LLM. Streams SSE progress."""
from core.cuisine import build_remap_prompt, CUISINE_TYPES, VALID_PREFIXES
from core.extractor import _llm, _parse_json
from core.db import conn as db_conn
BATCH = 20 # restaurants per LLM call (smaller for better accuracy)
def _apply_batch(batch: list[dict], valid_set: set[str]) -> tuple[int, list[dict]]:
"""Run LLM on a batch. Returns (updated_count, missed_items)."""
prompt = build_remap_prompt(batch)
raw = _llm(prompt, max_tokens=4096)
result = _parse_json(raw)
if not isinstance(result, list):
result = []
result_map = {}
for item in result:
rid = item.get("id")
new_type = item.get("cuisine_type")
if rid and new_type:
result_map[rid] = new_type
updated = 0
missed = []
for r in batch:
rid = r["id"]
new_type = result_map.get(rid)
if not new_type:
missed.append(r)
continue
# Accept if exact match or valid prefix
if new_type not in valid_set and not new_type.startswith(VALID_PREFIXES):
missed.append(r)
continue
with db_conn() as c:
c.cursor().execute(
"UPDATE restaurants SET cuisine_type = :ct WHERE id = :id",
{"ct": new_type, "id": rid},
)
updated += 1
return updated, missed
def generate():
sql = """
SELECT r.id, r.name, r.cuisine_type,
(SELECT LISTAGG(vr.foods_mentioned, '|') WITHIN GROUP (ORDER BY vr.id)
FROM video_restaurants vr WHERE vr.restaurant_id = r.id) AS foods
FROM restaurants r
WHERE EXISTS (SELECT 1 FROM video_restaurants vr2 WHERE vr2.restaurant_id = r.id)
ORDER BY r.name
"""
with db_conn() as c:
cur = c.cursor()
cur.execute(sql)
rows = []
for row in cur.fetchall():
foods_raw = row[3].read() if hasattr(row[3], "read") else row[3]
rows.append({"id": row[0], "name": row[1], "cuisine_type": row[2], "foods_mentioned": foods_raw})
total = len(rows)
yield f"data: {_json.dumps({'type': 'start', 'total': total})}\n\n"
valid_set = set(CUISINE_TYPES)
updated = 0
all_missed: list[dict] = []
# Pass 1: process all in batches
for i in range(0, total, BATCH):
batch = rows[i : i + BATCH]
yield f"data: {_json.dumps({'type': 'processing', 'current': min(i + BATCH, total), 'total': total, 'pass': 1})}\n\n"
try:
cnt, missed = _apply_batch(batch, valid_set)
updated += cnt
all_missed.extend(missed)
yield f"data: {_json.dumps({'type': 'batch_done', 'current': min(i + BATCH, total), 'total': total, 'updated': updated, 'missed': len(all_missed)})}\n\n"
except Exception as e:
logger.error("Remap batch error at %d: %s", i, e, exc_info=True)
all_missed.extend(batch)
yield f"data: {_json.dumps({'type': 'error', 'message': str(e), 'current': i})}\n\n"
# Pass 2: retry missed items (smaller batches for accuracy)
if all_missed:
yield f"data: {_json.dumps({'type': 'retry', 'missed': len(all_missed)})}\n\n"
RETRY_BATCH = 10
for i in range(0, len(all_missed), RETRY_BATCH):
batch = all_missed[i : i + RETRY_BATCH]
try:
cnt, _ = _apply_batch(batch, valid_set)
updated += cnt
yield f"data: {_json.dumps({'type': 'batch_done', 'current': min(i + RETRY_BATCH, len(all_missed)), 'total': len(all_missed), 'updated': updated, 'pass': 2})}\n\n"
except Exception as e:
logger.error("Remap retry error at %d: %s", i, e, exc_info=True)
cache.flush()
yield f"data: {_json.dumps({'type': 'complete', 'total': total, 'updated': updated})}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
@router.post("/rebuild-vectors")
def rebuild_vectors(_admin: dict = Depends(get_admin_user)):
"""Rebuild all restaurant vector embeddings. Streams SSE progress."""
from core import vector
def generate():
yield f"data: {_json.dumps({'type': 'start'})}\n\n"
try:
for progress in vector.rebuild_all_vectors():
yield f"data: {_json.dumps({'type': progress.get('status', 'progress'), **progress})}\n\n"
cache.flush()
except Exception as e:
logger.error("Rebuild vectors error: %s", e, exc_info=True)
yield f"data: {_json.dumps({'type': 'error', 'message': str(e)})}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
@router.get("/extract/prompt")
def get_extract_prompt():
"""Get the current LLM extraction prompt template."""
@@ -209,11 +333,14 @@ def get_extract_prompt():
def _do_process(limit: int):
return {"restaurants_extracted": process_pending(limit)}
result = process_pending(limit)
if result > 0:
cache.flush()
return {"restaurants_extracted": result}
@router.post("/process")
async def trigger_processing(limit: int = Query(5, le=20)):
async def trigger_processing(limit: int = Query(5, le=20), _admin: dict = Depends(get_admin_user)):
"""Manually trigger processing of pending videos (non-blocking)."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(_executor, _do_process, limit)
@@ -318,11 +445,12 @@ def _do_fetch_transcript(video_db_id: str, mode: str):
{"txt": transcript, "vid": video_db_id},
)
cache.flush()
return {"ok": True, "length": len(transcript), "source": source}
@router.post("/{video_db_id}/fetch-transcript")
async def fetch_transcript(video_db_id: str, mode: str = Query("auto")):
async def fetch_transcript(video_db_id: str, mode: str = Query("auto"), _admin: dict = Depends(get_admin_user)):
"""Fetch and save transcript for a video (non-blocking)."""
from fastapi import HTTPException
@@ -359,11 +487,12 @@ def _do_extract(video_db_id: str, custom_prompt: str | None):
transcript,
custom_prompt=custom_prompt,
)
cache.flush()
return {"ok": True, "restaurants_extracted": count}
@router.post("/{video_db_id}/extract")
async def extract_restaurants_from_video(video_db_id: str, body: dict = None):
async def extract_restaurants_from_video(video_db_id: str, body: dict = None, _admin: dict = Depends(get_admin_user)):
"""Run LLM extraction on an existing transcript (non-blocking)."""
from fastapi import HTTPException
custom_prompt = body.get("prompt") if body else None
@@ -375,7 +504,7 @@ async def extract_restaurants_from_video(video_db_id: str, body: dict = None):
@router.post("/{video_db_id}/skip")
def skip_video(video_db_id: str):
def skip_video(video_db_id: str, _admin: dict = Depends(get_admin_user)):
"""Mark a video as skipped."""
from fastapi import HTTPException
with conn() as c:
@@ -386,11 +515,12 @@ def skip_video(video_db_id: str):
)
if cur.rowcount == 0:
raise HTTPException(404, "Video not found")
cache.flush()
return {"ok": True}
@router.delete("/{video_db_id}")
def delete_video(video_db_id: str):
def delete_video(video_db_id: str, _admin: dict = Depends(get_admin_user)):
"""Delete a video and its related data."""
from core.db import conn as get_conn
with get_conn() as c:
@@ -441,11 +571,12 @@ def delete_video(video_db_id: str):
if cur.rowcount == 0:
from fastapi import HTTPException
raise HTTPException(404, "Video not found")
cache.flush()
return {"ok": True}
@router.put("/{video_db_id}")
def update_video(video_db_id: str, body: dict):
def update_video(video_db_id: str, body: dict, _admin: dict = Depends(get_admin_user)):
"""Update video title."""
from fastapi import HTTPException
title = body.get("title")
@@ -459,11 +590,12 @@ def update_video(video_db_id: str, body: dict):
)
if cur.rowcount == 0:
raise HTTPException(404, "Video not found")
cache.flush()
return {"ok": True}
@router.delete("/{video_db_id}/restaurants/{restaurant_id}")
def delete_video_restaurant(video_db_id: str, restaurant_id: str):
def delete_video_restaurant(video_db_id: str, restaurant_id: str, _admin: dict = Depends(get_admin_user)):
"""Delete a video-restaurant mapping. Also cleans up orphaned restaurant."""
from fastapi import HTTPException
with conn() as c:
@@ -487,11 +619,12 @@ def delete_video_restaurant(video_db_id: str, restaurant_id: str):
DELETE FROM restaurants WHERE id = :rid
AND NOT EXISTS (SELECT 1 FROM video_restaurants WHERE restaurant_id = :rid)
""", {"rid": restaurant_id})
cache.flush()
return {"ok": True}
@router.post("/{video_db_id}/restaurants/manual")
def add_manual_restaurant(video_db_id: str, body: dict):
def add_manual_restaurant(video_db_id: str, body: dict, _admin: dict = Depends(get_admin_user)):
"""Manually add a restaurant and link it to a video."""
from fastapi import HTTPException
from core import restaurant as rest_mod
@@ -538,11 +671,12 @@ def add_manual_restaurant(video_db_id: str, body: dict):
guests=guests if isinstance(guests, list) else [],
)
cache.flush()
return {"ok": True, "restaurant_id": rid, "link_id": link_id}
@router.put("/{video_db_id}/restaurants/{restaurant_id}")
def update_video_restaurant(video_db_id: str, restaurant_id: str, body: dict):
def update_video_restaurant(video_db_id: str, restaurant_id: str, body: dict, _admin: dict = Depends(get_admin_user)):
"""Update restaurant info linked to a video.
If name changed, re-geocode and remap to a new restaurant record.
@@ -552,6 +686,9 @@ def update_video_restaurant(video_db_id: str, restaurant_id: str, body: dict):
# Check if name changed — need to remap
new_name = body.get("name", "").strip() if "name" in body else None
name_changed = False
active_rid = restaurant_id
if new_name:
with conn() as c:
cur = c.cursor()
@@ -560,101 +697,126 @@ def update_video_restaurant(video_db_id: str, restaurant_id: str, body: dict):
old_name = row[0] if row else ""
if old_name != new_name:
# Name changed: geocode new restaurant, remap
name_changed = True
from core import restaurant as rest_mod
from core.geocoding import geocode_restaurant
address = body.get("address", "").strip() or body.get("region", "").strip() or ""
address = (body.get("address") or "").strip() or (body.get("region") or "").strip() or ""
geo = geocode_restaurant(new_name, address)
if not geo:
raise HTTPException(400, f"'{new_name}' 위치를 찾을 수 없습니다.")
new_rid = rest_mod.upsert(
name=new_name,
address=geo.get("formatted_address") or body.get("address"),
region=body.get("region"),
latitude=geo["latitude"],
longitude=geo["longitude"],
cuisine_type=body.get("cuisine_type"),
price_range=body.get("price_range"),
google_place_id=geo.get("google_place_id"),
phone=geo.get("phone"),
website=geo.get("website"),
business_status=geo.get("business_status"),
rating=geo.get("rating"),
rating_count=geo.get("rating_count"),
)
# Read existing mapping data, delete old, create new
with conn() as c:
cur = c.cursor()
cur.execute(
"SELECT foods_mentioned, evaluation, guests FROM video_restaurants WHERE video_id = :vid AND restaurant_id = :rid",
{"vid": video_db_id, "rid": restaurant_id},
)
old_vr = cur.fetchone()
cur.execute(
"DELETE FROM video_restaurants WHERE video_id = :vid AND restaurant_id = :rid",
{"vid": video_db_id, "rid": restaurant_id},
# Geocode failed — just rename in place without remapping
with conn() as c:
cur = c.cursor()
cur.execute("UPDATE restaurants SET name = :name, updated_at = SYSTIMESTAMP WHERE id = :rid",
{"name": new_name, "rid": restaurant_id})
else:
new_rid = rest_mod.upsert(
name=new_name,
address=geo.get("formatted_address") or body.get("address"),
region=body.get("region"),
latitude=geo["latitude"],
longitude=geo["longitude"],
cuisine_type=body.get("cuisine_type"),
price_range=body.get("price_range"),
google_place_id=geo.get("google_place_id"),
phone=geo.get("phone"),
website=geo.get("website"),
business_status=geo.get("business_status"),
rating=geo.get("rating"),
rating_count=geo.get("rating_count"),
)
# Build new mapping values from body or old data
def _parse(val, default):
if val is None:
return default
if hasattr(val, "read"):
val = val.read()
if isinstance(val, (list, dict)):
return val
try:
return _json.loads(val)
except Exception:
return default
# Read existing mapping data, delete old, create new
with conn() as c:
cur = c.cursor()
cur.execute(
"SELECT foods_mentioned, evaluation, guests FROM video_restaurants WHERE video_id = :vid AND restaurant_id = :rid",
{"vid": video_db_id, "rid": restaurant_id},
)
old_vr = cur.fetchone()
old_foods = _parse(old_vr[0], []) if old_vr else []
old_eval = _parse(old_vr[1], {}) if old_vr else {}
old_guests = _parse(old_vr[2], []) if old_vr else []
cur.execute(
"DELETE FROM video_restaurants WHERE video_id = :vid AND restaurant_id = :rid",
{"vid": video_db_id, "rid": restaurant_id},
)
foods = body.get("foods_mentioned", old_foods)
evaluation = body.get("evaluation", old_eval)
guests = body.get("guests", old_guests)
def _parse(val, default):
if val is None:
return default
if hasattr(val, "read"):
val = val.read()
if isinstance(val, (list, dict)):
return val
try:
return _json.loads(val)
except Exception:
return default
eval_text = evaluation.get("text", "") if isinstance(evaluation, dict) else str(evaluation or "")
old_foods = _parse(old_vr[0], []) if old_vr else []
old_eval = _parse(old_vr[1], {}) if old_vr else {}
old_guests = _parse(old_vr[2], []) if old_vr else []
rest_mod.link_video_restaurant(
video_db_id=video_db_id,
restaurant_id=new_rid,
foods=foods if isinstance(foods, list) else [],
evaluation=eval_text or None,
guests=guests if isinstance(guests, list) else [],
)
foods = body.get("foods_mentioned", old_foods)
evaluation = body.get("evaluation", old_eval)
guests = body.get("guests", old_guests)
return {"ok": True, "remapped": True, "new_restaurant_id": new_rid}
eval_text = evaluation.get("text", "") if isinstance(evaluation, dict) else str(evaluation or "")
# No name change — update in place
with conn() as c:
cur = c.cursor()
r_sets = []
r_params: dict = {"rid": restaurant_id}
for field in ("name", "address", "region", "cuisine_type", "price_range"):
if field in body:
r_sets.append(f"{field} = :{field}")
r_params[field] = body[field]
if r_sets:
r_sets.append("updated_at = SYSTIMESTAMP")
sql = f"UPDATE restaurants SET {', '.join(r_sets)} WHERE id = :rid"
cur.execute(sql, r_params)
rest_mod.link_video_restaurant(
video_db_id=video_db_id,
restaurant_id=new_rid,
foods=foods if isinstance(foods, list) else [],
evaluation=eval_text or None,
guests=guests if isinstance(guests, list) else [],
)
vr_params: dict = {"vid": video_db_id, "rid": restaurant_id}
vr_sets = []
for field in ("foods_mentioned", "evaluation", "guests"):
if field in body:
vr_sets.append(f"{field} = :{field}")
val = body[field]
vr_params[field] = _json.dumps(val, ensure_ascii=False) if isinstance(val, (list, dict)) else val
if vr_sets:
sql = f"UPDATE video_restaurants SET {', '.join(vr_sets)} WHERE video_id = :vid AND restaurant_id = :rid"
cur.execute(sql, vr_params)
active_rid = new_rid
return {"ok": True}
# 기존 식당이 다른 영상 매핑이 없으면 고아 → 삭제
if new_rid != restaurant_id:
with conn() as c:
cur = c.cursor()
cur.execute(
"SELECT COUNT(*) FROM video_restaurants WHERE restaurant_id = :rid",
{"rid": restaurant_id},
)
remaining = cur.fetchone()[0]
if remaining == 0:
cur.execute("DELETE FROM restaurant_vectors WHERE restaurant_id = :rid", {"rid": restaurant_id})
cur.execute("DELETE FROM user_reviews WHERE restaurant_id = :rid", {"rid": restaurant_id})
cur.execute("DELETE FROM user_favorites WHERE restaurant_id = :rid", {"rid": restaurant_id})
cur.execute("DELETE FROM restaurants WHERE id = :rid", {"rid": restaurant_id})
# Update remaining fields in place (skip name if already remapped)
if not name_changed:
with conn() as c:
cur = c.cursor()
r_sets = []
r_params: dict = {"rid": active_rid}
for field in ("address", "region", "cuisine_type", "price_range"):
if field in body:
r_sets.append(f"{field} = :{field}")
r_params[field] = body[field]
if r_sets:
r_sets.append("updated_at = SYSTIMESTAMP")
sql = f"UPDATE restaurants SET {', '.join(r_sets)} WHERE id = :rid"
cur.execute(sql, r_params)
vr_params: dict = {"vid": video_db_id, "rid": active_rid}
vr_sets = []
for field in ("foods_mentioned", "evaluation", "guests"):
if field in body:
vr_sets.append(f"{field} = :{field}")
val = body[field]
vr_params[field] = _json.dumps(val, ensure_ascii=False) if isinstance(val, (list, dict)) else val
if vr_sets:
sql = f"UPDATE video_restaurants SET {', '.join(vr_sets)} WHERE video_id = :vid AND restaurant_id = :rid"
cur.execute(sql, vr_params)
cache.flush()
result: dict = {"ok": True}
if name_changed:
result["remapped"] = active_rid != restaurant_id
if active_rid != restaurant_id:
result["new_restaurant_id"] = active_rid
return result

107
backend/core/cache.py Normal file
View File

@@ -0,0 +1,107 @@
"""Redis cache layer — graceful fallback when Redis is unavailable."""
from __future__ import annotations
import json
import logging
import os
from typing import Any
import redis
logger = logging.getLogger(__name__)
_client: redis.Redis | None = None
_disabled = False
DEFAULT_TTL = 600 # 10 minutes
def _get_client() -> redis.Redis | None:
global _client, _disabled
if _disabled:
return None
if _client is None:
host = os.environ.get("REDIS_HOST", "192.168.0.147")
port = int(os.environ.get("REDIS_PORT", "6379"))
db = int(os.environ.get("REDIS_DB", "0"))
try:
_client = redis.Redis(
host=host, port=port, db=db,
socket_connect_timeout=2,
socket_timeout=2,
decode_responses=True,
)
_client.ping()
logger.info("Redis connected: %s:%s/%s", host, port, db)
except Exception as e:
logger.warning("Redis unavailable (%s), caching disabled", e)
_client = None
_disabled = True
return None
return _client
def make_key(*parts: Any) -> str:
"""Build a cache key like 'tasteby:restaurants:cuisine=한식:limit=100'."""
return "tasteby:" + ":".join(str(p) for p in parts if p is not None and p != "")
def get(key: str) -> Any | None:
"""Get cached value. Returns None on miss or error."""
try:
client = _get_client()
if not client:
return None
val = client.get(key)
if val is not None:
return json.loads(val)
except Exception as e:
logger.debug("Cache get error: %s", e)
return None
def set(key: str, value: Any, ttl: int = DEFAULT_TTL) -> None:
"""Cache a value as JSON with TTL."""
try:
client = _get_client()
if not client:
return
client.setex(key, ttl, json.dumps(value, ensure_ascii=False, default=str))
except Exception as e:
logger.debug("Cache set error: %s", e)
def flush() -> None:
"""Flush all tasteby cache keys."""
try:
client = _get_client()
if not client:
return
cursor = 0
while True:
cursor, keys = client.scan(cursor, match="tasteby:*", count=200)
if keys:
client.delete(*keys)
if cursor == 0:
break
logger.info("Cache flushed")
except Exception as e:
logger.debug("Cache flush error: %s", e)
def invalidate_prefix(prefix: str) -> None:
"""Delete all keys matching a prefix."""
try:
client = _get_client()
if not client:
return
cursor = 0
while True:
cursor, keys = client.scan(cursor, match=f"{prefix}*", count=200)
if keys:
client.delete(*keys)
if cursor == 0:
break
except Exception as e:
logger.debug("Cache invalidate error: %s", e)

102
backend/core/cuisine.py Normal file
View File

@@ -0,0 +1,102 @@
"""Standardized cuisine type taxonomy and LLM remapping."""
from __future__ import annotations
# ── Canonical cuisine types ──
# Format: "대분류|소분류"
CUISINE_TYPES = [
# 한식
"한식|백반/한정식",
"한식|국밥/해장국",
"한식|찌개/전골/탕",
"한식|삼겹살/돼지구이",
"한식|소고기/한우구이",
"한식|곱창/막창",
"한식|닭/오리구이",
"한식|족발/보쌈",
"한식|회/횟집",
"한식|해산물",
"한식|분식",
"한식|면",
"한식|죽/죽집",
"한식|순대/순대국",
"한식|장어/민물",
"한식|주점/포차",
# 일식
"일식|스시/오마카세",
"일식|라멘",
"일식|돈카츠",
"일식|텐동/튀김",
"일식|이자카야",
"일식|야키니쿠",
"일식|카레",
"일식|소바/우동",
# 중식
"중식|중화요리",
"중식|마라/훠궈",
"중식|딤섬/만두",
"중식|양꼬치",
# 양식
"양식|파스타/이탈리안",
"양식|스테이크",
"양식|햄버거",
"양식|피자",
"양식|프렌치",
"양식|바베큐",
"양식|브런치",
"양식|비건/샐러드",
# 아시아
"아시아|베트남",
"아시아|태국",
"아시아|인도/중동",
"아시아|동남아기타",
# 기타
"기타|치킨",
"기타|카페/디저트",
"기타|베이커리",
"기타|뷔페",
"기타|퓨전",
]
# For LLM prompt
CUISINE_LIST_TEXT = "\n".join(f" - {c}" for c in CUISINE_TYPES)
_REMAP_PROMPT = """\
아래 식당들의 cuisine_type을 표준 분류로 매핑하세요.
표준 분류 목록 (반드시 이 중 하나를 선택):
{cuisine_types}
식당 목록:
{restaurants}
규칙:
- 모든 식당에 대해 빠짐없이 결과를 반환 (총 {count}개 모두 반환해야 함)
- 반드시 위 표준 분류 목록의 값을 그대로 복사하여 사용 (오타 금지)
- 식당 이름, 현재 분류, 메뉴를 종합적으로 고려
- JSON 배열만 반환, 설명 없음
- 형식: [{{"id": "식당ID", "cuisine_type": "한식|국밥/해장국"}}, ...]
JSON 배열:"""
def build_remap_prompt(restaurants: list[dict]) -> str:
"""Build a prompt for remapping cuisine types."""
items = []
for r in restaurants:
items.append({
"id": r["id"],
"name": r["name"],
"current_cuisine_type": r.get("cuisine_type"),
"foods_mentioned": r.get("foods_mentioned"),
})
import json
return _REMAP_PROMPT.format(
cuisine_types=CUISINE_LIST_TEXT,
restaurants=json.dumps(items, ensure_ascii=False),
count=len(items),
)
# Valid prefixes for loose validation
VALID_PREFIXES = ("한식|", "일식|", "중식|", "양식|", "아시아|", "기타|")

View File

@@ -20,6 +20,8 @@ from oci.generative_ai_inference.models import (
UserMessage,
)
from core.cuisine import CUISINE_LIST_TEXT
logger = logging.getLogger(__name__)
@@ -101,18 +103,22 @@ _EXTRACT_PROMPT = """\
필드:
- name: 식당 이름 (string, 필수)
- address: 주소 또는 위치 힌트 (string | null)
- region: 지역 (예: 서울 강남, 부산 해운대) (string | null)
- cuisine_type: 음식 종류 (예: 한식, 일식, 중식, 양식, 카페) (string | null)
- region: 지역"나라|시/도|구/군/시" 파이프(|) 구분 형식으로 작성 (string | null)
- 한국 예시: "한국|서울|강남구", "한국|부산|해운대구", "한국|제주", "한국|강원|강릉시"
- 해외 예시: "일본|도쿄", "일본|오사카", "싱가포르", "미국|뉴욕", "태국|방콕"
- 나라는 한글로, 해외 도시도 한글로 표기
- cuisine_type: 아래 목록에서 가장 적합한 것을 선택 (string, 필수). 반드시 아래 목록 중 하나를 사용:
{cuisine_types}
- price_range: 가격대 (예: 1만원대, 2-3만원) (string | null)
- foods_mentioned: 언급된 메뉴들 (string[])
- evaluation: 평가 내용 (string | null)
- guests: 함께한 게스트 (string[])
영상 제목: {title}
영상 제목: {{title}}
자막:
{transcript}
{{transcript}}
JSON 배열:"""
JSON 배열:""".format(cuisine_types=CUISINE_LIST_TEXT)
def extract_restaurants(title: str, transcript: str, custom_prompt: str | None = None) -> tuple[list[dict], str]:

View File

@@ -3,12 +3,86 @@
from __future__ import annotations
import json
import re
import oracledb
from core.db import conn
# ── Region parser: address → "나라|시|구" ──
_CITY_MAP = {
"서울특별시": "서울", "서울": "서울",
"부산광역시": "부산", "부산": "부산",
"대구광역시": "대구", "대구": "대구",
"인천광역시": "인천", "인천": "인천",
"광주광역시": "광주", "광주": "광주",
"대전광역시": "대전", "대전": "대전",
"울산광역시": "울산", "울산": "울산",
"세종특별자치시": "세종",
"경기도": "경기", "경기": "경기",
"강원특별자치도": "강원", "강원도": "강원",
"충청북도": "충북", "충청남도": "충남",
"전라북도": "전북", "전북특별자치도": "전북",
"전라남도": "전남",
"경상북도": "경북", "경상남도": "경남",
"제주특별자치도": "제주",
}
def parse_region_from_address(address: str | None) -> str | None:
"""Parse address into 'country|city|district' format."""
if not address:
return None
addr = address.strip()
# Japanese
if addr.startswith("일본") or "Japan" in addr:
city = None
if "Tokyo" in addr: city = "도쿄"
elif "Osaka" in addr: city = "오사카"
elif "Sapporo" in addr or "Hokkaido" in addr: city = "삿포로"
elif "Kyoto" in addr: city = "교토"
elif "Fukuoka" in addr: city = "후쿠오카"
return f"일본|{city}" if city else "일본"
# Singapore
if "Singapore" in addr or "싱가포르" in addr:
return "싱가포르"
# Korean standard: "대한민국 시/도 구/시 ..."
if "대한민국" in addr:
m = re.match(r"대한민국\s+(\S+)\s+(\S+)", addr)
if m:
city = _CITY_MAP.get(m.group(1))
if city:
gu = m.group(2)
if gu.endswith(("", "", "")):
return f"한국|{city}|{gu}"
# Not a district — just city level
return f"한국|{city}"
# Reversed: "... 구 시 대한민국" / "... 시 KR"
parts = addr.split()
for i, p in enumerate(parts):
if p in _CITY_MAP:
city = _CITY_MAP[p]
gu = parts[i - 1] if i > 0 and parts[i - 1].endswith(("", "", "")) else None
return f"한국|{city}|{gu}" if gu else f"한국|{city}"
return "한국"
# Korean without prefix
parts = addr.split()
if parts:
city = _CITY_MAP.get(parts[0])
if city and len(parts) > 1 and parts[1].endswith(("", "", "")):
return f"한국|{city}|{parts[1]}"
elif city:
return f"한국|{city}"
return None
def _truncate_bytes(val: str | None, max_bytes: int) -> str | None:
"""Truncate a string to fit within max_bytes when encoded as UTF-8."""
if not val:
@@ -19,6 +93,21 @@ def _truncate_bytes(val: str | None, max_bytes: int) -> str | None:
return encoded[:max_bytes].decode("utf-8", errors="ignore").rstrip()
def find_by_place_id(google_place_id: str) -> dict | None:
"""Find a restaurant by Google Place ID."""
sql = "SELECT id, name, address, region, latitude, longitude FROM restaurants WHERE google_place_id = :gid"
with conn() as c:
cur = c.cursor()
cur.execute(sql, {"gid": google_place_id})
r = cur.fetchone()
if r:
return {
"id": r[0], "name": r[1], "address": r[2],
"region": r[3], "latitude": r[4], "longitude": r[5],
}
return None
def find_by_name(name: str) -> dict | None:
"""Find a restaurant by exact name match."""
sql = "SELECT id, name, address, region, latitude, longitude FROM restaurants WHERE name = :n"
@@ -50,17 +139,27 @@ def upsert(
rating_count: int | None = None,
) -> str:
"""Insert or update a restaurant. Returns row id."""
# Auto-derive region from address if not provided
if not region and address:
region = parse_region_from_address(address)
# Truncate fields to fit DB column byte limits (VARCHAR2 is byte-based)
price_range = _truncate_bytes(price_range, 50)
cuisine_type = _truncate_bytes(cuisine_type, 100)
region = _truncate_bytes(region, 100)
website = _truncate_bytes(website, 500)
existing = find_by_name(name)
# 1) google_place_id로 먼저 찾고, 2) 이름으로 찾기
existing = None
if google_place_id:
existing = find_by_place_id(google_place_id)
if not existing:
existing = find_by_name(name)
if existing:
sql = """
UPDATE restaurants
SET address = COALESCE(:addr, address),
SET name = :name,
address = COALESCE(:addr, address),
region = COALESCE(:reg, region),
latitude = COALESCE(:lat, latitude),
longitude = COALESCE(:lng, longitude),
@@ -77,6 +176,7 @@ def upsert(
"""
with conn() as c:
c.cursor().execute(sql, {
"name": name,
"addr": address, "reg": region,
"lat": latitude, "lng": longitude,
"cuisine": cuisine_type, "price": price_range,

View File

@@ -3,9 +3,12 @@
from __future__ import annotations
import array
import json
import logging
import os
import oci
import oracledb
from oci.generative_ai_inference import GenerativeAiInferenceClient
from oci.generative_ai_inference.models import (
EmbedTextDetails,
@@ -14,6 +17,10 @@ from oci.generative_ai_inference.models import (
from core.db import conn
logger = logging.getLogger(__name__)
_EMBED_BATCH_SIZE = 96 # Cohere embed v4 max batch size
def _embed_texts(texts: list[str]) -> list[list[float]]:
config = oci.config.from_file()
@@ -34,10 +41,148 @@ def _embed_texts(texts: list[str]) -> list[list[float]]:
return response.data.embeddings
def _embed_texts_batched(texts: list[str]) -> list[list[float]]:
"""Embed texts in batches to respect API limits."""
all_embeddings: list[list[float]] = []
for i in range(0, len(texts), _EMBED_BATCH_SIZE):
batch = texts[i : i + _EMBED_BATCH_SIZE]
all_embeddings.extend(_embed_texts(batch))
return all_embeddings
def _to_vec(embedding: list[float]) -> array.array:
return array.array("f", embedding)
def _parse_json_field(val, default):
if val is None:
return default
if isinstance(val, (list, dict)):
return val
if hasattr(val, "read"):
val = val.read()
if isinstance(val, str):
try:
return json.loads(val)
except (json.JSONDecodeError, ValueError):
return default
return default
def _build_rich_chunk(rest: dict, video_links: list[dict]) -> str:
"""Build a single JSON chunk per restaurant with all relevant info."""
# Collect all foods, evaluations, video titles from linked videos
all_foods: list[str] = []
all_evaluations: list[str] = []
video_titles: list[str] = []
channel_names: set[str] = set()
for vl in video_links:
if vl.get("title"):
video_titles.append(vl["title"])
if vl.get("channel_name"):
channel_names.add(vl["channel_name"])
foods = _parse_json_field(vl.get("foods_mentioned"), [])
if foods:
all_foods.extend(foods)
ev = _parse_json_field(vl.get("evaluation"), {})
if isinstance(ev, dict) and ev.get("text"):
all_evaluations.append(ev["text"])
elif isinstance(ev, str) and ev:
all_evaluations.append(ev)
doc = {
"name": rest.get("name"),
"cuisine_type": rest.get("cuisine_type"),
"region": rest.get("region"),
"address": rest.get("address"),
"price_range": rest.get("price_range"),
"menu": list(dict.fromkeys(all_foods)), # deduplicate, preserve order
"summary": all_evaluations,
"video_titles": video_titles,
"channels": sorted(channel_names),
}
# Remove None/empty values
doc = {k: v for k, v in doc.items() if v}
return json.dumps(doc, ensure_ascii=False)
def rebuild_all_vectors():
"""Rebuild vector embeddings for ALL restaurants.
Yields progress dicts: {"status": "progress", "current": N, "total": M, "name": "..."}
Final yield: {"status": "done", "total": N}
"""
# 1. Get all restaurants with video links
sql_restaurants = """
SELECT DISTINCT r.id, r.name, r.address, r.region, r.cuisine_type, r.price_range
FROM restaurants r
JOIN video_restaurants vr ON vr.restaurant_id = r.id
WHERE r.latitude IS NOT NULL
ORDER BY r.name
"""
sql_video_links = """
SELECT v.title, vr.foods_mentioned, vr.evaluation, c.channel_name
FROM video_restaurants vr
JOIN videos v ON v.id = vr.video_id
JOIN channels c ON c.id = v.channel_id
WHERE vr.restaurant_id = :rid
"""
# Load all restaurant data
restaurants_data: list[tuple[dict, str]] = [] # (rest_dict, chunk_text)
with conn() as c:
cur = c.cursor()
cur.execute(sql_restaurants)
cols = [d[0].lower() for d in cur.description]
all_rests = [dict(zip(cols, row)) for row in cur.fetchall()]
total = len(all_rests)
logger.info("Rebuilding vectors for %d restaurants", total)
for i, rest in enumerate(all_rests):
with conn() as c:
cur = c.cursor()
cur.execute(sql_video_links, {"rid": rest["id"]})
vl_cols = [d[0].lower() for d in cur.description]
video_links = [dict(zip(vl_cols, row)) for row in cur.fetchall()]
chunk = _build_rich_chunk(rest, video_links)
restaurants_data.append((rest, chunk))
yield {"status": "progress", "current": i + 1, "total": total, "phase": "prepare", "name": rest["name"]}
# 2. Delete all existing vectors
with conn() as c:
c.cursor().execute("DELETE FROM restaurant_vectors")
logger.info("Cleared existing vectors")
yield {"status": "progress", "current": 0, "total": total, "phase": "embed"}
# 3. Embed in batches and insert
chunks = [chunk for _, chunk in restaurants_data]
rest_ids = [rest["id"] for rest, _ in restaurants_data]
embeddings = _embed_texts_batched(chunks)
logger.info("Generated %d embeddings", len(embeddings))
insert_sql = """
INSERT INTO restaurant_vectors (restaurant_id, chunk_text, embedding)
VALUES (:rid, :chunk, :emb)
"""
with conn() as c:
cur = c.cursor()
for i, (rid, chunk, emb) in enumerate(zip(rest_ids, chunks, embeddings)):
cur.execute(insert_sql, {
"rid": rid,
"chunk": chunk,
"emb": _to_vec(emb),
})
if (i + 1) % 50 == 0 or i + 1 == total:
yield {"status": "progress", "current": i + 1, "total": total, "phase": "insert"}
logger.info("Rebuilt vectors for %d restaurants", total)
yield {"status": "done", "total": total}
def save_restaurant_vectors(restaurant_id: str, chunks: list[str]) -> list[str]:
"""Embed and store text chunks for a restaurant.
@@ -54,7 +199,6 @@ def save_restaurant_vectors(restaurant_id: str, chunks: list[str]) -> list[str]:
VALUES (:rid, :chunk, :emb)
RETURNING id INTO :out_id
"""
import oracledb
with conn() as c:
cur = c.cursor()
for chunk, emb in zip(chunks, embeddings):
@@ -69,10 +213,11 @@ def save_restaurant_vectors(restaurant_id: str, chunks: list[str]) -> list[str]:
return inserted
def search_similar(query: str, top_k: int = 10) -> list[dict]:
def search_similar(query: str, top_k: int = 10, max_distance: float = 0.57) -> list[dict]:
"""Semantic search: find restaurants similar to query text.
Returns list of dicts: restaurant_id, chunk_text, distance.
Only results with cosine distance <= max_distance are returned.
"""
embeddings = _embed_texts([query])
query_vec = _to_vec(embeddings[0])
@@ -81,12 +226,13 @@ def search_similar(query: str, top_k: int = 10) -> list[dict]:
SELECT rv.restaurant_id, rv.chunk_text,
VECTOR_DISTANCE(rv.embedding, :qvec, COSINE) AS dist
FROM restaurant_vectors rv
WHERE VECTOR_DISTANCE(rv.embedding, :qvec2, COSINE) <= :max_dist
ORDER BY dist
FETCH FIRST :k ROWS ONLY
"""
with conn() as c:
cur = c.cursor()
cur.execute(sql, {"qvec": query_vec, "k": top_k})
cur.execute(sql, {"qvec": query_vec, "qvec2": query_vec, "k": top_k, "max_dist": max_distance})
return [
{
"restaurant_id": r[0],