- Oracle ADB queue table (sql/schema.sql) - Queue CRUD: core/queue_db.py - YouTube transcript: core/youtube.py - Web page fetch: core/web.py - LLM enrichment via OCI GenAI Gemini Flash: core/enricher.py - Text chunker: core/chunker.py - Obsidian note writer: core/obsidian.py - Oracle vector store insertion: core/vector.py - Polling daemon: daemon/worker.py - Telegram bot: bot/telegram_bot.py - Main runner: main.py
186 lines
5.0 KiB
Python
186 lines
5.0 KiB
Python
"""Oracle ADB connection pool and CRUD operations for knowledge_queue."""
|
|
|
|
import json
|
|
import os
|
|
from contextlib import contextmanager
|
|
from typing import Generator
|
|
|
|
import oracledb
|
|
|
|
_pool: oracledb.ConnectionPool | None = None
|
|
|
|
|
|
def _get_pool() -> oracledb.ConnectionPool:
|
|
"""Return (or lazily create) the module-level connection pool."""
|
|
global _pool
|
|
if _pool is None:
|
|
kwargs: dict = dict(
|
|
user=os.environ["ORACLE_USER"],
|
|
password=os.environ["ORACLE_PASSWORD"],
|
|
dsn=os.environ["ORACLE_DSN"],
|
|
min=1,
|
|
max=5,
|
|
increment=1,
|
|
)
|
|
wallet = os.environ.get("ORACLE_WALLET")
|
|
if wallet:
|
|
kwargs["config_dir"] = wallet # tnsnames.ora + cwallet.sso live here
|
|
_pool = oracledb.create_pool(**kwargs)
|
|
return _pool
|
|
|
|
|
|
@contextmanager
|
|
def _conn() -> Generator[oracledb.Connection, None, None]:
|
|
"""Context manager that acquires and releases a pooled connection."""
|
|
pool = _get_pool()
|
|
conn = pool.acquire()
|
|
try:
|
|
yield conn
|
|
conn.commit()
|
|
except Exception:
|
|
conn.rollback()
|
|
raise
|
|
finally:
|
|
pool.release(conn)
|
|
|
|
|
|
def insert_item(input_type: str, content: str, chat_id: str = "") -> str:
|
|
"""Insert a new queue item and return its generated UUID.
|
|
|
|
Args:
|
|
input_type: One of 'youtube', 'url', 'text'.
|
|
content: The URL or raw text to process.
|
|
chat_id: Telegram chat ID for future notification support.
|
|
|
|
Returns:
|
|
The UUID of the newly inserted row.
|
|
"""
|
|
sql = """
|
|
INSERT INTO knowledge_queue (input_type, content, telegram_chat_id)
|
|
VALUES (:input_type, :content, :chat_id)
|
|
RETURNING id INTO :out_id
|
|
"""
|
|
with _conn() as conn:
|
|
cursor = conn.cursor()
|
|
out_id_var = cursor.var(oracledb.STRING)
|
|
cursor.execute(
|
|
sql,
|
|
{
|
|
"input_type": input_type,
|
|
"content": content,
|
|
"chat_id": chat_id,
|
|
"out_id": out_id_var,
|
|
},
|
|
)
|
|
return out_id_var.getvalue()[0]
|
|
|
|
|
|
def fetch_pending(limit: int = 5) -> list[dict]:
|
|
"""Fetch oldest pending items up to limit.
|
|
|
|
Args:
|
|
limit: Maximum number of rows to return.
|
|
|
|
Returns:
|
|
List of dicts with keys: id, input_type, content, telegram_chat_id.
|
|
"""
|
|
sql = """
|
|
SELECT id, input_type, content, telegram_chat_id
|
|
FROM knowledge_queue
|
|
WHERE status = 'pending'
|
|
ORDER BY created_at
|
|
FETCH FIRST :n ROWS ONLY
|
|
"""
|
|
with _conn() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(sql, {"n": limit})
|
|
rows = cursor.fetchall()
|
|
|
|
return [
|
|
{
|
|
"id": row[0],
|
|
"input_type": row[1],
|
|
"content": row[2].read() if hasattr(row[2], "read") else row[2],
|
|
"telegram_chat_id": row[3],
|
|
}
|
|
for row in rows
|
|
]
|
|
|
|
|
|
def set_processing(row_id: str) -> None:
|
|
"""Mark a queue item as processing.
|
|
|
|
Args:
|
|
row_id: The UUID of the row to update.
|
|
"""
|
|
sql = """
|
|
UPDATE knowledge_queue
|
|
SET status = 'processing', updated_at = SYSTIMESTAMP
|
|
WHERE id = :id
|
|
"""
|
|
with _conn() as conn:
|
|
conn.cursor().execute(sql, {"id": row_id})
|
|
|
|
|
|
def set_done(row_id: str, title: str, metadata: dict) -> None:
|
|
"""Mark a queue item as done with extracted metadata.
|
|
|
|
Args:
|
|
row_id: The UUID of the row to update.
|
|
title: LLM-extracted title.
|
|
metadata: Dict of enrichment results to store as JSON.
|
|
"""
|
|
sql = """
|
|
UPDATE knowledge_queue
|
|
SET status = 'done',
|
|
title = :title,
|
|
metadata_json = :meta_json,
|
|
updated_at = SYSTIMESTAMP
|
|
WHERE id = :id
|
|
"""
|
|
with _conn() as conn:
|
|
conn.cursor().execute(
|
|
sql,
|
|
{
|
|
"id": row_id,
|
|
"title": title[:500] if title else "",
|
|
"meta_json": json.dumps(metadata, ensure_ascii=False),
|
|
},
|
|
)
|
|
|
|
|
|
def set_error(row_id: str, error_msg: str) -> None:
|
|
"""Mark a queue item as error with a message.
|
|
|
|
Args:
|
|
row_id: The UUID of the row to update.
|
|
error_msg: Description of the error.
|
|
"""
|
|
sql = """
|
|
UPDATE knowledge_queue
|
|
SET status = 'error', error_msg = :error_msg, updated_at = SYSTIMESTAMP
|
|
WHERE id = :id
|
|
"""
|
|
with _conn() as conn:
|
|
conn.cursor().execute(sql, {"id": row_id, "error_msg": error_msg})
|
|
|
|
|
|
def get_status_counts() -> dict:
|
|
"""Return count of rows per status.
|
|
|
|
Returns:
|
|
Dict like {'pending': 3, 'processing': 1, 'done': 42, 'error': 0}.
|
|
"""
|
|
sql = """
|
|
SELECT status, COUNT(*) FROM knowledge_queue GROUP BY status
|
|
"""
|
|
with _conn() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(sql)
|
|
rows = cursor.fetchall()
|
|
|
|
counts = {"pending": 0, "processing": 0, "done": 0, "error": 0}
|
|
for status, count in rows:
|
|
counts[status] = count
|
|
return counts
|