diff --git a/core/enricher.py b/core/enricher.py index 66cee2b..490d874 100644 --- a/core/enricher.py +++ b/core/enricher.py @@ -1,6 +1,19 @@ -"""LLM-based content enrichment via OCI GenAI Gemini Flash.""" +"""LLM-based content enrichment via OCI GenAI Gemini Flash. + +4단계 파이프라인: + 1. Normalize — 구어체 정제 + 핵심 엔티티 추출 + 2. Index Tree — 계층적 목차(JSON) 생성 + 3. Leaf Summarize — 섹션별 상세 요약 (context overlap 적용) + 4. Consistency — 엔티티 누락 검증 및 보완 + 5. Assemble — 최종 Markdown 문서 조립 (LLM 불필요) + +짧은 텍스트(< 3000자)는 단순 1-pass 처리로 폴백. +""" + +from __future__ import annotations import json +import logging import os import re @@ -14,16 +27,322 @@ from oci.generative_ai_inference.models import ( UserMessage, ) -_PROMPT = """\ -You are a knowledge extraction assistant. Analyze the content below and return ONLY a valid JSON object with these fields: -- "title": concise descriptive title for this content (string) -- "summary": 3-5 sentence summary capturing key insights, written in English (string) -- "summary_ko": the same summary translated into Korean (string) -- "tags": list of 3-7 relevant keywords or topics (string[]) -- "author": author or creator name, or null if not found (string | null) -- "date": publication date in ISO 8601 format (YYYY-MM-DD), or null if not found (string | null) -- "content_type": one of "youtube", "article", "documentation", "news", "forum", "code", "other" (string) -- "language": primary language of the content, ISO 639-1 code, e.g. "en", "ko", "ja" (string) +logger = logging.getLogger(__name__) + +# 텍스트 길이 임계값 +_SHORT_THRESHOLD = 3_000 # 이하면 1-pass로 처리 +_SECTION_SIZE = 4_000 # 섹션별 청킹 크기 +_OVERLAP = 300 # 인접 섹션 컨텍스트 오버랩 + + +# ── LLM 헬퍼 ───────────────────────────────────────────────────────────────── + +def _get_client() -> GenerativeAiInferenceClient: + config = oci.config.from_file() + endpoint = os.environ.get("OCI_CHAT_ENDPOINT") or os.environ["OCI_GENAI_ENDPOINT"] + return GenerativeAiInferenceClient(config, service_endpoint=endpoint) + + +def _llm(prompt: str, max_tokens: int = 2048) -> str: + """단일 LLM 호출. 응답 텍스트 반환.""" + client = _get_client() + req = GenericChatRequest( + messages=[UserMessage(content=[TextContent(text=prompt)])], + max_tokens=max_tokens, + temperature=0, + ) + det = ChatDetails( + compartment_id=os.environ["OCI_COMPARTMENT_ID"], + serving_mode=OnDemandServingMode(model_id=os.environ["OCI_CHAT_MODEL_ID"]), + chat_request=req, + ) + resp = client.chat(det) + return resp.data.chat_response.choices[0].message.content[0].text.strip() + + +def _parse_json(raw: str) -> dict | list: + """LLM 응답에서 JSON 파싱. 마크다운 코드블록 제거 후 시도.""" + raw = re.sub(r"^```(?:json)?\s*|\s*```$", "", raw, flags=re.MULTILINE).strip() + return json.loads(raw) + + +# ── Step 1: Normalize ───────────────────────────────────────────────────────── + +def _normalize(text: str) -> tuple[str, list[str]]: + """구어체 정제 + 핵심 엔티티 추출. + + 긴 텍스트는 4000자 청크로 분할 처리 후 합산. + Returns: + (정제된 텍스트, 엔티티 리스트) + """ + from .chunker import chunk_text + + chunks = chunk_text(text, size=_SECTION_SIZE, overlap=0) + normalized_parts: list[str] = [] + all_entities: list[str] = [] + + for i, chunk in enumerate(chunks): + prompt = f"""\ +다음 텍스트를 아래 두 가지 작업으로 처리하세요. + +[작업 1 - 정제] +- 구어체·추임새(음, 어, 그래서, 뭔가, 이제...)를 제거 +- 중복 표현 압축 +- 문장 부호(마침표, 물음표, 줄바꿈)를 복원해 읽기 좋은 문어체/보고서체로 변환 +- 단, 정보값(수치·고유명사·전문용어)은 절대 삭제하지 말 것 + +[작업 2 - 엔티티 추출] +- 고유명사, 전문 용어, 수치, 날짜, 조직명 등 핵심 엔티티를 리스트업 + +JSON으로만 응답 (설명 없이): +{{"normalized": "정제된 텍스트", "entities": ["엔티티1", ...]}} + +원문 (청크 {i+1}/{len(chunks)}): +{chunk}""" + + try: + result = _parse_json(_llm(prompt, max_tokens=_SECTION_SIZE + 512)) + normalized_parts.append(result.get("normalized", chunk)) + all_entities.extend(result.get("entities", [])) + except Exception as e: + logger.warning("Normalize chunk %d failed: %s", i, e) + normalized_parts.append(chunk) + + # 엔티티 중복 제거 (순서 유지) + seen: set[str] = set() + unique_entities = [e for e in all_entities if not (e in seen or seen.add(e))] + + return "\n\n".join(normalized_parts), unique_entities + + +# ── Step 2: Index Tree ──────────────────────────────────────────────────────── + +def _build_index_tree(text: str) -> list[dict]: + """텍스트 전체를 스캔해 계층적 목차(JSON) 생성. + + Returns: + [{"title": str, "level": int, "children": [...]}] + """ + # 긴 텍스트는 첫 8000자 + 마지막 1000자로 구조 파악 + sample = text[:8_000] + if len(text) > 9_000: + sample += "\n...(중략)...\n" + text[-1_000:] + + prompt = f"""\ +다음 텍스트를 분석해 계층적 목차를 만드세요. + +규칙: +- 텍스트의 주제 흐름을 파악해 대목차(level 1) > 중목차(level 2) > 소목차(level 3) 구조로 분류 +- 각 섹션 제목은 명사형으로 간결하게 작성 +- 소목차는 실제 내용이 있을 때만 생성 (과도한 세분화 금지) +- 전체 섹션 수는 3~10개 권장 + +JSON 배열로만 응답: +[ + {{"title": "대목차 제목", "level": 1, "children": [ + {{"title": "중목차", "level": 2, "children": [ + {{"title": "소목차", "level": 3, "children": []}} + ]}} + ]}} +] + +텍스트: +{sample}""" + + try: + tree = _parse_json(_llm(prompt, max_tokens=1024)) + if isinstance(tree, list): + return tree + except Exception as e: + logger.warning("Index tree build failed: %s", e) + + # 폴백: 단일 섹션 + return [{"title": "본문", "level": 1, "children": []}] + + +# ── Step 3: Leaf Summarize ──────────────────────────────────────────────────── + +def _collect_leaves(tree: list[dict]) -> list[dict]: + """트리에서 말단(leaf) 노드만 수집.""" + leaves: list[dict] = [] + for node in tree: + if node.get("children"): + leaves.extend(_collect_leaves(node["children"])) + else: + leaves.append(node) + return leaves + + +def _split_by_sections(text: str, leaves: list[dict]) -> list[str]: + """텍스트를 섹션 수에 맞게 균등 분할 (context overlap 포함). + + Returns: + 섹션별 텍스트 리스트 (인접 섹션 앞뒤 OVERLAP자 포함) + """ + n = len(leaves) + if n == 0: + return [] + + total = len(text) + base = total // n + sections: list[str] = [] + + for i in range(n): + start = max(0, i * base - _OVERLAP) + end = min(total, (i + 1) * base + _OVERLAP) + sections.append(text[start:end]) + + return sections + + +def _summarize_leaves( + text: str, + tree: list[dict], + content_type: str, +) -> list[dict]: + """말단 섹션별로 상세 요약 생성 (context overlap 적용). + + Returns: + [{"title": str, "summary": str}, ...] + """ + leaves = _collect_leaves(tree) + sections = _split_by_sections(text, leaves) + results: list[dict] = [] + + for i, (leaf, section_text) in enumerate(zip(leaves, sections)): + prompt = f"""\ +다음은 "{leaf['title']}" 섹션에 해당하는 내용입니다. +(앞뒤 섹션과 {_OVERLAP}자 컨텍스트가 포함되어 있음) + +작성 규칙: +- 해당 섹션의 핵심 내용을 상세하게 요약 (3~7문장) +- 수치·사례·고유명사는 반드시 포함 +- 문어체 보고서 형식으로 작성 +- 마크다운 볼드(**) 활용해 핵심어 강조 + +내용: +{section_text} + +"{leaf['title']}" 섹션 상세 요약:""" + + try: + summary = _llm(prompt, max_tokens=800) + results.append({"title": leaf["title"], "summary": summary}) + logger.debug("Section %d/%d summarized: %s", i + 1, len(leaves), leaf["title"]) + except Exception as e: + logger.warning("Leaf summarize failed [%s]: %s", leaf["title"], e) + results.append({"title": leaf["title"], "summary": section_text[:400] + "..."}) + + return results + + +# ── Step 4: Consistency Check ───────────────────────────────────────────────── + +def _consistency_check( + assembled: str, + entities: list[str], +) -> str: + """엔티티 누락 검증 — 빠진 핵심 정보를 문서에 보완. + + 엔티티가 없거나 문서가 짧으면 스킵. + """ + if not entities or len(assembled) < 200: + return assembled + + # 엔티티 중 assembled에 없는 것만 추려서 검증 비용 절감 + missing = [e for e in entities if e not in assembled] + if not missing: + logger.debug("Consistency check: all %d entities present.", len(entities)) + return assembled + + prompt = f"""\ +아래 문서에 다음 핵심 엔티티들이 언급되지 않았습니다. +각 엔티티를 가장 적합한 섹션에 자연스럽게 추가하세요. +전체 문서 구조(제목, 섹션)는 절대 변경하지 마세요. + +누락 엔티티: +{json.dumps(missing, ensure_ascii=False)} + +문서: +{assembled} + +수정된 문서 전체를 그대로 출력하세요:""" + + try: + return _llm(prompt, max_tokens=len(assembled) + 512) + except Exception as e: + logger.warning("Consistency check failed: %s", e) + return assembled + + +# ── Step 5: Assemble ────────────────────────────────────────────────────────── + +def _assemble( + overall_summary: str, + tree: list[dict], + sections: list[dict], + entities: list[str], +) -> str: + """목차 구조 + 섹션 요약 → 최종 Markdown 문서 조립.""" + lines: list[str] = [] + + # 전체 요약 + lines.append("## 요약") + lines.append(overall_summary) + lines.append("") + + # 목차 + lines.append("## 목차") + for node in tree: + indent = " " * (node["level"] - 1) + lines.append(f"{indent}- {node['title']}") + for child in node.get("children", []): + indent2 = " " * (child["level"] - 1) + lines.append(f"{indent2}- {child['title']}") + lines.append("") + + # 섹션별 상세 + heading = {1: "#", 2: "##", 3: "###"} + for section in sections: + lvl = _find_level(tree, section["title"]) + h = heading.get(lvl, "##") + lines.append(f"{h} {section['title']}") + lines.append(section["summary"]) + lines.append("") + + # 핵심 엔티티 + if entities: + lines.append("## 핵심 키워드") + lines.append(", ".join(f"`{e}`" for e in entities[:20])) + lines.append("") + + return "\n".join(lines) + + +def _find_level(tree: list[dict], title: str, default: int = 2) -> int: + """트리에서 title에 해당하는 level 반환.""" + for node in tree: + if node["title"] == title: + return node["level"] + found = _find_level(node.get("children", []), title, default) + if found != default: + return found + return default + + +# ── 단순 1-pass 처리 (짧은 텍스트) ──────────────────────────────────────────── + +_SIMPLE_PROMPT = """\ +You are a knowledge extraction assistant. Analyze the content below and return ONLY a valid JSON object: +- "title": concise descriptive title (string) +- "summary": 3-5 sentence summary of key insights (string) +- "summary_ko": same summary in Korean (string) +- "body_md": well-structured Markdown document with proper headings and bullet points (string) +- "tags": 3-7 relevant keywords (string[]) +- "author": author name or null (string | null) +- "date": publication date YYYY-MM-DD or null (string | null) +- "content_type": one of youtube/article/documentation/news/forum/code/other (string) +- "language": ISO 639-1 code e.g. "en", "ko" (string) Content type: {content_type} Source URL: {url} @@ -33,67 +352,158 @@ Content: Return only the JSON object, no markdown, no explanation.""" -def _get_client() -> GenerativeAiInferenceClient: - config = oci.config.from_file() - # Gemini models live in us-ashburn-1; use OCI_CHAT_ENDPOINT if set, - # otherwise fall back to OCI_GENAI_ENDPOINT. - endpoint = os.environ.get("OCI_CHAT_ENDPOINT") or os.environ["OCI_GENAI_ENDPOINT"] - return GenerativeAiInferenceClient(config, service_endpoint=endpoint) - - -def enrich(content_type: str, title: str, url: str, text: str) -> dict: - """Extract structured metadata from content using Gemini Flash. - - Args: - content_type: One of 'youtube', 'url', 'text'. - title: Initial title hint (may be empty). - url: Source URL (empty for plain text). - text: The full content text to analyze. - - Returns: - Dict with keys: title, summary, tags, author, date, content_type. - Falls back to minimal defaults on LLM failure. - """ - prompt = _PROMPT.format( +def _simple_enrich(content_type: str, title: str, url: str, text: str) -> dict: + """짧은 텍스트용 단순 1-pass 처리.""" + prompt = _SIMPLE_PROMPT.format( content_type=content_type, url=url or "(none)", text=text[:6000], ) + raw = _llm(prompt, max_tokens=2048) + meta = _parse_json(raw) + meta.setdefault("title", title or url or text[:80]) + meta.setdefault("summary", "") + meta.setdefault("summary_ko", "") + meta.setdefault("body_md", text) + meta.setdefault("tags", []) + meta.setdefault("author", None) + meta.setdefault("date", None) + meta.setdefault("content_type", content_type) + meta.setdefault("language", "en") + return meta + +# ── 메타데이터 추출 (기본 정보) ─────────────────────────────────────────────── + +_META_PROMPT = """\ +Analyze the content below and return ONLY a JSON object with these fields: +- "title": concise descriptive title (string) +- "summary": 3-5 sentence overall summary (string) +- "summary_ko": same summary in Korean (string) +- "tags": 3-7 relevant keywords (string[]) +- "author": author or null (string | null) +- "date": YYYY-MM-DD or null (string | null) +- "content_type": youtube/article/documentation/news/forum/code/other (string) +- "language": ISO 639-1 code (string) + +Content type: {content_type} +Source URL: {url} +Content (first 5000 chars): +{text} + +Return only the JSON, no markdown.""" + + +def _extract_meta(content_type: str, title: str, url: str, text: str) -> dict: + """기본 메타데이터(제목·요약·태그 등) 추출.""" + prompt = _META_PROMPT.format( + content_type=content_type, + url=url or "(none)", + text=text[:5000], + ) try: - client = _get_client() - req = GenericChatRequest( - messages=[UserMessage(content=[TextContent(text=prompt)])], - max_tokens=2048, - temperature=0, - ) - det = ChatDetails( - compartment_id=os.environ["OCI_COMPARTMENT_ID"], - serving_mode=OnDemandServingMode(model_id=os.environ["OCI_CHAT_MODEL_ID"]), - chat_request=req, - ) - response = client.chat(det) - raw = response.data.chat_response.choices[0].message.content[0].text.strip() - raw = re.sub(r"^```(?:json)?\s*|\s*```$", "", raw, flags=re.MULTILINE) - metadata = json.loads(raw) - except Exception as exc: - metadata = { - "title": title or url or text[:80], - "summary": text[:300], - "tags": [], - "author": None, - "date": None, - "content_type": content_type, - "_error": str(exc), - } + meta = _parse_json(_llm(prompt, max_tokens=1024)) + except Exception: + meta = {} + meta.setdefault("title", title or url or text[:80]) + meta.setdefault("summary", "") + meta.setdefault("summary_ko", "") + meta.setdefault("tags", []) + meta.setdefault("author", None) + meta.setdefault("date", None) + meta.setdefault("content_type", content_type) + meta.setdefault("language", "en") + return meta - # Ensure required keys exist - metadata.setdefault("title", title or url or text[:80]) - metadata.setdefault("summary", "") - metadata.setdefault("tags", []) - metadata.setdefault("author", None) - metadata.setdefault("date", None) - metadata.setdefault("content_type", content_type) - metadata.setdefault("language", "en") - return metadata +# ── 공개 인터페이스 ────────────────────────────────────────────────────────── + +def enrich(content_type: str, title: str, url: str, text: str) -> dict: + """4단계 파이프라인으로 콘텐츠를 구조화된 문서로 변환. + + Args: + content_type: 'youtube' | 'url' | 'text' + title: 초기 제목 힌트 (없으면 빈 문자열) + url: 소스 URL (텍스트 직접 입력이면 빈 문자열) + text: 처리할 전체 텍스트 + + Returns: + Dict with keys: + title, summary, summary_ko, tags, author, date, + content_type, language, body_md + body_md: 4단계 파이프라인으로 생성된 구조화 Markdown 문서 + """ + # ── 짧은 텍스트: 단순 처리 ────────────────────────────── + if len(text) < _SHORT_THRESHOLD: + logger.info("Short text (%d chars) → simple 1-pass enrichment", len(text)) + try: + return _simple_enrich(content_type, title, url, text) + except Exception as e: + logger.warning("Simple enrich failed: %s", e) + return _fallback(title, url, text, content_type) + + # ── 긴 텍스트: 4단계 파이프라인 ──────────────────────── + logger.info("Long text (%d chars) → 4-step pipeline", len(text)) + + # Step 1: Normalize + Entity Extraction + logger.info("[1/4] Normalize...") + try: + normalized, entities = _normalize(text) + except Exception as e: + logger.warning("Normalize failed, using raw: %s", e) + normalized, entities = text, [] + + # Step 2: Index Tree + logger.info("[2/4] Index Tree (%d entities found)...", len(entities)) + try: + tree = _build_index_tree(normalized) + except Exception as e: + logger.warning("Index tree failed: %s", e) + tree = [{"title": "본문", "level": 1, "children": []}] + + # Step 3: Leaf Summarize + logger.info("[3/4] Leaf Summarize (%d sections)...", len(_collect_leaves(tree))) + try: + sections = _summarize_leaves(normalized, tree, content_type) + except Exception as e: + logger.warning("Leaf summarize failed: %s", e) + sections = [{"title": "본문", "summary": normalized[:1000]}] + + # 기본 메타데이터 추출 (제목·요약·태그) + logger.info("[meta] Extracting metadata...") + try: + meta = _extract_meta(content_type, title, url, normalized) + except Exception as e: + logger.warning("Meta extraction failed: %s", e) + meta = _fallback(title, url, text, content_type) + + # Step 4: Assemble (LLM 없음) + logger.info("[4/4] Assemble...") + body_md = _assemble( + overall_summary=meta.get("summary", ""), + tree=tree, + sections=sections, + entities=entities, + ) + + # Step 5 (optional): Consistency Check + logger.info("[5/5] Consistency Check...") + body_md = _consistency_check(body_md, entities) + + meta["body_md"] = body_md + logger.info("Pipeline complete. body_md=%d chars", len(body_md)) + return meta + + +def _fallback(title: str, url: str, text: str, content_type: str) -> dict: + return { + "title": title or url or text[:80], + "summary": text[:300], + "summary_ko": "", + "body_md": text, + "tags": [], + "author": None, + "date": None, + "content_type": content_type, + "language": "en", + } diff --git a/daemon/worker.py b/daemon/worker.py index ae1250f..e9051be 100644 --- a/daemon/worker.py +++ b/daemon/worker.py @@ -55,11 +55,12 @@ def process_item(item: dict) -> None: meta = enrich(input_type, yt_title, url, text) title = meta.get("title") or yt_title or url or row_id[:8] + # body_md: 4단계 파이프라인이 생성한 구조화 문서 (없으면 원문 폴백) note_path = save_note( content_type=input_type, title=title, summary=meta.get("summary", ""), - body=text, + body=meta.get("body_md") or text, tags=meta.get("tags", []), source_url=url, author=meta.get("author") or "",