Files
tasteby/backend/core/auth.py
joungmin 6c47d3c57d Backend enhancements: auth, channels, restaurants, daemon improvements
- Add admin auth dependency and role checks
- Expand channel and restaurant API routes
- Improve YouTube transcript fetching
- Enhance daemon worker with better error handling and scheduling

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 10:59:22 +09:00

129 lines
4.1 KiB
Python

"""Authentication helpers — Google OAuth2 + JWT."""
from __future__ import annotations
import os
from datetime import datetime, timedelta, timezone
import jwt
import oracledb
from google.oauth2 import id_token as google_id_token
from google.auth.transport import requests as google_requests
from core.db import conn
JWT_SECRET = os.environ.get("JWT_SECRET", "tasteby-dev-secret-change-me")
JWT_ALGORITHM = "HS256"
JWT_EXPIRE_DAYS = 7
def verify_google_token(token: str) -> dict:
"""Verify a Google ID token and return user info.
Returns dict with keys: sub, email, name, picture.
Raises ValueError on invalid token.
"""
info = google_id_token.verify_oauth2_token(
token, google_requests.Request(),
)
return {
"sub": info["sub"],
"email": info.get("email"),
"name": info.get("name"),
"picture": info.get("picture"),
}
def find_or_create_user(
provider: str,
provider_id: str,
email: str | None = None,
nickname: str | None = None,
avatar_url: str | None = None,
) -> dict:
"""Find existing user or create new one. Returns user dict."""
# Try to find existing user
sql_find = """
SELECT id, provider, provider_id, email, nickname, avatar_url, created_at, last_login_at
FROM tasteby_users
WHERE provider = :provider AND provider_id = :provider_id
"""
with conn() as c:
cur = c.cursor()
cur.execute(sql_find, {"provider": provider, "provider_id": provider_id})
row = cur.fetchone()
if row:
# Update last_login and optional fields
sql_update = """
UPDATE tasteby_users
SET last_login_at = SYSTIMESTAMP,
email = COALESCE(:email, email),
nickname = COALESCE(:nickname, nickname),
avatar_url = COALESCE(:avatar_url, avatar_url)
WHERE id = :id
"""
cur.execute(sql_update, {
"email": email, "nickname": nickname,
"avatar_url": avatar_url, "id": row[0],
})
# Fetch is_admin
cur.execute("SELECT is_admin FROM tasteby_users WHERE id = :id", {"id": row[0]})
is_admin = bool(cur.fetchone()[0])
return {
"id": row[0],
"provider": row[1],
"provider_id": row[2],
"email": email or row[3],
"nickname": nickname or row[4],
"avatar_url": avatar_url or row[5],
"is_admin": is_admin,
}
# Create new user
sql_insert = """
INSERT INTO tasteby_users (provider, provider_id, email, nickname, avatar_url, last_login_at)
VALUES (:provider, :provider_id, :email, :nickname, :avatar_url, SYSTIMESTAMP)
RETURNING id INTO :out_id
"""
out_id = cur.var(oracledb.STRING)
cur.execute(sql_insert, {
"provider": provider,
"provider_id": provider_id,
"email": email,
"nickname": nickname,
"avatar_url": avatar_url,
"out_id": out_id,
})
new_id = out_id.getvalue()[0]
return {
"id": new_id,
"provider": provider,
"provider_id": provider_id,
"email": email,
"nickname": nickname,
"avatar_url": avatar_url,
"is_admin": False,
}
def create_jwt(user: dict) -> str:
"""Create a JWT access token for the given user."""
payload = {
"sub": user["id"],
"email": user.get("email"),
"nickname": user.get("nickname"),
"is_admin": user.get("is_admin", False),
"exp": datetime.now(timezone.utc) + timedelta(days=JWT_EXPIRE_DAYS),
"iat": datetime.now(timezone.utc),
}
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
def verify_jwt(token: str) -> dict:
"""Verify a JWT and return the payload.
Raises jwt.InvalidTokenError on failure.
"""
return jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])