Files
knowledge-inbox/bot/telegram_bot.py
joungmin 86a4104ae3 feat: initial knowledge-inbox pipeline implementation
- Oracle ADB queue table (sql/schema.sql)
- Queue CRUD: core/queue_db.py
- YouTube transcript: core/youtube.py
- Web page fetch: core/web.py
- LLM enrichment via OCI GenAI Gemini Flash: core/enricher.py
- Text chunker: core/chunker.py
- Obsidian note writer: core/obsidian.py
- Oracle vector store insertion: core/vector.py
- Polling daemon: daemon/worker.py
- Telegram bot: bot/telegram_bot.py
- Main runner: main.py
2026-02-28 08:16:11 +09:00

109 lines
3.5 KiB
Python

"""Telegram bot for receiving knowledge inbox items."""
import logging
import os
import re
from telegram import Update
from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters
from core.queue_db import get_status_counts, insert_item
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
def detect_type(text: str) -> str:
"""Detect the input type of a user message.
Args:
text: Raw message text from user.
Returns:
One of 'youtube', 'url', 'text'.
"""
text = text.strip()
if re.search(r"youtube\.com/watch|youtu\.be/", text):
return "youtube"
if text.startswith(("http://", "https://")):
return "url"
return "text"
async def cmd_start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle /start command."""
await update.message.reply_text(
"📚 *Knowledge Inbox Bot*\n\n"
"다음을 전송하면 자동으로 처리하여 Obsidian에 저장합니다:\n\n"
"• *YouTube URL* — 트랜스크립트 추출 후 요약\n"
"• *웹 URL* — 페이지 내용 추출 후 요약\n"
"• *자유 텍스트* — 그대로 저장 후 태그 추출\n\n"
"/status — 처리 현황 조회",
parse_mode="Markdown",
)
async def cmd_status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle /status command."""
try:
counts = get_status_counts()
msg = (
"📊 *처리 현황*\n\n"
f"⏳ 대기중: {counts.get('pending', 0)}\n"
f"🔄 처리중: {counts.get('processing', 0)}\n"
f"✅ 완료: {counts.get('done', 0)}\n"
f"❌ 오류: {counts.get('error', 0)}"
)
except Exception as exc:
logger.error("Status query failed: %s", exc)
msg = "❌ 상태 조회에 실패했습니다."
await update.message.reply_text(msg, parse_mode="Markdown")
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle all non-command messages."""
text = update.message.text or ""
chat_id = str(update.effective_chat.id)
if not text.strip():
return
input_type = detect_type(text)
type_labels = {"youtube": "YouTube", "url": "웹페이지", "text": "텍스트"}
try:
row_id = insert_item(input_type, text.strip(), chat_id)
label = type_labels[input_type]
await update.message.reply_text(
f"📥 *{label}*이 큐에 추가됐습니다.\n"
f"ID: `{row_id[:8]}`\n\n"
"처리 완료 후 Obsidian에 저장됩니다.",
parse_mode="Markdown",
)
except Exception as exc:
logger.error("insert_item failed: %s", exc)
await update.message.reply_text("❌ 저장에 실패했습니다. 잠시 후 다시 시도해주세요.")
def build_app() -> Application:
"""Build and configure the Telegram Application.
Returns:
Configured Application instance ready to run.
"""
token = os.environ["TELEGRAM_BOT_TOKEN"]
app = Application.builder().token(token).build()
app.add_handler(CommandHandler("start", cmd_start))
app.add_handler(CommandHandler("status", cmd_status))
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
return app
if __name__ == "__main__":
from dotenv import load_dotenv
load_dotenv()
build_app().run_polling()