From 86a4104ae3f0f35930d02c404539ab3faaad73a7 Mon Sep 17 00:00:00 2001 From: joungmin Date: Sat, 28 Feb 2026 08:16:11 +0900 Subject: [PATCH] feat: initial knowledge-inbox pipeline implementation - Oracle ADB queue table (sql/schema.sql) - Queue CRUD: core/queue_db.py - YouTube transcript: core/youtube.py - Web page fetch: core/web.py - LLM enrichment via OCI GenAI Gemini Flash: core/enricher.py - Text chunker: core/chunker.py - Obsidian note writer: core/obsidian.py - Oracle vector store insertion: core/vector.py - Polling daemon: daemon/worker.py - Telegram bot: bot/telegram_bot.py - Main runner: main.py --- .env.example | 20 +++++ .gitignore | 8 ++ bot/__init__.py | 0 bot/telegram_bot.py | 108 ++++++++++++++++++++++++++ core/__init__.py | 0 core/chunker.py | 28 +++++++ core/enricher.py | 96 +++++++++++++++++++++++ core/obsidian.py | 86 ++++++++++++++++++++ core/queue_db.py | 185 ++++++++++++++++++++++++++++++++++++++++++++ core/vector.py | 114 +++++++++++++++++++++++++++ core/web.py | 59 ++++++++++++++ core/youtube.py | 50 ++++++++++++ daemon/__init__.py | 0 daemon/worker.py | 108 ++++++++++++++++++++++++++ main.py | 23 ++++++ pyproject.toml | 19 +++++ requirements.txt | 6 ++ sql/schema.sql | 16 ++++ 18 files changed, 926 insertions(+) create mode 100644 .env.example create mode 100644 .gitignore create mode 100644 bot/__init__.py create mode 100644 bot/telegram_bot.py create mode 100644 core/__init__.py create mode 100644 core/chunker.py create mode 100644 core/enricher.py create mode 100644 core/obsidian.py create mode 100644 core/queue_db.py create mode 100644 core/vector.py create mode 100644 core/web.py create mode 100644 core/youtube.py create mode 100644 daemon/__init__.py create mode 100644 daemon/worker.py create mode 100644 main.py create mode 100644 pyproject.toml create mode 100644 requirements.txt create mode 100644 sql/schema.sql diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..aa62847 --- /dev/null +++ b/.env.example @@ -0,0 +1,20 @@ +# Telegram +TELEGRAM_BOT_TOKEN= + +# Oracle ADB (queue + vector store shared) +ORACLE_USER=admin +ORACLE_PASSWORD= +ORACLE_DSN=h8i4i0g8cxtd2lpf_high +ORACLE_WALLET=/Users/joungmin/devkit/db_conn/Wallet_H8I4I0G8CXTD2LPF + +# OCI GenAI +OCI_COMPARTMENT_ID= +OCI_GENAI_ENDPOINT=https://inference.generativeai.us-ashburn-1.oci.oraclecloud.com +OCI_EMBED_MODEL_ID=cohere.embed-v4.0 +OCI_CHAT_MODEL_ID=ocid1.generativeaimodel.oc1.iad.amaaaaaask7dceyaeo4ehrn25guuats5s45hnvswlhxo6riop275l2bkr2vq + +# Obsidian +OBSIDIAN_VAULT=/Users/joungmin/Documents/Obsidian Vault + +# Daemon +DAEMON_INTERVAL=30 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5319c5f --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +.env +__pycache__/ +*.pyc +*.pyo +.venv/ +dist/ +*.egg-info/ +.DS_Store diff --git a/bot/__init__.py b/bot/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bot/telegram_bot.py b/bot/telegram_bot.py new file mode 100644 index 0000000..e637054 --- /dev/null +++ b/bot/telegram_bot.py @@ -0,0 +1,108 @@ +"""Telegram bot for receiving knowledge inbox items.""" + +import logging +import os +import re + +from telegram import Update +from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters + +from core.queue_db import get_status_counts, insert_item + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", +) +logger = logging.getLogger(__name__) + + +def detect_type(text: str) -> str: + """Detect the input type of a user message. + + Args: + text: Raw message text from user. + + Returns: + One of 'youtube', 'url', 'text'. + """ + text = text.strip() + if re.search(r"youtube\.com/watch|youtu\.be/", text): + return "youtube" + if text.startswith(("http://", "https://")): + return "url" + return "text" + + +async def cmd_start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + """Handle /start command.""" + await update.message.reply_text( + "πŸ“š *Knowledge Inbox Bot*\n\n" + "λ‹€μŒμ„ μ „μ†‘ν•˜λ©΄ μžλ™μœΌλ‘œ μ²˜λ¦¬ν•˜μ—¬ Obsidian에 μ €μž₯ν•©λ‹ˆλ‹€:\n\n" + "β€’ *YouTube URL* β€” 트랜슀크립트 μΆ”μΆœ ν›„ μš”μ•½\n" + "β€’ *μ›Ή URL* β€” νŽ˜μ΄μ§€ λ‚΄μš© μΆ”μΆœ ν›„ μš”μ•½\n" + "β€’ *자유 ν…μŠ€νŠΈ* β€” κ·ΈλŒ€λ‘œ μ €μž₯ ν›„ νƒœκ·Έ μΆ”μΆœ\n\n" + "/status β€” 처리 ν˜„ν™© 쑰회", + parse_mode="Markdown", + ) + + +async def cmd_status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + """Handle /status command.""" + try: + counts = get_status_counts() + msg = ( + "πŸ“Š *처리 ν˜„ν™©*\n\n" + f"⏳ λŒ€κΈ°μ€‘: {counts.get('pending', 0)}\n" + f"πŸ”„ μ²˜λ¦¬μ€‘: {counts.get('processing', 0)}\n" + f"βœ… μ™„λ£Œ: {counts.get('done', 0)}\n" + f"❌ 였λ₯˜: {counts.get('error', 0)}" + ) + except Exception as exc: + logger.error("Status query failed: %s", exc) + msg = "❌ μƒνƒœ μ‘°νšŒμ— μ‹€νŒ¨ν–ˆμŠ΅λ‹ˆλ‹€." + await update.message.reply_text(msg, parse_mode="Markdown") + + +async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + """Handle all non-command messages.""" + text = update.message.text or "" + chat_id = str(update.effective_chat.id) + + if not text.strip(): + return + + input_type = detect_type(text) + type_labels = {"youtube": "YouTube", "url": "μ›ΉνŽ˜μ΄μ§€", "text": "ν…μŠ€νŠΈ"} + + try: + row_id = insert_item(input_type, text.strip(), chat_id) + label = type_labels[input_type] + await update.message.reply_text( + f"πŸ“₯ *{label}*이 큐에 μΆ”κ°€λμŠ΅λ‹ˆλ‹€.\n" + f"ID: `{row_id[:8]}`\n\n" + "처리 μ™„λ£Œ ν›„ Obsidian에 μ €μž₯λ©λ‹ˆλ‹€.", + parse_mode="Markdown", + ) + except Exception as exc: + logger.error("insert_item failed: %s", exc) + await update.message.reply_text("❌ μ €μž₯에 μ‹€νŒ¨ν–ˆμŠ΅λ‹ˆλ‹€. μž μ‹œ ν›„ λ‹€μ‹œ μ‹œλ„ν•΄μ£Όμ„Έμš”.") + + +def build_app() -> Application: + """Build and configure the Telegram Application. + + Returns: + Configured Application instance ready to run. + """ + token = os.environ["TELEGRAM_BOT_TOKEN"] + app = Application.builder().token(token).build() + app.add_handler(CommandHandler("start", cmd_start)) + app.add_handler(CommandHandler("status", cmd_status)) + app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)) + return app + + +if __name__ == "__main__": + from dotenv import load_dotenv + load_dotenv() + build_app().run_polling() diff --git a/core/__init__.py b/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/core/chunker.py b/core/chunker.py new file mode 100644 index 0000000..6c9f461 --- /dev/null +++ b/core/chunker.py @@ -0,0 +1,28 @@ +"""Simple sliding-window text chunking.""" + + +def chunk_text(text: str, size: int = 2000, overlap: int = 200) -> list[str]: + """Split text into overlapping chunks. + + Args: + text: The full text to split. + size: Maximum characters per chunk. + overlap: Characters of overlap between consecutive chunks. + + Returns: + List of text chunks. Returns single-item list for short text. + """ + if len(text) <= size: + return [text] + + chunks: list[str] = [] + step = size - overlap + start = 0 + while start < len(text): + end = start + size + chunks.append(text[start:end]) + if end >= len(text): + break + start += step + + return chunks diff --git a/core/enricher.py b/core/enricher.py new file mode 100644 index 0000000..293cde2 --- /dev/null +++ b/core/enricher.py @@ -0,0 +1,96 @@ +"""LLM-based content enrichment via OCI GenAI Gemini Flash.""" + +import json +import os +import re + +import oci +from oci.generative_ai_inference import GenerativeAiInferenceClient +from oci.generative_ai_inference.models import ( + ChatDetails, + GenericChatRequest, + OnDemandServingMode, + TextContent, + UserMessage, +) + +_PROMPT = """\ +You are a knowledge extraction assistant. Analyze the content below and return ONLY a valid JSON object with these fields: +- "title": concise descriptive title for this content (string) +- "summary": 3-5 sentence summary capturing key insights (string) +- "tags": list of 3-7 relevant keywords or topics (string[]) +- "author": author or creator name, or null if not found (string | null) +- "date": publication date in ISO 8601 format (YYYY-MM-DD), or null if not found (string | null) +- "content_type": one of "youtube", "article", "documentation", "news", "forum", "code", "other" (string) + +Content type: {content_type} +Source URL: {url} +Content: +{text} + +Return only the JSON object, no markdown, no explanation.""" + + +def _get_client() -> GenerativeAiInferenceClient: + config = oci.config.from_file() + return GenerativeAiInferenceClient( + config, + service_endpoint=os.environ["OCI_GENAI_ENDPOINT"], + ) + + +def enrich(content_type: str, title: str, url: str, text: str) -> dict: + """Extract structured metadata from content using Gemini Flash. + + Args: + content_type: One of 'youtube', 'url', 'text'. + title: Initial title hint (may be empty). + url: Source URL (empty for plain text). + text: The full content text to analyze. + + Returns: + Dict with keys: title, summary, tags, author, date, content_type. + Falls back to minimal defaults on LLM failure. + """ + prompt = _PROMPT.format( + content_type=content_type, + url=url or "(none)", + text=text[:6000], + ) + + try: + client = _get_client() + req = GenericChatRequest( + messages=[UserMessage(content=[TextContent(text=prompt)])], + max_tokens=1024, + temperature=0, + ) + det = ChatDetails( + compartment_id=os.environ["OCI_COMPARTMENT_ID"], + serving_mode=OnDemandServingMode(model_id=os.environ["OCI_CHAT_MODEL_ID"]), + chat_request=req, + ) + response = client.chat(det) + raw = response.data.chat_response.choices[0].message.content[0].text.strip() + raw = re.sub(r"^```(?:json)?\s*|\s*```$", "", raw, flags=re.MULTILINE) + metadata = json.loads(raw) + except Exception as exc: + metadata = { + "title": title or url or text[:80], + "summary": text[:300], + "tags": [], + "author": None, + "date": None, + "content_type": content_type, + "_error": str(exc), + } + + # Ensure required keys exist + metadata.setdefault("title", title or url or text[:80]) + metadata.setdefault("summary", "") + metadata.setdefault("tags", []) + metadata.setdefault("author", None) + metadata.setdefault("date", None) + metadata.setdefault("content_type", content_type) + + return metadata diff --git a/core/obsidian.py b/core/obsidian.py new file mode 100644 index 0000000..ddec6ad --- /dev/null +++ b/core/obsidian.py @@ -0,0 +1,86 @@ +"""Save processed knowledge items as Obsidian markdown notes.""" + +import os +import re +from datetime import datetime +from pathlib import Path + + +def _slugify(text: str, max_len: int = 50) -> str: + """Convert text to a filesystem-safe slug.""" + text = re.sub(r"[^\w\s-]", "", text, flags=re.UNICODE) + text = re.sub(r"[\s_]+", "-", text).strip("-") + return text[:max_len].lower() + + +def save_note( + content_type: str, + title: str, + summary: str, + body: str, + tags: list[str], + source_url: str = "", + author: str = "", + date: str = "", +) -> Path: + """Save a processed knowledge item as an Obsidian markdown file. + + Args: + content_type: One of 'youtube', 'url', 'text'. + title: The note title. + summary: LLM-generated summary. + body: Full content text. + tags: List of topic tags. + source_url: Original URL (empty for plain text). + author: Author name (may be empty). + date: Publication date in ISO 8601 format (may be empty). + + Returns: + Path of the created markdown file. + """ + vault = os.environ.get("OBSIDIAN_VAULT", "/Users/joungmin/Documents/Obsidian Vault") + today = datetime.now().strftime("%Y-%m-%d") + now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + slug = _slugify(title) or "untitled" + + # Determine subfolder by content type + subfolder_map = { + "youtube": "20 Sources/YouTube", + "url": "20 Sources/Web", + "text": "20 Sources/Notes", + } + subfolder = subfolder_map.get(content_type, "20 Sources/Notes") + note_dir = Path(vault) / subfolder + note_dir.mkdir(parents=True, exist_ok=True) + + filename = f"{today}-{slug}.md" + note_path = note_dir / filename + + # Build YAML frontmatter tags + tags_yaml = ", ".join(tags) if tags else "" + + content = f"""--- +title: {title} +source_type: {content_type} +url: {source_url} +author: {author} +date: {date} +tags: [{tags_yaml}] +created: {today} +--- + +# {title} + +## μš”μ•½ +{summary} + +## 원문 +{body} + +--- +*Source: {source_url}* +*Saved: {now_str}* +""" + + note_path.write_text(content, encoding="utf-8") + return note_path diff --git a/core/queue_db.py b/core/queue_db.py new file mode 100644 index 0000000..bf63083 --- /dev/null +++ b/core/queue_db.py @@ -0,0 +1,185 @@ +"""Oracle ADB connection pool and CRUD operations for knowledge_queue.""" + +import json +import os +from contextlib import contextmanager +from typing import Generator + +import oracledb + +_pool: oracledb.ConnectionPool | None = None + + +def _get_pool() -> oracledb.ConnectionPool: + """Return (or lazily create) the module-level connection pool.""" + global _pool + if _pool is None: + kwargs: dict = dict( + user=os.environ["ORACLE_USER"], + password=os.environ["ORACLE_PASSWORD"], + dsn=os.environ["ORACLE_DSN"], + min=1, + max=5, + increment=1, + ) + wallet = os.environ.get("ORACLE_WALLET") + if wallet: + kwargs["config_dir"] = wallet # tnsnames.ora + cwallet.sso live here + _pool = oracledb.create_pool(**kwargs) + return _pool + + +@contextmanager +def _conn() -> Generator[oracledb.Connection, None, None]: + """Context manager that acquires and releases a pooled connection.""" + pool = _get_pool() + conn = pool.acquire() + try: + yield conn + conn.commit() + except Exception: + conn.rollback() + raise + finally: + pool.release(conn) + + +def insert_item(input_type: str, content: str, chat_id: str = "") -> str: + """Insert a new queue item and return its generated UUID. + + Args: + input_type: One of 'youtube', 'url', 'text'. + content: The URL or raw text to process. + chat_id: Telegram chat ID for future notification support. + + Returns: + The UUID of the newly inserted row. + """ + sql = """ + INSERT INTO knowledge_queue (input_type, content, telegram_chat_id) + VALUES (:input_type, :content, :chat_id) + RETURNING id INTO :out_id + """ + with _conn() as conn: + cursor = conn.cursor() + out_id_var = cursor.var(oracledb.STRING) + cursor.execute( + sql, + { + "input_type": input_type, + "content": content, + "chat_id": chat_id, + "out_id": out_id_var, + }, + ) + return out_id_var.getvalue()[0] + + +def fetch_pending(limit: int = 5) -> list[dict]: + """Fetch oldest pending items up to limit. + + Args: + limit: Maximum number of rows to return. + + Returns: + List of dicts with keys: id, input_type, content, telegram_chat_id. + """ + sql = """ + SELECT id, input_type, content, telegram_chat_id + FROM knowledge_queue + WHERE status = 'pending' + ORDER BY created_at + FETCH FIRST :n ROWS ONLY + """ + with _conn() as conn: + cursor = conn.cursor() + cursor.execute(sql, {"n": limit}) + rows = cursor.fetchall() + + return [ + { + "id": row[0], + "input_type": row[1], + "content": row[2].read() if hasattr(row[2], "read") else row[2], + "telegram_chat_id": row[3], + } + for row in rows + ] + + +def set_processing(row_id: str) -> None: + """Mark a queue item as processing. + + Args: + row_id: The UUID of the row to update. + """ + sql = """ + UPDATE knowledge_queue + SET status = 'processing', updated_at = SYSTIMESTAMP + WHERE id = :id + """ + with _conn() as conn: + conn.cursor().execute(sql, {"id": row_id}) + + +def set_done(row_id: str, title: str, metadata: dict) -> None: + """Mark a queue item as done with extracted metadata. + + Args: + row_id: The UUID of the row to update. + title: LLM-extracted title. + metadata: Dict of enrichment results to store as JSON. + """ + sql = """ + UPDATE knowledge_queue + SET status = 'done', + title = :title, + metadata_json = :meta_json, + updated_at = SYSTIMESTAMP + WHERE id = :id + """ + with _conn() as conn: + conn.cursor().execute( + sql, + { + "id": row_id, + "title": title[:500] if title else "", + "meta_json": json.dumps(metadata, ensure_ascii=False), + }, + ) + + +def set_error(row_id: str, error_msg: str) -> None: + """Mark a queue item as error with a message. + + Args: + row_id: The UUID of the row to update. + error_msg: Description of the error. + """ + sql = """ + UPDATE knowledge_queue + SET status = 'error', error_msg = :error_msg, updated_at = SYSTIMESTAMP + WHERE id = :id + """ + with _conn() as conn: + conn.cursor().execute(sql, {"id": row_id, "error_msg": error_msg}) + + +def get_status_counts() -> dict: + """Return count of rows per status. + + Returns: + Dict like {'pending': 3, 'processing': 1, 'done': 42, 'error': 0}. + """ + sql = """ + SELECT status, COUNT(*) FROM knowledge_queue GROUP BY status + """ + with _conn() as conn: + cursor = conn.cursor() + cursor.execute(sql) + rows = cursor.fetchall() + + counts = {"pending": 0, "processing": 0, "done": 0, "error": 0} + for status, count in rows: + counts[status] = count + return counts diff --git a/core/vector.py b/core/vector.py new file mode 100644 index 0000000..8c4ff4f --- /dev/null +++ b/core/vector.py @@ -0,0 +1,114 @@ +"""Embedding generation and Oracle vector store insertion.""" + +import array +import os +from contextlib import contextmanager +from typing import Generator + +import oci +import oracledb +from oci.generative_ai_inference import GenerativeAiInferenceClient +from oci.generative_ai_inference.models import ( + EmbedTextDetails, + OnDemandServingMode, +) + +# Reuse same pool as queue_db but connect to same ADB instance +_pool: oracledb.ConnectionPool | None = None + + +def _get_pool() -> oracledb.ConnectionPool: + """Return (or lazily create) the module-level connection pool.""" + global _pool + if _pool is None: + kwargs: dict = dict( + user=os.environ["ORACLE_USER"], + password=os.environ["ORACLE_PASSWORD"], + dsn=os.environ["ORACLE_DSN"], + min=1, + max=5, + increment=1, + ) + wallet = os.environ.get("ORACLE_WALLET") + if wallet: + kwargs["config_dir"] = wallet + _pool = oracledb.create_pool(**kwargs) + return _pool + + +@contextmanager +def _conn() -> Generator[oracledb.Connection, None, None]: + """Context manager that acquires and releases a pooled connection.""" + pool = _get_pool() + conn = pool.acquire() + try: + yield conn + conn.commit() + except Exception: + conn.rollback() + raise + finally: + pool.release(conn) + + +def _to_vector_param(embedding: list[float]) -> array.array: + return array.array("f", embedding) + + +def _embed_texts(texts: list[str]) -> list[list[float]]: + """Generate embeddings using Cohere Embed v4 via OCI GenAI.""" + config = oci.config.from_file() + client = GenerativeAiInferenceClient( + config, + service_endpoint=os.environ["OCI_GENAI_ENDPOINT"], + ) + model_id = os.environ.get("OCI_EMBED_MODEL_ID", "cohere.embed-v4.0") + compartment_id = os.environ["OCI_COMPARTMENT_ID"] + + details = EmbedTextDetails( + inputs=texts, + serving_mode=OnDemandServingMode(model_id=model_id), + compartment_id=compartment_id, + input_type="SEARCH_DOCUMENT", + ) + response = client.embed_text(details) + return response.data.embeddings + + +def save_to_vector(doc_id: str, chunks: list[str]) -> list[str]: + """Embed chunks and insert them into the Oracle vector store. + + Args: + doc_id: Document identifier (e.g. 'youtube:abc12345'). + chunks: List of text chunks to embed and store. + + Returns: + List of inserted row UUIDs. + """ + if not chunks: + return [] + + embeddings = _embed_texts(chunks) + inserted_ids: list[str] = [] + + sql = """ + INSERT INTO vector_store (doc_id, chunk_text, embedding) + VALUES (:doc_id, :chunk_text, :embedding) + RETURNING id INTO :out_id + """ + with _conn() as conn: + cursor = conn.cursor() + for chunk, embedding in zip(chunks, embeddings): + out_id_var = cursor.var(oracledb.STRING) + cursor.execute( + sql, + { + "doc_id": doc_id, + "chunk_text": chunk, + "embedding": _to_vector_param(embedding), + "out_id": out_id_var, + }, + ) + inserted_ids.append(out_id_var.getvalue()[0]) + + return inserted_ids diff --git a/core/web.py b/core/web.py new file mode 100644 index 0000000..d99a0d0 --- /dev/null +++ b/core/web.py @@ -0,0 +1,59 @@ +"""URL fetching and HTML-to-text extraction.""" + +import re +from html.parser import HTMLParser + +import httpx + + +class _TextExtractor(HTMLParser): + _SKIP_TAGS = {"script", "style", "head", "nav", "footer", "noscript"} + + def __init__(self) -> None: + super().__init__() + self._buf: list[str] = [] + self._skip = 0 + + def handle_starttag(self, tag: str, attrs: list) -> None: + if tag in self._SKIP_TAGS: + self._skip += 1 + + def handle_endtag(self, tag: str) -> None: + if tag in self._SKIP_TAGS and self._skip: + self._skip -= 1 + + def handle_data(self, data: str) -> None: + if not self._skip: + text = data.strip() + if text: + self._buf.append(text) + + def get_text(self) -> str: + return " ".join(self._buf) + + +def _html_to_text(html: str) -> str: + parser = _TextExtractor() + parser.feed(html) + return re.sub(r"\s{3,}", " ", parser.get_text()) + + +_HEADERS = {"User-Agent": "Mozilla/5.0 (compatible; knowledge-inbox/1.0)"} + + +def fetch_page_text(url: str, max_chars: int = 8000) -> str: + """Fetch a URL and return stripped plain text, truncated to max_chars. + + Args: + url: The URL to fetch. + max_chars: Maximum characters to return. + + Returns: + Extracted plain text, or empty string on failure. + """ + try: + r = httpx.get(url, timeout=15, follow_redirects=True, headers=_HEADERS) + r.raise_for_status() + return _html_to_text(r.text)[:max_chars] + except Exception: + return "" diff --git a/core/youtube.py b/core/youtube.py new file mode 100644 index 0000000..0d2f0fe --- /dev/null +++ b/core/youtube.py @@ -0,0 +1,50 @@ +"""YouTube transcript extraction via youtube-transcript-api.""" + +import re + +from youtube_transcript_api import YouTubeTranscriptApi + + +def _extract_video_id(url: str) -> str: + """Extract YouTube video ID from a URL. + + Args: + url: YouTube URL (watch?v= or youtu.be/ formats). + + Returns: + The video ID string. + + Raises: + ValueError: If no video ID can be found in the URL. + """ + match = re.search(r"(?:v=|youtu\.be/)([^&?/\s]+)", url) + if not match: + raise ValueError(f"Cannot extract video ID from URL: {url}") + return match.group(1) + + +def get_transcript(url: str) -> dict: + """Fetch transcript text for a YouTube video. + + Args: + url: YouTube video URL. + + Returns: + Dict with keys: video_id, title, text, url. + title falls back to video_id if unavailable. + """ + video_id = _extract_video_id(url) + + fetched = YouTubeTranscriptApi.fetch(video_id, languages=["ko", "en"]) + segments = list(fetched) + text = " ".join(seg.text for seg in segments) + + # Try to get title from fetched transcript metadata + title = getattr(fetched, "title", None) or video_id + + return { + "video_id": video_id, + "title": title, + "text": text, + "url": url, + } diff --git a/daemon/__init__.py b/daemon/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/daemon/worker.py b/daemon/worker.py new file mode 100644 index 0000000..18190fd --- /dev/null +++ b/daemon/worker.py @@ -0,0 +1,108 @@ +"""Polling daemon that processes knowledge_queue items.""" + +import logging +import os +import time + +from core.chunker import chunk_text +from core.enricher import enrich +from core.obsidian import save_note +from core.queue_db import fetch_pending, set_done, set_error, set_processing +from core.vector import save_to_vector +from core.web import fetch_page_text +from core.youtube import get_transcript + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", +) +logger = logging.getLogger(__name__) + + +def process_item(item: dict) -> None: + """Process a single queue item end-to-end. + + Args: + item: Dict from fetch_pending() with keys: id, input_type, content. + """ + row_id = item["id"] + input_type = item["input_type"] + content = item["content"] + + set_processing(row_id) + logger.info("Processing %s [%s]", row_id[:8], input_type) + + try: + url = "" + yt_title = "" + + if input_type == "youtube": + result = get_transcript(content) + text = result["text"] + url = content + yt_title = result["title"] + elif input_type == "url": + text = fetch_page_text(content) + url = content + else: # text + text = content + + if not text: + raise ValueError("No text content extracted") + + meta = enrich(input_type, yt_title, url, text) + title = meta.get("title") or yt_title or url or row_id[:8] + + note_path = save_note( + content_type=input_type, + title=title, + summary=meta.get("summary", ""), + body=text, + tags=meta.get("tags", []), + source_url=url, + author=meta.get("author") or "", + date=meta.get("date") or "", + ) + logger.info("Obsidian note saved: %s", note_path) + + chunks = chunk_text(text) + doc_id = f"{input_type}:{row_id[:8]}" + inserted = save_to_vector(doc_id, chunks) + logger.info("Vector store: inserted %d chunks for doc_id=%s", len(inserted), doc_id) + + set_done(row_id, title, meta) + logger.info("Done: %s β†’ %s", row_id[:8], title[:60]) + + except Exception as exc: + logger.error("Error processing %s: %s", row_id[:8], exc, exc_info=True) + set_error(row_id, str(exc)) + + +def run_loop(interval: int = 30) -> None: + """Poll for pending items indefinitely. + + Args: + interval: Seconds to sleep between polling cycles. + """ + interval = int(os.environ.get("DAEMON_INTERVAL", interval)) + logger.info("Daemon started (interval=%ds)", interval) + + while True: + try: + items = fetch_pending(limit=5) + if items: + logger.info("Found %d pending item(s)", len(items)) + for item in items: + process_item(item) + else: + logger.debug("No pending items") + except Exception as exc: + logger.error("Polling error: %s", exc, exc_info=True) + + time.sleep(interval) + + +if __name__ == "__main__": + from dotenv import load_dotenv + load_dotenv() + run_loop() diff --git a/main.py b/main.py new file mode 100644 index 0000000..5d82525 --- /dev/null +++ b/main.py @@ -0,0 +1,23 @@ +"""Main entry point: starts daemon thread + Telegram bot.""" + +import threading + +from dotenv import load_dotenv + +load_dotenv() + +from bot.telegram_bot import build_app +from daemon.worker import run_loop + + +def main() -> None: + """Start the daemon in a background thread, then run the bot (blocking).""" + t = threading.Thread(target=run_loop, args=(30,), daemon=True) + t.start() + + app = build_app() + app.run_polling() + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..8be9e56 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,19 @@ +[project] +name = "knowledge-inbox" +version = "0.1.0" +requires-python = ">=3.11" +dependencies = [ + "python-telegram-bot>=21.0", + "youtube-transcript-api>=0.6", + "httpx>=0.27", + "oracledb>=2.0", + "oci>=2.100", + "python-dotenv>=1.0", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["bot", "core", "daemon"] diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..2f28839 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +python-telegram-bot>=21.0 +youtube-transcript-api>=0.6 +httpx>=0.27 +oracledb>=2.0 +oci>=2.100 +python-dotenv>=1.0 diff --git a/sql/schema.sql b/sql/schema.sql new file mode 100644 index 0000000..5418fd9 --- /dev/null +++ b/sql/schema.sql @@ -0,0 +1,16 @@ +-- knowledge_queue table for processing pipeline +CREATE TABLE knowledge_queue ( + id VARCHAR2(36) DEFAULT SYS_GUID() PRIMARY KEY, + input_type VARCHAR2(20) NOT NULL, -- 'youtube' | 'url' | 'text' + content CLOB NOT NULL, -- URL or raw text + status VARCHAR2(20) DEFAULT 'pending' NOT NULL, + -- pending | processing | done | error + title VARCHAR2(500), -- LLM-extracted title (after processing) + error_msg CLOB, + metadata_json CLOB, -- JSON: summary, tags, author, etc. + telegram_chat_id VARCHAR2(50), -- for future notification support + created_at TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL, + updated_at TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL +); + +CREATE INDEX idx_kq_status ON knowledge_queue (status, created_at);