"""Authentication helpers — Google OAuth2 + JWT.""" from __future__ import annotations import os from datetime import datetime, timedelta, timezone import jwt import oracledb from google.oauth2 import id_token as google_id_token from google.auth.transport import requests as google_requests from core.db import conn JWT_SECRET = os.environ.get("JWT_SECRET", "tasteby-dev-secret-change-me") JWT_ALGORITHM = "HS256" JWT_EXPIRE_DAYS = 7 def verify_google_token(token: str) -> dict: """Verify a Google ID token and return user info. Returns dict with keys: sub, email, name, picture. Raises ValueError on invalid token. """ info = google_id_token.verify_oauth2_token( token, google_requests.Request(), ) return { "sub": info["sub"], "email": info.get("email"), "name": info.get("name"), "picture": info.get("picture"), } def find_or_create_user( provider: str, provider_id: str, email: str | None = None, nickname: str | None = None, avatar_url: str | None = None, ) -> dict: """Find existing user or create new one. Returns user dict.""" # Try to find existing user sql_find = """ SELECT id, provider, provider_id, email, nickname, avatar_url, created_at, last_login_at FROM tasteby_users WHERE provider = :provider AND provider_id = :provider_id """ with conn() as c: cur = c.cursor() cur.execute(sql_find, {"provider": provider, "provider_id": provider_id}) row = cur.fetchone() if row: # Update last_login and optional fields sql_update = """ UPDATE tasteby_users SET last_login_at = SYSTIMESTAMP, email = COALESCE(:email, email), nickname = COALESCE(:nickname, nickname), avatar_url = COALESCE(:avatar_url, avatar_url) WHERE id = :id """ cur.execute(sql_update, { "email": email, "nickname": nickname, "avatar_url": avatar_url, "id": row[0], }) return { "id": row[0], "provider": row[1], "provider_id": row[2], "email": email or row[3], "nickname": nickname or row[4], "avatar_url": avatar_url or row[5], } # Create new user sql_insert = """ INSERT INTO tasteby_users (provider, provider_id, email, nickname, avatar_url, last_login_at) VALUES (:provider, :provider_id, :email, :nickname, :avatar_url, SYSTIMESTAMP) RETURNING id INTO :out_id """ out_id = cur.var(oracledb.STRING) cur.execute(sql_insert, { "provider": provider, "provider_id": provider_id, "email": email, "nickname": nickname, "avatar_url": avatar_url, "out_id": out_id, }) new_id = out_id.getvalue()[0] return { "id": new_id, "provider": provider, "provider_id": provider_id, "email": email, "nickname": nickname, "avatar_url": avatar_url, } def create_jwt(user: dict) -> str: """Create a JWT access token for the given user.""" payload = { "sub": user["id"], "email": user.get("email"), "nickname": user.get("nickname"), "exp": datetime.now(timezone.utc) + timedelta(days=JWT_EXPIRE_DAYS), "iat": datetime.now(timezone.utc), } return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM) def verify_jwt(token: str) -> dict: """Verify a JWT and return the payload. Raises jwt.InvalidTokenError on failure. """ return jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])