feat: add Gemini 2.5 Flash enrichment to web_search results
Fetch full page content per result, then call Gemini 2.5 Flash (via OCI GenAI) to extract: abstract, date, author, tags, content_type, and uuid. Enrichment runs in parallel (ThreadPoolExecutor). enrich=False flag available to skip for raw/fast results.
This commit is contained in:
164
src/enricher.py
Normal file
164
src/enricher.py
Normal file
@@ -0,0 +1,164 @@
|
||||
"""Page fetching and LLM-based enrichment via Gemini 2.5 Pro on OCI GenAI."""
|
||||
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import uuid
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from html.parser import HTMLParser
|
||||
|
||||
import httpx
|
||||
import oci
|
||||
from oci.generative_ai_inference import GenerativeAiInferenceClient
|
||||
from oci.generative_ai_inference.models import (
|
||||
ChatDetails,
|
||||
GenericChatRequest,
|
||||
OnDemandServingMode,
|
||||
TextContent,
|
||||
UserMessage,
|
||||
)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# HTML stripping
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class _TextExtractor(HTMLParser):
|
||||
_SKIP_TAGS = {"script", "style", "head", "nav", "footer", "noscript"}
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self._buf: list[str] = []
|
||||
self._skip = 0
|
||||
|
||||
def handle_starttag(self, tag: str, attrs: list) -> None:
|
||||
if tag in self._SKIP_TAGS:
|
||||
self._skip += 1
|
||||
|
||||
def handle_endtag(self, tag: str) -> None:
|
||||
if tag in self._SKIP_TAGS and self._skip:
|
||||
self._skip -= 1
|
||||
|
||||
def handle_data(self, data: str) -> None:
|
||||
if not self._skip:
|
||||
text = data.strip()
|
||||
if text:
|
||||
self._buf.append(text)
|
||||
|
||||
def get_text(self) -> str:
|
||||
return " ".join(self._buf)
|
||||
|
||||
|
||||
def _html_to_text(html: str) -> str:
|
||||
parser = _TextExtractor()
|
||||
parser.feed(html)
|
||||
return re.sub(r"\s{3,}", " ", parser.get_text())
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Page fetching
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_HEADERS = {"User-Agent": "Mozilla/5.0 (compatible; searxng-mcp/1.0)"}
|
||||
|
||||
|
||||
def fetch_page_text(url: str, max_chars: int = 5000) -> str:
|
||||
"""Fetch a URL and return stripped plain text, truncated to max_chars."""
|
||||
try:
|
||||
r = httpx.get(url, timeout=10, follow_redirects=True, headers=_HEADERS)
|
||||
r.raise_for_status()
|
||||
return _html_to_text(r.text)[:max_chars]
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# OCI GenAI (Gemini 2.5 Pro) enrichment
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_PROMPT = """\
|
||||
You are an information extractor. Analyze the web page content below and return ONLY a valid JSON object with these fields:
|
||||
- "abstract": 2-3 sentence summary of the page content (string)
|
||||
- "date": publication or last-modified date in ISO 8601 format, or null if not found (string | null)
|
||||
- "author": author or organization name, or null if not found (string | null)
|
||||
- "tags": list of 3-5 relevant keywords (string[])
|
||||
- "content_type": one of "article", "documentation", "news", "forum", "code", "video", "other" (string)
|
||||
|
||||
Title: {title}
|
||||
URL: {url}
|
||||
Content:
|
||||
{content}
|
||||
|
||||
Return only the JSON object, no markdown, no explanation."""
|
||||
|
||||
|
||||
def _get_llm_client() -> GenerativeAiInferenceClient:
|
||||
config = oci.config.from_file()
|
||||
return GenerativeAiInferenceClient(
|
||||
config,
|
||||
service_endpoint=os.environ["OCI_GENAI_ENDPOINT"],
|
||||
)
|
||||
|
||||
|
||||
def enrich_result(title: str, url: str, snippet: str) -> dict:
|
||||
"""Fetch page, then call Gemini 2.5 Pro to extract structured metadata."""
|
||||
page_text = fetch_page_text(url)
|
||||
content = page_text if page_text else snippet
|
||||
|
||||
prompt = _PROMPT.format(title=title, url=url, content=content[:4000])
|
||||
|
||||
try:
|
||||
client = _get_llm_client()
|
||||
req = GenericChatRequest(
|
||||
messages=[UserMessage(content=[TextContent(text=prompt)])],
|
||||
max_tokens=512,
|
||||
temperature=0,
|
||||
)
|
||||
det = ChatDetails(
|
||||
compartment_id=os.environ["OCI_COMPARTMENT_ID"],
|
||||
serving_mode=OnDemandServingMode(
|
||||
model_id=os.environ["OCI_CHAT_MODEL_ID"]
|
||||
),
|
||||
chat_request=req,
|
||||
)
|
||||
response = client.chat(det)
|
||||
raw = response.data.chat_response.choices[0].message.content[0].text.strip()
|
||||
# Strip markdown code fences if present
|
||||
raw = re.sub(r"^```(?:json)?\s*|\s*```$", "", raw, flags=re.MULTILINE)
|
||||
metadata = json.loads(raw)
|
||||
except Exception as exc:
|
||||
metadata = {
|
||||
"abstract": snippet,
|
||||
"date": None,
|
||||
"author": None,
|
||||
"tags": [],
|
||||
"content_type": "other",
|
||||
"_error": str(exc),
|
||||
}
|
||||
|
||||
metadata["uuid"] = str(uuid.uuid4())
|
||||
return metadata
|
||||
|
||||
|
||||
def enrich_results_parallel(
|
||||
results: list[dict],
|
||||
max_workers: int = 5,
|
||||
) -> list[dict]:
|
||||
"""Enrich a list of search results in parallel, adding metadata to each."""
|
||||
enriched: dict[int, dict] = {}
|
||||
|
||||
with ThreadPoolExecutor(max_workers=max_workers) as pool:
|
||||
futures = {
|
||||
pool.submit(enrich_result, r["title"], r["url"], r["content"]): i
|
||||
for i, r in enumerate(results)
|
||||
}
|
||||
for future in as_completed(futures):
|
||||
idx = futures[future]
|
||||
try:
|
||||
enriched[idx] = future.result()
|
||||
except Exception as exc:
|
||||
enriched[idx] = {"uuid": str(uuid.uuid4()), "_error": str(exc)}
|
||||
|
||||
return [
|
||||
{**results[i], **enriched.get(i, {})}
|
||||
for i in range(len(results))
|
||||
]
|
||||
@@ -1,4 +1,4 @@
|
||||
"""MCP server wrapping the self-hosted SearXNG instance for free web search."""
|
||||
"""MCP server wrapping self-hosted SearXNG with Gemini 2.5 Pro enrichment."""
|
||||
|
||||
import os
|
||||
import sys
|
||||
@@ -14,24 +14,38 @@ load_dotenv(os.path.join(_project_root, ".env"))
|
||||
import httpx
|
||||
from mcp.server.fastmcp import FastMCP
|
||||
|
||||
from src.enricher import enrich_results_parallel
|
||||
|
||||
mcp = FastMCP(
|
||||
name="searxng",
|
||||
instructions="Web search via self-hosted SearXNG. Use this instead of built-in WebSearch.",
|
||||
instructions=(
|
||||
"Web search via self-hosted SearXNG + Gemini 2.5 Pro enrichment. "
|
||||
"Use this instead of the built-in WebSearch tool."
|
||||
),
|
||||
)
|
||||
|
||||
SEARXNG_URL = os.environ.get("SEARXNG_URL", "https://searxng.cloud-handson.com")
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def web_search(query: str, max_results: int = 10) -> list[dict]:
|
||||
def web_search(query: str, max_results: int = 10, enrich: bool = True) -> list[dict]:
|
||||
"""Search the web using the self-hosted SearXNG instance.
|
||||
|
||||
Each result is enriched by Gemini 2.5 Pro (via OCI GenAI) with:
|
||||
- abstract: 2-3 sentence summary of the page
|
||||
- date: publication date (ISO 8601) or null
|
||||
- author: author/org name or null
|
||||
- tags: 3-5 relevant keywords
|
||||
- content_type: article / documentation / news / forum / code / video / other
|
||||
- uuid: unique identifier for this result
|
||||
|
||||
Args:
|
||||
query: The search query string.
|
||||
max_results: Maximum number of results to return (default 10).
|
||||
enrich: Set to False to skip LLM enrichment and return raw results faster.
|
||||
|
||||
Returns:
|
||||
List of result dicts with keys: ``title``, ``url``, ``content`` (snippet).
|
||||
List of enriched result dicts.
|
||||
"""
|
||||
response = httpx.get(
|
||||
f"{SEARXNG_URL}/search",
|
||||
@@ -41,7 +55,7 @@ def web_search(query: str, max_results: int = 10) -> list[dict]:
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
|
||||
return [
|
||||
results = [
|
||||
{
|
||||
"title": r.get("title", ""),
|
||||
"url": r.get("url", ""),
|
||||
@@ -50,6 +64,11 @@ def web_search(query: str, max_results: int = 10) -> list[dict]:
|
||||
for r in data.get("results", [])[:max_results]
|
||||
]
|
||||
|
||||
if enrich and results:
|
||||
results = enrich_results_parallel(results)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def main() -> None:
|
||||
mcp.run()
|
||||
|
||||
Reference in New Issue
Block a user