Files
knowledge-inbox/daemon/worker.py
joungmin d0c2aa3857 feat: replace single-pass enricher with 4-step pipeline
Upgrades content processing from a single LLM call to a structured
5-step document reconstruction pipeline:

  1. Normalize  — 구어체 정제, 문장부호 복원, 핵심 엔티티 추출
  2. Index Tree — 텍스트 전체 스캔 → 계층적 목차(JSON) 생성
  3. Leaf Summarize — 섹션별 상세 요약 (context overlap 300자 적용)
  4. Consistency Check — 누락 엔티티 검증 및 보완
  5. Assemble — 최종 Markdown 문서 조립 (LLM 불필요)

- Short texts (< 3000 chars): simple 1-pass fallback
- Long texts: full pipeline (N+4 LLM calls where N = section count)
- worker.py: uses body_md from enricher as Obsidian note body

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-02 18:02:00 +09:00

119 lines
3.6 KiB
Python

"""Polling daemon that processes knowledge_queue items."""
import logging
import os
import time
from core.anki import add_vocab_cards
from core.chunker import chunk_text
from core.enricher import enrich
from core.obsidian import save_note
from core.queue_db import fetch_pending, set_done, set_error, set_processing
from core.vector import save_to_vector
from core.vocab import extract_vocab
from core.web import fetch_page_text
from core.youtube import get_transcript
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
def process_item(item: dict) -> None:
"""Process a single queue item end-to-end.
Args:
item: Dict from fetch_pending() with keys: id, input_type, content.
"""
row_id = item["id"]
input_type = item["input_type"]
content = item["content"]
set_processing(row_id)
logger.info("Processing %s [%s]", row_id[:8], input_type)
try:
url = ""
yt_title = ""
if input_type == "youtube":
result = get_transcript(content)
text = result["text"]
url = content
yt_title = result["title"]
elif input_type == "url":
text = fetch_page_text(content)
url = content
else: # text
text = content
if not text:
raise ValueError("No text content extracted")
meta = enrich(input_type, yt_title, url, text)
title = meta.get("title") or yt_title or url or row_id[:8]
# body_md: 4단계 파이프라인이 생성한 구조화 문서 (없으면 원문 폴백)
note_path = save_note(
content_type=input_type,
title=title,
summary=meta.get("summary", ""),
body=meta.get("body_md") or text,
tags=meta.get("tags", []),
source_url=url,
author=meta.get("author") or "",
date=meta.get("date") or "",
summary_ko=meta.get("summary_ko", ""),
)
logger.info("Obsidian note saved: %s", note_path)
chunks = chunk_text(text)
doc_id = f"{input_type}:{row_id[:8]}"
inserted = save_to_vector(doc_id, chunks)
logger.info("Vector store: inserted %d chunks for doc_id=%s", len(inserted), doc_id)
# Add Anki vocabulary cards for English content
if meta.get("language", "").startswith("en"):
vocab = extract_vocab(text, title)
if vocab:
add_vocab_cards(vocab, source_title=title)
set_done(row_id, title, meta)
logger.info("Done: %s%s", row_id[:8], title[:60])
except Exception as exc:
logger.error("Error processing %s: %s", row_id[:8], exc, exc_info=True)
set_error(row_id, str(exc))
def run_loop(interval: int = 30) -> None:
"""Poll for pending items indefinitely.
Args:
interval: Seconds to sleep between polling cycles.
"""
interval = int(os.environ.get("DAEMON_INTERVAL", interval))
logger.info("Daemon started (interval=%ds)", interval)
while True:
try:
items = fetch_pending(limit=5)
if items:
logger.info("Found %d pending item(s)", len(items))
for item in items:
process_item(item)
else:
logger.debug("No pending items")
except Exception as exc:
logger.error("Polling error: %s", exc, exc_info=True)
time.sleep(interval)
if __name__ == "__main__":
from dotenv import load_dotenv
load_dotenv()
run_loop()