"""Oracle ADB connection pool and CRUD operations for knowledge_queue.""" from __future__ import annotations import json import os from contextlib import contextmanager from typing import Generator, Optional import oracledb _pool: Optional[oracledb.ConnectionPool] = None def _get_pool() -> oracledb.ConnectionPool: """Return (or lazily create) the module-level connection pool.""" global _pool if _pool is None: kwargs: dict = dict( user=os.environ["ORACLE_USER"], password=os.environ["ORACLE_PASSWORD"], dsn=os.environ["ORACLE_DSN"], min=1, max=5, increment=1, ) wallet = os.environ.get("ORACLE_WALLET") if wallet: kwargs["config_dir"] = wallet # tnsnames.ora + cwallet.sso live here _pool = oracledb.create_pool(**kwargs) return _pool @contextmanager def _conn() -> Generator[oracledb.Connection, None, None]: """Context manager that acquires and releases a pooled connection.""" pool = _get_pool() conn = pool.acquire() try: yield conn conn.commit() except Exception: conn.rollback() raise finally: pool.release(conn) def insert_item(input_type: str, content: str, chat_id: str = "") -> str: """Insert a new queue item and return its generated UUID. Args: input_type: One of 'youtube', 'url', 'text'. content: The URL or raw text to process. chat_id: Telegram chat ID for future notification support. Returns: The UUID of the newly inserted row. """ sql = """ INSERT INTO knowledge_queue (input_type, content, telegram_chat_id) VALUES (:input_type, :content, :chat_id) RETURNING id INTO :out_id """ with _conn() as conn: cursor = conn.cursor() out_id_var = cursor.var(oracledb.STRING) cursor.execute( sql, { "input_type": input_type, "content": content, "chat_id": chat_id, "out_id": out_id_var, }, ) return out_id_var.getvalue()[0] def fetch_pending(limit: int = 5) -> list[dict]: """Fetch oldest pending items up to limit. Args: limit: Maximum number of rows to return. Returns: List of dicts with keys: id, input_type, content, telegram_chat_id. """ sql = """ SELECT id, input_type, content, telegram_chat_id FROM knowledge_queue WHERE status = 'pending' ORDER BY created_at FETCH FIRST :n ROWS ONLY """ with _conn() as conn: cursor = conn.cursor() cursor.execute(sql, {"n": limit}) rows = cursor.fetchall() return [ { "id": row[0], "input_type": row[1], "content": row[2].read() if hasattr(row[2], "read") else row[2], "telegram_chat_id": row[3], } for row in rows ] def set_processing(row_id: str) -> None: """Mark a queue item as processing. Args: row_id: The UUID of the row to update. """ sql = """ UPDATE knowledge_queue SET status = 'processing', updated_at = SYSTIMESTAMP WHERE id = :id """ with _conn() as conn: conn.cursor().execute(sql, {"id": row_id}) def set_done(row_id: str, title: str, metadata: dict) -> None: """Mark a queue item as done with extracted metadata. Args: row_id: The UUID of the row to update. title: LLM-extracted title. metadata: Dict of enrichment results to store as JSON. """ sql = """ UPDATE knowledge_queue SET status = 'done', title = :title, metadata_json = :meta_json, updated_at = SYSTIMESTAMP WHERE id = :id """ with _conn() as conn: conn.cursor().execute( sql, { "id": row_id, "title": title[:500] if title else "", "meta_json": json.dumps(metadata, ensure_ascii=False), }, ) def set_error(row_id: str, error_msg: str) -> None: """Mark a queue item as error with a message. Args: row_id: The UUID of the row to update. error_msg: Description of the error. """ sql = """ UPDATE knowledge_queue SET status = 'error', error_msg = :error_msg, updated_at = SYSTIMESTAMP WHERE id = :id """ with _conn() as conn: conn.cursor().execute(sql, {"id": row_id, "error_msg": error_msg}) def get_status_counts() -> dict: """Return count of rows per status. Returns: Dict like {'pending': 3, 'processing': 1, 'done': 42, 'error': 0}. """ sql = """ SELECT status, COUNT(*) FROM knowledge_queue GROUP BY status """ with _conn() as conn: cursor = conn.cursor() cursor.execute(sql) rows = cursor.fetchall() counts = {"pending": 0, "processing": 0, "done": 0, "error": 0} for status, count in rows: counts[status] = count return counts