feat: replace single-pass enricher with 4-step pipeline

Upgrades content processing from a single LLM call to a structured
5-step document reconstruction pipeline:

  1. Normalize  — 구어체 정제, 문장부호 복원, 핵심 엔티티 추출
  2. Index Tree — 텍스트 전체 스캔 → 계층적 목차(JSON) 생성
  3. Leaf Summarize — 섹션별 상세 요약 (context overlap 300자 적용)
  4. Consistency Check — 누락 엔티티 검증 및 보완
  5. Assemble — 최종 Markdown 문서 조립 (LLM 불필요)

- Short texts (< 3000 chars): simple 1-pass fallback
- Long texts: full pipeline (N+4 LLM calls where N = section count)
- worker.py: uses body_md from enricher as Obsidian note body

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
joungmin
2026-03-02 18:02:00 +09:00
parent 128fde3ad6
commit d0c2aa3857
2 changed files with 479 additions and 68 deletions

View File

@@ -1,6 +1,19 @@
"""LLM-based content enrichment via OCI GenAI Gemini Flash.""" """LLM-based content enrichment via OCI GenAI Gemini Flash.
4단계 파이프라인:
1. Normalize — 구어체 정제 + 핵심 엔티티 추출
2. Index Tree — 계층적 목차(JSON) 생성
3. Leaf Summarize — 섹션별 상세 요약 (context overlap 적용)
4. Consistency — 엔티티 누락 검증 및 보완
5. Assemble — 최종 Markdown 문서 조립 (LLM 불필요)
짧은 텍스트(< 3000자)는 단순 1-pass 처리로 폴백.
"""
from __future__ import annotations
import json import json
import logging
import os import os
import re import re
@@ -14,16 +27,322 @@ from oci.generative_ai_inference.models import (
UserMessage, UserMessage,
) )
_PROMPT = """\ logger = logging.getLogger(__name__)
You are a knowledge extraction assistant. Analyze the content below and return ONLY a valid JSON object with these fields:
- "title": concise descriptive title for this content (string) # 텍스트 길이 임계값
- "summary": 3-5 sentence summary capturing key insights, written in English (string) _SHORT_THRESHOLD = 3_000 # 이하면 1-pass로 처리
- "summary_ko": the same summary translated into Korean (string) _SECTION_SIZE = 4_000 # 섹션별 청킹 크기
- "tags": list of 3-7 relevant keywords or topics (string[]) _OVERLAP = 300 # 인접 섹션 컨텍스트 오버랩
- "author": author or creator name, or null if not found (string | null)
- "date": publication date in ISO 8601 format (YYYY-MM-DD), or null if not found (string | null)
- "content_type": one of "youtube", "article", "documentation", "news", "forum", "code", "other" (string) # ── LLM 헬퍼 ─────────────────────────────────────────────────────────────────
- "language": primary language of the content, ISO 639-1 code, e.g. "en", "ko", "ja" (string)
def _get_client() -> GenerativeAiInferenceClient:
config = oci.config.from_file()
endpoint = os.environ.get("OCI_CHAT_ENDPOINT") or os.environ["OCI_GENAI_ENDPOINT"]
return GenerativeAiInferenceClient(config, service_endpoint=endpoint)
def _llm(prompt: str, max_tokens: int = 2048) -> str:
"""단일 LLM 호출. 응답 텍스트 반환."""
client = _get_client()
req = GenericChatRequest(
messages=[UserMessage(content=[TextContent(text=prompt)])],
max_tokens=max_tokens,
temperature=0,
)
det = ChatDetails(
compartment_id=os.environ["OCI_COMPARTMENT_ID"],
serving_mode=OnDemandServingMode(model_id=os.environ["OCI_CHAT_MODEL_ID"]),
chat_request=req,
)
resp = client.chat(det)
return resp.data.chat_response.choices[0].message.content[0].text.strip()
def _parse_json(raw: str) -> dict | list:
"""LLM 응답에서 JSON 파싱. 마크다운 코드블록 제거 후 시도."""
raw = re.sub(r"^```(?:json)?\s*|\s*```$", "", raw, flags=re.MULTILINE).strip()
return json.loads(raw)
# ── Step 1: Normalize ─────────────────────────────────────────────────────────
def _normalize(text: str) -> tuple[str, list[str]]:
"""구어체 정제 + 핵심 엔티티 추출.
긴 텍스트는 4000자 청크로 분할 처리 후 합산.
Returns:
(정제된 텍스트, 엔티티 리스트)
"""
from .chunker import chunk_text
chunks = chunk_text(text, size=_SECTION_SIZE, overlap=0)
normalized_parts: list[str] = []
all_entities: list[str] = []
for i, chunk in enumerate(chunks):
prompt = f"""\
다음 텍스트를 아래 두 가지 작업으로 처리하세요.
[작업 1 - 정제]
- 구어체·추임새(음, 어, 그래서, 뭔가, 이제...)를 제거
- 중복 표현 압축
- 문장 부호(마침표, 물음표, 줄바꿈)를 복원해 읽기 좋은 문어체/보고서체로 변환
- 단, 정보값(수치·고유명사·전문용어)은 절대 삭제하지 말 것
[작업 2 - 엔티티 추출]
- 고유명사, 전문 용어, 수치, 날짜, 조직명 등 핵심 엔티티를 리스트업
JSON으로만 응답 (설명 없이):
{{"normalized": "정제된 텍스트", "entities": ["엔티티1", ...]}}
원문 (청크 {i+1}/{len(chunks)}):
{chunk}"""
try:
result = _parse_json(_llm(prompt, max_tokens=_SECTION_SIZE + 512))
normalized_parts.append(result.get("normalized", chunk))
all_entities.extend(result.get("entities", []))
except Exception as e:
logger.warning("Normalize chunk %d failed: %s", i, e)
normalized_parts.append(chunk)
# 엔티티 중복 제거 (순서 유지)
seen: set[str] = set()
unique_entities = [e for e in all_entities if not (e in seen or seen.add(e))]
return "\n\n".join(normalized_parts), unique_entities
# ── Step 2: Index Tree ────────────────────────────────────────────────────────
def _build_index_tree(text: str) -> list[dict]:
"""텍스트 전체를 스캔해 계층적 목차(JSON) 생성.
Returns:
[{"title": str, "level": int, "children": [...]}]
"""
# 긴 텍스트는 첫 8000자 + 마지막 1000자로 구조 파악
sample = text[:8_000]
if len(text) > 9_000:
sample += "\n...(중략)...\n" + text[-1_000:]
prompt = f"""\
다음 텍스트를 분석해 계층적 목차를 만드세요.
규칙:
- 텍스트의 주제 흐름을 파악해 대목차(level 1) > 중목차(level 2) > 소목차(level 3) 구조로 분류
- 각 섹션 제목은 명사형으로 간결하게 작성
- 소목차는 실제 내용이 있을 때만 생성 (과도한 세분화 금지)
- 전체 섹션 수는 3~10개 권장
JSON 배열로만 응답:
[
{{"title": "대목차 제목", "level": 1, "children": [
{{"title": "중목차", "level": 2, "children": [
{{"title": "소목차", "level": 3, "children": []}}
]}}
]}}
]
텍스트:
{sample}"""
try:
tree = _parse_json(_llm(prompt, max_tokens=1024))
if isinstance(tree, list):
return tree
except Exception as e:
logger.warning("Index tree build failed: %s", e)
# 폴백: 단일 섹션
return [{"title": "본문", "level": 1, "children": []}]
# ── Step 3: Leaf Summarize ────────────────────────────────────────────────────
def _collect_leaves(tree: list[dict]) -> list[dict]:
"""트리에서 말단(leaf) 노드만 수집."""
leaves: list[dict] = []
for node in tree:
if node.get("children"):
leaves.extend(_collect_leaves(node["children"]))
else:
leaves.append(node)
return leaves
def _split_by_sections(text: str, leaves: list[dict]) -> list[str]:
"""텍스트를 섹션 수에 맞게 균등 분할 (context overlap 포함).
Returns:
섹션별 텍스트 리스트 (인접 섹션 앞뒤 OVERLAP자 포함)
"""
n = len(leaves)
if n == 0:
return []
total = len(text)
base = total // n
sections: list[str] = []
for i in range(n):
start = max(0, i * base - _OVERLAP)
end = min(total, (i + 1) * base + _OVERLAP)
sections.append(text[start:end])
return sections
def _summarize_leaves(
text: str,
tree: list[dict],
content_type: str,
) -> list[dict]:
"""말단 섹션별로 상세 요약 생성 (context overlap 적용).
Returns:
[{"title": str, "summary": str}, ...]
"""
leaves = _collect_leaves(tree)
sections = _split_by_sections(text, leaves)
results: list[dict] = []
for i, (leaf, section_text) in enumerate(zip(leaves, sections)):
prompt = f"""\
다음은 "{leaf['title']}" 섹션에 해당하는 내용입니다.
(앞뒤 섹션과 {_OVERLAP}자 컨텍스트가 포함되어 있음)
작성 규칙:
- 해당 섹션의 핵심 내용을 상세하게 요약 (3~7문장)
- 수치·사례·고유명사는 반드시 포함
- 문어체 보고서 형식으로 작성
- 마크다운 볼드(**) 활용해 핵심어 강조
내용:
{section_text}
"{leaf['title']}" 섹션 상세 요약:"""
try:
summary = _llm(prompt, max_tokens=800)
results.append({"title": leaf["title"], "summary": summary})
logger.debug("Section %d/%d summarized: %s", i + 1, len(leaves), leaf["title"])
except Exception as e:
logger.warning("Leaf summarize failed [%s]: %s", leaf["title"], e)
results.append({"title": leaf["title"], "summary": section_text[:400] + "..."})
return results
# ── Step 4: Consistency Check ─────────────────────────────────────────────────
def _consistency_check(
assembled: str,
entities: list[str],
) -> str:
"""엔티티 누락 검증 — 빠진 핵심 정보를 문서에 보완.
엔티티가 없거나 문서가 짧으면 스킵.
"""
if not entities or len(assembled) < 200:
return assembled
# 엔티티 중 assembled에 없는 것만 추려서 검증 비용 절감
missing = [e for e in entities if e not in assembled]
if not missing:
logger.debug("Consistency check: all %d entities present.", len(entities))
return assembled
prompt = f"""\
아래 문서에 다음 핵심 엔티티들이 언급되지 않았습니다.
각 엔티티를 가장 적합한 섹션에 자연스럽게 추가하세요.
전체 문서 구조(제목, 섹션)는 절대 변경하지 마세요.
누락 엔티티:
{json.dumps(missing, ensure_ascii=False)}
문서:
{assembled}
수정된 문서 전체를 그대로 출력하세요:"""
try:
return _llm(prompt, max_tokens=len(assembled) + 512)
except Exception as e:
logger.warning("Consistency check failed: %s", e)
return assembled
# ── Step 5: Assemble ──────────────────────────────────────────────────────────
def _assemble(
overall_summary: str,
tree: list[dict],
sections: list[dict],
entities: list[str],
) -> str:
"""목차 구조 + 섹션 요약 → 최종 Markdown 문서 조립."""
lines: list[str] = []
# 전체 요약
lines.append("## 요약")
lines.append(overall_summary)
lines.append("")
# 목차
lines.append("## 목차")
for node in tree:
indent = " " * (node["level"] - 1)
lines.append(f"{indent}- {node['title']}")
for child in node.get("children", []):
indent2 = " " * (child["level"] - 1)
lines.append(f"{indent2}- {child['title']}")
lines.append("")
# 섹션별 상세
heading = {1: "#", 2: "##", 3: "###"}
for section in sections:
lvl = _find_level(tree, section["title"])
h = heading.get(lvl, "##")
lines.append(f"{h} {section['title']}")
lines.append(section["summary"])
lines.append("")
# 핵심 엔티티
if entities:
lines.append("## 핵심 키워드")
lines.append(", ".join(f"`{e}`" for e in entities[:20]))
lines.append("")
return "\n".join(lines)
def _find_level(tree: list[dict], title: str, default: int = 2) -> int:
"""트리에서 title에 해당하는 level 반환."""
for node in tree:
if node["title"] == title:
return node["level"]
found = _find_level(node.get("children", []), title, default)
if found != default:
return found
return default
# ── 단순 1-pass 처리 (짧은 텍스트) ────────────────────────────────────────────
_SIMPLE_PROMPT = """\
You are a knowledge extraction assistant. Analyze the content below and return ONLY a valid JSON object:
- "title": concise descriptive title (string)
- "summary": 3-5 sentence summary of key insights (string)
- "summary_ko": same summary in Korean (string)
- "body_md": well-structured Markdown document with proper headings and bullet points (string)
- "tags": 3-7 relevant keywords (string[])
- "author": author name or null (string | null)
- "date": publication date YYYY-MM-DD or null (string | null)
- "content_type": one of youtube/article/documentation/news/forum/code/other (string)
- "language": ISO 639-1 code e.g. "en", "ko" (string)
Content type: {content_type} Content type: {content_type}
Source URL: {url} Source URL: {url}
@@ -33,67 +352,158 @@ Content:
Return only the JSON object, no markdown, no explanation.""" Return only the JSON object, no markdown, no explanation."""
def _get_client() -> GenerativeAiInferenceClient: def _simple_enrich(content_type: str, title: str, url: str, text: str) -> dict:
config = oci.config.from_file() """짧은 텍스트용 단순 1-pass 처리."""
# Gemini models live in us-ashburn-1; use OCI_CHAT_ENDPOINT if set, prompt = _SIMPLE_PROMPT.format(
# otherwise fall back to OCI_GENAI_ENDPOINT.
endpoint = os.environ.get("OCI_CHAT_ENDPOINT") or os.environ["OCI_GENAI_ENDPOINT"]
return GenerativeAiInferenceClient(config, service_endpoint=endpoint)
def enrich(content_type: str, title: str, url: str, text: str) -> dict:
"""Extract structured metadata from content using Gemini Flash.
Args:
content_type: One of 'youtube', 'url', 'text'.
title: Initial title hint (may be empty).
url: Source URL (empty for plain text).
text: The full content text to analyze.
Returns:
Dict with keys: title, summary, tags, author, date, content_type.
Falls back to minimal defaults on LLM failure.
"""
prompt = _PROMPT.format(
content_type=content_type, content_type=content_type,
url=url or "(none)", url=url or "(none)",
text=text[:6000], text=text[:6000],
) )
raw = _llm(prompt, max_tokens=2048)
meta = _parse_json(raw)
meta.setdefault("title", title or url or text[:80])
meta.setdefault("summary", "")
meta.setdefault("summary_ko", "")
meta.setdefault("body_md", text)
meta.setdefault("tags", [])
meta.setdefault("author", None)
meta.setdefault("date", None)
meta.setdefault("content_type", content_type)
meta.setdefault("language", "en")
return meta
# ── 메타데이터 추출 (기본 정보) ───────────────────────────────────────────────
_META_PROMPT = """\
Analyze the content below and return ONLY a JSON object with these fields:
- "title": concise descriptive title (string)
- "summary": 3-5 sentence overall summary (string)
- "summary_ko": same summary in Korean (string)
- "tags": 3-7 relevant keywords (string[])
- "author": author or null (string | null)
- "date": YYYY-MM-DD or null (string | null)
- "content_type": youtube/article/documentation/news/forum/code/other (string)
- "language": ISO 639-1 code (string)
Content type: {content_type}
Source URL: {url}
Content (first 5000 chars):
{text}
Return only the JSON, no markdown."""
def _extract_meta(content_type: str, title: str, url: str, text: str) -> dict:
"""기본 메타데이터(제목·요약·태그 등) 추출."""
prompt = _META_PROMPT.format(
content_type=content_type,
url=url or "(none)",
text=text[:5000],
)
try: try:
client = _get_client() meta = _parse_json(_llm(prompt, max_tokens=1024))
req = GenericChatRequest( except Exception:
messages=[UserMessage(content=[TextContent(text=prompt)])], meta = {}
max_tokens=2048, meta.setdefault("title", title or url or text[:80])
temperature=0, meta.setdefault("summary", "")
meta.setdefault("summary_ko", "")
meta.setdefault("tags", [])
meta.setdefault("author", None)
meta.setdefault("date", None)
meta.setdefault("content_type", content_type)
meta.setdefault("language", "en")
return meta
# ── 공개 인터페이스 ──────────────────────────────────────────────────────────
def enrich(content_type: str, title: str, url: str, text: str) -> dict:
"""4단계 파이프라인으로 콘텐츠를 구조화된 문서로 변환.
Args:
content_type: 'youtube' | 'url' | 'text'
title: 초기 제목 힌트 (없으면 빈 문자열)
url: 소스 URL (텍스트 직접 입력이면 빈 문자열)
text: 처리할 전체 텍스트
Returns:
Dict with keys:
title, summary, summary_ko, tags, author, date,
content_type, language, body_md
body_md: 4단계 파이프라인으로 생성된 구조화 Markdown 문서
"""
# ── 짧은 텍스트: 단순 처리 ──────────────────────────────
if len(text) < _SHORT_THRESHOLD:
logger.info("Short text (%d chars) → simple 1-pass enrichment", len(text))
try:
return _simple_enrich(content_type, title, url, text)
except Exception as e:
logger.warning("Simple enrich failed: %s", e)
return _fallback(title, url, text, content_type)
# ── 긴 텍스트: 4단계 파이프라인 ────────────────────────
logger.info("Long text (%d chars) → 4-step pipeline", len(text))
# Step 1: Normalize + Entity Extraction
logger.info("[1/4] Normalize...")
try:
normalized, entities = _normalize(text)
except Exception as e:
logger.warning("Normalize failed, using raw: %s", e)
normalized, entities = text, []
# Step 2: Index Tree
logger.info("[2/4] Index Tree (%d entities found)...", len(entities))
try:
tree = _build_index_tree(normalized)
except Exception as e:
logger.warning("Index tree failed: %s", e)
tree = [{"title": "본문", "level": 1, "children": []}]
# Step 3: Leaf Summarize
logger.info("[3/4] Leaf Summarize (%d sections)...", len(_collect_leaves(tree)))
try:
sections = _summarize_leaves(normalized, tree, content_type)
except Exception as e:
logger.warning("Leaf summarize failed: %s", e)
sections = [{"title": "본문", "summary": normalized[:1000]}]
# 기본 메타데이터 추출 (제목·요약·태그)
logger.info("[meta] Extracting metadata...")
try:
meta = _extract_meta(content_type, title, url, normalized)
except Exception as e:
logger.warning("Meta extraction failed: %s", e)
meta = _fallback(title, url, text, content_type)
# Step 4: Assemble (LLM 없음)
logger.info("[4/4] Assemble...")
body_md = _assemble(
overall_summary=meta.get("summary", ""),
tree=tree,
sections=sections,
entities=entities,
) )
det = ChatDetails(
compartment_id=os.environ["OCI_COMPARTMENT_ID"], # Step 5 (optional): Consistency Check
serving_mode=OnDemandServingMode(model_id=os.environ["OCI_CHAT_MODEL_ID"]), logger.info("[5/5] Consistency Check...")
chat_request=req, body_md = _consistency_check(body_md, entities)
)
response = client.chat(det) meta["body_md"] = body_md
raw = response.data.chat_response.choices[0].message.content[0].text.strip() logger.info("Pipeline complete. body_md=%d chars", len(body_md))
raw = re.sub(r"^```(?:json)?\s*|\s*```$", "", raw, flags=re.MULTILINE) return meta
metadata = json.loads(raw)
except Exception as exc:
metadata = { def _fallback(title: str, url: str, text: str, content_type: str) -> dict:
return {
"title": title or url or text[:80], "title": title or url or text[:80],
"summary": text[:300], "summary": text[:300],
"summary_ko": "",
"body_md": text,
"tags": [], "tags": [],
"author": None, "author": None,
"date": None, "date": None,
"content_type": content_type, "content_type": content_type,
"_error": str(exc), "language": "en",
} }
# Ensure required keys exist
metadata.setdefault("title", title or url or text[:80])
metadata.setdefault("summary", "")
metadata.setdefault("tags", [])
metadata.setdefault("author", None)
metadata.setdefault("date", None)
metadata.setdefault("content_type", content_type)
metadata.setdefault("language", "en")
return metadata

View File

@@ -55,11 +55,12 @@ def process_item(item: dict) -> None:
meta = enrich(input_type, yt_title, url, text) meta = enrich(input_type, yt_title, url, text)
title = meta.get("title") or yt_title or url or row_id[:8] title = meta.get("title") or yt_title or url or row_id[:8]
# body_md: 4단계 파이프라인이 생성한 구조화 문서 (없으면 원문 폴백)
note_path = save_note( note_path = save_note(
content_type=input_type, content_type=input_type,
title=title, title=title,
summary=meta.get("summary", ""), summary=meta.get("summary", ""),
body=text, body=meta.get("body_md") or text,
tags=meta.get("tags", []), tags=meta.get("tags", []),
source_url=url, source_url=url,
author=meta.get("author") or "", author=meta.get("author") or "",