feat: initial knowledge-inbox pipeline implementation
- Oracle ADB queue table (sql/schema.sql) - Queue CRUD: core/queue_db.py - YouTube transcript: core/youtube.py - Web page fetch: core/web.py - LLM enrichment via OCI GenAI Gemini Flash: core/enricher.py - Text chunker: core/chunker.py - Obsidian note writer: core/obsidian.py - Oracle vector store insertion: core/vector.py - Polling daemon: daemon/worker.py - Telegram bot: bot/telegram_bot.py - Main runner: main.py
This commit is contained in:
185
core/queue_db.py
Normal file
185
core/queue_db.py
Normal file
@@ -0,0 +1,185 @@
|
||||
"""Oracle ADB connection pool and CRUD operations for knowledge_queue."""
|
||||
|
||||
import json
|
||||
import os
|
||||
from contextlib import contextmanager
|
||||
from typing import Generator
|
||||
|
||||
import oracledb
|
||||
|
||||
_pool: oracledb.ConnectionPool | None = None
|
||||
|
||||
|
||||
def _get_pool() -> oracledb.ConnectionPool:
|
||||
"""Return (or lazily create) the module-level connection pool."""
|
||||
global _pool
|
||||
if _pool is None:
|
||||
kwargs: dict = dict(
|
||||
user=os.environ["ORACLE_USER"],
|
||||
password=os.environ["ORACLE_PASSWORD"],
|
||||
dsn=os.environ["ORACLE_DSN"],
|
||||
min=1,
|
||||
max=5,
|
||||
increment=1,
|
||||
)
|
||||
wallet = os.environ.get("ORACLE_WALLET")
|
||||
if wallet:
|
||||
kwargs["config_dir"] = wallet # tnsnames.ora + cwallet.sso live here
|
||||
_pool = oracledb.create_pool(**kwargs)
|
||||
return _pool
|
||||
|
||||
|
||||
@contextmanager
|
||||
def _conn() -> Generator[oracledb.Connection, None, None]:
|
||||
"""Context manager that acquires and releases a pooled connection."""
|
||||
pool = _get_pool()
|
||||
conn = pool.acquire()
|
||||
try:
|
||||
yield conn
|
||||
conn.commit()
|
||||
except Exception:
|
||||
conn.rollback()
|
||||
raise
|
||||
finally:
|
||||
pool.release(conn)
|
||||
|
||||
|
||||
def insert_item(input_type: str, content: str, chat_id: str = "") -> str:
|
||||
"""Insert a new queue item and return its generated UUID.
|
||||
|
||||
Args:
|
||||
input_type: One of 'youtube', 'url', 'text'.
|
||||
content: The URL or raw text to process.
|
||||
chat_id: Telegram chat ID for future notification support.
|
||||
|
||||
Returns:
|
||||
The UUID of the newly inserted row.
|
||||
"""
|
||||
sql = """
|
||||
INSERT INTO knowledge_queue (input_type, content, telegram_chat_id)
|
||||
VALUES (:input_type, :content, :chat_id)
|
||||
RETURNING id INTO :out_id
|
||||
"""
|
||||
with _conn() as conn:
|
||||
cursor = conn.cursor()
|
||||
out_id_var = cursor.var(oracledb.STRING)
|
||||
cursor.execute(
|
||||
sql,
|
||||
{
|
||||
"input_type": input_type,
|
||||
"content": content,
|
||||
"chat_id": chat_id,
|
||||
"out_id": out_id_var,
|
||||
},
|
||||
)
|
||||
return out_id_var.getvalue()[0]
|
||||
|
||||
|
||||
def fetch_pending(limit: int = 5) -> list[dict]:
|
||||
"""Fetch oldest pending items up to limit.
|
||||
|
||||
Args:
|
||||
limit: Maximum number of rows to return.
|
||||
|
||||
Returns:
|
||||
List of dicts with keys: id, input_type, content, telegram_chat_id.
|
||||
"""
|
||||
sql = """
|
||||
SELECT id, input_type, content, telegram_chat_id
|
||||
FROM knowledge_queue
|
||||
WHERE status = 'pending'
|
||||
ORDER BY created_at
|
||||
FETCH FIRST :n ROWS ONLY
|
||||
"""
|
||||
with _conn() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(sql, {"n": limit})
|
||||
rows = cursor.fetchall()
|
||||
|
||||
return [
|
||||
{
|
||||
"id": row[0],
|
||||
"input_type": row[1],
|
||||
"content": row[2].read() if hasattr(row[2], "read") else row[2],
|
||||
"telegram_chat_id": row[3],
|
||||
}
|
||||
for row in rows
|
||||
]
|
||||
|
||||
|
||||
def set_processing(row_id: str) -> None:
|
||||
"""Mark a queue item as processing.
|
||||
|
||||
Args:
|
||||
row_id: The UUID of the row to update.
|
||||
"""
|
||||
sql = """
|
||||
UPDATE knowledge_queue
|
||||
SET status = 'processing', updated_at = SYSTIMESTAMP
|
||||
WHERE id = :id
|
||||
"""
|
||||
with _conn() as conn:
|
||||
conn.cursor().execute(sql, {"id": row_id})
|
||||
|
||||
|
||||
def set_done(row_id: str, title: str, metadata: dict) -> None:
|
||||
"""Mark a queue item as done with extracted metadata.
|
||||
|
||||
Args:
|
||||
row_id: The UUID of the row to update.
|
||||
title: LLM-extracted title.
|
||||
metadata: Dict of enrichment results to store as JSON.
|
||||
"""
|
||||
sql = """
|
||||
UPDATE knowledge_queue
|
||||
SET status = 'done',
|
||||
title = :title,
|
||||
metadata_json = :meta_json,
|
||||
updated_at = SYSTIMESTAMP
|
||||
WHERE id = :id
|
||||
"""
|
||||
with _conn() as conn:
|
||||
conn.cursor().execute(
|
||||
sql,
|
||||
{
|
||||
"id": row_id,
|
||||
"title": title[:500] if title else "",
|
||||
"meta_json": json.dumps(metadata, ensure_ascii=False),
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def set_error(row_id: str, error_msg: str) -> None:
|
||||
"""Mark a queue item as error with a message.
|
||||
|
||||
Args:
|
||||
row_id: The UUID of the row to update.
|
||||
error_msg: Description of the error.
|
||||
"""
|
||||
sql = """
|
||||
UPDATE knowledge_queue
|
||||
SET status = 'error', error_msg = :error_msg, updated_at = SYSTIMESTAMP
|
||||
WHERE id = :id
|
||||
"""
|
||||
with _conn() as conn:
|
||||
conn.cursor().execute(sql, {"id": row_id, "error_msg": error_msg})
|
||||
|
||||
|
||||
def get_status_counts() -> dict:
|
||||
"""Return count of rows per status.
|
||||
|
||||
Returns:
|
||||
Dict like {'pending': 3, 'processing': 1, 'done': 42, 'error': 0}.
|
||||
"""
|
||||
sql = """
|
||||
SELECT status, COUNT(*) FROM knowledge_queue GROUP BY status
|
||||
"""
|
||||
with _conn() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(sql)
|
||||
rows = cursor.fetchall()
|
||||
|
||||
counts = {"pending": 0, "processing": 0, "done": 0, "error": 0}
|
||||
for status, count in rows:
|
||||
counts[status] = count
|
||||
return counts
|
||||
Reference in New Issue
Block a user